From 924a0cdf43557d67209805627bd6b1ec941776f0 Mon Sep 17 00:00:00 2001 From: William Song <48054931+SzyWilliam@users.noreply.github.com> Date: Thu, 30 May 2024 15:44:21 +0800 Subject: [PATCH] RATIS-2084. Follower reply ALREADY_INSTALLED when install old snapshots from leader (#1091) --- .../ratis/server/impl/RaftServerImpl.java | 1 + .../apache/ratis/server/impl/ServerState.java | 4 ++ .../impl/SnapshotInstallationHandler.java | 8 ++- .../ratis/InstallSnapshotFromLeaderTests.java | 70 +++++++++++++++++++ .../java/org/apache/ratis/RaftTestUtil.java | 14 ++++ .../server/impl/LeaderElectionTests.java | 38 ++++------ .../ratis/grpc/TestLeaderInstallSnapshot.java | 6 ++ 7 files changed, 112 insertions(+), 29 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index cba3bba1c6..7ec94076fb 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1875,6 +1875,7 @@ CompletableFuture applyLogToStateMachine(ReferenceCountedObject s.truncate(logEntry.getIndex())); if (logEntry.hasStateMachineLogEntry()) { getTransactionManager().remove(TermIndex.valueOf(logEntry)); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index 0f46c6b523..c49e9554f0 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -387,6 +387,10 @@ void setRaftConf(RaftConfiguration conf) { LOG.trace("{}: {}", getMemberId(), configurationManager); } + void truncate(long logIndex) { + configurationManager.removeConfigurations(logIndex); + } + void updateConfiguration(List entries) { if (entries != null && !entries.isEmpty()) { configurationManager.removeConfigurations(entries.get(0).getIndex()); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index f03e2d883d..4a63e64ee0 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -133,6 +133,7 @@ private InstallSnapshotReplyProto installSnapshotImpl(InstallSnapshotRequestProt if (request.hasLastRaftConfigurationLogEntryProto()) { // Set the configuration included in the snapshot final LogEntryProto proto = request.getLastRaftConfigurationLogEntryProto(); + state.truncate(proto.getIndex()); if (!state.getRaftConf().equals(LogProtoUtils.toRaftConfiguration(proto))) { LOG.info("{}: set new configuration {} from snapshot", getMemberId(), proto); state.setRaftConf(proto); @@ -175,9 +176,10 @@ private InstallSnapshotReplyProto checkAndInstallSnapshot(InstallSnapshotRequest // Check and append the snapshot chunk. We simply put this in lock // considering a follower peer requiring a snapshot installation does not // have a lot of requests - Preconditions.assertTrue(state.getLog().getLastCommittedIndex() < lastIncludedIndex, - "%s log's commit index is %s, last included index in snapshot is %s", - getMemberId(), state.getLog().getLastCommittedIndex(), lastIncludedIndex); + if (state.getLog().getLastCommittedIndex() >= lastIncludedIndex) { + return toInstallSnapshotReplyProto(leaderId, getMemberId(), + currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.ALREADY_INSTALLED); + } //TODO: We should only update State with installed snapshot once the request is done. state.installSnapshot(request); diff --git a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java index 46cfebbd17..b83a7dfdd3 100644 --- a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java @@ -21,7 +21,10 @@ import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.exceptions.RaftRetryFailureException; +import org.apache.ratis.retry.RetryPolicies; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.MiniRaftCluster; @@ -85,6 +88,12 @@ public void testSeparateSnapshotInstallPath() throws Exception { runWithNewCluster(1, this::testMultiFileInstallSnapshot); } + public void testInstallSnapshotLeaderSwitch() throws Exception { + getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, + StateMachineWithSeparatedSnapshotPath.class, StateMachine.class); + runWithNewCluster(3, this::testInstallSnapshotDuringLeaderSwitch); + } + private void testMultiFileInstallSnapshot(CLUSTER cluster) throws Exception { try { int i = 0; @@ -127,6 +136,67 @@ private void testMultiFileInstallSnapshot(CLUSTER cluster) throws Exception { } } + private void testInstallSnapshotDuringLeaderSwitch(CLUSTER cluster) throws Exception { + try { + RaftTestUtil.waitForLeader(cluster); + final RaftPeerId leaderId = cluster.getLeader().getId(); + + // perform operations and force all peers to take snapshot + try (final RaftClient client = cluster.createClient(leaderId)) { + for (int i = 0; i < SNAPSHOT_TRIGGER_THRESHOLD * 2; i++) { + final RaftClientReply + reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i)); + Assertions.assertTrue(reply.isSuccess()); + } + + for (final RaftPeer peer: cluster.getPeers()) { + final RaftClientReply snapshotReply = client.getSnapshotManagementApi(leaderId).create(3000); + Assertions.assertTrue(snapshotReply.isSuccess()); + } + } + final SnapshotInfo snapshot = cluster.getLeader().getStateMachine().getLatestSnapshot(); + Assertions.assertNotNull(snapshot); + + // isolate two followers (majority) in old configuration + final List oldFollowers = cluster.getFollowers(); + for (RaftServer.Division f: oldFollowers) { + RaftTestUtil.isolate(cluster, f.getId()); + } + + // add two more peers and install snapshot from leaders + final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true, + true); + try (final RaftClient client = cluster.createClient(leaderId, RetryPolicies.noRetry())) { + Assertions.assertThrows(RaftRetryFailureException.class, + () -> client.admin().setConfiguration(change.allPeersInNewConf)); + } + + final SnapshotInfo snapshotInfo = cluster.getDivision(change.newPeers[0].getId()) + .getStateMachine().getLatestSnapshot(); + Assertions.assertNotNull(snapshotInfo); + + // recover the old followers and isolate the leader to force leader switch + RaftTestUtil.isolate(cluster, leaderId); + for (RaftServer.Division f: oldFollowers) { + RaftTestUtil.deIsolate(cluster, f.getId()); + } + RaftTestUtil.waitForLeader(cluster); + + try (final RaftClient client = cluster.createClient(cluster.getLeader().getId())) { + // successfully setConfiguration during leader switch + final RaftClientReply setConf = client.admin().setConfiguration(change.allPeersInNewConf); + Assertions.assertTrue(setConf.isSuccess()); + + RaftTestUtil.deIsolate(cluster, leaderId); + final RaftClientReply + reply = client.io().send(new RaftTestUtil.SimpleMessage("final")); + Assertions.assertTrue(reply.isSuccess()); + } + } finally { + cluster.shutdown(); + } + } + private static class StateMachineWithMultiNestedSnapshotFile extends SimpleStateMachine4Testing { File snapshotRoot; diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index f7a3f9a526..be8739ad8e 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -514,6 +514,20 @@ static void blockQueueAndSetDelay(Iterable servers, Thread.sleep(3 * maxTimeout.toLong(TimeUnit.MILLISECONDS)); } + static void isolate(MiniRaftCluster cluster, RaftPeerId id) { + try { + BlockRequestHandlingInjection.getInstance().blockReplier(id.toString()); + cluster.setBlockRequestsFrom(id.toString(), true); + } catch (Exception e) { + e.printStackTrace(); + } + } + + static void deIsolate(MiniRaftCluster cluster, RaftPeerId id) { + BlockRequestHandlingInjection.getInstance().unblockReplier(id.toString()); + cluster.setBlockRequestsFrom(id.toString(), false); + } + static Thread sendMessageInNewThread(MiniRaftCluster cluster, RaftPeerId leaderId, SimpleMessage... messages) { Thread t = new Thread(() -> { try (final RaftClient client = cluster.createClient(leaderId)) { diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java index 69791896ab..ecb4a3dc0e 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java @@ -122,12 +122,12 @@ void runTestLostMajorityHeartbeats(CLUSTER cluster) throws Exception { final TimeDuration maxTimeout = RaftServerConfigKeys.Rpc.timeoutMax(getProperties()); final RaftServer.Division leader = waitForLeader(cluster); try { - isolate(cluster, leader.getId()); + RaftTestUtil.isolate(cluster, leader.getId()); maxTimeout.sleep(); maxTimeout.sleep(); RaftServerTestUtil.assertLostMajorityHeartbeatsRecently(leader); } finally { - deIsolate(cluster, leader.getId()); + RaftTestUtil.deIsolate(cluster, leader.getId()); } } @@ -164,12 +164,12 @@ void runTestListenerNotStartLeaderElection(CLUSTER cluster) throws Exception { final RaftServer.Division listener = cluster.getListeners().get(0); final RaftPeerId listenerId = listener.getId(); try { - isolate(cluster, listenerId); + RaftTestUtil.isolate(cluster, listenerId); maxTimeout.sleep(); maxTimeout.sleep(); Assertions.assertEquals(RaftProtos.RaftPeerRole.LISTENER, listener.getInfo().getCurrentRole()); } finally { - deIsolate(cluster, listener.getId()); + RaftTestUtil.deIsolate(cluster, listener.getId()); } } @@ -247,7 +247,7 @@ public void testTransferLeaderTimeout() throws Exception { RaftServer.Division newLeader = followers.get(0); // isolate new leader, so that transfer leadership will timeout - isolate(cluster, newLeader.getId()); + RaftTestUtil.isolate(cluster, newLeader.getId()); List peers = cluster.getPeers(); @@ -287,7 +287,7 @@ public void testTransferLeaderTimeout() throws Exception { Assertions.assertEquals(leader.getId().toString(), reply.getReplierId()); Assertions.assertTrue(reply.isSuccess()); - deIsolate(cluster, newLeader.getId()); + RaftTestUtil.deIsolate(cluster, newLeader.getId()); } cluster.shutdown(); @@ -364,32 +364,18 @@ protected void testDisconnectLeader() throws Exception { try (RaftClient client = cluster.createClient(leader.getId())) { client.io().send(new RaftTestUtil.SimpleMessage("message")); Thread.sleep(1000); - isolate(cluster, leader.getId()); + RaftTestUtil.isolate(cluster, leader.getId()); RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("message")); Assertions.assertNotEquals(reply.getReplierId(), leader.getId().toString()); Assertions.assertTrue(reply.isSuccess()); } finally { - deIsolate(cluster, leader.getId()); + RaftTestUtil.deIsolate(cluster, leader.getId()); } cluster.shutdown(); } } - private void isolate(MiniRaftCluster cluster, RaftPeerId id) { - try { - BlockRequestHandlingInjection.getInstance().blockReplier(id.toString()); - cluster.setBlockRequestsFrom(id.toString(), true); - } catch (Exception e) { - e.printStackTrace(); - } - } - - private void deIsolate(MiniRaftCluster cluster, RaftPeerId id) { - BlockRequestHandlingInjection.getInstance().unblockReplier(id.toString()); - cluster.setBlockRequestsFrom(id.toString(), false); - } - @Test public void testAddListener() throws Exception { try (final MiniRaftCluster cluster = newCluster(3)) { @@ -571,7 +557,7 @@ public void testPreVote() { assertEquals(followers.size(), 2); RaftServer.Division follower = followers.get(0); - isolate(cluster, follower.getId()); + RaftTestUtil.isolate(cluster, follower.getId()); // send message so that the isolated follower's log lag the others RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("message")); Assertions.assertTrue(reply.isSuccess()); @@ -579,7 +565,7 @@ public void testPreVote() { final long savedTerm = leader.getInfo().getCurrentTerm(); LOG.info("Wait follower {} timeout and trigger pre-vote", follower.getId()); Thread.sleep(2000); - deIsolate(cluster, follower.getId()); + RaftTestUtil.deIsolate(cluster, follower.getId()); Thread.sleep(2000); // with pre-vote leader will not step down RaftServer.Division newleader = waitForLeader(cluster); @@ -670,14 +656,14 @@ void runTestLeaderLease(CLUSTER cluster, long leaseTimeoutMs) throws Exception { Assertions.assertTrue(leader.getInfo().isLeaderReady()); RaftServerTestUtil.assertLeaderLease(leader, true); - isolate(cluster, leader.getId()); + RaftTestUtil.isolate(cluster, leader.getId()); Thread.sleep(leaseTimeoutMs); Assertions.assertTrue(leader.getInfo().isLeader()); Assertions.assertTrue(leader.getInfo().isLeaderReady()); RaftServerTestUtil.assertLeaderLease(leader, false); } finally { - deIsolate(cluster, leader.getId()); + RaftTestUtil.deIsolate(cluster, leader.getId()); } } diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java index 22c590c9dd..b85cd13535 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java @@ -46,4 +46,10 @@ public void testSeparateSnapshotInstallPath(Boolean separateHeartbeat) throws Ex super.testSeparateSnapshotInstallPath(); } + @ParameterizedTest + @MethodSource("data") + public void testInstallSnapshotLeaderSwitch(Boolean separateHeartbeat) throws Exception { + GrpcConfigKeys.Server.setHeartbeatChannel(getProperties(), separateHeartbeat); + super.testInstallSnapshotLeaderSwitch(); + } }