Skip to content

Commit

Permalink
Ensure leader replicator references consistent nextIndex and matchInd…
Browse files Browse the repository at this point in the history
…ex within replication algorithm.
  • Loading branch information
kuujo committed Jun 11, 2015
1 parent 53a8fba commit 736e9d5
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 50 deletions.
Expand Up @@ -72,6 +72,11 @@ public void readObject(Buffer buffer, Serializer serializer) {

}

@Override
public String toString() {
return String.format("%s", getClass().getSimpleName());
}

/**
* Register client request builder.
*/
Expand Down
94 changes: 44 additions & 50 deletions raft/src/main/java/net/kuujo/copycat/raft/state/LeaderState.java
Expand Up @@ -730,16 +730,16 @@ private void commitEntries() {
// Sort the list of replicas, order by the last index that was replicated
// to the replica. This will allow us to determine the median index
// for all known replicated entries across all cluster members.
Collections.sort(replicas, (o1, o2) -> Long.compare(o2.matchIndex != 0 ? o2.matchIndex : 0l, o1.matchIndex != 0 ? o1.matchIndex : 0l));
Collections.sort(replicas, (o1, o2) -> Long.compare(o2.state.getMatchIndex() != 0 ? o2.state.getMatchIndex() : 0l, o1.state.getMatchIndex() != 0 ? o1.state.getMatchIndex() : 0l));

// Set the current commit index as the median replicated index.
// Since replicas is a list with zero based indexes, use the negation of
// the required quorum size to get the index of the replica with the least
// possible quorum replication. That replica's match index is the commit index.
// Set the commit index. Once the commit index has been set we can run
// all tasks up to the given commit.
long commitIndex = replicas.get(quorumIndex).matchIndex;
long globalIndex = replicas.get(replicas.size() - 1).matchIndex;
long commitIndex = replicas.get(quorumIndex).state.getMatchIndex();
long globalIndex = replicas.get(replicas.size() - 1).state.getMatchIndex();
if (commitIndex > 0) {
context.setCommitIndex(commitIndex);
context.setGlobalIndex(globalIndex);
Expand All @@ -757,34 +757,32 @@ private void commitEntries() {
private class Replica {
private final int id;
private final Member member;
private long nextIndex = 1;
private long matchIndex = 0;
private final MemberState state;
private boolean committing;

private Replica(int id, Member member) {
this.id = id;
this.member = member;
this.nextIndex = Math.max(context.getLog().lastIndex(), 1);
MemberState state = context.getMembers().getMember(member.id());
if (state == null) {
state = new MemberState(member.id(), Member.Type.ACTIVE, System.currentTimeMillis());
state.setNextIndex(Math.max(context.getLog().lastIndex(), 1));
}
this.state = state;
}

/**
* Triggers a commit for the replica.
*/
private void commit() {
if (!committing && isOpen()) {
MemberState member = context.getMembers().getMember(this.member.id());
if (member == null) {
member = new MemberState(this.member.id(), Member.Type.ACTIVE, System.currentTimeMillis());
context.getMembers().addMember(member);
}

// If the log is empty then send an empty commit.
// If the next index hasn't yet been set then we send an empty commit first.
// If the next index is greater than the last index then send an empty commit.
if (context.getLog().isEmpty() || nextIndex > context.getLog().lastIndex()) {
emptyCommit(member);
if (context.getLog().isEmpty() || state.getNextIndex() > context.getLog().lastIndex()) {
emptyCommit();
} else {
entriesCommit(member);
entriesCommit();
}
}
}
Expand All @@ -793,7 +791,7 @@ private void commit() {
* Gets the previous index.
*/
private long getPrevIndex() {
return nextIndex - 1;
return state.getNextIndex() - 1;
}

/**
Expand Down Expand Up @@ -837,26 +835,26 @@ private List<Entry> getEntries(long prevIndex) {
* Performs an empty commit.
*/
@SuppressWarnings("unchecked")
private void emptyCommit(MemberState member) {
private void emptyCommit() {
long prevIndex = getPrevIndex();
Entry prevEntry = getPrevEntry(prevIndex);
commit(member, prevIndex, prevEntry, Collections.EMPTY_LIST);
commit(prevIndex, prevEntry, Collections.EMPTY_LIST);
}

/**
* Performs a commit with entries.
*/
private void entriesCommit(MemberState member) {
private void entriesCommit() {
long prevIndex = getPrevIndex();
Entry prevEntry = getPrevEntry(prevIndex);
List<Entry> entries = getEntries(prevIndex);
commit(member, prevIndex, prevEntry, entries);
commit(prevIndex, prevEntry, entries);
}

/**
* Sends a commit message.
*/
private void commit(MemberState member, long prevIndex, Entry prevEntry, List<Entry> entries) {
private void commit(long prevIndex, Entry prevEntry, List<Entry> entries) {
AppendRequest request = AppendRequest.builder()
.withTerm(context.getTerm())
.withLeader(context.getCluster().member().id())
Expand All @@ -868,45 +866,45 @@ private void commit(MemberState member, long prevIndex, Entry prevEntry, List<En
.build();

committing = true;
LOGGER.debug("{} - Sent {} to {}", context.getCluster().member().id(), request, member);
LOGGER.debug("{} - Sent {} to {}", context.getCluster().member().id(), request, this.member);
this.member.<AppendRequest, AppendResponse>send(request).whenCompleteAsync((response, error) -> {
committing = false;
context.checkThread();

if (isOpen()) {
if (error == null) {
LOGGER.debug("{} - Received {} from {}", context.getCluster().member().id(), response, member);
LOGGER.debug("{} - Received {} from {}", context.getCluster().member().id(), response, this.member);
if (response.status() == Response.Status.OK) {
// Update the commit time for the replica. This will cause heartbeat futures to be triggered.
commitTime(id);

// If replication succeeded then trigger commit futures.
if (response.succeeded()) {
updateMatchIndex(member, response);
updateNextIndex(member);
updateMatchIndex(response);
updateNextIndex();

// If entries were committed to the replica then check commit indexes.
if (!entries.isEmpty()) {
commitEntries();
}

// If there are more entries to send then attempt to send another commit.
if (hasMoreEntries(member)) {
if (hasMoreEntries()) {
commit();
}
} else if (response.term() > context.getTerm()) {
transition(RaftState.FOLLOWER);
} else {
resetMatchIndex(member, response);
resetNextIndex(member);
resetMatchIndex(response);
resetNextIndex();

// If there are more entries to send then attempt to send another commit.
if (hasMoreEntries(member)) {
if (hasMoreEntries()) {
commit();
}
}
} else if (response.term() > context.getTerm()) {
LOGGER.debug("{} - Received higher term from {}", context.getCluster().member().id(), member);
LOGGER.debug("{} - Received higher term from {}", context.getCluster().member().id(), this.member);
transition(RaftState.FOLLOWER);
} else {
LOGGER.warn("{} - {}", context.getCluster().member().id(), response.error() != null ? response.error() : "");
Expand All @@ -922,55 +920,51 @@ private void commit(MemberState member, long prevIndex, Entry prevEntry, List<En
/**
* Returns a boolean value indicating whether there are more entries to send.
*/
private boolean hasMoreEntries(MemberState member) {
return member.getNextIndex() < context.getLog().lastIndex();
private boolean hasMoreEntries() {
return state.getNextIndex() < context.getLog().lastIndex();
}

/**
* Updates the match index when a response is received.
*/
private void updateMatchIndex(MemberState member, AppendResponse response) {
private void updateMatchIndex(AppendResponse response) {
// If the replica returned a valid match index then update the existing match index. Because the
// replicator pipelines replication, we perform a MAX(matchIndex, logIndex) to get the true match index.
member.setMatchIndex(Math.max(member.getMatchIndex(), response.logIndex()));
state.setMatchIndex(Math.max(state.getMatchIndex(), response.logIndex()));
}

/**
* Updates the next index when the match index is updated.
*/
private void updateNextIndex(MemberState member) {
private void updateNextIndex() {
// If the match index was set, update the next index to be greater than the match index if necessary.
// Note that because of pipelining append requests, the next index can potentially be much larger than
// the match index. We rely on the algorithm to reject invalid append requests.
member.setNextIndex(Math.max(member.getNextIndex(), Math.max(member.getMatchIndex() + 1, 1)));
state.setNextIndex(Math.max(state.getNextIndex(), Math.max(state.getMatchIndex() + 1, 1)));
}

/**
* Resets the match index when a response fails.
*/
private void resetMatchIndex(MemberState member, AppendResponse response) {
if (member.getMatchIndex() == 0) {
member.setMatchIndex(response.logIndex());
private void resetMatchIndex(AppendResponse response) {
if (state.getMatchIndex() == 0) {
state.setMatchIndex(response.logIndex());
} else if (response.logIndex() != 0) {
member.setMatchIndex(Math.max(member.getMatchIndex(), response.logIndex()));
state.setMatchIndex(Math.max(state.getMatchIndex(), response.logIndex()));
}
LOGGER.debug("{} - Reset match index for {} to {}", context.getCluster()
.member()
.id(), member, member.getMatchIndex());
LOGGER.debug("{} - Reset match index for {} to {}", context.getCluster().member().id(), member, state.getMatchIndex());
}

/**
* Resets the next index when a response fails.
*/
private void resetNextIndex(MemberState member) {
if (member.getMatchIndex() != 0) {
member.setNextIndex(member.getMatchIndex() + 1);
private void resetNextIndex() {
if (state.getMatchIndex() != 0) {
state.setNextIndex(state.getMatchIndex() + 1);
} else {
member.setNextIndex(context.getLog().firstIndex());
state.setNextIndex(context.getLog().firstIndex());
}
LOGGER.debug("{} - Reset next index for {} to {}", context.getCluster()
.member()
.id(), member, member.getNextIndex());
LOGGER.debug("{} - Reset next index for {} to {}", context.getCluster().member().id(), member, state.getNextIndex());
}

}
Expand Down

0 comments on commit 736e9d5

Please sign in to comment.