Skip to content

Commit

Permalink
Check previous commit index prior to committing entries and add addit…
Browse files Browse the repository at this point in the history
…ional logging for entry commitment.
  • Loading branch information
kuujo committed Sep 11, 2017
1 parent 2b44756 commit 1afdc03
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 11 deletions.
Expand Up @@ -422,8 +422,9 @@ public MemberId getLastVotedFor() {
* Sets the commit index.
*
* @param commitIndex The commit index.
* @return the previous commit index
*/
public void setCommitIndex(long commitIndex) {
public long setCommitIndex(long commitIndex) {
checkArgument(commitIndex >= 0, "commitIndex must be positive");
long previousCommitIndex = this.commitIndex;
if (commitIndex > previousCommitIndex) {
Expand All @@ -438,6 +439,7 @@ public void setCommitIndex(long commitIndex) {
firstCommitIndex = commitIndex;
}
}
return previousCommitIndex;
}

/**
Expand Down
Expand Up @@ -310,10 +310,12 @@ private void commitEntries() {
// If the active members list is empty (a configuration change occurred between an append request/response)
// ensure all commit futures are completed and cleared.
if (members.isEmpty()) {
long previousCommitIndex = raft.getCommitIndex();
long commitIndex = raft.getLogWriter().getLastIndex();
raft.setCommitIndex(commitIndex);
completeCommits(previousCommitIndex, commitIndex);
long previousCommitIndex = raft.setCommitIndex(commitIndex);
if (commitIndex > previousCommitIndex) {
log.trace("Committed entries up to {}", commitIndex);
completeCommits(previousCommitIndex, commitIndex);
}
return;
}

Expand All @@ -325,6 +327,7 @@ private void commitEntries() {
// the index of the leader's no-op entry. Update the commit index and trigger commit futures.
long previousCommitIndex = raft.getCommitIndex();
if (commitIndex > 0 && commitIndex > previousCommitIndex && (leaderIndex > 0 && commitIndex >= leaderIndex)) {
log.trace("Committed entries up to {}", commitIndex);
raft.setCommitIndex(commitIndex);
completeCommits(previousCommitIndex, commitIndex);
}
Expand Down Expand Up @@ -502,4 +505,10 @@ protected void handleInstallResponseFailure(RaftMemberContext member, InstallReq
super.handleInstallResponseFailure(member, request, error);
}

@Override
public void close() {
super.close();
appendFutures.values().forEach(future ->
future.completeExceptionally(new IllegalStateException("Inactive state")));
}
}
Expand Up @@ -268,16 +268,12 @@ else if (lastEntry.index() == index) {
}

// Update the context commit and global indices.
long previousCommitIndex = raft.getCommitIndex();
raft.setCommitIndex(commitIndex);

if (raft.getCommitIndex() > previousCommitIndex) {
long previousCommitIndex = raft.setCommitIndex(commitIndex);
if (previousCommitIndex < commitIndex) {
log.trace("Committed entries up to index {}", commitIndex);
raft.getStateMachine().applyAll(commitIndex);
}

// Apply commits to the state machine in batch.
raft.getStateMachine().applyAll(raft.getCommitIndex());

// Return a successful append response.
succeedAppend(lastLogIndex, future);
}
Expand Down

0 comments on commit 1afdc03

Please sign in to comment.