diff --git a/atomix/cluster/src/main/java/io/atomix/raft/roles/LeaderRole.java b/atomix/cluster/src/main/java/io/atomix/raft/roles/LeaderRole.java index d669ad756fe1..664e44fc54db 100644 --- a/atomix/cluster/src/main/java/io/atomix/raft/roles/LeaderRole.java +++ b/atomix/cluster/src/main/java/io/atomix/raft/roles/LeaderRole.java @@ -51,7 +51,6 @@ import io.atomix.utils.concurrent.Futures; import io.atomix.utils.concurrent.Scheduled; import io.camunda.zeebe.journal.JournalException; -import io.camunda.zeebe.snapshots.PersistedSnapshotListener; import java.nio.ByteBuffer; import java.time.Duration; import java.time.Instant; @@ -106,11 +105,6 @@ public synchronized CompletableFuture stop() { .thenRun(this::stepDown); } - @Override - protected PersistedSnapshotListener createSnapshotListener() { - return null; - } - @Override public RaftServer.Role role() { return RaftServer.Role.LEADER; diff --git a/atomix/cluster/src/main/java/io/atomix/raft/roles/PassiveRole.java b/atomix/cluster/src/main/java/io/atomix/raft/roles/PassiveRole.java index 8e6437d2dbdf..facff3435011 100644 --- a/atomix/cluster/src/main/java/io/atomix/raft/roles/PassiveRole.java +++ b/atomix/cluster/src/main/java/io/atomix/raft/roles/PassiveRole.java @@ -34,19 +34,14 @@ import io.atomix.raft.snapshot.impl.SnapshotChunkImpl; import io.atomix.raft.storage.log.IndexedRaftLogEntry; import io.atomix.raft.storage.log.PersistedRaftRecord; -import io.atomix.raft.storage.log.RaftLog; import io.atomix.raft.storage.log.RaftLogReader; -import io.atomix.utils.concurrent.ThreadContext; import io.camunda.zeebe.journal.JournalException; import io.camunda.zeebe.journal.JournalException.InvalidChecksum; import io.camunda.zeebe.journal.JournalException.InvalidIndex; -import io.camunda.zeebe.snapshots.PersistedSnapshot; -import io.camunda.zeebe.snapshots.PersistedSnapshotListener; import io.camunda.zeebe.snapshots.ReceivedSnapshot; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; import org.agrona.concurrent.UnsafeBuffer; -import org.slf4j.Logger; /** Passive state. */ public class PassiveRole extends InactiveRole { @@ -54,7 +49,6 @@ public class PassiveRole extends InactiveRole { private final SnapshotReplicationMetrics snapshotReplicationMetrics; private long pendingSnapshotStartTimestamp; private ReceivedSnapshot pendingSnapshot; - private PersistedSnapshotListener snapshotListener; private ByteBuffer nextPendingSnapshotChunkId; public PassiveRole(final RaftContext context) { @@ -66,20 +60,13 @@ public PassiveRole(final RaftContext context) { @Override public CompletableFuture start() { - snapshotListener = createSnapshotListener(); - return super.start() - .thenRun(this::truncateUncommittedEntries) - .thenRun(this::addSnapshotListener) - .thenApply(v -> this); + return super.start().thenRun(this::truncateUncommittedEntries).thenApply(v -> this); } @Override public CompletableFuture stop() { abortPendingSnapshots(); - if (snapshotListener != null) { - raft.getPersistedSnapshotStore().removeSnapshotListener(snapshotListener); - } // as a safe guard, we clean up any orphaned pending snapshots try { @@ -100,31 +87,6 @@ private void truncateUncommittedEntries() { raft.getLog().flush(); raft.setLastWrittenIndex(raft.getCommitIndex()); } - - // to fix the edge case where we might have been stopped - // between persisting snapshot and truncating log we need to call on restart snapshot listener - // again, such that we truncate the log when necessary - final var latestSnapshot = raft.getCurrentSnapshot(); - if (latestSnapshot != null && snapshotListener != null) { - snapshotListener.onNewSnapshot(latestSnapshot); - } - } - - /** - * Should be overwritten by sub classes to introduce different snapshot listener. - * - *

If null no snapshot listener will be installed - * - * @return the snapshot listener which will be installed - */ - protected PersistedSnapshotListener createSnapshotListener() { - return new ResetWriterSnapshotListener(log, raft.getThreadContext(), raft.getLog()); - } - - private void addSnapshotListener() { - if (snapshotListener != null) { - raft.getPersistedSnapshotStore().addSnapshotListener(snapshotListener); - } } @Override @@ -182,6 +144,7 @@ public CompletableFuture onInstall(final InstallRequest request } if (!request.complete() && request.nextChunkId() == null) { + abortPendingSnapshots(); return CompletableFuture.completedFuture( logResponse( InstallResponse.builder() @@ -195,6 +158,7 @@ public CompletableFuture onInstall(final InstallRequest request final var snapshotChunk = new SnapshotChunkImpl(); final var snapshotChunkBuffer = new UnsafeBuffer(request.data()); if (!snapshotChunk.tryWrap(snapshotChunkBuffer)) { + abortPendingSnapshots(); return CompletableFuture.completedFuture( logResponse( InstallResponse.builder() @@ -229,13 +193,14 @@ public CompletableFuture onInstall(final InstallRequest request } else { // fail the request if this is not the expected next chunk if (!isExpectedChunk(request.chunkId())) { + abortPendingSnapshots(); return CompletableFuture.completedFuture( logResponse( InstallResponse.builder() .withStatus(RaftResponse.Status.ERROR) .withError( RaftError.Type.ILLEGAL_MEMBER_STATE, - "Request chunk is was received out of order") + "Snapshot chunk is received out of order") .build())); } } @@ -265,11 +230,12 @@ public CompletableFuture onInstall(final InstallRequest request final long elapsed = System.currentTimeMillis() - pendingSnapshotStartTimestamp; log.debug("Committing snapshot {}", pendingSnapshot); try { + // Reset before committing to prevent the edge case where the system crashes after + // committing the snapshot, and restart with a snapshot and invalid log. + resetLogOnReceivingSnapshot(pendingSnapshot.index()); + final var snapshot = pendingSnapshot.persist().join(); log.info("Committed snapshot {}", snapshot); - // Must be executed immediately before any other operation on this threadcontext. Hence - // don't wait for the listener to be notified by the snapshot store. - snapshotListener.onNewSnapshot(snapshot); } catch (final Exception e) { log.error("Failed to commit pending snapshot {}, rolling back", pendingSnapshot, e); abortPendingSnapshots(); @@ -742,41 +708,14 @@ protected boolean completeAppend( return succeeded; } - private static final class ResetWriterSnapshotListener implements PersistedSnapshotListener { - - private final ThreadContext threadContext; - private final RaftLog raftLog; - private final Logger log; + private void resetLogOnReceivingSnapshot(final long snapshotIndex) { + final var raftLog = raft.getLog(); - ResetWriterSnapshotListener( - final Logger log, final ThreadContext threadContext, final RaftLog raftLog) { - this.log = log; - this.threadContext = threadContext; - this.raftLog = raftLog; - } - - @Override - public void onNewSnapshot(final PersistedSnapshot persistedSnapshot) { - if (threadContext.isCurrentContext()) { - // this is called after the snapshot is commited - // on install requests and on Zeebe snapshot replication - - final var index = persistedSnapshot.getIndex(); - // It might happen that the last index is far behind our current snapshot index. - // E. g. on slower followers, we need to throw away the existing log, - // otherwise we might end with an inconsistent log (gaps between last index and - // snapshot index) - final var lastIndex = raftLog.getLastIndex(); - if (lastIndex < index) { - log.info( - "Delete existing log (lastIndex '{}') and replace with received snapshot (index '{}')", - lastIndex, - index); - raftLog.reset(index + 1); - } - } else { - threadContext.execute(() -> onNewSnapshot(persistedSnapshot)); - } - } + log.info( + "Delete existing log (lastIndex '{}') and replace with received snapshot (index '{}'). First entry in the log will be at index {}", + raftLog.getLastIndex(), + snapshotIndex, + snapshotIndex + 1); + raftLog.reset(snapshotIndex + 1); } } diff --git a/atomix/cluster/src/test/java/io/atomix/raft/ControllableRaftContexts.java b/atomix/cluster/src/test/java/io/atomix/raft/ControllableRaftContexts.java index 4c2528826448..277ef837c43f 100644 --- a/atomix/cluster/src/test/java/io/atomix/raft/ControllableRaftContexts.java +++ b/atomix/cluster/src/test/java/io/atomix/raft/ControllableRaftContexts.java @@ -25,6 +25,7 @@ import io.atomix.raft.partition.RaftPartitionConfig; import io.atomix.raft.protocol.ControllableRaftServerProtocol; import io.atomix.raft.roles.LeaderRole; +import io.atomix.raft.snapshot.InMemorySnapshot; import io.atomix.raft.snapshot.TestSnapshotStore; import io.atomix.raft.storage.RaftStorage; import io.atomix.raft.storage.log.IndexedRaftLogEntry; @@ -53,15 +54,17 @@ import java.util.function.Function; import java.util.stream.Collectors; import org.jmock.lib.concurrent.DeterministicScheduler; +import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Uses a DeterministicScheduler and controllable messaging layer to get a deterministic execution - * of raft threads. Note:- Currently there is some non-determinism hidden in the raft. Hence the - * resulting execution is not fully deterministic. + * of raft threads. */ public final class ControllableRaftContexts { + private static final Logger LOG = LoggerFactory.getLogger("TEST"); + private final Map serverProtocols = new HashMap<>(); private final Map>> messageQueue = new HashMap<>(); @@ -69,9 +72,11 @@ public final class ControllableRaftContexts { new HashMap<>(); private Path directory; + private Random random; private final int nodeCount; private final Map raftServers = new HashMap<>(); + private final Map snapshotStores = new HashMap<>(); private Duration electionTimeout; private Duration hearbeatTimeout; private int nextEntry = 0; @@ -97,6 +102,7 @@ public RaftContext getRaftContext(final MemberId memberId) { public void setup(final Path directory, final Random random) throws Exception { this.directory = directory; + this.random = random; if (nodeCount > 0) { createRaftContexts(nodeCount, random); } @@ -151,6 +157,7 @@ private void createRaftContexts(final int nodeCount, final Random random) { } public RaftContext createRaftContext(final MemberId memberId, final Random random) { + final RaftStorage storage = createStorage(memberId); final var raft = new RaftContext( memberId.id() + "-partition-1", @@ -158,7 +165,7 @@ public RaftContext createRaftContext(final MemberId memberId, final Random rando memberId, mock(ClusterMembershipService.class), new ControllableRaftServerProtocol(memberId, serverProtocols, messageQueue), - createStorage(memberId), + storage, getRaftThreadContextFactory(memberId), () -> random, RaftElectionConfig.ofDefaultElection(), @@ -184,12 +191,14 @@ private RaftStorage createStorage( final MemberId memberId, final Function configurator) { final var memberDirectory = getMemberDirectory(directory, memberId.toString()); + final TestSnapshotStore persistedSnapshotStore = new TestSnapshotStore(new AtomicReference<>()); final RaftStorage.Builder defaults = RaftStorage.builder() .withDirectory(memberDirectory) .withMaxSegmentSize(1024 * 10) .withFreeDiskSpace(100) - .withSnapshotStore(new TestSnapshotStore(new AtomicReference<>())); + .withSnapshotStore(persistedSnapshotStore); + snapshotStores.put(memberId, persistedSnapshotStore); return configurator.apply(defaults).build(); } @@ -310,6 +319,34 @@ public void clientAppendOnLeader() { } } + public void snapshotAndCompact(final MemberId memberId) { + final RaftContext raftContext = raftServers.get(memberId); + // Take snapshot at an index between lastSnapshotIndex and current commitIndex + final TestSnapshotStore testSnapshotStore = snapshotStores.get(memberId); + final var startIndex = + Math.max(raftContext.getLog().getFirstIndex(), testSnapshotStore.getCurrentSnapshotIndex()); + if (startIndex >= raftContext.getCommitIndex()) { + // cannot take snapshot + return; + } + final long snapshotIndex = + random.longs(1, startIndex, raftContext.getCommitIndex()).findFirst().orElseThrow(); + try (final RaftLogReader reader = raftContext.getLog().openCommittedReader()) { + reader.seek(snapshotIndex); + final long term = reader.next().term(); + + InMemorySnapshot.newPersistedSnapshot( + snapshotIndex, term, random.nextInt(9) + 1, testSnapshotStore); + + LOG.info( + "Snapshot taken at index {}. Current commit index is {}", + snapshotIndex, + raftContext.getCommitIndex()); + } + + raftContext.getLog().deleteUntil(snapshotIndex); + } + // ----------------------- Verifications ----------------------------- // Verify that committed entries in all logs are equal @@ -317,28 +354,36 @@ public void assertAllLogsEqual() { final var readers = raftServers.values().stream() .collect(Collectors.toMap(Function.identity(), s -> s.getLog().openCommittedReader())); - long index = 0; - while (true) { + long index = + raftServers.values().stream() + .map(s -> s.getLog().getFirstIndex()) + .min(Long::compareTo) + .orElse(1L) + - 1; + + final long commitIndexOnLeader = + raftServers.values().stream() + .map(RaftContext::getCommitIndex) + .max(Long::compareTo) + .orElseThrow(); + + while (index < commitIndexOnLeader) { + final var nextIndex = index + 1; final var entries = readers.keySet().stream() .filter(s -> readers.get(s).hasNext()) - .collect(Collectors.toMap(s -> s.getName(), s -> readers.get(s).next())); - if (entries.size() == 0) { - break; - } + // only compared not compacted entries + .filter(s -> s.getLog().getFirstIndex() <= nextIndex) + .collect(Collectors.toMap(RaftContext::getName, s -> readers.get(s).next())); + assertThat(entries.values().stream().distinct().count()) .withFailMessage( "Expected to find the same entry at a committed index on all nodes, but found %s", entries) - .isEqualTo(1); + .isLessThanOrEqualTo(1); index++; } - final var commitIndexOnLeader = - raftServers.values().stream() - .map(RaftContext::getCommitIndex) - .max(Long::compareTo) - .orElseThrow(); - assertThat(index).isEqualTo(commitIndexOnLeader); + readers.values().forEach(RaftLogReader::close); } @@ -389,17 +434,15 @@ boolean hasReplicatedAllEntries() { } public void assertAllEntriesCommittedAndReplicatedToAll() { - raftServers - .values() - .forEach( - s -> { - final var lastCommittedEntry = getLastCommittedEntry(s); - final var lastUncommittedEntry = getLastUncommittedEntry(s); + raftServers.forEach( + (memberId, raftServer) -> { + final var lastCommittedEntry = getLastCommittedEntry(raftServer); + final var lastUncommittedEntry = getLastUncommittedEntry(raftServer); - assertThat(lastCommittedEntry) - .describedAs("All entries should be committed") - .isEqualTo(lastUncommittedEntry); - }); + assertThat(lastCommittedEntry) + .describedAs("All entries should be committed in %s", memberId.id()) + .isEqualTo(lastUncommittedEntry); + }); assertThat(hasReplicatedAllEntries()) .describedAs("All entries are replicated to all followers") @@ -425,4 +468,29 @@ private IndexedRaftLogEntry getLastCommittedEntry(final RaftContext s) { return null; } } + + public void assertNoGapsInLog() { + raftServers.keySet().forEach(this::assertNoGapsInLog); + } + + private void assertNoGapsInLog(final MemberId memberId) { + final RaftContext s = raftServers.get(memberId); + final long firstIndex = s.getLog().getFirstIndex(); + long nextIndex = firstIndex; + try (final var reader = s.getLog().openCommittedReader()) { + while (reader.hasNext()) { + assertThat(reader.next().index()) + .describedAs("There is no gap in the log %s", memberId.id()) + .isEqualTo(nextIndex); + nextIndex++; + } + } + + if (firstIndex != 1) { + final var currentSnapshotIndex = snapshotStores.get(memberId).getCurrentSnapshotIndex(); + assertThat(currentSnapshotIndex) + .describedAs("The log is compacted in %s. Hence a snapshot must exist.") + .isGreaterThanOrEqualTo(firstIndex - 1); + } + } } diff --git a/atomix/cluster/src/test/java/io/atomix/raft/RaftOperation.java b/atomix/cluster/src/test/java/io/atomix/raft/RaftOperation.java index f2881de15f00..a1f2578702c1 100644 --- a/atomix/cluster/src/test/java/io/atomix/raft/RaftOperation.java +++ b/atomix/cluster/src/test/java/io/atomix/raft/RaftOperation.java @@ -48,6 +48,13 @@ public String toString() { } /** Returns a list of RaftOperation */ + public static List getRaftOperationsWithSnapshot() { + final List defaultRaftOperation = getDefaultRaftOperations(); + defaultRaftOperation.add( + RaftOperation.of("Take snapshot", ControllableRaftContexts::snapshotAndCompact)); + return defaultRaftOperation; + } + public static List getDefaultRaftOperations() { final List defaultRaftOperation = new ArrayList<>(); defaultRaftOperation.add( diff --git a/atomix/cluster/src/test/java/io/atomix/raft/RandomizedRaftTest.java b/atomix/cluster/src/test/java/io/atomix/raft/RandomizedRaftTest.java index d59e26f7adfd..86e5badf0cf5 100644 --- a/atomix/cluster/src/test/java/io/atomix/raft/RandomizedRaftTest.java +++ b/atomix/cluster/src/test/java/io/atomix/raft/RandomizedRaftTest.java @@ -43,7 +43,8 @@ public class RandomizedRaftTest { private static final int OPERATION_SIZE = 10000; private static final Logger LOG = LoggerFactory.getLogger(RandomizedRaftTest.class); private ControllableRaftContexts raftContexts; - private List operations; + private List defaultOperations; + private List operationsWithSnapshot; private List raftMembers; private Path raftDataDirectory; @@ -55,7 +56,8 @@ public void initOperations() { .mapToObj(String::valueOf) .map(MemberId::from) .collect(Collectors.toList()); - operations = RaftOperation.getDefaultRaftOperations(); + defaultOperations = RaftOperation.getDefaultRaftOperations(); + operationsWithSnapshot = RaftOperation.getRaftOperationsWithSnapshot(); raftMembers = servers; } @@ -67,12 +69,48 @@ public void shutDownRaftNodes() throws IOException { } @Property(tries = 10, shrinking = ShrinkingMode.OFF, edgeCases = EdgeCasesMode.NONE) - void consistencyTest( + void consistencyTestWithNoSnapshot( @ForAll("raftOperations") final List raftOperations, @ForAll("raftMembers") final List raftMembers, @ForAll("seeds") final long seed) throws Exception { + consistencyTest(raftOperations, raftMembers, seed); + } + + @Property(tries = 10, shrinking = ShrinkingMode.OFF, edgeCases = EdgeCasesMode.NONE) + void consistencyTestWithSnapshot( + @ForAll("raftOperationsWithSnapshot") final List raftOperations, + @ForAll("raftMembers") final List raftMembers, + @ForAll("seeds") final long seed) + throws Exception { + + consistencyTest(raftOperations, raftMembers, seed); + } + + @Property(tries = 10, shrinking = ShrinkingMode.OFF, edgeCases = EdgeCasesMode.NONE) + void livenessTestWithNoSnapshot( + @ForAll("raftOperations") final List raftOperations, + @ForAll("raftMembers") final List raftMembers, + @ForAll("seeds") final long seed) + throws Exception { + + livenessTest(raftOperations, raftMembers, seed); + } + + @Property(tries = 10, shrinking = ShrinkingMode.OFF, edgeCases = EdgeCasesMode.NONE) + void livenessTestWithSnapshot( + @ForAll("raftOperationsWithSnapshot") final List raftOperations, + @ForAll("raftMembers") final List raftMembers, + @ForAll("seeds") final long seed) + throws Exception { + + livenessTest(raftOperations, raftMembers, seed); + } + + private void consistencyTest( + final List raftOperations, final List raftMembers, final long seed) + throws Exception { setUpRaftNodes(new Random(seed)); int step = 0; @@ -92,15 +130,12 @@ void consistencyTest( } raftContexts.assertAllLogsEqual(); + raftContexts.assertNoGapsInLog(); } - @Property(tries = 10, shrinking = ShrinkingMode.OFF, edgeCases = EdgeCasesMode.NONE) - void livenessTest( - @ForAll("raftOperations") final List raftOperations, - @ForAll("raftMembers") final List raftMembers, - @ForAll("seeds") final long seed) + private void livenessTest( + final List raftOperations, final List raftMembers, final long seed) throws Exception { - setUpRaftNodes(new Random(seed)); // given - when there are failures such as message loss @@ -146,11 +181,20 @@ void livenessTest( // Verify all entries are replicated and committed in all replicas raftContexts.assertAllLogsEqual(); raftContexts.assertAllEntriesCommittedAndReplicatedToAll(); + raftContexts.assertNoGapsInLog(); } + /** Basic raft operations with out snapshotting, compaction or restart */ @Provide Arbitrary> raftOperations() { - final var operation = Arbitraries.of(operations); + final var operation = Arbitraries.of(defaultOperations); + return operation.list().ofSize(OPERATION_SIZE); + } + + /** Basic raft operation with snapshotting and compaction */ + @Provide + Arbitrary> raftOperationsWithSnapshot() { + final var operation = Arbitraries.of(operationsWithSnapshot); return operation.list().ofSize(OPERATION_SIZE); } diff --git a/atomix/cluster/src/test/java/io/atomix/raft/snapshot/InMemorySnapshot.java b/atomix/cluster/src/test/java/io/atomix/raft/snapshot/InMemorySnapshot.java index 11138dbb4a2f..7a3ebbbc0e95 100644 --- a/atomix/cluster/src/test/java/io/atomix/raft/snapshot/InMemorySnapshot.java +++ b/atomix/cluster/src/test/java/io/atomix/raft/snapshot/InMemorySnapshot.java @@ -63,7 +63,7 @@ public static InMemorySnapshot newPersistedSnapshot( final long index, final long term, final int size, final TestSnapshotStore snapshotStore) { final var snapshot = new InMemorySnapshot(snapshotStore, index, term); for (int i = 0; i < size; i++) { - snapshot.writeChunks("chunk-" + i, "test".getBytes()); + snapshot.writeChunks("chunk-" + i, ("test-" + i).getBytes()); } snapshot.persist(); return snapshot;