Skip to content

Commit

Permalink
RATIS-2084. Follower reply ALREADY_INSTALLED when install old snapsho…
Browse files Browse the repository at this point in the history
…ts from leader (#1091)
  • Loading branch information
SzyWilliam committed May 30, 2024
1 parent bd4ab14 commit 924a0cd
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1875,6 +1875,7 @@ CompletableFuture<Message> applyLogToStateMachine(ReferenceCountedObject<LogEntr
* @param logEntry the log entry being truncated
*/
void notifyTruncatedLogEntry(LogEntryProto logEntry) {
Optional.ofNullable(getState()).ifPresent(s -> s.truncate(logEntry.getIndex()));
if (logEntry.hasStateMachineLogEntry()) {
getTransactionManager().remove(TermIndex.valueOf(logEntry));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,10 @@ void setRaftConf(RaftConfiguration conf) {
LOG.trace("{}: {}", getMemberId(), configurationManager);
}

void truncate(long logIndex) {
configurationManager.removeConfigurations(logIndex);
}

void updateConfiguration(List<LogEntryProto> entries) {
if (entries != null && !entries.isEmpty()) {
configurationManager.removeConfigurations(entries.get(0).getIndex());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ private InstallSnapshotReplyProto installSnapshotImpl(InstallSnapshotRequestProt
if (request.hasLastRaftConfigurationLogEntryProto()) {
// Set the configuration included in the snapshot
final LogEntryProto proto = request.getLastRaftConfigurationLogEntryProto();
state.truncate(proto.getIndex());
if (!state.getRaftConf().equals(LogProtoUtils.toRaftConfiguration(proto))) {
LOG.info("{}: set new configuration {} from snapshot", getMemberId(), proto);
state.setRaftConf(proto);
Expand Down Expand Up @@ -175,9 +176,10 @@ private InstallSnapshotReplyProto checkAndInstallSnapshot(InstallSnapshotRequest
// Check and append the snapshot chunk. We simply put this in lock
// considering a follower peer requiring a snapshot installation does not
// have a lot of requests
Preconditions.assertTrue(state.getLog().getLastCommittedIndex() < lastIncludedIndex,
"%s log's commit index is %s, last included index in snapshot is %s",
getMemberId(), state.getLog().getLastCommittedIndex(), lastIncludedIndex);
if (state.getLog().getLastCommittedIndex() >= lastIncludedIndex) {
return toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.ALREADY_INSTALLED);
}

//TODO: We should only update State with installed snapshot once the request is done.
state.installSnapshot(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.MiniRaftCluster;
Expand Down Expand Up @@ -85,6 +88,12 @@ public void testSeparateSnapshotInstallPath() throws Exception {
runWithNewCluster(1, this::testMultiFileInstallSnapshot);
}

public void testInstallSnapshotLeaderSwitch() throws Exception {
getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
StateMachineWithSeparatedSnapshotPath.class, StateMachine.class);
runWithNewCluster(3, this::testInstallSnapshotDuringLeaderSwitch);
}

private void testMultiFileInstallSnapshot(CLUSTER cluster) throws Exception {
try {
int i = 0;
Expand Down Expand Up @@ -127,6 +136,67 @@ private void testMultiFileInstallSnapshot(CLUSTER cluster) throws Exception {
}
}

private void testInstallSnapshotDuringLeaderSwitch(CLUSTER cluster) throws Exception {
try {
RaftTestUtil.waitForLeader(cluster);
final RaftPeerId leaderId = cluster.getLeader().getId();

// perform operations and force all peers to take snapshot
try (final RaftClient client = cluster.createClient(leaderId)) {
for (int i = 0; i < SNAPSHOT_TRIGGER_THRESHOLD * 2; i++) {
final RaftClientReply
reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i));
Assertions.assertTrue(reply.isSuccess());
}

for (final RaftPeer peer: cluster.getPeers()) {
final RaftClientReply snapshotReply = client.getSnapshotManagementApi(leaderId).create(3000);
Assertions.assertTrue(snapshotReply.isSuccess());
}
}
final SnapshotInfo snapshot = cluster.getLeader().getStateMachine().getLatestSnapshot();
Assertions.assertNotNull(snapshot);

// isolate two followers (majority) in old configuration
final List<RaftServer.Division> oldFollowers = cluster.getFollowers();
for (RaftServer.Division f: oldFollowers) {
RaftTestUtil.isolate(cluster, f.getId());
}

// add two more peers and install snapshot from leaders
final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
true);
try (final RaftClient client = cluster.createClient(leaderId, RetryPolicies.noRetry())) {
Assertions.assertThrows(RaftRetryFailureException.class,
() -> client.admin().setConfiguration(change.allPeersInNewConf));
}

final SnapshotInfo snapshotInfo = cluster.getDivision(change.newPeers[0].getId())
.getStateMachine().getLatestSnapshot();
Assertions.assertNotNull(snapshotInfo);

// recover the old followers and isolate the leader to force leader switch
RaftTestUtil.isolate(cluster, leaderId);
for (RaftServer.Division f: oldFollowers) {
RaftTestUtil.deIsolate(cluster, f.getId());
}
RaftTestUtil.waitForLeader(cluster);

try (final RaftClient client = cluster.createClient(cluster.getLeader().getId())) {
// successfully setConfiguration during leader switch
final RaftClientReply setConf = client.admin().setConfiguration(change.allPeersInNewConf);
Assertions.assertTrue(setConf.isSuccess());

RaftTestUtil.deIsolate(cluster, leaderId);
final RaftClientReply
reply = client.io().send(new RaftTestUtil.SimpleMessage("final"));
Assertions.assertTrue(reply.isSuccess());
}
} finally {
cluster.shutdown();
}
}

private static class StateMachineWithMultiNestedSnapshotFile extends SimpleStateMachine4Testing {

File snapshotRoot;
Expand Down
14 changes: 14 additions & 0 deletions ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,20 @@ static void blockQueueAndSetDelay(Iterable<RaftServer> servers,
Thread.sleep(3 * maxTimeout.toLong(TimeUnit.MILLISECONDS));
}

static void isolate(MiniRaftCluster cluster, RaftPeerId id) {
try {
BlockRequestHandlingInjection.getInstance().blockReplier(id.toString());
cluster.setBlockRequestsFrom(id.toString(), true);
} catch (Exception e) {
e.printStackTrace();
}
}

static void deIsolate(MiniRaftCluster cluster, RaftPeerId id) {
BlockRequestHandlingInjection.getInstance().unblockReplier(id.toString());
cluster.setBlockRequestsFrom(id.toString(), false);
}

static Thread sendMessageInNewThread(MiniRaftCluster cluster, RaftPeerId leaderId, SimpleMessage... messages) {
Thread t = new Thread(() -> {
try (final RaftClient client = cluster.createClient(leaderId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,12 @@ void runTestLostMajorityHeartbeats(CLUSTER cluster) throws Exception {
final TimeDuration maxTimeout = RaftServerConfigKeys.Rpc.timeoutMax(getProperties());
final RaftServer.Division leader = waitForLeader(cluster);
try {
isolate(cluster, leader.getId());
RaftTestUtil.isolate(cluster, leader.getId());
maxTimeout.sleep();
maxTimeout.sleep();
RaftServerTestUtil.assertLostMajorityHeartbeatsRecently(leader);
} finally {
deIsolate(cluster, leader.getId());
RaftTestUtil.deIsolate(cluster, leader.getId());
}
}

Expand Down Expand Up @@ -164,12 +164,12 @@ void runTestListenerNotStartLeaderElection(CLUSTER cluster) throws Exception {
final RaftServer.Division listener = cluster.getListeners().get(0);
final RaftPeerId listenerId = listener.getId();
try {
isolate(cluster, listenerId);
RaftTestUtil.isolate(cluster, listenerId);
maxTimeout.sleep();
maxTimeout.sleep();
Assertions.assertEquals(RaftProtos.RaftPeerRole.LISTENER, listener.getInfo().getCurrentRole());
} finally {
deIsolate(cluster, listener.getId());
RaftTestUtil.deIsolate(cluster, listener.getId());
}
}

Expand Down Expand Up @@ -247,7 +247,7 @@ public void testTransferLeaderTimeout() throws Exception {
RaftServer.Division newLeader = followers.get(0);

// isolate new leader, so that transfer leadership will timeout
isolate(cluster, newLeader.getId());
RaftTestUtil.isolate(cluster, newLeader.getId());

List<RaftPeer> peers = cluster.getPeers();

Expand Down Expand Up @@ -287,7 +287,7 @@ public void testTransferLeaderTimeout() throws Exception {
Assertions.assertEquals(leader.getId().toString(), reply.getReplierId());
Assertions.assertTrue(reply.isSuccess());

deIsolate(cluster, newLeader.getId());
RaftTestUtil.deIsolate(cluster, newLeader.getId());
}

cluster.shutdown();
Expand Down Expand Up @@ -364,32 +364,18 @@ protected void testDisconnectLeader() throws Exception {
try (RaftClient client = cluster.createClient(leader.getId())) {
client.io().send(new RaftTestUtil.SimpleMessage("message"));
Thread.sleep(1000);
isolate(cluster, leader.getId());
RaftTestUtil.isolate(cluster, leader.getId());
RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("message"));
Assertions.assertNotEquals(reply.getReplierId(), leader.getId().toString());
Assertions.assertTrue(reply.isSuccess());
} finally {
deIsolate(cluster, leader.getId());
RaftTestUtil.deIsolate(cluster, leader.getId());
}

cluster.shutdown();
}
}

private void isolate(MiniRaftCluster cluster, RaftPeerId id) {
try {
BlockRequestHandlingInjection.getInstance().blockReplier(id.toString());
cluster.setBlockRequestsFrom(id.toString(), true);
} catch (Exception e) {
e.printStackTrace();
}
}

private void deIsolate(MiniRaftCluster cluster, RaftPeerId id) {
BlockRequestHandlingInjection.getInstance().unblockReplier(id.toString());
cluster.setBlockRequestsFrom(id.toString(), false);
}

@Test
public void testAddListener() throws Exception {
try (final MiniRaftCluster cluster = newCluster(3)) {
Expand Down Expand Up @@ -571,15 +557,15 @@ public void testPreVote() {
assertEquals(followers.size(), 2);

RaftServer.Division follower = followers.get(0);
isolate(cluster, follower.getId());
RaftTestUtil.isolate(cluster, follower.getId());
// send message so that the isolated follower's log lag the others
RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("message"));
Assertions.assertTrue(reply.isSuccess());

final long savedTerm = leader.getInfo().getCurrentTerm();
LOG.info("Wait follower {} timeout and trigger pre-vote", follower.getId());
Thread.sleep(2000);
deIsolate(cluster, follower.getId());
RaftTestUtil.deIsolate(cluster, follower.getId());
Thread.sleep(2000);
// with pre-vote leader will not step down
RaftServer.Division newleader = waitForLeader(cluster);
Expand Down Expand Up @@ -670,14 +656,14 @@ void runTestLeaderLease(CLUSTER cluster, long leaseTimeoutMs) throws Exception {
Assertions.assertTrue(leader.getInfo().isLeaderReady());
RaftServerTestUtil.assertLeaderLease(leader, true);

isolate(cluster, leader.getId());
RaftTestUtil.isolate(cluster, leader.getId());
Thread.sleep(leaseTimeoutMs);

Assertions.assertTrue(leader.getInfo().isLeader());
Assertions.assertTrue(leader.getInfo().isLeaderReady());
RaftServerTestUtil.assertLeaderLease(leader, false);
} finally {
deIsolate(cluster, leader.getId());
RaftTestUtil.deIsolate(cluster, leader.getId());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,10 @@ public void testSeparateSnapshotInstallPath(Boolean separateHeartbeat) throws Ex
super.testSeparateSnapshotInstallPath();
}

@ParameterizedTest
@MethodSource("data")
public void testInstallSnapshotLeaderSwitch(Boolean separateHeartbeat) throws Exception {
GrpcConfigKeys.Server.setHeartbeatChannel(getProperties(), separateHeartbeat);
super.testInstallSnapshotLeaderSwitch();
}
}

0 comments on commit 924a0cd

Please sign in to comment.