Skip to content

Commit

Permalink
Schedule log compaction for a computed time interval to decrease the …
Browse files Browse the repository at this point in the history
…chance snapshots need to be replicated to followers.
  • Loading branch information
kuujo committed Sep 5, 2017
1 parent f70997c commit ce78448
Showing 1 changed file with 44 additions and 16 deletions.
Expand Up @@ -81,6 +81,7 @@ public class RaftServiceManager implements AutoCloseable {
private final Map<String, DefaultServiceContext> services = new HashMap<>();
private long lastPrepared;
private long lastCompacted;
private long compactTime;

public RaftServiceManager(RaftContext raft, ScheduledExecutorService threadPool, ThreadContext threadContext) {
this.raft = checkNotNull(raft, "state cannot be null");
Expand All @@ -91,7 +92,7 @@ public RaftServiceManager(RaftContext raft, ScheduledExecutorService threadPool,
this.logger = ContextualLoggerFactory.getLogger(getClass(), LoggerContext.builder(RaftServer.class)
.addValue(raft.getName())
.build());
scheduleCompaction();
scheduleSnapshots();
}

/**
Expand Down Expand Up @@ -529,21 +530,40 @@ private CompletableFuture<OperationResult> applyQuery(Indexed<QueryEntry> entry)
}

/**
* Schedules a log compaction iteration.
* Schedules a snapshot iteration.
*/
private void scheduleCompaction() {
threadContext.schedule(COMPACT_INTERVAL, this::compactLog);
private void scheduleSnapshots() {
threadContext.schedule(COMPACT_INTERVAL, this::snapshotServices);
}

/**
* Schedules a log compaction.
*/
private void scheduleCompaction(long lastApplied) {
// Wait half the time since the last compaction before compacting the logs to ensure followers can keep
// up with the leader without replicating snapshots.
long compactTime = System.currentTimeMillis();
long lastCompactTime = this.compactTime;
this.compactTime = compactTime;
if (lastCompactTime > 0) {
Duration duration = Duration.ofMillis((compactTime - lastCompactTime) / 2);
logger.trace("Scheduling compaction for {}", duration);
threadContext.schedule(duration, () -> compactLogs(lastApplied));
} else {
scheduleSnapshots();
}
}

/**
* Compacts the log if necessary.
*/
@SuppressWarnings("unchecked")
private void compactLog() {
private void snapshotServices() {
long lastApplied = raft.getLastApplied();

// Only take snapshots if segments can be removed from the log below the lastApplied index.
if (raft.getLog().isCompactable(lastApplied) && raft.getLog().getCompactableIndex(lastApplied) > lastCompacted) {
logger.debug("Snapshotting services");

// Update the index at which the log was last compacted.
this.lastCompacted = lastApplied;
Expand All @@ -561,18 +581,26 @@ private void compactLog() {

// Wait for snapshots in all state machines to be completed before compacting the log at the last applied index.
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.whenCompleteAsync((result, error) -> {
logger.info("Compacting logs up to index {}", lastApplied);
try {
log.compact(lastApplied);
} catch (Exception e) {
logger.error("An exception occurred during log compaction: {}", e);
} finally {
scheduleCompaction();
}
}, threadContext);
.whenComplete((result, error) -> scheduleCompaction(lastApplied));
} else {
scheduleCompaction();
scheduleSnapshots();
}
}

/**
* Compacts logs up to the given index.
*
* @param compactIndex the index to which to compact logs
*/
private void compactLogs(long compactIndex) {
logger.debug("Compacting logs up to index {}", compactIndex);
try {
log.compact(compactIndex);
} catch (Exception e) {
logger.error("An exception occurred during log compaction: {}", e);
} finally {
// Immediately attempt to take new snapshots since compaction is already run after a time interval.
snapshotServices();
}
}

Expand Down

0 comments on commit ce78448

Please sign in to comment.