diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java index e45f310bca14..6c1fe0da0da3 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java @@ -369,26 +369,6 @@ public void shutdown() { nodesToStop.forEach(node -> IgnitionManager.stop(node.name())); } - /** - * Knocks out a node so that it stops receiving messages from other nodes of the cluster. To bring a node back, - * {@link #reanimateNode(int, NodeKnockout)} should be used. - */ - public void knockOutNode(int nodeIndex, NodeKnockout knockout) { - knockout.knockOutNode(nodeIndex, this); - - knockedOutNodesIndices.add(nodeIndex); - } - - /** - * Reanimates a knocked-out node so that it starts receiving messages from other nodes of the cluster again. This nullifies the - * effect of {@link #knockOutNode(int, NodeKnockout)}. - */ - public void reanimateNode(int nodeIndex, NodeKnockout knockout) { - knockout.reanimateNode(nodeIndex, this); - - knockedOutNodesIndices.remove(nodeIndex); - } - /** * Executes an action with a {@link Session} opened via a node with the given index. * @@ -431,71 +411,52 @@ public T query(int nodeIndex, String sql, Function, T> ext } /** - * A way to make a node be separated from a cluster and stop receiving updates. + * Simulate network partition for a chosen node. More precisely, drop all messages sent to it by other cluster members. + * + *

WARNING: this should only be used carefully because 'drop all messages to a node' might break some invariants + * after the 'connectivity' is restored with {@link #removeNetworkPartitionOf(int)}. Only use this method if you + * know what you are doing! Prefer {@link #stopNode(int)}. + * + * @param nodeIndex Index of the node messages to which need to be dropped. */ - public enum NodeKnockout { - /** Stop a node to knock it out. */ - STOP { - @Override - void knockOutNode(int nodeIndex, Cluster cluster) { - cluster.stopNode(nodeIndex); - } + public void simulateNetworkPartitionOf(int nodeIndex) { + IgniteImpl recipient = node(nodeIndex); + + runningNodes() + .filter(node -> node != recipient) + .forEach(sourceNode -> { + sourceNode.dropMessages( + new AddCensorshipByRecipientConsistentId(recipient.name(), sourceNode.dropMessagesPredicate()) + ); + }); - @Override - void reanimateNode(int nodeIndex, Cluster cluster) { - cluster.startNode(nodeIndex); - } - }, - /** Emulate a network partition so that messages to the knocked-out node are dropped. */ - PARTITION_NETWORK { - @Override - void knockOutNode(int nodeIndex, Cluster cluster) { - IgniteImpl recipient = cluster.node(nodeIndex); - - cluster.runningNodes() - .filter(node -> node != recipient) - .forEach(sourceNode -> { - sourceNode.dropMessages( - new AddCensorshipByRecipientConsistentId(recipient.name(), sourceNode.dropMessagesPredicate()) - ); - }); - - LOG.info("Knocked out node " + nodeIndex + " with an artificial network partition"); - } + LOG.info("Knocked out node " + nodeIndex + " with an artificial network partition"); + } - @Override - void reanimateNode(int nodeIndex, Cluster cluster) { - IgniteImpl receiver = cluster.node(nodeIndex); + /** + * Removes the simulated 'network partition' for the given node. + * + * @param nodeIndex Index of the node. + * @see #simulateNetworkPartitionOf(int) + */ + public void removeNetworkPartitionOf(int nodeIndex) { + IgniteImpl receiver = node(nodeIndex); - cluster.runningNodes() - .filter(node -> node != receiver) - .forEach(ignite -> { - var censor = (AddCensorshipByRecipientConsistentId) ignite.dropMessagesPredicate(); + runningNodes() + .filter(node -> node != receiver) + .forEach(ignite -> { + var censor = (AddCensorshipByRecipientConsistentId) ignite.dropMessagesPredicate(); - assertNotNull(censor); + assertNotNull(censor); - if (censor.prevPredicate == null) { - ignite.stopDroppingMessages(); - } else { - ignite.dropMessages(censor.prevPredicate); - } - }); + if (censor.prevPredicate == null) { + ignite.stopDroppingMessages(); + } else { + ignite.dropMessages(censor.prevPredicate); + } + }); - LOG.info("Reanimated node " + nodeIndex + " by removing an artificial network partition"); - } - }; - - /** - * Knocks out a node so that it stops receiving messages from other nodes of the cluster. To bring a node back, - * {@link #reanimateNode(int, Cluster)} should be used. - */ - abstract void knockOutNode(int nodeIndex, Cluster cluster); - - /** - * Reanimates a knocked-out node so that it starts receiving messages from other nodes of the cluster again. This nullifies the - * effect of {@link #knockOutNode(int, Cluster)}. - */ - abstract void reanimateNode(int nodeIndex, Cluster cluster); + LOG.info("Reanimated node " + nodeIndex + " by removing an artificial network partition"); } private static class AddCensorshipByRecipientConsistentId implements BiPredicate { diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java index b0dc26417f06..f6cf2b43ab0b 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java @@ -38,7 +38,6 @@ import java.util.concurrent.TimeUnit; import org.apache.ignite.Ignite; import org.apache.ignite.IgnitionManager; -import org.apache.ignite.internal.Cluster.NodeKnockout; import org.apache.ignite.internal.ClusterPerTestIntegrationTest; import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration; @@ -290,7 +289,7 @@ void nodeDoesNotLeaveLogicalTopologyImmediatelyAfterBeingLostBySwim() throws Exc entryNode.logicalTopologyService().addEventListener(listener); // Knock the node out without allowing it to say good bye. - cluster.knockOutNode(1, NodeKnockout.PARTITION_NETWORK); + cluster.simulateNetworkPartitionOf(1); // 1 second is usually insufficient on my machine to get an event, even if it's produced. On the CI we // should probably account for spurious delays due to other processes, hence 2 seconds. diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java index 567bdab214f8..c44403dd4ec8 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java @@ -53,7 +53,6 @@ import java.util.stream.IntStream; import org.apache.calcite.sql.validate.SqlValidatorException; import org.apache.ignite.internal.Cluster; -import org.apache.ignite.internal.Cluster.NodeKnockout; import org.apache.ignite.internal.IgniteIntegrationTest; import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.logger.IgniteLogger; @@ -63,7 +62,6 @@ import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException; import org.apache.ignite.internal.storage.StorageRebalanceException; import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine; -import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine; import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaResponse; import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId; @@ -127,11 +125,6 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest { */ private static final String DEFAULT_STORAGE_ENGINE = ""; - /** - * {@link NodeKnockout} that is used by tests that are indifferent for the knockout strategy being used. - */ - private static final NodeKnockout DEFAULT_KNOCKOUT = NodeKnockout.PARTITION_NETWORK; - @WorkDirectory private Path workDir; @@ -173,7 +166,7 @@ public void tearDown(TestInfo testInfo) throws Exception { * until {@code shouldStop} returns {@code true}, in that case this method throws {@link UnableToRetry} exception. */ private static T withRetry(Supplier action, Predicate shouldStop) { - int maxAttempts = 4; + int maxAttempts = 5; int sleepMillis = 500; for (int attempt = 1; attempt <= maxAttempts; attempt++) { @@ -184,8 +177,10 @@ private static T withRetry(Supplier action, Predicate s throw new UnableToRetry(e); } if (attempt < maxAttempts && isTransientFailure(e)) { - LOG.warn("Attempt " + attempt + " failed, going to retry", e); + LOG.warn("Attempt {} failed, going to retry", e, attempt); } else { + LOG.error("Attempt {} failed, not going to retry anymore, rethrowing", e, attempt); + throw e; } } @@ -264,29 +259,33 @@ private static List> readRows(ResultSet r } /** - * Tests that a leader successfully feeds a follower with a RAFT snapshot (using {@link NodeKnockout#STOP} strategy - * to knock-out the follower to make it require a snapshot installation). + * Tests that a leader successfully feeds a follower with a RAFT snapshot. */ @Test - void leaderFeedsFollowerWithSnapshotWithKnockoutStop() throws Exception { - testLeaderFeedsFollowerWithSnapshot(Cluster.NodeKnockout.STOP, DEFAULT_STORAGE_ENGINE); + void leaderFeedsFollowerWithSnapshot() throws Exception { + testLeaderFeedsFollowerWithSnapshot(DEFAULT_STORAGE_ENGINE); } /** - * Tests that a leader successfully feeds a follower with a RAFT snapshot (using {@link NodeKnockout#PARTITION_NETWORK} strategy - * to knock-out the follower to make it require a snapshot installation). + * Tests that a leader successfully feeds a follower with a RAFT snapshot on any of the supported storage engines. */ - @Test - void leaderFeedsFollowerWithSnapshotWithKnockoutPartitionNetwork() throws Exception { - testLeaderFeedsFollowerWithSnapshot(Cluster.NodeKnockout.PARTITION_NETWORK, DEFAULT_STORAGE_ENGINE); + // TODO: IGNITE-18481 - make sure we don't forget to add new storage engines here + @ParameterizedTest + @ValueSource(strings = { + RocksDbStorageEngine.ENGINE_NAME, + PersistentPageMemoryStorageEngine.ENGINE_NAME + // TODO: uncomment when https://issues.apache.org/jira/browse/IGNITE-19234 is fixed +// VolatilePageMemoryStorageEngine.ENGINE_NAME + }) + void leaderFeedsFollowerWithSnapshot(String storageEngine) throws Exception { + testLeaderFeedsFollowerWithSnapshot(storageEngine); } /** - * Tests that a leader successfully feeds a follower with a RAFT snapshot (using the given {@link NodeKnockout} strategy - * to knock-out the follower to make it require a snapshot installation and the given storage engine). + * Tests that a leader successfully feeds a follower with a RAFT snapshot (using the given storage engine). */ - private void testLeaderFeedsFollowerWithSnapshot(NodeKnockout knockout, String storageEngine) throws Exception { - feedNode2WithSnapshotOfOneRow(knockout, storageEngine); + private void testLeaderFeedsFollowerWithSnapshot(String storageEngine) throws Exception { + feedNode2WithSnapshotOfOneRow(storageEngine); transferLeadershipOnSolePartitionTo(2); @@ -295,50 +294,42 @@ private void testLeaderFeedsFollowerWithSnapshot(NodeKnockout knockout, String s assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one")))); } - private void feedNode2WithSnapshotOfOneRow(NodeKnockout knockout) throws InterruptedException { - feedNode2WithSnapshotOfOneRow(knockout, DEFAULT_STORAGE_ENGINE); + private void feedNode2WithSnapshotOfOneRow() throws InterruptedException { + feedNode2WithSnapshotOfOneRow(DEFAULT_STORAGE_ENGINE); } - private void feedNode2WithSnapshotOfOneRow(NodeKnockout knockout, String storageEngine) throws InterruptedException { - prepareClusterForInstallingSnapshotToNode2(knockout, storageEngine); + private void feedNode2WithSnapshotOfOneRow(String storageEngine) throws InterruptedException { + prepareClusterForInstallingSnapshotToNode2(storageEngine); - reanimateNode2AndWaitForSnapshotInstalled(knockout); + reanimateNode2AndWaitForSnapshotInstalled(); } /** * Transfer the cluster to a state in which, when node 2 is reanimated from being knocked-out, the only partition * of the only table (called TEST) is transferred to it using RAFT snapshot installation mechanism. - * - * @param knockout The knock-out strategy that was used to knock-out node 2 and that will be used to reanimate it. - * @see NodeKnockout */ - private void prepareClusterForInstallingSnapshotToNode2(NodeKnockout knockout) throws InterruptedException { - prepareClusterForInstallingSnapshotToNode2(knockout, DEFAULT_STORAGE_ENGINE); + private void prepareClusterForInstallingSnapshotToNode2() throws InterruptedException { + prepareClusterForInstallingSnapshotToNode2(DEFAULT_STORAGE_ENGINE); } /** * Transfer the cluster to a state in which, when node 2 is reanimated from being knocked-out, the only partition * of the only table (called TEST) is transferred to it using RAFT snapshot installation mechanism. * - * @param knockout The knock-out strategy that should be used to knock-out node 2. * @param storageEngine Storage engine for the TEST table. - * @see NodeKnockout */ - private void prepareClusterForInstallingSnapshotToNode2(NodeKnockout knockout, String storageEngine) throws InterruptedException { - prepareClusterForInstallingSnapshotToNode2(knockout, storageEngine, theCluster -> {}); + private void prepareClusterForInstallingSnapshotToNode2(String storageEngine) throws InterruptedException { + prepareClusterForInstallingSnapshotToNode2(storageEngine, theCluster -> {}); } /** * Transfer the cluster to a state in which, when node 2 is reanimated from being knocked-out, the only partition * of the only table (called TEST) is transferred to it using RAFT snapshot installation mechanism. * - * @param knockout The knock-out strategy that should be used to knock-out node 2. * @param storageEngine Storage engine for the TEST table. * @param doOnClusterAfterInit Action executed just after the cluster is started and initialized. - * @see NodeKnockout */ private void prepareClusterForInstallingSnapshotToNode2( - NodeKnockout knockout, String storageEngine, Consumer doOnClusterAfterInit ) throws InterruptedException { @@ -352,12 +343,16 @@ private void prepareClusterForInstallingSnapshotToNode2( transferLeadershipOnSolePartitionTo(0); - cluster.knockOutNode(2, knockout); + knockoutNode(2); executeDmlWithRetry(0, "insert into test(key, value) values (1, 'one')"); // Make sure AppendEntries from leader to follower is impossible, making the leader to use InstallSnapshot. - causeLogTruncationOnSolePartitionLeader(); + causeLogTruncationOnSolePartitionLeader(0); + } + + private void knockoutNode(int nodeIndex) { + cluster.stopNode(nodeIndex); } private void createTestTableWith3Replicas(String storageEngine) throws InterruptedException { @@ -396,19 +391,19 @@ private void waitForTableToStart() throws InterruptedException { * with AppendEntries (because the leader does not already have the index that is required to send AppendEntries * to the lagging follower), so the leader will have to send InstallSnapshot instead. */ - private void causeLogTruncationOnSolePartitionLeader() throws InterruptedException { + private void causeLogTruncationOnSolePartitionLeader(int expectedLeaderNodeIndex) throws InterruptedException { // Doing this twice because first snapshot creation does not trigger log truncation. - doSnapshotOnSolePartitionLeader(); - doSnapshotOnSolePartitionLeader(); + doSnapshotOnSolePartitionLeader(expectedLeaderNodeIndex); + doSnapshotOnSolePartitionLeader(expectedLeaderNodeIndex); } /** * Causes a RAFT snapshot to be taken on the RAFT leader of the sole table partition that exists in the cluster. */ - private void doSnapshotOnSolePartitionLeader() throws InterruptedException { + private void doSnapshotOnSolePartitionLeader(int expectedLeaderNodeIndex) throws InterruptedException { TablePartitionId tablePartitionId = solePartitionId(); - doSnapshotOn(tablePartitionId); + doSnapshotOn(tablePartitionId, expectedLeaderNodeIndex); } /** @@ -436,9 +431,14 @@ private static List tablePartitionIds(IgniteImpl node) { /** * Takes a RAFT snapshot on the leader of the RAFT group corresponding to the given table partition. */ - private void doSnapshotOn(TablePartitionId tablePartitionId) throws InterruptedException { + private void doSnapshotOn(TablePartitionId tablePartitionId, int expectedLeaderNodeIndex) throws InterruptedException { RaftGroupService raftGroupService = cluster.leaderServiceFor(tablePartitionId); + assertThat( + "Unexpected leadership change", + raftGroupService.getServerId().getConsistentId(), is(cluster.node(expectedLeaderNodeIndex).name()) + ); + CountDownLatch snapshotLatch = new CountDownLatch(1); AtomicReference snapshotStatus = new AtomicReference<>(); @@ -456,15 +456,15 @@ private void doSnapshotOn(TablePartitionId tablePartitionId) throws InterruptedE * Reanimates (that is, reverts the effects of a knock out) node 2 and waits until a RAFT snapshot is installed * on it for the sole table partition in the cluster. */ - private void reanimateNode2AndWaitForSnapshotInstalled(NodeKnockout knockout) throws InterruptedException { - reanimateNodeAndWaitForSnapshotInstalled(2, knockout); + private void reanimateNode2AndWaitForSnapshotInstalled() throws InterruptedException { + reanimateNodeAndWaitForSnapshotInstalled(2); } /** * Reanimates (that is, reverts the effects of a knock out) a node with the given index and waits until a RAFT snapshot is installed * on it for the sole table partition in the cluster. */ - private void reanimateNodeAndWaitForSnapshotInstalled(int nodeIndex, NodeKnockout knockout) throws InterruptedException { + private void reanimateNodeAndWaitForSnapshotInstalled(int nodeIndex) throws InterruptedException { CountDownLatch snapshotInstalledLatch = new CountDownLatch(1); var handler = new NoOpHandler() { @@ -479,7 +479,7 @@ public void publish(LogRecord record) { replicatorLogger.addHandler(handler); try { - cluster.reanimateNode(nodeIndex, knockout); + reanimateNode(nodeIndex); assertTrue(snapshotInstalledLatch.await(60, TimeUnit.SECONDS), "Did not install a snapshot in time"); } finally { @@ -487,6 +487,10 @@ public void publish(LogRecord record) { } } + private void reanimateNode(int nodeIndex) { + cluster.startNode(nodeIndex); + } + private void transferLeadershipOnSolePartitionTo(int nodeIndex) throws InterruptedException { String nodeConsistentId = cluster.node(nodeIndex).node().name(); @@ -539,32 +543,17 @@ private static void initiateLeadershipTransferTo(String targetLeaderConsistentId /** * Tests that, if first part of a transaction (everything before COMMIT) arrives using AppendEntries, and later the whole * partition state arrives in a RAFT snapshot, then the transaction is seen as committed (i.e. its effects are seen). - * - *

{@link NodeKnockout#STOP} is used to knock out the follower which will accept the snapshot. - */ - @Test - void txSemanticsIsMaintainedWithKnockoutStop() throws Exception { - txSemanticsIsMaintainedAfterInstallingSnapshot(Cluster.NodeKnockout.STOP); - } - - /** - * Tests that, if first part of a transaction (everything before COMMIT) arrives using AppendEntries, and later the whole - * partition state arrives in a RAFT snapshot, then the transaction is seen as committed (i.e. its effects are seen). - * - *

{@link NodeKnockout#PARTITION_NETWORK} is used to knock out the follower which will accept the snapshot. */ @Test - void txSemanticsIsMaintainedWithKnockoutPartitionNetwork() throws Exception { - txSemanticsIsMaintainedAfterInstallingSnapshot(Cluster.NodeKnockout.PARTITION_NETWORK); + void txSemanticsIsMaintained() throws Exception { + txSemanticsIsMaintainedAfterInstallingSnapshot(); } /** * Tests that, if first part of a transaction (everything before COMMIT) arrives using AppendEntries, and later the whole * partition state arrives in a RAFT snapshot, then the transaction is seen as committed (i.e. its effects are seen). - * - *

The given {@link NodeKnockout} is used to knock out the follower which will accept the snapshot. */ - private void txSemanticsIsMaintainedAfterInstallingSnapshot(NodeKnockout knockout) throws Exception { + private void txSemanticsIsMaintainedAfterInstallingSnapshot() throws Exception { cluster.startAndInit(3); createTestTableWith3Replicas(DEFAULT_STORAGE_ENGINE); @@ -577,15 +566,15 @@ private void txSemanticsIsMaintainedAfterInstallingSnapshot(NodeKnockout knockou cluster.doInSession(0, session -> { executeUpdate("insert into test(key, value) values (1, 'one')", session, tx); - cluster.knockOutNode(2, knockout); + knockoutNode(2); tx.commit(); }); // Make sure AppendEntries from leader to follower is impossible, making the leader to use InstallSnapshot. - causeLogTruncationOnSolePartitionLeader(); + causeLogTruncationOnSolePartitionLeader(0); - reanimateNode2AndWaitForSnapshotInstalled(knockout); + reanimateNode2AndWaitForSnapshotInstalled(); transferLeadershipOnSolePartitionTo(2); @@ -594,27 +583,13 @@ private void txSemanticsIsMaintainedAfterInstallingSnapshot(NodeKnockout knockou assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one")))); } - /** - * Tests that a leader successfully feeds a follower with a RAFT snapshot on any of the supported storage engines. - */ - // TODO: IGNITE-18481 - make sure we don't forget to add new storage engines here - @ParameterizedTest - @ValueSource(strings = { - RocksDbStorageEngine.ENGINE_NAME, - PersistentPageMemoryStorageEngine.ENGINE_NAME, - VolatilePageMemoryStorageEngine.ENGINE_NAME - }) - void leaderFeedsFollowerWithSnapshot(String storageEngine) throws Exception { - testLeaderFeedsFollowerWithSnapshot(DEFAULT_KNOCKOUT, storageEngine); - } - /** * Tests that entries can still be added to a follower using AppendEntries after it gets fed with a RAFT snapshot. */ @Test @Disabled("Enable when IGNITE-18485 is fixed") void entriesKeepAppendedAfterSnapshotInstallation() throws Exception { - feedNode2WithSnapshotOfOneRow(DEFAULT_KNOCKOUT); + feedNode2WithSnapshotOfOneRow(); // this should be possibly replaced with executeDmlWithRetry. cluster.doInSession(0, session -> { @@ -637,9 +612,7 @@ void entriesKeepAppendedAfterSnapshotInstallation() throws Exception { // TODO: IGNITE-18423 - enable when ReplicationTimeoutException is fixed @Disabled("IGNITE-18423") void entriesKeepAppendedDuringSnapshotInstallation() throws Exception { - NodeKnockout knockout = DEFAULT_KNOCKOUT; - - prepareClusterForInstallingSnapshotToNode2(knockout); + prepareClusterForInstallingSnapshotToNode2(); AtomicBoolean installedSnapshot = new AtomicBoolean(false); AtomicInteger lastLoadedKey = new AtomicInteger(); @@ -654,7 +627,7 @@ void entriesKeepAppendedDuringSnapshotInstallation() throws Exception { } }); - reanimateNode2AndWaitForSnapshotInstalled(knockout); + reanimateNode2AndWaitForSnapshotInstalled(); installedSnapshot.set(true); @@ -676,22 +649,22 @@ void entriesKeepAppendedDuringSnapshotInstallation() throws Exception { // TODO: IGNITE-18423 - enable when ReplicationTimeoutException is fixed @Disabled("IGNITE-18423") void nodeCanInstallSnapshotsAfterSnapshotInstalledToIt() throws Exception { - feedNode2WithSnapshotOfOneRow(DEFAULT_KNOCKOUT); + feedNode2WithSnapshotOfOneRow(); // The leader (0) has fed the follower (2). Now, change roles: the new leader will be node 2, it will feed node 0. transferLeadershipOnSolePartitionTo(2); - cluster.knockOutNode(0, DEFAULT_KNOCKOUT); + knockoutNode(0); cluster.doInSession(2, session -> { executeUpdate("insert into test(key, value) values (2, 'two')", session); }); // Make sure AppendEntries from leader to follower is impossible, making the leader to use InstallSnapshot. - causeLogTruncationOnSolePartitionLeader(); + causeLogTruncationOnSolePartitionLeader(2); - reanimateNodeAndWaitForSnapshotInstalled(0, DEFAULT_KNOCKOUT); + reanimateNodeAndWaitForSnapshotInstalled(0); transferLeadershipOnSolePartitionTo(0); @@ -706,11 +679,11 @@ void nodeCanInstallSnapshotsAfterSnapshotInstalledToIt() throws Exception { */ @Test void snapshotInstallationRepeatsOnTimeout() throws Exception { - prepareClusterForInstallingSnapshotToNode2(DEFAULT_KNOCKOUT, DEFAULT_STORAGE_ENGINE, theCluster -> { + prepareClusterForInstallingSnapshotToNode2(DEFAULT_STORAGE_ENGINE, theCluster -> { theCluster.node(0).dropMessages(dropFirstSnapshotMetaResponse()); }); - reanimateNode2AndWaitForSnapshotInstalled(DEFAULT_KNOCKOUT); + reanimateNode2AndWaitForSnapshotInstalled(); } private BiPredicate dropFirstSnapshotMetaResponse() { @@ -720,8 +693,10 @@ private BiPredicate dropFirstSnapshotMetaResponse() { } private BiPredicate dropFirstSnapshotMetaResponse(AtomicBoolean sentSnapshotMetaResponse) { + String node2Name = cluster.node(2).name(); + return (targetConsistentId, message) -> { - if (Objects.equals(targetConsistentId, cluster.node(2).name()) && message instanceof SnapshotMetaResponse) { + if (Objects.equals(targetConsistentId, node2Name) && message instanceof SnapshotMetaResponse) { return sentSnapshotMetaResponse.compareAndSet(false, true); } else { return false; @@ -730,8 +705,10 @@ private BiPredicate dropFirstSnapshotMetaResponse(Atomic } private BiPredicate dropSnapshotMetaResponse(CompletableFuture sentFirstSnapshotMetaResponse) { + String node2Name = cluster.node(2).name(); + return (targetConsistentId, message) -> { - if (Objects.equals(targetConsistentId, cluster.node(2).name()) && message instanceof SnapshotMetaResponse) { + if (Objects.equals(targetConsistentId, node2Name) && message instanceof SnapshotMetaResponse) { sentFirstSnapshotMetaResponse.complete(null); // Always drop. @@ -775,7 +752,7 @@ public void publish(LogRecord record) { snapshotExecutorLogger.addHandler(snapshotInstallFailedDueToIdenticalRetryHandler); try { - prepareClusterForInstallingSnapshotToNode2(DEFAULT_KNOCKOUT, DEFAULT_STORAGE_ENGINE, theCluster -> { + prepareClusterForInstallingSnapshotToNode2(DEFAULT_STORAGE_ENGINE, theCluster -> { BiPredicate dropSafeTimeUntilSecondInstallSnapshotRequestIsProcessed = (recipientId, message) -> message instanceof ActionRequest && ((ActionRequest) message).command() instanceof SafeTimeSyncCommand @@ -789,7 +766,7 @@ public void publish(LogRecord record) { theCluster.node(2).dropMessages(dropSafeTimeUntilSecondInstallSnapshotRequestIsProcessed); }); - reanimateNode2AndWaitForSnapshotInstalled(DEFAULT_KNOCKOUT); + reanimateNode2AndWaitForSnapshotInstalled(); } finally { snapshotExecutorLogger.removeHandler(snapshotInstallFailedDueToIdenticalRetryHandler); } @@ -799,7 +776,7 @@ public void publish(LogRecord record) { void testChangeLeaderOnInstallSnapshotInMiddle() throws Exception { CompletableFuture sentSnapshotMetaResponseFormNode1Future = new CompletableFuture<>(); - prepareClusterForInstallingSnapshotToNode2(NodeKnockout.PARTITION_NETWORK, DEFAULT_STORAGE_ENGINE, cluster -> { + prepareClusterForInstallingSnapshotToNode2(DEFAULT_STORAGE_ENGINE, cluster -> { // Let's hang the InstallSnapshot in the "middle" from the leader with index 1. cluster.node(1).dropMessages(dropSnapshotMetaResponse(sentSnapshotMetaResponseFormNode1Future)); }); @@ -807,14 +784,14 @@ void testChangeLeaderOnInstallSnapshotInMiddle() throws Exception { // Change the leader and truncate its log so that InstallSnapshot occurs instead of AppendEntries. transferLeadershipOnSolePartitionTo(1); - causeLogTruncationOnSolePartitionLeader(); + causeLogTruncationOnSolePartitionLeader(1); CompletableFuture installSnapshotSuccessfulFuture = new CompletableFuture<>(); listenForSnapshotInstalledSuccessFromLogger(0, 2, installSnapshotSuccessfulFuture); // Return node 2. - cluster.reanimateNode(2, NodeKnockout.PARTITION_NETWORK); + reanimateNode(2); // Waiting for the InstallSnapshot from node 2 to hang in the "middle". assertThat(sentSnapshotMetaResponseFormNode1Future, willSucceedIn(1, TimeUnit.MINUTES)); @@ -873,7 +850,7 @@ public void publish(LogRecord record) { void testChangeLeaderDuringSnapshotInstallationToLeaderWithEnoughLog() throws Exception { CompletableFuture sentSnapshotMetaResponseFormNode0Future = new CompletableFuture<>(); - prepareClusterForInstallingSnapshotToNode2(NodeKnockout.PARTITION_NETWORK, DEFAULT_STORAGE_ENGINE, cluster -> { + prepareClusterForInstallingSnapshotToNode2(DEFAULT_STORAGE_ENGINE, cluster -> { // Let's hang the InstallSnapshot in the "middle" from the leader with index 0. cluster.node(0).dropMessages(dropSnapshotMetaResponse(sentSnapshotMetaResponseFormNode0Future)); }); @@ -883,7 +860,7 @@ void testChangeLeaderDuringSnapshotInstallationToLeaderWithEnoughLog() throws Ex listenForSnapshotInstalledSuccessFromLogger(1, 2, installSnapshotSuccessfulFuture); // Return node 2. - cluster.reanimateNode(2, NodeKnockout.PARTITION_NETWORK); + reanimateNode(2); // Waiting for the InstallSnapshot from node 2 to hang in the "middle". assertThat(sentSnapshotMetaResponseFormNode0Future, willSucceedIn(1, TimeUnit.MINUTES)); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java index 8e1f6e25fd28..6fadf6d3bb2c 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.rebalance; import static java.util.stream.Collectors.toList; -import static org.apache.ignite.internal.Cluster.NodeKnockout.PARTITION_NETWORK; import static org.apache.ignite.internal.SessionUtils.executeUpdate; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -124,7 +123,7 @@ void assignmentsChangingOnNodeLeaveNodeJoin() throws Exception { assertInstanceOf(ReplicaUnavailableException.class, e.getCause()); } - cluster.knockOutNode(2, PARTITION_NETWORK); + cluster.simulateNetworkPartitionOf(2); assertTrue(waitAssignments(List.of( Set.of(0, 1, 3), @@ -137,7 +136,7 @@ void assignmentsChangingOnNodeLeaveNodeJoin() throws Exception { assertNotNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(1).node()).get()); assertNotNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(3).node()).get()); - cluster.reanimateNode(2, PARTITION_NETWORK); + cluster.removeNetworkPartitionOf(2); assertTrue(waitAssignments(List.of( Set.of(0, 1, 2), @@ -203,10 +202,11 @@ private boolean waitAssignments(List> nodes) throws InterruptedExce private void createTestTable() throws InterruptedException { String sql1 = "create zone test_zone with " + + "partitions=1, replicas=3, " + "data_nodes_auto_adjust_scale_up=0, " + "data_nodes_auto_adjust_scale_down=0"; String sql2 = "create table test (id int primary key, value varchar(20))" - + " with partitions=1, replicas=3, primary_zone='TEST_ZONE'"; + + " with primary_zone='TEST_ZONE'"; cluster.doInSession(0, session -> { executeUpdate(sql1, session);