Skip to content

Commit

Permalink
Fix more NPEs in replication algorithm.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jan 5, 2015
1 parent 93fb907 commit bbfeb34
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 22 deletions.
32 changes: 17 additions & 15 deletions core/src/main/java/net/kuujo/copycat/internal/ActiveState.java
Expand Up @@ -249,7 +249,7 @@ private AppendResponse doAppendEntries(AppendRequest request) {
// If the log contains entries after the request's previous log index
// then remove those entries to be replaced by the request entries.
if (!request.entries().isEmpty()) {
long index = request.logIndex();
long index = request.logIndex() != null ? request.logIndex() : 0;
for (ByteBuffer entry : request.entries()) {
index++;
// Replicated snapshot entries are *always* immediately logged and applied to the state machine
Expand Down Expand Up @@ -284,27 +284,29 @@ private AppendResponse doAppendEntries(AppendRequest request) {
/**
* Applies commits to the local state machine.
*/
private void doApplyCommits(long commitIndex) {
private void doApplyCommits(Long commitIndex) {
// If the synced commit index is greater than the local commit index then
// apply commits to the local state machine.
// Also, it's possible that one of the previous command applications failed
// due to asynchronous communication errors, so alternatively check if the
// local commit index is greater than last applied. If all the state machine
// commands have not yet been applied then we want to re-attempt to apply them.
if (commitIndex > context.getCommitIndex() || context.getCommitIndex() > context.getLastApplied()) {
// Update the local commit index with min(request commit, last log // index)
Long lastIndex = context.log().lastIndex();
if (lastIndex != null) {
context.setCommitIndex(Math.min(Math.max(commitIndex, context.getCommitIndex()), lastIndex));
if (commitIndex != null) {
if (commitIndex > context.getCommitIndex() || context.getCommitIndex() > context.getLastApplied()) {
// Update the local commit index with min(request commit, last log // index)
Long lastIndex = context.log().lastIndex();
if (lastIndex != null) {
context.setCommitIndex(Math.min(Math.max(commitIndex, context.getCommitIndex() != null ? context.getCommitIndex() : commitIndex), lastIndex));

// If the updated commit index indicates that commits remain to be
// applied to the state machine, iterate entries and apply them.
if (context.getCommitIndex() > context.getLastApplied()) {
// Starting after the last applied entry, iterate through new entries
// and apply them to the state machine up to the commit index.
for (long i = context.getLastApplied() + 1; i <= Math.min(context.getCommitIndex(), lastIndex); i++) {
// Apply the entry to the state machine.
applyEntry(i);
// If the updated commit index indicates that commits remain to be
// applied to the state machine, iterate entries and apply them.
if (context.getCommitIndex() > context.getLastApplied()) {
// Starting after the last applied entry, iterate through new entries
// and apply them to the state machine up to the commit index.
for (long i = context.getLastApplied() + 1; i <= Math.min(context.getCommitIndex(), lastIndex); i++) {
// Apply the entry to the state machine.
applyEntry(i);
}
}
}
}
Expand Down
10 changes: 4 additions & 6 deletions core/src/main/java/net/kuujo/copycat/internal/LeaderState.java
Expand Up @@ -441,9 +441,7 @@ public CompletableFuture<Long> ping(Long index) {
.withTerm(context.getTerm())
.withLeader(context.getLocalMember())
.withLogIndex(index)
.withLogTerm(index != null && context.log().containsIndex(index) ? context.log()
.getEntry(index)
.getLong() : null)
.withLogTerm(index != null && context.log().containsIndex(index) ? context.log().getEntry(index).getLong() : null)
.withCommitIndex(context.getCommitIndex())
.build();
LOGGER.debug("{} - Sent {} to {}", context.getLocalMember(), request, member);
Expand Down Expand Up @@ -528,12 +526,12 @@ private void doSync(final Long prevIndex, final ByteBuffer prevEntry, final List
.withTerm(context.getTerm())
.withLeader(context.getLocalMember())
.withLogIndex(prevIndex)
.withLogTerm(prevEntry != null ? prevEntry.getLong() : 0)
.withLogTerm(prevEntry != null ? prevEntry.getLong() : null)
.withEntries(entries)
.withCommitIndex(context.getCommitIndex())
.build();

sendIndex = Math.max(sendIndex + 1, prevIndex + entries.size() + 1);
sendIndex = Math.max(sendIndex != null ? sendIndex + 1 : 0, prevIndex != null ? prevIndex + entries.size() + 1 : context.log().firstIndex() + entries.size() + 1);

LOGGER.debug("{} - Sent {} to {}", context.getLocalMember(), request, member);
appendHandler.handle(request).whenComplete((response, error) -> {
Expand All @@ -560,7 +558,7 @@ private void doSync(final Long prevIndex, final ByteBuffer prevEntry, final List
// the replica in the response to generate a new nextIndex. This allows
// us to skip repeatedly replicating one entry at a time if it's not
// necessary.
nextIndex = sendIndex = response.logIndex() != null ? Math.max(response.logIndex() + 1, context.log().firstIndex()) : prevIndex;
nextIndex = sendIndex = response.logIndex() != null ? Math.max(response.logIndex() + 1, context.log().firstIndex()) : prevIndex != null ? prevIndex : context.log().firstIndex();
doSync();
}
}
Expand Down
Expand Up @@ -110,7 +110,9 @@ public boolean containsIndex(long index) {
public ByteBuffer getEntry(long index) {
assertIsOpen();
assertContainsIndex(index);
return log.get(index);
ByteBuffer buffer = log.get(index);
if (buffer != null) buffer.rewind();
return buffer;
}

@Override
Expand Down

0 comments on commit bbfeb34

Please sign in to comment.