Skip to content
Permalink
Browse files
IGNITE-15705 Implemented election timeout auto-adjusting mechanism. F…
…ixes #481

Signed-off-by: Slava Koptilin <slava.koptilin@gmail.com>
  • Loading branch information
alievmirza authored and sk0x50 committed Jan 26, 2022
1 parent 1eff57b commit f51281b2d5b697be98b03de8582839529fd7e192
Showing 11 changed files with 404 additions and 60 deletions.
@@ -18,6 +18,7 @@

import static java.util.stream.Collectors.toList;
import static org.apache.ignite.raft.jraft.core.TestCluster.ELECTION_TIMEOUT_MILLIS;
import static org.apache.ignite.raft.jraft.test.TestUtils.sender;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -51,6 +52,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
import java.util.function.BooleanSupplier;
import java.util.stream.Stream;
import org.apache.ignite.internal.testframework.WorkDirectory;
@@ -99,6 +101,7 @@
import org.apache.ignite.raft.jraft.util.Bits;
import org.apache.ignite.raft.jraft.util.Endpoint;
import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.raft.jraft.util.ExponentialBackoffTimeoutStrategy;
import org.apache.ignite.raft.jraft.util.Utils;
import org.apache.ignite.raft.jraft.util.concurrent.FixedThreadsExecutorGroup;
import org.apache.ignite.utils.ClusterServiceTestUtils;
@@ -1488,20 +1491,15 @@ public void testLeaderFail() throws Exception {

List<Node> followers = cluster.getFollowers();

for (Node follower : followers) {
NodeImpl follower0 = (NodeImpl) follower;
DefaultRaftClientService rpcService = (DefaultRaftClientService) follower0.getRpcClientService();
RpcClientEx rpcClientEx = (RpcClientEx) rpcService.getRpcClient();
rpcClientEx.blockMessages((msg, nodeId) -> {
if (msg instanceof RpcRequests.RequestVoteRequest) {
RpcRequests.RequestVoteRequest msg0 = (RpcRequests.RequestVoteRequest)msg;
blockMessagesOnFollowers(followers, (msg, nodeId) -> {
if (msg instanceof RpcRequests.RequestVoteRequest) {
RpcRequests.RequestVoteRequest msg0 = (RpcRequests.RequestVoteRequest) msg;

return !msg0.preVote();
}
return !msg0.preVote();
}

return false;
});
}
return false;
});

// stop leader
LOG.warn("Stop leader {}", leader.getNodeId().getPeerId());
@@ -1511,12 +1509,7 @@ public void testLeaderFail() throws Exception {
assertFalse(followers.isEmpty());
sendTestTaskAndWait("follower apply ", followers.get(0), -1); // Should fail, because no leader.

for (Node follower : followers) {
NodeImpl follower0 = (NodeImpl) follower;
DefaultRaftClientService rpcService = (DefaultRaftClientService) follower0.getRpcClientService();
RpcClientEx rpcClientEx = (RpcClientEx) rpcService.getRpcClient();
rpcClientEx.stopBlock();
}
stopBlockingMessagesOnFollowers(followers);

// elect new leader
cluster.waitLeader();
@@ -2747,7 +2740,7 @@ public void testLeaderPropagatedBeforeVote() throws Exception {

// Block only one vote message.
for (NodeImpl node : nodes) {
RpcClientEx rpcClientEx = TestUtils.sender(node);
RpcClientEx rpcClientEx = sender(node);
rpcClientEx.recordMessages((msg, nodeId) -> true);
rpcClientEx.blockMessages((msg, nodeId) -> {
if (msg instanceof RpcRequests.RequestVoteRequest) {
@@ -2776,7 +2769,7 @@ public void testLeaderPropagatedBeforeVote() throws Exception {
Node leader = cluster.getLeader();
cluster.ensureLeader(leader);

RpcClientEx client = TestUtils.sender(leader);
RpcClientEx client = sender(leader);

client.stopBlock(1); // Unblock vote message.

@@ -3367,20 +3360,15 @@ public void testBlockedElection() throws Exception {

List<Node> followers = cluster.getFollowers();

for (Node follower : followers) {
NodeImpl follower0 = (NodeImpl) follower;
DefaultRaftClientService rpcService = (DefaultRaftClientService) follower0.getRpcClientService();
RpcClientEx rpcClientEx = (RpcClientEx) rpcService.getRpcClient();
rpcClientEx.blockMessages((msg, nodeId) -> {
if (msg instanceof RpcRequests.RequestVoteRequest) {
RpcRequests.RequestVoteRequest msg0 = (RpcRequests.RequestVoteRequest)msg;
blockMessagesOnFollowers(followers, (msg, nodeId) -> {
if (msg instanceof RpcRequests.RequestVoteRequest) {
RpcRequests.RequestVoteRequest msg0 = (RpcRequests.RequestVoteRequest) msg;

return !msg0.preVote();
}
return !msg0.preVote();
}

return false;
});
}
return false;
});

LOG.warn("Stop leader {}, curTerm={}", leader.getNodeId().getPeerId(), ((NodeImpl) leader).getCurrentTerm());

@@ -3392,17 +3380,84 @@ public void testBlockedElection() throws Exception {

assertNull(cluster.getLeader());

stopBlockingMessagesOnFollowers(followers);

// elect new leader
cluster.waitLeader();
leader = cluster.getLeader();
LOG.info("Elect new leader is {}, curTerm={}", leader.getLeaderId(), ((NodeImpl) leader).getCurrentTerm());
}

@Test
public void testElectionTimeoutAutoAdjustWhenBlockedAllMessages() throws Exception {
testElectionTimeoutAutoAdjustWhenBlockedMessages((msg, nodeId) -> true);
}

@Test
public void testElectionTimeoutAutoAdjustWhenBlockedRequestVoteMessages() throws Exception {
testElectionTimeoutAutoAdjustWhenBlockedMessages((msg, nodeId) -> {
if (msg instanceof RpcRequests.RequestVoteRequest) {
RpcRequests.RequestVoteRequest msg0 = (RpcRequests.RequestVoteRequest) msg;

return !msg0.preVote();
}

return false;
});
}

private void testElectionTimeoutAutoAdjustWhenBlockedMessages(BiPredicate<Object, String> blockingPredicate) throws Exception {
List<PeerId> peers = TestUtils.generatePeers(4);
int maxElectionRoundsWithoutAdjusting = 3;

cluster = new TestCluster("unittest", dataPath, peers, new LinkedHashSet<>(), ELECTION_TIMEOUT_MILLIS,
opts -> opts.setElectionTimeoutStrategy(new ExponentialBackoffTimeoutStrategy(11_000, maxElectionRoundsWithoutAdjusting)),
testInfo);

for (PeerId peer : peers) {
assertTrue(cluster.start(peer.getEndpoint()));
}

cluster.waitLeader();

Node leader = cluster.getLeader();

int initElectionTimeout = leader.getOptions().getElectionTimeoutMs();

LOG.warn("Current leader {}, electTimeout={}", leader.getNodeId().getPeerId(), leader.getOptions().getElectionTimeoutMs());

List<Node> followers = cluster.getFollowers();

for (Node follower : followers) {
NodeImpl follower0 = (NodeImpl) follower;
DefaultRaftClientService rpcService = (DefaultRaftClientService) follower0.getRpcClientService();
RpcClientEx rpcClientEx = (RpcClientEx) rpcService.getRpcClient();
rpcClientEx.stopBlock();

assertEquals(initElectionTimeout, follower0.getOptions().getElectionTimeoutMs());
}

blockMessagesOnFollowers(followers, blockingPredicate);

LOG.warn("Stop leader {}, curTerm={}", leader.getNodeId().getPeerId(), ((NodeImpl) leader).getCurrentTerm());

assertTrue(cluster.stop(leader.getNodeId().getPeerId().getEndpoint()));

assertNull(cluster.getLeader());

assertTrue(waitForCondition(() -> followers.stream().allMatch(f -> f.getOptions().getElectionTimeoutMs() > initElectionTimeout),
(long) maxElectionRoundsWithoutAdjusting
// need to multiply to 2 because stepDown happens after voteTimer timeout
* (initElectionTimeout + followers.get(0).getOptions().getRaftOptions().getMaxElectionDelayMs()) * 2));

stopBlockingMessagesOnFollowers(followers);

// elect new leader
cluster.waitLeader();
leader = cluster.getLeader();
LOG.info("Elect new leader is {}, curTerm={}", leader.getLeaderId(), ((NodeImpl) leader).getCurrentTerm());

LOG.info("Elected new leader is {}, curTerm={}", leader.getLeaderId(), ((NodeImpl) leader).getCurrentTerm());

assertTrue(
waitForCondition(() -> followers.stream().allMatch(f -> f.getOptions().getElectionTimeoutMs() == initElectionTimeout),
3_000));
}

/**
@@ -3686,4 +3741,18 @@ public void run(Status status, long theIndex, byte[] reqCtx) {
latch.await();
return success.get();
}

private void blockMessagesOnFollowers(List<Node> followers, BiPredicate<Object, String> blockingPredicate) {
for (Node follower : followers) {
RpcClientEx rpcClientEx = sender(follower);
rpcClientEx.blockMessages(blockingPredicate);
}
}

private void stopBlockingMessagesOnFollowers(List<Node> followers) {
for (Node follower : followers) {
RpcClientEx rpcClientEx = sender(follower);
rpcClientEx.stopBlock();
}
}
}
@@ -62,12 +62,10 @@ public class Loza implements IgniteComponent {
private static final int CLIENT_POOL_SIZE = Math.min(Utils.cpus() * 3, 20);

/** Timeout. */
// TODO: IGNITE-15705 Correct value should be investigated
private static final int TIMEOUT = 10000;
private static final int RETRY_TIMEOUT = 10000;

/** Network timeout. */
// TODO: IGNITE-15705 Correct value should be investigated
private static final int NETWORK_TIMEOUT = 3000;
private static final int RPC_TIMEOUT = 3000;

/** Retry delay. */
private static final int DELAY = 200;
@@ -205,8 +203,8 @@ private CompletableFuture<RaftGroupService> prepareRaftGroupInternal(String grou
groupId,
clusterNetSvc,
FACTORY,
TIMEOUT,
NETWORK_TIMEOUT,
RETRY_TIMEOUT,
RPC_TIMEOUT,
peers,
true,
DELAY,
@@ -274,7 +272,7 @@ private CompletableFuture<RaftGroupService> updateRaftGroupInternal(String group
groupId,
clusterNetSvc,
FACTORY,
TIMEOUT,
RETRY_TIMEOUT,
peers,
true,
DELAY,
@@ -323,8 +321,8 @@ private CompletableFuture<Void> changePeersInternal(String groupId, List<Cluster
groupId,
clusterNetSvc,
FACTORY,
10 * TIMEOUT,
10 * NETWORK_TIMEOUT,
10 * RETRY_TIMEOUT,
10 * RPC_TIMEOUT,
expectedPeers,
true,
DELAY,
@@ -64,6 +64,7 @@
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.raft.jraft.util.ExponentialBackoffTimeoutStrategy;
import org.apache.ignite.raft.jraft.util.JDKMarshaller;
import org.jetbrains.annotations.Nullable;

@@ -121,8 +122,20 @@ public JraftServerImpl(ClusterService service, Path dataPath, NodeOptions opts)
this.opts.setSharedPools(true);

if (opts.getServerName() == null) {
opts.setServerName(service.localConfiguration().getName());
}
this.opts.setServerName(service.localConfiguration().getName());
}

/*
Timeout increasing strategy for election timeout. Adjusting happens according to
{@link org.apache.ignite.raft.jraft.util.ExponentialBackoffTimeoutStrategy} when a leader is not elected, after several
consecutive unsuccessful leader elections, which could be controlled through {@code roundsWithoutAdjusting} parameter of
{@link org.apache.ignite.raft.jraft.util.ExponentialBackoffTimeoutStrategy}.
Max timeout value that {@link org.apache.ignite.raft.jraft.util.ExponentialBackoffTimeoutStrategy} could produce
must be more than timeout of a membership protocol to remove failed node from the cluster.
In our case, we may assume that 11s could be enough as far as 11s is greater
than suspicion timeout for the 1000 nodes cluster with ping interval equals 500ms.
*/
this.opts.setElectionTimeoutStrategy(new ExponentialBackoffTimeoutStrategy(11_000, 3));
}

/** {@inheritDoc} */

0 comments on commit f51281b

Please sign in to comment.