Skip to content

Commit

Permalink
Apply AppendEntries commits asynchronously.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jun 10, 2015
1 parent 5409556 commit e08d061
Showing 1 changed file with 18 additions and 7 deletions.
25 changes: 18 additions & 7 deletions raft/src/main/java/net/kuujo/copycat/raft/state/ActiveState.java
Expand Up @@ -177,8 +177,12 @@ private AppendResponse doAppendEntries(AppendRequest request) {
}

// If we've made it this far, apply commits and send a successful response.
doApplyCommits(request.commitIndex());
doApplyIndex(request.globalIndex());
context.getContext().execute(() -> {
applyCommits(request.commitIndex())
.thenRun(() -> applyIndex(request.globalIndex()))
.thenRun(request::close);
});

return AppendResponse.builder()
.withStatus(Response.Status.OK)
.withTerm(context.getTerm())
Expand All @@ -190,7 +194,8 @@ private AppendResponse doAppendEntries(AppendRequest request) {
/**
* Applies commits to the local state machine.
*/
private void doApplyCommits(long commitIndex) {
@SuppressWarnings("unchecked")
private CompletableFuture<Void> applyCommits(long commitIndex) {
// If the synced commit index is greater than the local commit index then
// apply commits to the local state machine.
// Also, it's possible that one of the previous write applications failed
Expand All @@ -199,10 +204,12 @@ private void doApplyCommits(long commitIndex) {
// commands have not yet been applied then we want to re-attempt to apply them.
if (commitIndex != 0 && !context.getLog().isEmpty()) {
if (context.getCommitIndex() == 0 || commitIndex > context.getCommitIndex()) {
LOGGER.debug("{} - Applying {} commits", context.getCluster().member().id(), commitIndex - Math.max(context.getCommitIndex(), context.getLog().firstIndex()));
long lastIndex = context.getLog().lastIndex();
int commits = (int) (Math.min(commitIndex, lastIndex) - Math.max(context.getCommitIndex(), context.getLog().firstIndex()));

LOGGER.debug("{} - Applying {} commits", context.getCluster().member().id(), commits);

// Update the local commit index with min(request commit, last log // index)
long lastIndex = context.getLog().lastIndex();
long previousCommitIndex = context.getCommitIndex();
if (lastIndex != 0) {
context.setCommitIndex(Math.min(Math.max(commitIndex, previousCommitIndex != 0 ? previousCommitIndex : commitIndex), lastIndex));
Expand All @@ -212,29 +219,33 @@ private void doApplyCommits(long commitIndex) {
if (context.getCommitIndex() > previousCommitIndex) {
// Starting after the last applied entry, iterate through new entries
// and apply them to the state machine up to the commit index.
CompletableFuture<?>[] futures = new CompletableFuture[commits];
int j = 0;
for (long i = Math.max(previousCommitIndex + 1, context.getLog().firstIndex()); i <= Math.min(context.getCommitIndex(), lastIndex); i++) {
Entry entry = context.getLog().getEntry(i);
if (entry != null) {
context.getStateMachine().apply(entry).whenCompleteAsync((result, error) -> {
futures[j++] = context.getStateMachine().apply(entry).whenCompleteAsync((result, error) -> {
if (isOpen() && error != null) {
LOGGER.info("{} - An application error occurred: {}", context.getCluster().member().id(), error);
}
entry.close();
}, context.getContext());
}
}
return CompletableFuture.allOf(futures);
}
}
}
} else {
context.setCommitIndex(commitIndex);
}
return CompletableFuture.completedFuture(null);
}

/**
* Recycles the log up to the given index.
*/
private void doApplyIndex(long globalIndex) {
private void applyIndex(long globalIndex) {
if (globalIndex > 0) {
context.setGlobalIndex(globalIndex);
}
Expand Down

0 comments on commit e08d061

Please sign in to comment.