Skip to content

Commit

Permalink
Ensure only unapplied committed entries are applied on leader startup.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jun 1, 2015
1 parent 803fa49 commit dceb49e
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 10 deletions.
15 changes: 7 additions & 8 deletions raft/src/main/java/net/kuujo/copycat/raft/state/LeaderState.java
Expand Up @@ -56,7 +56,6 @@ public synchronized CompletableFuture<AbstractState> open() {
context.getContext().execute(() -> { context.getContext().execute(() -> {
commitEntries().whenComplete((result, error) -> { commitEntries().whenComplete((result, error) -> {
if (error == null) { if (error == null) {
applyEntries();
startHeartbeatTimer(); startHeartbeatTimer();
} }
}); });
Expand Down Expand Up @@ -90,6 +89,7 @@ private CompletableFuture<Void> commitEntries() {
context.checkThread(); context.checkThread();
if (isOpen()) { if (isOpen()) {
if (error == null) { if (error == null) {
applyEntries(resultIndex);
future.complete(null); future.complete(null);
} else { } else {
transition(RaftState.FOLLOWER); transition(RaftState.FOLLOWER);
Expand All @@ -102,25 +102,24 @@ private CompletableFuture<Void> commitEntries() {
/** /**
* Applies all unapplied entries to the log. * Applies all unapplied entries to the log.
*/ */
private void applyEntries() { private void applyEntries(long index) {
if (!context.getLog().isEmpty()) { if (!context.getLog().isEmpty()) {
int count = 0; int count = 0;
long lastIndex = context.getLog().lastIndex(); for (long lastApplied = Math.max(context.getLastApplied(), context.getLog().firstIndex()); lastApplied <= index; lastApplied++) {
for (long commitIndex = Math.max(context.getCommitIndex(), context.getLog().firstIndex()); commitIndex <= lastIndex; commitIndex++) { try (Entry entry = context.getLog().getEntry(lastApplied)) {
try (Entry entry = context.getLog().getEntry(commitIndex)) {
if (entry != null) { if (entry != null) {
try { try {
context.getStateMachine().apply(entry); context.getStateMachine().apply(entry);
} catch (ApplicationException e) { } catch (ApplicationException e) {

LOGGER.info("{} - an application error occurred: {}", context.getCluster().member().id(), e);
} finally { } finally {
context.setLastApplied(commitIndex); context.setLastApplied(lastApplied);
} }
} }
} }
count++; count++;
} }
LOGGER.debug("{} - Applied {} entries to log", context.getCluster().member().id(), count); LOGGER.debug("{} - applied {} entries to log", context.getCluster().member().id(), count);
} }
} }


Expand Down
Expand Up @@ -439,9 +439,9 @@ public long getCommitIndex() {
*/ */
RaftContext setGlobalIndex(long globalIndex) { RaftContext setGlobalIndex(long globalIndex) {
if (globalIndex < 0) if (globalIndex < 0)
throw new IllegalArgumentException("recycle index must be positive"); throw new IllegalArgumentException("global index must be positive");
if (globalIndex < this.globalIndex) if (globalIndex < this.globalIndex)
throw new IllegalArgumentException("cannot decrease recycle index"); throw new IllegalArgumentException("cannot decrease global index");
this.globalIndex = globalIndex; this.globalIndex = globalIndex;
compactor.setCompactIndex(globalIndex); compactor.setCompactIndex(globalIndex);
return this; return this;
Expand Down

0 comments on commit dceb49e

Please sign in to comment.