Skip to content

Commit

Permalink
fix(raft): reset log always on receiving a snapshot
Browse files Browse the repository at this point in the history
There were some cases where the log is not reset and leads to scenarios where a follower is not able to replicate new events. This case is explained in #10183 (comment)
  • Loading branch information
deepthidevaki committed Aug 26, 2022
1 parent 6a39814 commit 4c82bd9
Showing 1 changed file with 16 additions and 15 deletions.
31 changes: 16 additions & 15 deletions atomix/cluster/src/main/java/io/atomix/raft/roles/PassiveRole.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,7 @@ public PassiveRole(final RaftContext context) {
public CompletableFuture<RaftRole> start() {
snapshotListener = createSnapshotListener();

return super.start()
.thenRun(this::truncateUncommittedEntries)
.thenRun(this::addSnapshotListener)
.thenApply(v -> this);
return super.start().thenRun(this::truncateUncommittedEntries).thenApply(v -> this);
}

@Override
Expand Down Expand Up @@ -100,14 +97,6 @@ private void truncateUncommittedEntries() {
raft.getLog().flush();
raft.setLastWrittenIndex(raft.getCommitIndex());
}

// to fix the edge case where we might have been stopped
// between persisting snapshot and truncating log we need to call on restart snapshot listener
// again, such that we truncate the log when necessary
final var latestSnapshot = raft.getCurrentSnapshot();
if (latestSnapshot != null && snapshotListener != null) {
snapshotListener.onNewSnapshot(latestSnapshot);
}
}

/**
Expand Down Expand Up @@ -268,11 +257,12 @@ public CompletableFuture<InstallResponse> onInstall(final InstallRequest request
final long elapsed = System.currentTimeMillis() - pendingSnapshotStartTimestamp;
log.debug("Committing snapshot {}", pendingSnapshot);
try {
// Reset before committing to prevent the edge case where the system crashes after
// committing the snapshot, and restart with a snapshot and invalid log.
resetLogOnReceivingSnapshot(pendingSnapshot.index());

final var snapshot = pendingSnapshot.persist().join();
log.info("Committed snapshot {}", snapshot);
// Must be executed immediately before any other operation on this threadcontext. Hence
// don't wait for the listener to be notified by the snapshot store.
snapshotListener.onNewSnapshot(snapshot);
} catch (final Exception e) {
log.error("Failed to commit pending snapshot {}, rolling back", pendingSnapshot, e);
abortPendingSnapshots();
Expand Down Expand Up @@ -745,6 +735,17 @@ protected boolean completeAppend(
return succeeded;
}

private void resetLogOnReceivingSnapshot(final long snapshotIndex) {
final var raftLog = raft.getLog();

log.info(
"Delete existing log (lastIndex '{}') and replace with received snapshot (index '{}'). First entry in the log will be at index {}",
raftLog.getLastIndex(),
snapshotIndex,
snapshotIndex + 1);
raftLog.reset(snapshotIndex + 1);
}

private static final class ResetWriterSnapshotListener implements PersistedSnapshotListener {

private final ThreadContext threadContext;
Expand Down

0 comments on commit 4c82bd9

Please sign in to comment.