Skip to content

Commit

Permalink
Voting config exclusions should work with absent nodes (#50836)
Browse files Browse the repository at this point in the history
Today the voting config exclusions API accepts node filters and resolves them
to a collection of node IDs against the current cluster membership.

This is problematic since we may want to exclude nodes that are not currently
members of the cluster. For instance:

- if attempting to remove a flaky node from the cluster you cannot reliably
  exclude it from the voting configuration since it may not reliably be a
  member of the cluster

- if `cluster.auto_shrink_voting_configuration: false` then naively shrinking
  the cluster will remove some nodes but will leaving their node IDs in the
  voting configuration. The only way to clean up the voting configuration is to
  grow the cluster back to its original size (potentially replacing some of the
  voting configuration) and then use the exclusions API.

This commit adds an alternative API that accepts node names and node IDs but
not node filters in general, and deprecates the current node-filters-based API.

Relates #47990.
  • Loading branch information
zacharymorn committed Apr 16, 2020
1 parent e64322f commit 6b299d4
Show file tree
Hide file tree
Showing 15 changed files with 674 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,70 +18,142 @@
*/
package org.elasticsearch.action.admin.cluster.configuration;

import org.apache.logging.log4j.LogManager;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.CoordinationMetadata.VotingConfigExclusion;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.logging.DeprecationLogger;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/**
* A request to add voting config exclusions for certain master-eligible nodes, and wait for these nodes to be removed from the voting
* configuration.
*/
public class AddVotingConfigExclusionsRequest extends MasterNodeRequest<AddVotingConfigExclusionsRequest> {
public static final String DEPRECATION_MESSAGE = "nodeDescription is deprecated and will be removed, use nodeIds or nodeNames instead";
private static final DeprecationLogger deprecationLogger = new DeprecationLogger(
LogManager.getLogger(AddVotingConfigExclusionsRequest.class));
private final String[] nodeDescriptions;
private final String[] nodeIds;
private final String[] nodeNames;
private final TimeValue timeout;

/**
* Construct a request to add voting config exclusions for master-eligible nodes matching the given descriptions, and wait for a
* Construct a request to add voting config exclusions for master-eligible nodes matching the given node names, and wait for a
* default 30 seconds for these exclusions to take effect, removing the nodes from the voting configuration.
* @param nodeDescriptions Descriptions of the nodes to add - see {@link DiscoveryNodes#resolveNodes(String...)}
* @param nodeNames Names of the nodes to add - see {@link AddVotingConfigExclusionsRequest#resolveVotingConfigExclusions(ClusterState)}
*/
public AddVotingConfigExclusionsRequest(String[] nodeDescriptions) {
this(nodeDescriptions, TimeValue.timeValueSeconds(30));
public AddVotingConfigExclusionsRequest(String... nodeNames) {
this(Strings.EMPTY_ARRAY, Strings.EMPTY_ARRAY, nodeNames, TimeValue.timeValueSeconds(30));
}

/**
* Construct a request to add voting config exclusions for master-eligible nodes matching the given descriptions, and wait for these
* nodes to be removed from the voting configuration.
* @param nodeDescriptions Descriptions of the nodes whose exclusions to add - see {@link DiscoveryNodes#resolveNodes(String...)}.
* @param nodeIds Ids of the nodes whose exclusions to add - see
* {@link AddVotingConfigExclusionsRequest#resolveVotingConfigExclusions(ClusterState)}.
* @param nodeNames Names of the nodes whose exclusions to add - see
* {@link AddVotingConfigExclusionsRequest#resolveVotingConfigExclusions(ClusterState)}.
* @param timeout How long to wait for the added exclusions to take effect and be removed from the voting configuration.
*/
public AddVotingConfigExclusionsRequest(String[] nodeDescriptions, TimeValue timeout) {
public AddVotingConfigExclusionsRequest(String[] nodeDescriptions, String[] nodeIds, String[] nodeNames, TimeValue timeout) {
if (timeout.compareTo(TimeValue.ZERO) < 0) {
throw new IllegalArgumentException("timeout [" + timeout + "] must be non-negative");
}

if (noneOrMoreThanOneIsSet(nodeDescriptions, nodeIds, nodeNames)) {
throw new IllegalArgumentException("Please set node identifiers correctly. " +
"One and only one of [node_name], [node_names] and [node_ids] has to be set");
}

if (nodeDescriptions.length > 0) {
deprecationLogger.deprecatedAndMaybeLog("voting_config_exclusion", DEPRECATION_MESSAGE);
}

this.nodeDescriptions = nodeDescriptions;
this.nodeIds = nodeIds;
this.nodeNames = nodeNames;
this.timeout = timeout;
}

public AddVotingConfigExclusionsRequest(StreamInput in) throws IOException {
super(in);
nodeDescriptions = in.readStringArray();
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
nodeIds = in.readStringArray();
nodeNames = in.readStringArray();
} else {
nodeIds = Strings.EMPTY_ARRAY;
nodeNames = Strings.EMPTY_ARRAY;
}
timeout = in.readTimeValue();

if (nodeDescriptions.length > 0) {
deprecationLogger.deprecatedAndMaybeLog("voting_config_exclusion",
"nodeDescription is deprecated and will be removed, use nodeIds or nodeNames instead");
}

}

Set<VotingConfigExclusion> resolveVotingConfigExclusions(ClusterState currentState) {
final DiscoveryNodes allNodes = currentState.nodes();
final Set<VotingConfigExclusion> resolvedNodes = Arrays.stream(allNodes.resolveNodes(nodeDescriptions))
.map(allNodes::get).filter(DiscoveryNode::isMasterNode).map(VotingConfigExclusion::new).collect(Collectors.toSet());

if (resolvedNodes.isEmpty()) {
throw new IllegalArgumentException("add voting config exclusions request for " + Arrays.asList(nodeDescriptions)
+ " matched no master-eligible nodes");
Set<VotingConfigExclusion> newVotingConfigExclusions = new HashSet<>();

if (nodeDescriptions.length >= 1) {
newVotingConfigExclusions = Arrays.stream(allNodes.resolveNodes(nodeDescriptions)).map(allNodes::get)
.filter(DiscoveryNode::isMasterNode).map(VotingConfigExclusion::new).collect(Collectors.toSet());

if (newVotingConfigExclusions.isEmpty()) {
throw new IllegalArgumentException("add voting config exclusions request for " + Arrays.asList(nodeDescriptions)
+ " matched no master-eligible nodes");
}
} else if (nodeIds.length >= 1) {
for (String nodeId : nodeIds) {
if (allNodes.nodeExists(nodeId)) {
DiscoveryNode discoveryNode = allNodes.get(nodeId);
if (discoveryNode.isMasterNode()) {
newVotingConfigExclusions.add(new VotingConfigExclusion(discoveryNode));
}
} else {
newVotingConfigExclusions.add(new VotingConfigExclusion(nodeId, VotingConfigExclusion.MISSING_VALUE_MARKER));
}
}
} else {
assert nodeNames.length >= 1;
Map<String, DiscoveryNode> existingNodes = StreamSupport.stream(allNodes.spliterator(), false)
.collect(Collectors.toMap(DiscoveryNode::getName, Function.identity()));

for (String nodeName : nodeNames) {
if (existingNodes.containsKey(nodeName)){
DiscoveryNode discoveryNode = existingNodes.get(nodeName);
if (discoveryNode.isMasterNode()) {
newVotingConfigExclusions.add(new VotingConfigExclusion(discoveryNode));
}
} else {
newVotingConfigExclusions.add(new VotingConfigExclusion(VotingConfigExclusion.MISSING_VALUE_MARKER, nodeName));
}
}
}

resolvedNodes.removeIf(n -> currentState.getVotingConfigExclusions().contains(n));
return resolvedNodes;
newVotingConfigExclusions.removeIf(n -> currentState.getVotingConfigExclusions().contains(n));
return newVotingConfigExclusions;
}

Set<VotingConfigExclusion> resolveVotingConfigExclusionsAndCheckMaximum(ClusterState currentState, int maxExclusionsCount,
Expand All @@ -99,13 +171,37 @@ Set<VotingConfigExclusion> resolveVotingConfigExclusionsAndCheckMaximum(ClusterS
return resolvedExclusions;
}

private boolean noneOrMoreThanOneIsSet(String[] deprecatedNodeDescription, String[] nodeIds, String[] nodeNames) {
if (deprecatedNodeDescription.length > 0) {
return nodeIds.length > 0 || nodeNames.length > 0;
} else if (nodeIds.length > 0) {
return nodeNames.length > 0;
} else {
return nodeNames.length > 0 == false;
}
}

/**
* @return descriptions of the nodes for whom to add voting config exclusions.
*/
public String[] getNodeDescriptions() {
return nodeDescriptions;
}

/**
* @return ids of the nodes for whom to add voting config exclusions.
*/
public String[] getNodeIds() {
return nodeIds;
}

/**
* @return names of the nodes for whom to add voting config exclusions.
*/
public String[] getNodeNames() {
return nodeNames;
}

/**
* @return how long to wait after adding the exclusions for the nodes to be removed from the voting configuration.
*/
Expand All @@ -122,14 +218,20 @@ public ActionRequestValidationException validate() {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(nodeDescriptions);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeStringArray(nodeIds);
out.writeStringArray(nodeNames);
}
out.writeTimeValue(timeout);
}

@Override
public String toString() {
return "AddVotingConfigExclusionsRequest{" +
"nodeDescriptions=" + Arrays.asList(nodeDescriptions) +
", timeout=" + timeout +
"nodeDescriptions=" + Arrays.asList(nodeDescriptions) + ", " +
"nodeIds=" + Arrays.asList(nodeIds) + ", " +
"nodeNames=" + Arrays.asList(nodeNames) + ", " +
"timeout=" + timeout +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ public CoordinationMetadata build() {
}

public static class VotingConfigExclusion implements Writeable, ToXContentFragment {
public static final String MISSING_VALUE_MARKER = "_absent_";
private final String nodeId;
private final String nodeName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,7 @@ assert localNodeMayWinElection(getLastAcceptedState()) :
// Package-private for testing
ClusterState improveConfiguration(ClusterState clusterState) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
assert validVotingConfigExclusionState(clusterState) : clusterState;

// exclude any nodes whose ID is in the voting config exclusions list ...
final Stream<String> excludedNodeIds = clusterState.getVotingConfigExclusions().stream().map(VotingConfigExclusion::getNodeId);
Expand Down Expand Up @@ -895,6 +896,31 @@ ClusterState improveConfiguration(ClusterState clusterState) {
return clusterState;
}

/*
* Valid Voting Configuration Exclusion state criteria:
* 1. Every voting config exclusion with an ID of _absent_ should not match any nodes currently in the cluster by name
* 2. Every voting config exclusion with a name of _absent_ should not match any nodes currently in the cluster by ID
*/
static boolean validVotingConfigExclusionState(ClusterState clusterState) {
Set<VotingConfigExclusion> votingConfigExclusions = clusterState.getVotingConfigExclusions();
Set<String> nodeNamesWithAbsentId = votingConfigExclusions.stream()
.filter(e -> e.getNodeId().equals(VotingConfigExclusion.MISSING_VALUE_MARKER))
.map(VotingConfigExclusion::getNodeName)
.collect(Collectors.toSet());
Set<String> nodeIdsWithAbsentName = votingConfigExclusions.stream()
.filter(e -> e.getNodeName().equals(VotingConfigExclusion.MISSING_VALUE_MARKER))
.map(VotingConfigExclusion::getNodeId)
.collect(Collectors.toSet());
for (DiscoveryNode node : clusterState.getNodes()) {
if (node.isMasterNode() &&
(nodeIdsWithAbsentName.contains(node.getId()) || nodeNamesWithAbsentId.contains(node.getName()))) {
return false;
}
}

return true;
}

private AtomicBoolean reconfigurationTaskScheduled = new AtomicBoolean();

private void scheduleReconfigurationIfNeeded() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;

Expand Down Expand Up @@ -124,6 +128,7 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
// we only enforce major version transitions on a fully formed clusters
final boolean enforceMajorVersion = currentState.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false;
// processing any joins
Map<String, String> joiniedNodeNameIds = new HashMap<>();
for (final Task joinTask : joiningNodes) {
if (joinTask.isBecomeMasterTask() || joinTask.isFinishElectionTask()) {
// noop
Expand All @@ -143,18 +148,47 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
nodesChanged = true;
minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion());
maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion());
if (node.isMasterNode()) {
joiniedNodeNameIds.put(node.getName(), node.getId());
}
} catch (IllegalArgumentException | IllegalStateException e) {
results.failure(joinTask, e);
continue;
}
}
results.success(joinTask);
}

if (nodesChanged) {
rerouteService.reroute("post-join reroute", Priority.HIGH, ActionListener.wrap(
r -> logger.trace("post-join reroute completed"),
e -> logger.debug("post-join reroute failed", e)));

if (joiniedNodeNameIds.isEmpty() == false) {
Set<CoordinationMetadata.VotingConfigExclusion> currentVotingConfigExclusions = currentState.getVotingConfigExclusions();
Set<CoordinationMetadata.VotingConfigExclusion> newVotingConfigExclusions = currentVotingConfigExclusions.stream()
.map(e -> {
// Update nodeId in VotingConfigExclusion when a new node with excluded node name joins
if (CoordinationMetadata.VotingConfigExclusion.MISSING_VALUE_MARKER.equals(e.getNodeId()) &&
joiniedNodeNameIds.containsKey(e.getNodeName())) {
return new CoordinationMetadata.VotingConfigExclusion(joiniedNodeNameIds.get(e.getNodeName()), e.getNodeName());
} else {
return e;
}
}).collect(Collectors.toSet());

// if VotingConfigExclusions did get updated
if (newVotingConfigExclusions.equals(currentVotingConfigExclusions) == false) {
CoordinationMetadata.Builder coordMetadataBuilder = CoordinationMetadata.builder(currentState.coordinationMetadata())
.clearVotingConfigExclusions();
newVotingConfigExclusions.forEach(coordMetadataBuilder::addVotingConfigExclusion);
Metadata newMetadata = Metadata.builder(currentState.metadata())
.coordinationMetadata(coordMetadataBuilder.build()).build();
return results.build(allocationService.adaptAutoExpandReplicas(newState.nodes(nodesBuilder)
.metadata(newMetadata).build()));
}
}

return results.build(allocationService.adaptAutoExpandReplicas(newState.nodes(nodesBuilder).build()));
} else {
// we must return a new cluster state instance to force publishing. This is important
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,21 @@
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.List;

import static org.elasticsearch.rest.RestRequest.Method.POST;

public class RestAddVotingConfigExclusionAction extends BaseRestHandler {

private static final TimeValue DEFAULT_TIMEOUT = TimeValue.timeValueSeconds(30L);
private static final Logger logger = LogManager.getLogger(RestAddVotingConfigExclusionAction.class);

private static final String DEPRECATION_MESSAGE = "POST /_cluster/voting_config_exclusions/{node_name} " +
"will be removed in a future version. " +
"Please use POST /_cluster/voting_config_exclusions?node_ids=... " +
"or POST /_cluster/voting_config_exclusions?node_names=... instead.";

@Override
public String getName() {
Expand All @@ -44,7 +51,8 @@ public String getName() {

@Override
public List<Route> routes() {
return List.of(new Route(POST, "/_cluster/voting_config_exclusions/{node_name}"));
return List.of(new DeprecatedRoute(POST, "/_cluster/voting_config_exclusions/{node_name}", DEPRECATION_MESSAGE),
new Route(POST, "/_cluster/voting_config_exclusions"));
}

@Override
Expand All @@ -58,10 +66,29 @@ protected RestChannelConsumer prepareRequest(final RestRequest request, final No
}

AddVotingConfigExclusionsRequest resolveVotingConfigExclusionsRequest(final RestRequest request) {
String nodeName = request.param("node_name");
String deprecatedNodeDescription = null;
String nodeIds = null;
String nodeNames = null;

if (request.hasParam("node_name")) {
deprecatedNodeDescription = request.param("node_name");
}

if (request.hasParam("node_ids")){
nodeIds = request.param("node_ids");
}

if (request.hasParam("node_names")){
nodeNames = request.param("node_names");
}

return new AddVotingConfigExclusionsRequest(
Strings.splitStringByCommaToArray(nodeName),
Strings.splitStringByCommaToArray(deprecatedNodeDescription),
Strings.splitStringByCommaToArray(nodeIds),
Strings.splitStringByCommaToArray(nodeNames),
TimeValue.parseTimeValue(request.param("timeout"), DEFAULT_TIMEOUT, getClass().getSimpleName() + ".timeout")
);
}


}

0 comments on commit 6b299d4

Please sign in to comment.