Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(raft): follower reset pendingsnapshot after rejecting install request #10183

Merged
11 commits merged into from
Aug 29, 2022
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import io.atomix.utils.concurrent.Futures;
import io.atomix.utils.concurrent.Scheduled;
import io.camunda.zeebe.journal.JournalException;
import io.camunda.zeebe.snapshots.PersistedSnapshotListener;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -106,11 +105,6 @@ public synchronized CompletableFuture<Void> stop() {
.thenRun(this::stepDown);
}

@Override
protected PersistedSnapshotListener createSnapshotListener() {
return null;
}

@Override
public RaftServer.Role role() {
return RaftServer.Role.LEADER;
Expand Down
95 changes: 17 additions & 78 deletions atomix/cluster/src/main/java/io/atomix/raft/roles/PassiveRole.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,21 @@
import io.atomix.raft.protocol.VoteResponse;
import io.atomix.raft.snapshot.impl.SnapshotChunkImpl;
import io.atomix.raft.storage.log.IndexedRaftLogEntry;
import io.atomix.raft.storage.log.RaftLog;
import io.atomix.raft.storage.log.RaftLogReader;
import io.atomix.utils.concurrent.ThreadContext;
import io.camunda.zeebe.journal.JournalException;
import io.camunda.zeebe.journal.JournalException.InvalidChecksum;
import io.camunda.zeebe.journal.JournalException.InvalidIndex;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.PersistedSnapshotListener;
import io.camunda.zeebe.snapshots.ReceivedSnapshot;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;

/** Passive state. */
public class PassiveRole extends InactiveRole {

private final SnapshotReplicationMetrics snapshotReplicationMetrics;
private long pendingSnapshotStartTimestamp;
private ReceivedSnapshot pendingSnapshot;
private PersistedSnapshotListener snapshotListener;
private ByteBuffer nextPendingSnapshotChunkId;

public PassiveRole(final RaftContext context) {
Expand All @@ -66,20 +60,13 @@ public PassiveRole(final RaftContext context) {

@Override
public CompletableFuture<RaftRole> start() {
snapshotListener = createSnapshotListener();

return super.start()
.thenRun(this::truncateUncommittedEntries)
.thenRun(this::addSnapshotListener)
.thenApply(v -> this);
return super.start().thenRun(this::truncateUncommittedEntries).thenApply(v -> this);
}

@Override
public CompletableFuture<Void> stop() {
abortPendingSnapshots();
if (snapshotListener != null) {
raft.getPersistedSnapshotStore().removeSnapshotListener(snapshotListener);
}

// as a safe guard, we clean up any orphaned pending snapshots
try {
Expand All @@ -100,31 +87,6 @@ private void truncateUncommittedEntries() {
raft.getLog().flush();
raft.setLastWrittenIndex(raft.getCommitIndex());
}

// to fix the edge case where we might have been stopped
// between persisting snapshot and truncating log we need to call on restart snapshot listener
// again, such that we truncate the log when necessary
final var latestSnapshot = raft.getCurrentSnapshot();
if (latestSnapshot != null && snapshotListener != null) {
snapshotListener.onNewSnapshot(latestSnapshot);
}
}

/**
* Should be overwritten by sub classes to introduce different snapshot listener.
*
* <p>If null no snapshot listener will be installed
*
* @return the snapshot listener which will be installed
*/
protected PersistedSnapshotListener createSnapshotListener() {
return new ResetWriterSnapshotListener(log, raft.getThreadContext(), raft.getLog());
}

private void addSnapshotListener() {
if (snapshotListener != null) {
raft.getPersistedSnapshotStore().addSnapshotListener(snapshotListener);
}
}

@Override
Expand Down Expand Up @@ -182,6 +144,7 @@ public CompletableFuture<InstallResponse> onInstall(final InstallRequest request
}

if (!request.complete() && request.nextChunkId() == null) {
abortPendingSnapshots();
return CompletableFuture.completedFuture(
logResponse(
InstallResponse.builder()
Expand All @@ -195,6 +158,7 @@ public CompletableFuture<InstallResponse> onInstall(final InstallRequest request
final var snapshotChunk = new SnapshotChunkImpl();
final var snapshotChunkBuffer = new UnsafeBuffer(request.data());
if (!snapshotChunk.tryWrap(snapshotChunkBuffer)) {
abortPendingSnapshots();
return CompletableFuture.completedFuture(
logResponse(
InstallResponse.builder()
Expand Down Expand Up @@ -229,13 +193,14 @@ public CompletableFuture<InstallResponse> onInstall(final InstallRequest request
} else {
// fail the request if this is not the expected next chunk
if (!isExpectedChunk(request.chunkId())) {
abortPendingSnapshots();
return CompletableFuture.completedFuture(
logResponse(
InstallResponse.builder()
.withStatus(RaftResponse.Status.ERROR)
.withError(
RaftError.Type.ILLEGAL_MEMBER_STATE,
"Request chunk is was received out of order")
"Snapshot chunk is received out of order")
.build()));
}
}
Expand Down Expand Up @@ -265,11 +230,12 @@ public CompletableFuture<InstallResponse> onInstall(final InstallRequest request
final long elapsed = System.currentTimeMillis() - pendingSnapshotStartTimestamp;
log.debug("Committing snapshot {}", pendingSnapshot);
try {
// Reset before committing to prevent the edge case where the system crashes after
// committing the snapshot, and restart with a snapshot and invalid log.
resetLogOnReceivingSnapshot(pendingSnapshot.index());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ With these changes here, a failure while committing the snapshot means that the broker loses data because it already reset the log prior to committing the snapshot. Isn't this problematic?

Say there is a bug where a received snapshot can't be committed. Previously, a running system would have been able to continue (just without snapshot replication, and maybe the next received snapshot can be committed again). Now, with these changes, all followers (or at least those that receive such a snapshot) would lose data immediately and only the leader would be left with the full data.


final var snapshot = pendingSnapshot.persist().join();
log.info("Committed snapshot {}", snapshot);
// Must be executed immediately before any other operation on this threadcontext. Hence
// don't wait for the listener to be notified by the snapshot store.
snapshotListener.onNewSnapshot(snapshot);
} catch (final Exception e) {
log.error("Failed to commit pending snapshot {}, rolling back", pendingSnapshot, e);
abortPendingSnapshots();
Expand Down Expand Up @@ -742,41 +708,14 @@ protected boolean completeAppend(
return succeeded;
}

private static final class ResetWriterSnapshotListener implements PersistedSnapshotListener {

private final ThreadContext threadContext;
private final RaftLog raftLog;
private final Logger log;
private void resetLogOnReceivingSnapshot(final long snapshotIndex) {
final var raftLog = raft.getLog();

ResetWriterSnapshotListener(
final Logger log, final ThreadContext threadContext, final RaftLog raftLog) {
this.log = log;
this.threadContext = threadContext;
this.raftLog = raftLog;
}

@Override
public void onNewSnapshot(final PersistedSnapshot persistedSnapshot) {
if (threadContext.isCurrentContext()) {
// this is called after the snapshot is commited
// on install requests and on Zeebe snapshot replication

final var index = persistedSnapshot.getIndex();
// It might happen that the last index is far behind our current snapshot index.
// E. g. on slower followers, we need to throw away the existing log,
// otherwise we might end with an inconsistent log (gaps between last index and
// snapshot index)
final var lastIndex = raftLog.getLastIndex();
if (lastIndex < index) {
log.info(
"Delete existing log (lastIndex '{}') and replace with received snapshot (index '{}')",
lastIndex,
index);
raftLog.reset(index + 1);
}
} else {
threadContext.execute(() -> onNewSnapshot(persistedSnapshot));
}
}
log.info(
"Delete existing log (lastIndex '{}') and replace with received snapshot (index '{}'). First entry in the log will be at index {}",
raftLog.getLastIndex(),
snapshotIndex,
snapshotIndex + 1);
raftLog.reset(snapshotIndex + 1);
}
}
Loading