Skip to content

Commit

Permalink
Add register based coordinator test suite (#93911)
Browse files Browse the repository at this point in the history
This commit adds a new register based coordinator test suite that uses an atomic
register in order to handle the cluster state coordination.

Closes ES-5417
  • Loading branch information
fcofdez committed Mar 8, 2023
1 parent d875dc2 commit 6bb796f
Show file tree
Hide file tree
Showing 14 changed files with 985 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public boolean isElectionQuorum(VoteCollection joinVotes) {
}

public boolean isPublishQuorum(VoteCollection votes) {
return votes.isQuorum(getLastCommittedConfiguration()) && votes.isQuorum(lastPublishedConfiguration);
return electionStrategy.isPublishQuorum(votes, getLastCommittedConfiguration(), lastPublishedConfiguration);
}

public boolean containsJoinVoteFor(DiscoveryNode node) {
Expand Down Expand Up @@ -165,7 +165,7 @@ public void setInitialState(ClusterState initialState) {
* @throws CoordinationStateRejectedException if the arguments were incompatible with the current state of this object.
*/
public Join handleStartJoin(StartJoinRequest startJoinRequest) {
if (startJoinRequest.getTerm() <= getCurrentTerm()) {
if (electionStrategy.shouldJoinLeaderInTerm(getCurrentTerm(), startJoinRequest.getTerm()) == false) {
logger.debug(
"handleStartJoin: ignoring [{}] as term provided is not greater than current term [{}]",
startJoinRequest,
Expand Down Expand Up @@ -332,8 +332,7 @@ public PublishRequest handleClientValue(ClusterState clusterState) {
);
}

if (clusterState.getLastAcceptedConfiguration().equals(getLastAcceptedConfiguration()) == false
&& getLastCommittedConfiguration().equals(getLastAcceptedConfiguration()) == false) {
if (electionStrategy.isInvalidReconfiguration(clusterState, getLastAcceptedConfiguration(), getLastCommittedConfiguration())) {
logger.debug("handleClientValue: only allow reconfiguration while not already reconfiguring");
throw new CoordinationStateRejectedException("only allow reconfiguration while not already reconfiguring");
}
Expand Down Expand Up @@ -500,6 +499,7 @@ public void handleCommit(ApplyCommitRequest applyCommit) {
applyCommit.getVersion()
);

assert getLastAcceptedTerm() == applyCommit.getTerm() && getLastAcceptedVersion() == applyCommit.getVersion();
persistedState.markLastAcceptedStateAsCommitted();
assert getLastCommittedConfiguration().equals(getLastAcceptedConfiguration());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ public class Coordinator extends AbstractLifecycleComponent implements ClusterSt
private Optional<CoordinatorPublication> currentPublication = Optional.empty();
private final NodeHealthService nodeHealthService;
private final List<PeerFinderListener> peerFinderListeners;
private final LeaderHeartbeatService leaderHeartbeatService;

/**
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
Expand All @@ -199,7 +200,9 @@ public Coordinator(
RerouteService rerouteService,
ElectionStrategy electionStrategy,
NodeHealthService nodeHealthService,
CircuitBreakerService circuitBreakerService
CircuitBreakerService circuitBreakerService,
Reconfigurator reconfigurator,
LeaderHeartbeatService leaderHeartbeatService
) {
this.settings = settings;
this.transportService = transportService;
Expand All @@ -220,7 +223,8 @@ public Coordinator(
rerouteService,
nodeHealthService,
joinReasonService,
circuitBreakerService
circuitBreakerService,
reconfigurator::maybeReconfigureAfterNewMasterIsElected
);
this.joinValidationService = new JoinValidationService(
settings,
Expand Down Expand Up @@ -272,7 +276,7 @@ public Coordinator(
this.nodeLeftQueue = masterService.createTaskQueue("node-left", Priority.IMMEDIATE, new NodeLeftExecutor(allocationService));
this.clusterApplier = clusterApplier;
masterService.setClusterStateSupplier(this::getStateForMasterService);
this.reconfigurator = new Reconfigurator(settings, clusterSettings);
this.reconfigurator = reconfigurator;
this.clusterBootstrapService = new ClusterBootstrapService(
settings,
transportService,
Expand All @@ -299,6 +303,7 @@ public Coordinator(
this.nodeHealthService = nodeHealthService;
this.peerFinderListeners = new CopyOnWriteArrayList<>();
this.peerFinderListeners.add(clusterBootstrapService);
this.leaderHeartbeatService = leaderHeartbeatService;
}

/**
Expand Down Expand Up @@ -516,6 +521,7 @@ private void startElection() {

final StartJoinRequest startJoinRequest = new StartJoinRequest(getLocalNode(), Math.max(getCurrentTerm(), maxTermSeen) + 1);
logger.debug("starting election with {}", startJoinRequest);
electionStrategy.onNewElection(startJoinRequest.getSourceNode(), startJoinRequest.getTerm(), getLastAcceptedState());
getDiscoveredNodes().forEach(node -> joinHelper.sendStartJoinRequest(startJoinRequest, node));
}
}
Expand Down Expand Up @@ -549,7 +555,7 @@ private static boolean nodeMayWinElection(ClusterState lastAcceptedState, Discov

private Optional<Join> ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
if (getCurrentTerm() < targetTerm) {
if (electionStrategy.shouldJoinLeaderInTerm(getCurrentTerm(), targetTerm)) {
return Optional.of(joinLeaderInTerm(new StartJoinRequest(sourceNode, targetTerm)));
}
return Optional.empty();
Expand Down Expand Up @@ -822,6 +828,7 @@ void becomeCandidate(String method) {
peerFinder.activate(coordinationState.get().getLastAcceptedState().nodes());
clusterFormationFailureHelper.start();

leaderHeartbeatService.stop();
leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES);
leaderChecker.updateLeader(null);

Expand Down Expand Up @@ -864,6 +871,7 @@ private void becomeLeader() {
clusterFormationFailureHelper.stop();
closePrevotingAndElectionScheduler();
preVoteCollector.update(getPreVoteResponse(), getLocalNode());
leaderHeartbeatService.start(getLocalNode(), getCurrentTerm());

assert leaderChecker.leader() == null : leaderChecker.leader();
followersChecker.updateFastResponseState(getCurrentTerm(), mode);
Expand Down Expand Up @@ -896,6 +904,7 @@ void becomeFollower(String method, DiscoveryNode leaderNode) {
joinAccumulator.close(mode);
joinAccumulator = new JoinHelper.FollowerJoinAccumulator();
leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES);
leaderHeartbeatService.stop();
}

updateSingleNodeClusterChecker();
Expand Down Expand Up @@ -1663,6 +1672,10 @@ boolean cancelCommittedPublication() {
}
}

void beforeCommit(long term, long version) {
electionStrategy.beforeCommit(term, version);
}

class CoordinatorPublication extends Publication {

private final ClusterStatePublicationEvent clusterStatePublicationEvent;
Expand Down Expand Up @@ -1899,7 +1912,9 @@ protected boolean isPublishQuorum(VoteCollection votes) {
protected Optional<ApplyCommitRequest> handlePublishResponse(DiscoveryNode sourceNode, PublishResponse publishResponse) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
assert getCurrentTerm() >= publishResponse.getTerm();
return coordinationState.get().handlePublishResponse(sourceNode, publishResponse);
var applyCommit = coordinationState.get().handlePublishResponse(sourceNode, publishResponse);
applyCommit.ifPresent(applyCommitRequest -> beforeCommit(applyCommitRequest.getTerm(), applyCommitRequest.getVersion()));
return applyCommit;
}

@Override
Expand Down Expand Up @@ -1948,13 +1963,21 @@ protected void sendApplyCommit(
ActionListener<Empty> responseActionListener
) {
assert transportService.getThreadPool().getThreadContext().isSystemContext();
transportService.sendRequest(
destination,
COMMIT_STATE_ACTION_NAME,
applyCommit,
COMMIT_STATE_REQUEST_OPTIONS,
new ActionListenerResponseHandler<>(wrapWithMutex(responseActionListener), in -> Empty.INSTANCE, Names.CLUSTER_COORDINATION)
);
try {
transportService.sendRequest(
destination,
COMMIT_STATE_ACTION_NAME,
applyCommit,
COMMIT_STATE_REQUEST_OPTIONS,
new ActionListenerResponseHandler<>(
wrapWithMutex(responseActionListener),
in -> Empty.INSTANCE,
Names.CLUSTER_COORDINATION
)
);
} catch (Exception e) {
responseActionListener.onFailure(e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package org.elasticsearch.cluster.coordination;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.CoordinationMetadata.VotingConfiguration;
import org.elasticsearch.cluster.coordination.CoordinationState.VoteCollection;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -39,7 +40,7 @@ protected ElectionStrategy() {
/**
* Whether there is an election quorum from the point of view of the given local node under the provided voting configurations
*/
public final boolean isElectionQuorum(
public boolean isElectionQuorum(
DiscoveryNode localNode,
long localCurrentTerm,
long localAcceptedTerm,
Expand All @@ -61,6 +62,18 @@ && satisfiesAdditionalQuorumConstraints(
);
}

public boolean isPublishQuorum(
VoteCollection voteCollection,
VotingConfiguration lastCommittedConfiguration,
VotingConfiguration latestPublishedConfiguration
) {
return voteCollection.isQuorum(lastCommittedConfiguration) && voteCollection.isQuorum(latestPublishedConfiguration);
}

public boolean shouldJoinLeaderInTerm(long currentTerm, long targetTerm) {
return currentTerm < targetTerm;
}

/**
* The extension point to be overridden by plugins. Defines additional constraints on the election quorum.
* @param localNode the local node for the election quorum
Expand All @@ -81,4 +94,19 @@ protected abstract boolean satisfiesAdditionalQuorumConstraints(
VotingConfiguration lastAcceptedConfiguration,
VoteCollection joinVotes
);

public void onNewElection(DiscoveryNode sourceNode, long proposedTerm, ClusterState latestAcceptedState) {}

public boolean isInvalidReconfiguration(
ClusterState clusterState,
VotingConfiguration lastAcceptedConfiguration,
VotingConfiguration lastCommittedConfiguration
) {
return clusterState.getLastAcceptedConfiguration().equals(lastAcceptedConfiguration) == false
&& lastCommittedConfiguration.equals(lastAcceptedConfiguration) == false;
}

public void beforeCommit(long term, long version) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.Coordinator.Mode;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RerouteService;
Expand Down Expand Up @@ -87,12 +88,13 @@ public class JoinHelper {
RerouteService rerouteService,
NodeHealthService nodeHealthService,
JoinReasonService joinReasonService,
CircuitBreakerService circuitBreakerService
CircuitBreakerService circuitBreakerService,
Function<ClusterState, ClusterState> maybeReconfigureAfterMasterElection
) {
this.joinTaskQueue = masterService.createTaskQueue(
"node-join",
Priority.URGENT,
new NodeJoinExecutor(allocationService, rerouteService)
new NodeJoinExecutor(allocationService, rerouteService, maybeReconfigureAfterMasterElection)
);
this.clusterApplier = clusterApplier;
this.transportService = transportService;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.cluster.coordination;

import org.elasticsearch.cluster.node.DiscoveryNode;

public interface LeaderHeartbeatService {
LeaderHeartbeatService NO_OP = new LeaderHeartbeatService() {
@Override
public void start(DiscoveryNode currentLeader, long term) {

}

@Override
public void stop() {

}
};

void start(DiscoveryNode currentLeader, long term);

void stop();
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
Expand All @@ -42,10 +43,20 @@ public class NodeJoinExecutor implements ClusterStateTaskExecutor<JoinTask> {

private final AllocationService allocationService;
private final RerouteService rerouteService;
private final Function<ClusterState, ClusterState> maybeReconfigureAfterMasterElection;

public NodeJoinExecutor(AllocationService allocationService, RerouteService rerouteService) {
this(allocationService, rerouteService, Function.identity());
}

public NodeJoinExecutor(
AllocationService allocationService,
RerouteService rerouteService,
Function<ClusterState, ClusterState> maybeReconfigureAfterMasterElection
) {
this.allocationService = allocationService;
this.rerouteService = rerouteService;
this.maybeReconfigureAfterMasterElection = maybeReconfigureAfterMasterElection;
}

@Override
Expand Down Expand Up @@ -256,6 +267,7 @@ protected ClusterState.Builder becomeMasterAndTrimConflictingNodes(
logger.trace("becomeMasterAndTrimConflictingNodes: {}", tmpState.nodes());
allocationService.cleanCaches();
tmpState = PersistentTasksCustomMetadata.disassociateDeadNodes(tmpState);
tmpState = maybeReconfigureAfterMasterElection.apply(tmpState);
return ClusterState.builder(allocationService.disassociateDeadNodes(tmpState, false, "removed dead nodes on election"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.CoordinationMetadata.VotingConfiguration;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -139,6 +140,10 @@ public VotingConfiguration reconfigure(
}
}

public ClusterState maybeReconfigureAfterNewMasterIsElected(ClusterState clusterState) {
return clusterState;
}

record VotingConfigNode(String id, boolean live, boolean currentMaster, boolean inCurrentConfig)
implements
Comparable<VotingConfigNode> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.coordination.ElectionStrategy;
import org.elasticsearch.cluster.coordination.LeaderHeartbeatService;
import org.elasticsearch.cluster.coordination.Reconfigurator;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
Expand Down Expand Up @@ -194,7 +196,9 @@ public DiscoveryModule(
rerouteService,
electionStrategy,
nodeHealthService,
circuitBreakerService
circuitBreakerService,
new Reconfigurator(settings, clusterSettings),
LeaderHeartbeatService.NO_OP
);
} else {
throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
Expand Down

0 comments on commit 6bb796f

Please sign in to comment.