From db96b404ef5b3d34645d0ee8ae9120cb634d75dd Mon Sep 17 00:00:00 2001 From: Deepthi Devaki Akkoorath Date: Wed, 24 Aug 2022 17:55:45 +0200 Subject: [PATCH 01/12] test(raft): allow comparing compacted logs When comparing compacted logs, not all entries exist in all logs. So compare only the entries that exist. (cherry picked from commit 773dfa2dff1b5c670e9856b09ad31576a8d8e39e) --- .../atomix/raft/ControllableRaftContexts.java | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/atomix/cluster/src/test/java/io/atomix/raft/ControllableRaftContexts.java b/atomix/cluster/src/test/java/io/atomix/raft/ControllableRaftContexts.java index 4c2528826448..b83f10d4c790 100644 --- a/atomix/cluster/src/test/java/io/atomix/raft/ControllableRaftContexts.java +++ b/atomix/cluster/src/test/java/io/atomix/raft/ControllableRaftContexts.java @@ -57,8 +57,7 @@ /** * Uses a DeterministicScheduler and controllable messaging layer to get a deterministic execution - * of raft threads. Note:- Currently there is some non-determinism hidden in the raft. Hence the - * resulting execution is not fully deterministic. + * of raft threads. */ public final class ControllableRaftContexts { @@ -317,12 +316,23 @@ public void assertAllLogsEqual() { final var readers = raftServers.values().stream() .collect(Collectors.toMap(Function.identity(), s -> s.getLog().openCommittedReader())); - long index = 0; + long index = + raftServers.values().stream() + .map(s -> s.getLog().getFirstIndex()) + .min(Long::compareTo) + .orElse(1L) + - 1; + + readers.values().forEach(r -> r.seek(-1)); // seek to first + while (true) { + final var nextIndex = index + 1; final var entries = readers.keySet().stream() .filter(s -> readers.get(s).hasNext()) - .collect(Collectors.toMap(s -> s.getName(), s -> readers.get(s).next())); + // only compared not compacted entries + .filter(s -> s.getLog().getFirstIndex() <= nextIndex) + .collect(Collectors.toMap(RaftContext::getName, s -> readers.get(s).next())); if (entries.size() == 0) { break; } From 015866b1a3c598f795d067c659c55b3c6c129900 Mon Sep 17 00:00:00 2001 From: Deepthi Devaki Akkoorath Date: Thu, 25 Aug 2022 10:57:35 +0200 Subject: [PATCH 02/12] test(raft): extend RandomizedRaftTest with snapshot and compaction (cherry picked from commit 5db293229aa06b59a34cbdaa04b44fcc7b2823ad) --- .../atomix/raft/ControllableRaftContexts.java | 83 ++++++++++++++++--- .../java/io/atomix/raft/RaftOperation.java | 7 ++ .../io/atomix/raft/RandomizedRaftTest.java | 64 +++++++++++--- 3 files changed, 131 insertions(+), 23 deletions(-) diff --git a/atomix/cluster/src/test/java/io/atomix/raft/ControllableRaftContexts.java b/atomix/cluster/src/test/java/io/atomix/raft/ControllableRaftContexts.java index b83f10d4c790..d8c3603add06 100644 --- a/atomix/cluster/src/test/java/io/atomix/raft/ControllableRaftContexts.java +++ b/atomix/cluster/src/test/java/io/atomix/raft/ControllableRaftContexts.java @@ -25,6 +25,7 @@ import io.atomix.raft.partition.RaftPartitionConfig; import io.atomix.raft.protocol.ControllableRaftServerProtocol; import io.atomix.raft.roles.LeaderRole; +import io.atomix.raft.snapshot.InMemorySnapshot; import io.atomix.raft.snapshot.TestSnapshotStore; import io.atomix.raft.storage.RaftStorage; import io.atomix.raft.storage.log.IndexedRaftLogEntry; @@ -53,6 +54,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import org.jmock.lib.concurrent.DeterministicScheduler; +import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** @@ -61,6 +63,8 @@ */ public final class ControllableRaftContexts { + private static final Logger LOG = LoggerFactory.getLogger("TEST"); + private final Map serverProtocols = new HashMap<>(); private final Map>> messageQueue = new HashMap<>(); @@ -68,9 +72,11 @@ public final class ControllableRaftContexts { new HashMap<>(); private Path directory; + private Random random; private final int nodeCount; private final Map raftServers = new HashMap<>(); + private final Map snapshotStores = new HashMap<>(); private Duration electionTimeout; private Duration hearbeatTimeout; private int nextEntry = 0; @@ -96,6 +102,7 @@ public RaftContext getRaftContext(final MemberId memberId) { public void setup(final Path directory, final Random random) throws Exception { this.directory = directory; + this.random = random; if (nodeCount > 0) { createRaftContexts(nodeCount, random); } @@ -150,6 +157,7 @@ private void createRaftContexts(final int nodeCount, final Random random) { } public RaftContext createRaftContext(final MemberId memberId, final Random random) { + final RaftStorage storage = createStorage(memberId); final var raft = new RaftContext( memberId.id() + "-partition-1", @@ -157,7 +165,7 @@ public RaftContext createRaftContext(final MemberId memberId, final Random rando memberId, mock(ClusterMembershipService.class), new ControllableRaftServerProtocol(memberId, serverProtocols, messageQueue), - createStorage(memberId), + storage, getRaftThreadContextFactory(memberId), () -> random, RaftElectionConfig.ofDefaultElection(), @@ -183,12 +191,14 @@ private RaftStorage createStorage( final MemberId memberId, final Function configurator) { final var memberDirectory = getMemberDirectory(directory, memberId.toString()); + final TestSnapshotStore persistedSnapshotStore = new TestSnapshotStore(new AtomicReference<>()); final RaftStorage.Builder defaults = RaftStorage.builder() .withDirectory(memberDirectory) .withMaxSegmentSize(1024 * 10) .withFreeDiskSpace(100) - .withSnapshotStore(new TestSnapshotStore(new AtomicReference<>())); + .withSnapshotStore(persistedSnapshotStore); + snapshotStores.put(memberId, persistedSnapshotStore); return configurator.apply(defaults).build(); } @@ -309,6 +319,33 @@ public void clientAppendOnLeader() { } } + public void snapshotAndCompact(final MemberId memberId) { + final RaftContext raftContext = raftServers.get(memberId); + // Take snapshot at an index between lastSnapshotIndex and current commitIndex + final TestSnapshotStore testSnapshotStore = snapshotStores.get(memberId); + final var startIndex = + Math.max(raftContext.getLog().getFirstIndex(), testSnapshotStore.getCurrentSnapshotIndex()); + if (startIndex >= raftContext.getCommitIndex()) { + // cannot take snapshot + return; + } + final long snapshotIndex = random.nextLong(startIndex, raftContext.getCommitIndex()); + try (final RaftLogReader reader = raftContext.getLog().openCommittedReader()) { + reader.seek(snapshotIndex); + final long term = reader.next().term(); + + InMemorySnapshot.newPersistedSnapshot( + snapshotIndex, term, random.nextInt(2, 10), testSnapshotStore); + + LOG.info( + "Snapshot taken at index {}. Current commit index is {}", + snapshotIndex, + raftContext.getCommitIndex()); + } + + raftContext.getLog().deleteUntil(snapshotIndex); + } + // ----------------------- Verifications ----------------------------- // Verify that committed entries in all logs are equal @@ -325,7 +362,13 @@ public void assertAllLogsEqual() { readers.values().forEach(r -> r.seek(-1)); // seek to first - while (true) { + final long commitIndexOnLeader = + raftServers.values().stream() + .map(RaftContext::getCommitIndex) + .max(Long::compareTo) + .orElseThrow(); + + while (index < commitIndexOnLeader) { final var nextIndex = index + 1; final var entries = readers.keySet().stream() @@ -333,22 +376,15 @@ public void assertAllLogsEqual() { // only compared not compacted entries .filter(s -> s.getLog().getFirstIndex() <= nextIndex) .collect(Collectors.toMap(RaftContext::getName, s -> readers.get(s).next())); - if (entries.size() == 0) { - break; - } + assertThat(entries.values().stream().distinct().count()) .withFailMessage( "Expected to find the same entry at a committed index on all nodes, but found %s", entries) - .isEqualTo(1); + .isLessThanOrEqualTo(1); index++; } - final var commitIndexOnLeader = - raftServers.values().stream() - .map(RaftContext::getCommitIndex) - .max(Long::compareTo) - .orElseThrow(); - assertThat(index).isEqualTo(commitIndexOnLeader); + readers.values().forEach(RaftLogReader::close); } @@ -435,4 +471,25 @@ private IndexedRaftLogEntry getLastCommittedEntry(final RaftContext s) { return null; } } + + public void assertNoGapsInLog() { + raftServers.keySet().forEach(this::assertNoGapsInLog); + } + + private void assertNoGapsInLog(final MemberId memberId) { + final RaftContext s = raftServers.get(memberId); + final long firstIndex = s.getLog().getFirstIndex(); + long nextIndex = firstIndex; + try (final var reader = s.getLog().openCommittedReader()) { + while (reader.hasNext()) { + assertThat(reader.next().index()).isEqualTo(nextIndex); + nextIndex++; + } + } + + if (firstIndex != 1) { + final var currentSnapshotIndex = snapshotStores.get(memberId).getCurrentSnapshotIndex(); + assertThat(currentSnapshotIndex).isGreaterThanOrEqualTo(firstIndex - 1); + } + } } diff --git a/atomix/cluster/src/test/java/io/atomix/raft/RaftOperation.java b/atomix/cluster/src/test/java/io/atomix/raft/RaftOperation.java index f2881de15f00..a1f2578702c1 100644 --- a/atomix/cluster/src/test/java/io/atomix/raft/RaftOperation.java +++ b/atomix/cluster/src/test/java/io/atomix/raft/RaftOperation.java @@ -48,6 +48,13 @@ public String toString() { } /** Returns a list of RaftOperation */ + public static List getRaftOperationsWithSnapshot() { + final List defaultRaftOperation = getDefaultRaftOperations(); + defaultRaftOperation.add( + RaftOperation.of("Take snapshot", ControllableRaftContexts::snapshotAndCompact)); + return defaultRaftOperation; + } + public static List getDefaultRaftOperations() { final List defaultRaftOperation = new ArrayList<>(); defaultRaftOperation.add( diff --git a/atomix/cluster/src/test/java/io/atomix/raft/RandomizedRaftTest.java b/atomix/cluster/src/test/java/io/atomix/raft/RandomizedRaftTest.java index d59e26f7adfd..86e5badf0cf5 100644 --- a/atomix/cluster/src/test/java/io/atomix/raft/RandomizedRaftTest.java +++ b/atomix/cluster/src/test/java/io/atomix/raft/RandomizedRaftTest.java @@ -43,7 +43,8 @@ public class RandomizedRaftTest { private static final int OPERATION_SIZE = 10000; private static final Logger LOG = LoggerFactory.getLogger(RandomizedRaftTest.class); private ControllableRaftContexts raftContexts; - private List operations; + private List defaultOperations; + private List operationsWithSnapshot; private List raftMembers; private Path raftDataDirectory; @@ -55,7 +56,8 @@ public void initOperations() { .mapToObj(String::valueOf) .map(MemberId::from) .collect(Collectors.toList()); - operations = RaftOperation.getDefaultRaftOperations(); + defaultOperations = RaftOperation.getDefaultRaftOperations(); + operationsWithSnapshot = RaftOperation.getRaftOperationsWithSnapshot(); raftMembers = servers; } @@ -67,12 +69,48 @@ public void shutDownRaftNodes() throws IOException { } @Property(tries = 10, shrinking = ShrinkingMode.OFF, edgeCases = EdgeCasesMode.NONE) - void consistencyTest( + void consistencyTestWithNoSnapshot( @ForAll("raftOperations") final List raftOperations, @ForAll("raftMembers") final List raftMembers, @ForAll("seeds") final long seed) throws Exception { + consistencyTest(raftOperations, raftMembers, seed); + } + + @Property(tries = 10, shrinking = ShrinkingMode.OFF, edgeCases = EdgeCasesMode.NONE) + void consistencyTestWithSnapshot( + @ForAll("raftOperationsWithSnapshot") final List raftOperations, + @ForAll("raftMembers") final List raftMembers, + @ForAll("seeds") final long seed) + throws Exception { + + consistencyTest(raftOperations, raftMembers, seed); + } + + @Property(tries = 10, shrinking = ShrinkingMode.OFF, edgeCases = EdgeCasesMode.NONE) + void livenessTestWithNoSnapshot( + @ForAll("raftOperations") final List raftOperations, + @ForAll("raftMembers") final List raftMembers, + @ForAll("seeds") final long seed) + throws Exception { + + livenessTest(raftOperations, raftMembers, seed); + } + + @Property(tries = 10, shrinking = ShrinkingMode.OFF, edgeCases = EdgeCasesMode.NONE) + void livenessTestWithSnapshot( + @ForAll("raftOperationsWithSnapshot") final List raftOperations, + @ForAll("raftMembers") final List raftMembers, + @ForAll("seeds") final long seed) + throws Exception { + + livenessTest(raftOperations, raftMembers, seed); + } + + private void consistencyTest( + final List raftOperations, final List raftMembers, final long seed) + throws Exception { setUpRaftNodes(new Random(seed)); int step = 0; @@ -92,15 +130,12 @@ void consistencyTest( } raftContexts.assertAllLogsEqual(); + raftContexts.assertNoGapsInLog(); } - @Property(tries = 10, shrinking = ShrinkingMode.OFF, edgeCases = EdgeCasesMode.NONE) - void livenessTest( - @ForAll("raftOperations") final List raftOperations, - @ForAll("raftMembers") final List raftMembers, - @ForAll("seeds") final long seed) + private void livenessTest( + final List raftOperations, final List raftMembers, final long seed) throws Exception { - setUpRaftNodes(new Random(seed)); // given - when there are failures such as message loss @@ -146,11 +181,20 @@ void livenessTest( // Verify all entries are replicated and committed in all replicas raftContexts.assertAllLogsEqual(); raftContexts.assertAllEntriesCommittedAndReplicatedToAll(); + raftContexts.assertNoGapsInLog(); } + /** Basic raft operations with out snapshotting, compaction or restart */ @Provide Arbitrary> raftOperations() { - final var operation = Arbitraries.of(operations); + final var operation = Arbitraries.of(defaultOperations); + return operation.list().ofSize(OPERATION_SIZE); + } + + /** Basic raft operation with snapshotting and compaction */ + @Provide + Arbitrary> raftOperationsWithSnapshot() { + final var operation = Arbitraries.of(operationsWithSnapshot); return operation.list().ofSize(OPERATION_SIZE); } From 10eca8eb859a442071b53fe476e80257be9f726a Mon Sep 17 00:00:00 2001 From: Deepthi Devaki Akkoorath Date: Thu, 25 Aug 2022 12:09:13 +0200 Subject: [PATCH 03/12] fix(raft): reset snapshot replication on error in follower When follower responds with any error, leader restarts send the snapshot from the initial chunk. When the follower has not resets its state, then the follower is not expecting the initial chunk. As a result, the follower reject the requests and this will continue endlessly. (cherry picked from commit 9aba732358778c277557eba97d47d4893ef73dc5) --- .../src/main/java/io/atomix/raft/roles/PassiveRole.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/atomix/cluster/src/main/java/io/atomix/raft/roles/PassiveRole.java b/atomix/cluster/src/main/java/io/atomix/raft/roles/PassiveRole.java index 8e6437d2dbdf..839e3973d353 100644 --- a/atomix/cluster/src/main/java/io/atomix/raft/roles/PassiveRole.java +++ b/atomix/cluster/src/main/java/io/atomix/raft/roles/PassiveRole.java @@ -182,6 +182,7 @@ public CompletableFuture onInstall(final InstallRequest request } if (!request.complete() && request.nextChunkId() == null) { + abortPendingSnapshots(); return CompletableFuture.completedFuture( logResponse( InstallResponse.builder() @@ -195,6 +196,7 @@ public CompletableFuture onInstall(final InstallRequest request final var snapshotChunk = new SnapshotChunkImpl(); final var snapshotChunkBuffer = new UnsafeBuffer(request.data()); if (!snapshotChunk.tryWrap(snapshotChunkBuffer)) { + abortPendingSnapshots(); return CompletableFuture.completedFuture( logResponse( InstallResponse.builder() @@ -229,6 +231,7 @@ public CompletableFuture onInstall(final InstallRequest request } else { // fail the request if this is not the expected next chunk if (!isExpectedChunk(request.chunkId())) { + abortPendingSnapshots(); return CompletableFuture.completedFuture( logResponse( InstallResponse.builder() From 1e35bc2c8c1e8b44056f5e1bb86798658507cb65 Mon Sep 17 00:00:00 2001 From: Deepthi Devaki Akkoorath Date: Thu, 25 Aug 2022 13:16:49 +0200 Subject: [PATCH 04/12] test(raft): write different content to each chunk to improve test reliability (cherry picked from commit fe11e344c221091288d621f71fd3526b1479d7f8) --- .../src/test/java/io/atomix/raft/snapshot/InMemorySnapshot.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/atomix/cluster/src/test/java/io/atomix/raft/snapshot/InMemorySnapshot.java b/atomix/cluster/src/test/java/io/atomix/raft/snapshot/InMemorySnapshot.java index 11138dbb4a2f..7a3ebbbc0e95 100644 --- a/atomix/cluster/src/test/java/io/atomix/raft/snapshot/InMemorySnapshot.java +++ b/atomix/cluster/src/test/java/io/atomix/raft/snapshot/InMemorySnapshot.java @@ -63,7 +63,7 @@ public static InMemorySnapshot newPersistedSnapshot( final long index, final long term, final int size, final TestSnapshotStore snapshotStore) { final var snapshot = new InMemorySnapshot(snapshotStore, index, term); for (int i = 0; i < size; i++) { - snapshot.writeChunks("chunk-" + i, "test".getBytes()); + snapshot.writeChunks("chunk-" + i, ("test-" + i).getBytes()); } snapshot.persist(); return snapshot; From 2847be67e55a8d95b8a3bb63c3dc7acefa41d9c5 Mon Sep 17 00:00:00 2001 From: Deepthi Devaki Akkoorath Date: Fri, 26 Aug 2022 09:37:23 +0200 Subject: [PATCH 05/12] test(raft): improve assertion failure message (cherry picked from commit 6a39814f32659703588b11f3a118d04a24751656) --- .../atomix/raft/ControllableRaftContexts.java | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/atomix/cluster/src/test/java/io/atomix/raft/ControllableRaftContexts.java b/atomix/cluster/src/test/java/io/atomix/raft/ControllableRaftContexts.java index d8c3603add06..9cf3b269eb88 100644 --- a/atomix/cluster/src/test/java/io/atomix/raft/ControllableRaftContexts.java +++ b/atomix/cluster/src/test/java/io/atomix/raft/ControllableRaftContexts.java @@ -435,17 +435,15 @@ boolean hasReplicatedAllEntries() { } public void assertAllEntriesCommittedAndReplicatedToAll() { - raftServers - .values() - .forEach( - s -> { - final var lastCommittedEntry = getLastCommittedEntry(s); - final var lastUncommittedEntry = getLastUncommittedEntry(s); - - assertThat(lastCommittedEntry) - .describedAs("All entries should be committed") - .isEqualTo(lastUncommittedEntry); - }); + raftServers.forEach( + (memberId, raftServer) -> { + final var lastCommittedEntry = getLastCommittedEntry(raftServer); + final var lastUncommittedEntry = getLastUncommittedEntry(raftServer); + + assertThat(lastCommittedEntry) + .describedAs("All entries should be committed in %s", memberId.id()) + .isEqualTo(lastUncommittedEntry); + }); assertThat(hasReplicatedAllEntries()) .describedAs("All entries are replicated to all followers") From 14679bdc833e5ea0a8cdd429cc9f37ceb30ae37a Mon Sep 17 00:00:00 2001 From: Deepthi Devaki Akkoorath Date: Fri, 26 Aug 2022 16:15:26 +0200 Subject: [PATCH 06/12] fix(raft): reset log always on receiving a snapshot There were some cases where the log is not reset and leads to scenarios where a follower is not able to replicate new events. This case is explained in https://github.com/camunda/zeebe/pull/10183#issuecomment-1228527414 (cherry picked from commit 4c82bd96c4290a3c6ccba9067bf2ef1c0ce0dbc6) --- .../io/atomix/raft/roles/PassiveRole.java | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/atomix/cluster/src/main/java/io/atomix/raft/roles/PassiveRole.java b/atomix/cluster/src/main/java/io/atomix/raft/roles/PassiveRole.java index 839e3973d353..ced5821c5457 100644 --- a/atomix/cluster/src/main/java/io/atomix/raft/roles/PassiveRole.java +++ b/atomix/cluster/src/main/java/io/atomix/raft/roles/PassiveRole.java @@ -68,10 +68,7 @@ public PassiveRole(final RaftContext context) { public CompletableFuture start() { snapshotListener = createSnapshotListener(); - return super.start() - .thenRun(this::truncateUncommittedEntries) - .thenRun(this::addSnapshotListener) - .thenApply(v -> this); + return super.start().thenRun(this::truncateUncommittedEntries).thenApply(v -> this); } @Override @@ -100,14 +97,6 @@ private void truncateUncommittedEntries() { raft.getLog().flush(); raft.setLastWrittenIndex(raft.getCommitIndex()); } - - // to fix the edge case where we might have been stopped - // between persisting snapshot and truncating log we need to call on restart snapshot listener - // again, such that we truncate the log when necessary - final var latestSnapshot = raft.getCurrentSnapshot(); - if (latestSnapshot != null && snapshotListener != null) { - snapshotListener.onNewSnapshot(latestSnapshot); - } } /** @@ -268,11 +257,12 @@ public CompletableFuture onInstall(final InstallRequest request final long elapsed = System.currentTimeMillis() - pendingSnapshotStartTimestamp; log.debug("Committing snapshot {}", pendingSnapshot); try { + // Reset before committing to prevent the edge case where the system crashes after + // committing the snapshot, and restart with a snapshot and invalid log. + resetLogOnReceivingSnapshot(pendingSnapshot.index()); + final var snapshot = pendingSnapshot.persist().join(); log.info("Committed snapshot {}", snapshot); - // Must be executed immediately before any other operation on this threadcontext. Hence - // don't wait for the listener to be notified by the snapshot store. - snapshotListener.onNewSnapshot(snapshot); } catch (final Exception e) { log.error("Failed to commit pending snapshot {}, rolling back", pendingSnapshot, e); abortPendingSnapshots(); @@ -745,6 +735,17 @@ protected boolean completeAppend( return succeeded; } + private void resetLogOnReceivingSnapshot(final long snapshotIndex) { + final var raftLog = raft.getLog(); + + log.info( + "Delete existing log (lastIndex '{}') and replace with received snapshot (index '{}'). First entry in the log will be at index {}", + raftLog.getLastIndex(), + snapshotIndex, + snapshotIndex + 1); + raftLog.reset(snapshotIndex + 1); + } + private static final class ResetWriterSnapshotListener implements PersistedSnapshotListener { private final ThreadContext threadContext; From e02082680d5a3d54fc3d80a130f85d02abd2a252 Mon Sep 17 00:00:00 2001 From: Deepthi Devaki Akkoorath Date: Fri, 26 Aug 2022 16:24:12 +0200 Subject: [PATCH 07/12] refactor(raft): remove unused snapshot listener (cherry picked from commit 43c96cd240dcb7bbd46a93948d8ae3aabfa7d5b3) --- .../java/io/atomix/raft/roles/LeaderRole.java | 6 -- .../io/atomix/raft/roles/PassiveRole.java | 65 ------------------- 2 files changed, 71 deletions(-) diff --git a/atomix/cluster/src/main/java/io/atomix/raft/roles/LeaderRole.java b/atomix/cluster/src/main/java/io/atomix/raft/roles/LeaderRole.java index d669ad756fe1..664e44fc54db 100644 --- a/atomix/cluster/src/main/java/io/atomix/raft/roles/LeaderRole.java +++ b/atomix/cluster/src/main/java/io/atomix/raft/roles/LeaderRole.java @@ -51,7 +51,6 @@ import io.atomix.utils.concurrent.Futures; import io.atomix.utils.concurrent.Scheduled; import io.camunda.zeebe.journal.JournalException; -import io.camunda.zeebe.snapshots.PersistedSnapshotListener; import java.nio.ByteBuffer; import java.time.Duration; import java.time.Instant; @@ -106,11 +105,6 @@ public synchronized CompletableFuture stop() { .thenRun(this::stepDown); } - @Override - protected PersistedSnapshotListener createSnapshotListener() { - return null; - } - @Override public RaftServer.Role role() { return RaftServer.Role.LEADER; diff --git a/atomix/cluster/src/main/java/io/atomix/raft/roles/PassiveRole.java b/atomix/cluster/src/main/java/io/atomix/raft/roles/PassiveRole.java index ced5821c5457..4e2f74fd89d6 100644 --- a/atomix/cluster/src/main/java/io/atomix/raft/roles/PassiveRole.java +++ b/atomix/cluster/src/main/java/io/atomix/raft/roles/PassiveRole.java @@ -34,19 +34,14 @@ import io.atomix.raft.snapshot.impl.SnapshotChunkImpl; import io.atomix.raft.storage.log.IndexedRaftLogEntry; import io.atomix.raft.storage.log.PersistedRaftRecord; -import io.atomix.raft.storage.log.RaftLog; import io.atomix.raft.storage.log.RaftLogReader; -import io.atomix.utils.concurrent.ThreadContext; import io.camunda.zeebe.journal.JournalException; import io.camunda.zeebe.journal.JournalException.InvalidChecksum; import io.camunda.zeebe.journal.JournalException.InvalidIndex; -import io.camunda.zeebe.snapshots.PersistedSnapshot; -import io.camunda.zeebe.snapshots.PersistedSnapshotListener; import io.camunda.zeebe.snapshots.ReceivedSnapshot; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; import org.agrona.concurrent.UnsafeBuffer; -import org.slf4j.Logger; /** Passive state. */ public class PassiveRole extends InactiveRole { @@ -54,7 +49,6 @@ public class PassiveRole extends InactiveRole { private final SnapshotReplicationMetrics snapshotReplicationMetrics; private long pendingSnapshotStartTimestamp; private ReceivedSnapshot pendingSnapshot; - private PersistedSnapshotListener snapshotListener; private ByteBuffer nextPendingSnapshotChunkId; public PassiveRole(final RaftContext context) { @@ -66,7 +60,6 @@ public PassiveRole(final RaftContext context) { @Override public CompletableFuture start() { - snapshotListener = createSnapshotListener(); return super.start().thenRun(this::truncateUncommittedEntries).thenApply(v -> this); } @@ -74,9 +67,6 @@ public CompletableFuture start() { @Override public CompletableFuture stop() { abortPendingSnapshots(); - if (snapshotListener != null) { - raft.getPersistedSnapshotStore().removeSnapshotListener(snapshotListener); - } // as a safe guard, we clean up any orphaned pending snapshots try { @@ -99,23 +89,6 @@ private void truncateUncommittedEntries() { } } - /** - * Should be overwritten by sub classes to introduce different snapshot listener. - * - *

If null no snapshot listener will be installed - * - * @return the snapshot listener which will be installed - */ - protected PersistedSnapshotListener createSnapshotListener() { - return new ResetWriterSnapshotListener(log, raft.getThreadContext(), raft.getLog()); - } - - private void addSnapshotListener() { - if (snapshotListener != null) { - raft.getPersistedSnapshotStore().addSnapshotListener(snapshotListener); - } - } - @Override public RaftServer.Role role() { return RaftServer.Role.PASSIVE; @@ -745,42 +718,4 @@ private void resetLogOnReceivingSnapshot(final long snapshotIndex) { snapshotIndex + 1); raftLog.reset(snapshotIndex + 1); } - - private static final class ResetWriterSnapshotListener implements PersistedSnapshotListener { - - private final ThreadContext threadContext; - private final RaftLog raftLog; - private final Logger log; - - ResetWriterSnapshotListener( - final Logger log, final ThreadContext threadContext, final RaftLog raftLog) { - this.log = log; - this.threadContext = threadContext; - this.raftLog = raftLog; - } - - @Override - public void onNewSnapshot(final PersistedSnapshot persistedSnapshot) { - if (threadContext.isCurrentContext()) { - // this is called after the snapshot is commited - // on install requests and on Zeebe snapshot replication - - final var index = persistedSnapshot.getIndex(); - // It might happen that the last index is far behind our current snapshot index. - // E. g. on slower followers, we need to throw away the existing log, - // otherwise we might end with an inconsistent log (gaps between last index and - // snapshot index) - final var lastIndex = raftLog.getLastIndex(); - if (lastIndex < index) { - log.info( - "Delete existing log (lastIndex '{}') and replace with received snapshot (index '{}')", - lastIndex, - index); - raftLog.reset(index + 1); - } - } else { - threadContext.execute(() -> onNewSnapshot(persistedSnapshot)); - } - } - } } From e835194e2ce6efa3ba27182287bcca4df0eda75f Mon Sep 17 00:00:00 2001 From: Deepthi Devaki Akkoorath Date: Fri, 26 Aug 2022 17:00:17 +0200 Subject: [PATCH 08/12] test(raft): improve assertion failure message (cherry picked from commit d79b730a998605ed5d333086c19eb169ffa61178) --- .../java/io/atomix/raft/ControllableRaftContexts.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/atomix/cluster/src/test/java/io/atomix/raft/ControllableRaftContexts.java b/atomix/cluster/src/test/java/io/atomix/raft/ControllableRaftContexts.java index 9cf3b269eb88..7c9a8e1827ee 100644 --- a/atomix/cluster/src/test/java/io/atomix/raft/ControllableRaftContexts.java +++ b/atomix/cluster/src/test/java/io/atomix/raft/ControllableRaftContexts.java @@ -480,14 +480,18 @@ private void assertNoGapsInLog(final MemberId memberId) { long nextIndex = firstIndex; try (final var reader = s.getLog().openCommittedReader()) { while (reader.hasNext()) { - assertThat(reader.next().index()).isEqualTo(nextIndex); + assertThat(reader.next().index()) + .describedAs("There is no gap in the log %s", memberId.id()) + .isEqualTo(nextIndex); nextIndex++; } } if (firstIndex != 1) { final var currentSnapshotIndex = snapshotStores.get(memberId).getCurrentSnapshotIndex(); - assertThat(currentSnapshotIndex).isGreaterThanOrEqualTo(firstIndex - 1); + assertThat(currentSnapshotIndex) + .describedAs("The log is compacted in %s. Hence a snapshot must exist.") + .isGreaterThanOrEqualTo(firstIndex - 1); } } } From b61a62ec325d22a3f70370ea2b545cd4287ae83e Mon Sep 17 00:00:00 2001 From: Deepthi Devaki Akkoorath Date: Fri, 26 Aug 2022 17:09:42 +0200 Subject: [PATCH 09/12] test(raft): remove unnecessary seek (cherry picked from commit 1362765fd050e87a91f4913d0ad6c2b500dd79ad) --- .../src/test/java/io/atomix/raft/ControllableRaftContexts.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/atomix/cluster/src/test/java/io/atomix/raft/ControllableRaftContexts.java b/atomix/cluster/src/test/java/io/atomix/raft/ControllableRaftContexts.java index 7c9a8e1827ee..2d5b299997d0 100644 --- a/atomix/cluster/src/test/java/io/atomix/raft/ControllableRaftContexts.java +++ b/atomix/cluster/src/test/java/io/atomix/raft/ControllableRaftContexts.java @@ -360,8 +360,6 @@ public void assertAllLogsEqual() { .orElse(1L) - 1; - readers.values().forEach(r -> r.seek(-1)); // seek to first - final long commitIndexOnLeader = raftServers.values().stream() .map(RaftContext::getCommitIndex) From 5b221b33cc658127f4e1ade16f9ae72bfb61c08b Mon Sep 17 00:00:00 2001 From: Deepthi Devaki Akkoorath Date: Mon, 29 Aug 2022 08:48:31 +0200 Subject: [PATCH 10/12] refactor(raft): fix typo in error message (cherry picked from commit 455a09c3f849adde3fd5dad16fd382ff3812dae6) --- .../cluster/src/main/java/io/atomix/raft/roles/PassiveRole.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/atomix/cluster/src/main/java/io/atomix/raft/roles/PassiveRole.java b/atomix/cluster/src/main/java/io/atomix/raft/roles/PassiveRole.java index 4e2f74fd89d6..facff3435011 100644 --- a/atomix/cluster/src/main/java/io/atomix/raft/roles/PassiveRole.java +++ b/atomix/cluster/src/main/java/io/atomix/raft/roles/PassiveRole.java @@ -200,7 +200,7 @@ public CompletableFuture onInstall(final InstallRequest request .withStatus(RaftResponse.Status.ERROR) .withError( RaftError.Type.ILLEGAL_MEMBER_STATE, - "Request chunk is was received out of order") + "Snapshot chunk is received out of order") .build())); } } From 00fad5f92dfc6a334daa2efd14d123d8991ed1ae Mon Sep 17 00:00:00 2001 From: Deepthi Devaki Akkoorath Date: Mon, 29 Aug 2022 14:16:28 +0200 Subject: [PATCH 11/12] test(raft): allow snapshot with one chunk in randomized tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Ole Schönburg (cherry picked from commit 69df0fac65c909a3e97a0c39174cd857ad2d9680) --- .../src/test/java/io/atomix/raft/ControllableRaftContexts.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/atomix/cluster/src/test/java/io/atomix/raft/ControllableRaftContexts.java b/atomix/cluster/src/test/java/io/atomix/raft/ControllableRaftContexts.java index 2d5b299997d0..8e8d17ca4ad4 100644 --- a/atomix/cluster/src/test/java/io/atomix/raft/ControllableRaftContexts.java +++ b/atomix/cluster/src/test/java/io/atomix/raft/ControllableRaftContexts.java @@ -335,7 +335,7 @@ public void snapshotAndCompact(final MemberId memberId) { final long term = reader.next().term(); InMemorySnapshot.newPersistedSnapshot( - snapshotIndex, term, random.nextInt(2, 10), testSnapshotStore); + snapshotIndex, term, random.nextInt(1, 10), testSnapshotStore); LOG.info( "Snapshot taken at index {}. Current commit index is {}", From d8162c9a53ce446c98e501b2e425fef001c755ed Mon Sep 17 00:00:00 2001 From: Deepthi Devaki Akkoorath Date: Mon, 29 Aug 2022 16:24:59 +0200 Subject: [PATCH 12/12] test(raft): use java 11 compatible Random --- .../test/java/io/atomix/raft/ControllableRaftContexts.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/atomix/cluster/src/test/java/io/atomix/raft/ControllableRaftContexts.java b/atomix/cluster/src/test/java/io/atomix/raft/ControllableRaftContexts.java index 8e8d17ca4ad4..277ef837c43f 100644 --- a/atomix/cluster/src/test/java/io/atomix/raft/ControllableRaftContexts.java +++ b/atomix/cluster/src/test/java/io/atomix/raft/ControllableRaftContexts.java @@ -329,13 +329,14 @@ public void snapshotAndCompact(final MemberId memberId) { // cannot take snapshot return; } - final long snapshotIndex = random.nextLong(startIndex, raftContext.getCommitIndex()); + final long snapshotIndex = + random.longs(1, startIndex, raftContext.getCommitIndex()).findFirst().orElseThrow(); try (final RaftLogReader reader = raftContext.getLog().openCommittedReader()) { reader.seek(snapshotIndex); final long term = reader.next().term(); InMemorySnapshot.newPersistedSnapshot( - snapshotIndex, term, random.nextInt(1, 10), testSnapshotStore); + snapshotIndex, term, random.nextInt(9) + 1, testSnapshotStore); LOG.info( "Snapshot taken at index {}. Current commit index is {}",