Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -369,26 +369,6 @@ public void shutdown() {
nodesToStop.forEach(node -> IgnitionManager.stop(node.name()));
}

/**
* Knocks out a node so that it stops receiving messages from other nodes of the cluster. To bring a node back,
* {@link #reanimateNode(int, NodeKnockout)} should be used.
*/
public void knockOutNode(int nodeIndex, NodeKnockout knockout) {
knockout.knockOutNode(nodeIndex, this);

knockedOutNodesIndices.add(nodeIndex);
}

/**
* Reanimates a knocked-out node so that it starts receiving messages from other nodes of the cluster again. This nullifies the
* effect of {@link #knockOutNode(int, NodeKnockout)}.
*/
public void reanimateNode(int nodeIndex, NodeKnockout knockout) {
knockout.reanimateNode(nodeIndex, this);

knockedOutNodesIndices.remove(nodeIndex);
}

/**
* Executes an action with a {@link Session} opened via a node with the given index.
*
Expand Down Expand Up @@ -431,71 +411,52 @@ public <T> T query(int nodeIndex, String sql, Function<ResultSet<SqlRow>, T> ext
}

/**
* A way to make a node be separated from a cluster and stop receiving updates.
* Simulate network partition for a chosen node. More precisely, drop all messages sent to it by other cluster members.
*
* <p>WARNING: this should only be used carefully because 'drop all messages to a node' might break some invariants
* after the 'connectivity' is restored with {@link #removeNetworkPartitionOf(int)}. Only use this method if you
* know what you are doing! Prefer {@link #stopNode(int)}.
*
* @param nodeIndex Index of the node messages to which need to be dropped.
*/
public enum NodeKnockout {
/** Stop a node to knock it out. */
STOP {
@Override
void knockOutNode(int nodeIndex, Cluster cluster) {
cluster.stopNode(nodeIndex);
}
public void simulateNetworkPartitionOf(int nodeIndex) {
IgniteImpl recipient = node(nodeIndex);

runningNodes()
.filter(node -> node != recipient)
.forEach(sourceNode -> {
sourceNode.dropMessages(
new AddCensorshipByRecipientConsistentId(recipient.name(), sourceNode.dropMessagesPredicate())
);
});

@Override
void reanimateNode(int nodeIndex, Cluster cluster) {
cluster.startNode(nodeIndex);
}
},
/** Emulate a network partition so that messages to the knocked-out node are dropped. */
PARTITION_NETWORK {
@Override
void knockOutNode(int nodeIndex, Cluster cluster) {
IgniteImpl recipient = cluster.node(nodeIndex);

cluster.runningNodes()
.filter(node -> node != recipient)
.forEach(sourceNode -> {
sourceNode.dropMessages(
new AddCensorshipByRecipientConsistentId(recipient.name(), sourceNode.dropMessagesPredicate())
);
});

LOG.info("Knocked out node " + nodeIndex + " with an artificial network partition");
}
LOG.info("Knocked out node " + nodeIndex + " with an artificial network partition");
}

@Override
void reanimateNode(int nodeIndex, Cluster cluster) {
IgniteImpl receiver = cluster.node(nodeIndex);
/**
* Removes the simulated 'network partition' for the given node.
*
* @param nodeIndex Index of the node.
* @see #simulateNetworkPartitionOf(int)
*/
public void removeNetworkPartitionOf(int nodeIndex) {
IgniteImpl receiver = node(nodeIndex);

cluster.runningNodes()
.filter(node -> node != receiver)
.forEach(ignite -> {
var censor = (AddCensorshipByRecipientConsistentId) ignite.dropMessagesPredicate();
runningNodes()
.filter(node -> node != receiver)
.forEach(ignite -> {
var censor = (AddCensorshipByRecipientConsistentId) ignite.dropMessagesPredicate();

assertNotNull(censor);
assertNotNull(censor);

if (censor.prevPredicate == null) {
ignite.stopDroppingMessages();
} else {
ignite.dropMessages(censor.prevPredicate);
}
});
if (censor.prevPredicate == null) {
ignite.stopDroppingMessages();
} else {
ignite.dropMessages(censor.prevPredicate);
}
});

LOG.info("Reanimated node " + nodeIndex + " by removing an artificial network partition");
}
};

/**
* Knocks out a node so that it stops receiving messages from other nodes of the cluster. To bring a node back,
* {@link #reanimateNode(int, Cluster)} should be used.
*/
abstract void knockOutNode(int nodeIndex, Cluster cluster);

/**
* Reanimates a knocked-out node so that it starts receiving messages from other nodes of the cluster again. This nullifies the
* effect of {@link #knockOutNode(int, Cluster)}.
*/
abstract void reanimateNode(int nodeIndex, Cluster cluster);
LOG.info("Reanimated node " + nodeIndex + " by removing an artificial network partition");
}

private static class AddCensorshipByRecipientConsistentId implements BiPredicate<String, NetworkMessage> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgnitionManager;
import org.apache.ignite.internal.Cluster.NodeKnockout;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
Expand Down Expand Up @@ -290,7 +289,7 @@ void nodeDoesNotLeaveLogicalTopologyImmediatelyAfterBeingLostBySwim() throws Exc
entryNode.logicalTopologyService().addEventListener(listener);

// Knock the node out without allowing it to say good bye.
cluster.knockOutNode(1, NodeKnockout.PARTITION_NETWORK);
cluster.simulateNetworkPartitionOf(1);

// 1 second is usually insufficient on my machine to get an event, even if it's produced. On the CI we
// should probably account for spurious delays due to other processes, hence 2 seconds.
Expand Down
Loading