Skip to content

Commit

Permalink
Complete placeholder AtomicRegister Coordinator tests (#106512)
Browse files Browse the repository at this point in the history
Porting over Coordinator tests to
Stateless/AtomisRegisterCoordinatorTests
  • Loading branch information
DiannaHohensee committed Mar 21, 2024
1 parent f617694 commit 49d40c3
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,13 @@ private void closePrevotingRound() {
}
}

/**
* Updates {@link #maxTermSeen} if greater.
*
* Every time a new term is found, either from another node requesting election, or this node trying to run for election, always update
* the max term number. The max term may not reflect an actual election, but rather an election attempt by some node in the
* cluster.
*/
private void updateMaxTermSeen(final long term) {
synchronized (mutex) {
maxTermSeen = Math.max(maxTermSeen, term);
Expand Down Expand Up @@ -549,6 +556,13 @@ private void startElection() {
}
}

/**
* Broadcasts a request to all 'discoveredNodes' in the cluster to elect 'candidateMasterNode' as the new master.
*
* @param candidateMasterNode the node running for election
* @param term the new proposed master term
* @param discoveredNodes all the nodes to which to send the request
*/
private void broadcastStartJoinRequest(DiscoveryNode candidateMasterNode, long term, List<DiscoveryNode> discoveredNodes) {
electionStrategy.onNewElection(candidateMasterNode, term, new ActionListener<>() {
@Override
Expand Down Expand Up @@ -670,6 +684,9 @@ public void onFailure(Exception e) {
});
}

/**
* Validates a request to join the new cluster. Runs on the candidate node running for election to master.
*/
private void validateJoinRequest(JoinRequest joinRequest, ActionListener<Void> validateListener) {

// Before letting the node join the cluster, ensure:
Expand Down Expand Up @@ -753,6 +770,9 @@ private void sendJoinPing(DiscoveryNode discoveryNode, TransportRequestOptions.T
);
}

/**
* Processes the request to join the cluster. Received by the node running for election to master.
*/
private void processJoinRequest(JoinRequest joinRequest, ActionListener<Void> joinListener) {
assert Transports.assertNotTransportThread("blocking on coordinator mutex and maybe doing IO to increase term");
final Optional<Join> optionalJoin = joinRequest.getOptionalJoin();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@
import static org.elasticsearch.cluster.coordination.stateless.StoreHeartbeatService.HEARTBEAT_FREQUENCY;
import static org.elasticsearch.cluster.coordination.stateless.StoreHeartbeatService.MAX_MISSED_HEARTBEATS;

/**
* Tests that the Coordinator code runs correctly relying on atomic register compare-and-swap. Stateless will use implementations of atomic
* register CAS in the cloud blob stores.
*
* StatelessCoordinationTests extends AtomicRegisterCoordinatorTests for testing, inheriting all the tests but using different
* {@link ElectionStrategy} implementations, etc.
*/
@TestLogging(reason = "these tests do a lot of log-worthy things but we usually don't care", value = "org.elasticsearch:FATAL")
public class AtomicRegisterCoordinatorTests extends CoordinatorTests {

Expand Down Expand Up @@ -86,23 +93,23 @@ public void testAckListenerReceivesNacksIfLeaderStandsDown() {
}

@Override
@AwaitsFix(bugUrl = "ES-5645")
public void testAckListenerReceivesNacksIfPublicationTimesOut() {
// The leader still has access to the register, therefore it acknowledges the state update
testAckListenerReceivesNacksIfPublicationTimesOut(true);
}

@Override
@AwaitsFix(bugUrl = "ES-5645")
public void testClusterCannotFormWithFailingJoinValidation() {
// A single node can form a cluster in this case
public void testClusterCannotFormWithFailingJoinValidation() throws Exception {
// A single node can form a cluster if it is able to join (vote for) its own cluster, so we must disable all nodes from successfully
// joining a cluster.
clusterCannotFormWithFailingJoinValidation(true);
}

@Override
@AwaitsFix(bugUrl = "ES-5645")
@AwaitsFix(bugUrl = "ES-8099")
public void testCannotJoinClusterWithDifferentUUID() {
// The cluster2 leader is considered dead since we only run the nodes in cluster 1
// therefore the node coming from cluster 2 ends up taking over the old master in cluster 2
// TODO: add more checks to avoid forming a mixed cluster between register based and traditional clusters
// Placeholder to implement a test wherein the blob store cluster state is suddenly swapped out with a different cluster's state
// with a different UUID. The cluster nodes should recognize the UUID change and refuse to load the foreign cluster state.
}

@Override
Expand Down Expand Up @@ -192,6 +199,9 @@ protected CoordinatorStrategy createCoordinatorStrategy() {
return new AtomicRegisterCoordinatorStrategy();
}

/**
* Strategy used to inject custom behavior into the {@link AbstractCoordinatorTestCase} test infrastructure.
*/
class AtomicRegisterCoordinatorStrategy implements CoordinatorStrategy {
private final AtomicLong currentTermRef = new AtomicLong();
private final AtomicReference<Heartbeat> heartBeatRef = new AtomicReference<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@

@TestLogging(reason = "these tests do a lot of log-worthy things but we usually don't care", value = "org.elasticsearch:FATAL")
public class CoordinatorTests extends AbstractCoordinatorTestCase {

public void testCanUpdateClusterStateAfterStabilisation() {
try (Cluster cluster = new Cluster(randomIntBetween(1, 5))) {
cluster.runRandomly();
Expand Down Expand Up @@ -637,6 +636,10 @@ public void testAckListenerReceivesNoAckFromHangingFollower() {
}

public void testAckListenerReceivesNacksIfPublicationTimesOut() {
testAckListenerReceivesNacksIfPublicationTimesOut(false);
}

protected void testAckListenerReceivesNacksIfPublicationTimesOut(boolean expectLeaderAcksSuccessfullyInStateless) {
try (Cluster cluster = new Cluster(3)) {
cluster.runRandomly();
cluster.stabilise();
Expand All @@ -651,12 +654,19 @@ public void testAckListenerReceivesNacksIfPublicationTimesOut() {
assertFalse("expected no immediate ack from " + leader, ackCollector.hasAcked(leader));
assertFalse("expected no immediate ack from " + follower0, ackCollector.hasAcked(follower0));
assertFalse("expected no immediate ack from " + follower1, ackCollector.hasAcked(follower1));

follower0.heal();
follower1.heal();
cluster.stabilise();
assertTrue("expected eventual nack from " + follower0, ackCollector.hasAckedUnsuccessfully(follower0));
assertTrue("expected eventual nack from " + follower1, ackCollector.hasAckedUnsuccessfully(follower1));
assertTrue("expected eventual nack from " + leader, ackCollector.hasAckedUnsuccessfully(leader));
if (expectLeaderAcksSuccessfullyInStateless) {
// A stateless leader directly updates the cluster state in the remote blob store: it does not require communication with
// the other cluster nodes to procceed with an update commit to the cluster state.
assertTrue("expected ack from leader, " + leader, ackCollector.hasAckedSuccessfully(leader));
} else {
assertTrue("expected eventual nack from leader, " + leader, ackCollector.hasAckedUnsuccessfully(leader));
}
}
}

Expand Down Expand Up @@ -1271,21 +1281,50 @@ public void testNodeCannotJoinIfJoinValidationFailsOnJoiningNode() {
}
}

public void testClusterCannotFormWithFailingJoinValidation() {
public void testClusterCannotFormWithFailingJoinValidation() throws Exception {
clusterCannotFormWithFailingJoinValidation(false);
}

/**
* Forms a random sized cluster and then disables join validation on either a random majority subset or all cluster nodes. Then checks
* that election fails.
*
* @param failJoinOnAllNodes this controls whether to fail join on all nodes or only a majority subset. The atomic register CAS election
* strategy will succeed in electing a master if any node can vote (even the master candidate voting for
* itself).
* @throws Exception
*/
protected void clusterCannotFormWithFailingJoinValidation(boolean failJoinOnAllNodes) throws Exception {
try (Cluster cluster = new Cluster(randomIntBetween(1, 5))) {
// fail join validation on a majority of nodes in the initial configuration
randomValueOtherThanMany(
nodes -> cluster.initialConfiguration.hasQuorum(
nodes.stream().map(ClusterNode::getLocalNode).map(DiscoveryNode::getId).collect(Collectors.toSet())
) == false,
() -> randomSubsetOf(cluster.clusterNodes)
).forEach(cn -> cn.extraJoinValidators.add((discoveryNode, clusterState) -> {
List<ClusterNode> clusterNodesToFailJoin;
if (failJoinOnAllNodes) {
// The AtomicRegister strategy succeeds if a master candidate votes for itself, so we must disable all nodes from passing
// join validation so that none of them can self-elect.
clusterNodesToFailJoin = cluster.clusterNodes;
} else {
// Fetch a random subset of cluster nodes that form a quorum (majority subset).
clusterNodesToFailJoin = randomValueOtherThanMany(
nodes -> cluster.initialConfiguration.hasQuorum(
nodes.stream().map(ClusterNode::getLocalNode).map(DiscoveryNode::getId).collect(Collectors.toSet())
) == false,
() -> randomSubsetOf(cluster.clusterNodes)
);
}

// Fail join validation on the set of nodes so that election will fail in the initial configuration.
clusterNodesToFailJoin.forEach(cn -> cn.extraJoinValidators.add((discoveryNode, clusterState) -> {
throw new IllegalArgumentException("join validation failed");
}));

cluster.bootstrapIfNecessary();

// Run the cluster for 10 seconds to give the cluster time to elect a master.
// It's possible stabilisation takes longer, but essentially impossible that it _always_ takes longer.
cluster.runFor(10000, "failing join validation");

assertTrue(cluster.clusterNodes.stream().allMatch(cn -> cn.getLastAppliedClusterState().version() == 0));

// Now clear the validation failures and verify that the cluster stabilizes.
for (ClusterNode clusterNode : cluster.clusterNodes) {
clusterNode.extraJoinValidators.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ protected static int defaultInt(Setting<Integer> setting) {
// Then a commit of the new leader's first cluster state
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY;

/**
* An estimate for the max time needed to stabilize a cluster. Takes into account delays for various communications involved in
* leader elections.
* */
public static final long DEFAULT_STABILISATION_TIME =
// If leader just blackholed, need to wait for this to be detected
(defaultMillis(LEADER_CHECK_INTERVAL_SETTING) + defaultMillis(LEADER_CHECK_TIMEOUT_SETTING)) * defaultInt(
Expand Down Expand Up @@ -549,6 +553,9 @@ private void updateCommittedStates() {
}
}

/**
* Uses a default period of time in which to wait for cluster stabilisation, and then verifies that a master has been elected.
*/
public void stabilise() {
stabilise(DEFAULT_STABILISATION_TIME, true);
}
Expand Down Expand Up @@ -942,6 +949,9 @@ protected long transportDelayMillis(String actionName) {
return 0;
}

/**
* Mimics a cluster node for testing.
*/
public final class ClusterNode {
private static final Logger logger = LogManager.getLogger(ClusterNode.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,18 @@ public String toString() {
}
}

/**
* Adds the list of class loggers to this {@link MockLogAppender}.
*
* Stops ({@link #stop()}) and runs some checks on the {@link MockLogAppender} once the returned object is released.
*/
public Releasable capturing(Class<?>... classes) {
return appendToLoggers(Arrays.stream(classes).map(LogManager::getLogger).toList());
}

/**
* Same as above except takes string class names of each logger.
*/
public Releasable capturing(String... names) {
return appendToLoggers(Arrays.stream(names).map(LogManager::getLogger).toList());
}
Expand Down

0 comments on commit 49d40c3

Please sign in to comment.