Skip to content

Commit

Permalink
Consider node shutdown in DataTierAllocationDecider (#98824)
Browse files Browse the repository at this point in the history
Prior to this commit, while DataTierAllocationDecider accounted for
Desired Nodes (if available), it did not account for Node Shutdown
information.

As of this commit, nodes that are marked as shutting down for
removal from the cluster are ignored for the purposes of computing
the available data tiers. Desired Nodes configuration, if present,
takes precedence over node shutdown, as desired nodes should be
preferred if possible.
  • Loading branch information
gwbrown committed Aug 31, 2023
1 parent 1ca66bd commit f669a1f
Show file tree
Hide file tree
Showing 8 changed files with 533 additions and 65 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/98824.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 98824
summary: Consider node shutdown in `DataTierAllocationDecider`
area: "Allocation"
type: bug
issues:
- 97207
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,16 @@ public static Type parse(String type) {
default -> throw new IllegalArgumentException("unknown shutdown type: " + type);
};
}

/**
* @return True if this shutdown type indicates that the node will be permanently removed from the cluster, false otherwise.
*/
public boolean isRemovalType() {
return switch (this) {
case REMOVE, SIGTERM, REPLACE -> true;
case RESTART -> false;
};
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -38,6 +39,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.cluster.routing.allocation.DataTier.ALL_DATA_TIERS;

/**
* This class holds all {@link DiscoveryNode} in the cluster and provides convenience methods to
* access, modify merge / diff discovery nodes.
Expand Down Expand Up @@ -66,7 +69,7 @@ public class DiscoveryNodes implements Iterable<DiscoveryNode>, SimpleDiffable<D
private final IndexVersion maxDataNodeCompatibleIndexVersion;
private final IndexVersion minSupportedIndexVersion;

private final Set<String> availableRoles;
private final Map<String, Set<String>> tiersToNodeIds;

private DiscoveryNodes(
long nodeLeftGeneration,
Expand All @@ -81,7 +84,7 @@ private DiscoveryNodes(
Version minNodeVersion,
IndexVersion maxDataNodeCompatibleIndexVersion,
IndexVersion minSupportedIndexVersion,
Set<String> availableRoles
Map<String, Set<String>> tiersToNodeIds
) {
this.nodeLeftGeneration = nodeLeftGeneration;
this.nodes = nodes;
Expand All @@ -99,7 +102,7 @@ private DiscoveryNodes(
this.maxDataNodeCompatibleIndexVersion = maxDataNodeCompatibleIndexVersion;
this.minSupportedIndexVersion = minSupportedIndexVersion;
assert (localNodeId == null) == (localNode == null);
this.availableRoles = availableRoles;
this.tiersToNodeIds = tiersToNodeIds;
}

public DiscoveryNodes withMasterNodeId(@Nullable String masterNodeId) {
Expand All @@ -117,7 +120,7 @@ public DiscoveryNodes withMasterNodeId(@Nullable String masterNodeId) {
minNodeVersion,
maxDataNodeCompatibleIndexVersion,
minSupportedIndexVersion,
availableRoles
tiersToNodeIds
);
}

Expand Down Expand Up @@ -150,13 +153,12 @@ public boolean isLocalNodeElectedMaster() {
}

/**
* Checks if any node has the role with the given {@code roleName}.
* Gets a {@link Map} of node roles to node IDs which have those roles.
*
* @param roleName name to check
* @return true if any node has the role of the given name
* @return {@link Map} of node roles to node IDs which have those roles.
*/
public boolean isRoleAvailable(String roleName) {
return availableRoles.contains(roleName);
public Map<String, Set<String>> getTiersToNodeIds() {
return tiersToNodeIds;
}

/**
Expand Down Expand Up @@ -876,14 +878,28 @@ public DiscoveryNodes build() {
Objects.requireNonNullElse(minNodeVersion, Version.CURRENT.minimumCompatibilityVersion()),
Objects.requireNonNullElse(maxDataNodeCompatibleIndexVersion, IndexVersion.current()),
Objects.requireNonNullElse(minSupportedIndexVersion, IndexVersion.MINIMUM_COMPATIBLE),
dataNodes.values()
.stream()
.flatMap(n -> n.getRoles().stream())
.map(DiscoveryNodeRole::roleName)
.collect(Collectors.toUnmodifiableSet())
computeTiersToNodesMap(dataNodes)
);
}

private static Map<String, Set<String>> computeTiersToNodesMap(final Map<String, DiscoveryNode> dataNodes) {
Map<String, Set<String>> tiersToNodes = new HashMap<>(ALL_DATA_TIERS.size() + 1);
for (var node : dataNodes.values()) {
if (node.hasRole(DiscoveryNodeRole.DATA_ROLE.roleName())) {
tiersToNodes.computeIfAbsent(DiscoveryNodeRole.DATA_ROLE.roleName(), (key) -> new HashSet<>()).add(node.getId());
}
for (var role : ALL_DATA_TIERS) {
if (node.hasRole(role)) {
tiersToNodes.computeIfAbsent(role, (key) -> new HashSet<>()).add(node.getId());
}
}
}
for (var entry : tiersToNodes.entrySet()) {
entry.setValue(Collections.unmodifiableSet(entry.getValue()));
}
return Collections.unmodifiableMap(tiersToNodes);
}

public boolean isLocalNodeElectedMaster() {
return masterNodeId != null && masterNodeId.equals(localNodeId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.cluster.metadata.DesiredNodes;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeFilters;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
Expand Down Expand Up @@ -581,7 +582,12 @@ private IndexMetadata indexMetadata(ShardRouting shard, RoutingAllocation alloca
return allocation.metadata().getIndexSafe(shard.index());
}

private Optional<String> highestPreferenceTier(List<String> preferredTiers, DiscoveryNodes unused, DesiredNodes desiredNodes) {
private Optional<String> highestPreferenceTier(
List<String> preferredTiers,
DiscoveryNodes unused,
DesiredNodes desiredNodes,
NodesShutdownMetadata shutdownMetadata
) {
return Optional.of(highestPreferenceTier(preferredTiers));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.cluster.metadata.DesiredNode;
import org.elasticsearch.cluster.metadata.DesiredNodes;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand All @@ -22,6 +23,8 @@
import org.elasticsearch.common.Strings;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -64,7 +67,12 @@ private static Decision shouldFilter(IndexMetadata indexMd, DiscoveryNode node,
}

public interface PreferredTierFunction {
Optional<String> apply(List<String> tierPreference, DiscoveryNodes nodes, DesiredNodes desiredNodes);
Optional<String> apply(
List<String> tierPreference,
DiscoveryNodes nodes,
DesiredNodes desiredNodes,
NodesShutdownMetadata shutdownMetadata
);
}

private static final Decision YES_PASSES = Decision.single(Decision.YES.type(), NAME, "node passes tier preference filters");
Expand All @@ -79,7 +87,12 @@ public static Decision shouldFilter(
if (tierPreference.isEmpty()) {
return YES_PASSES;
}
Optional<String> tier = preferredTierFunction.apply(tierPreference, allocation.nodes(), allocation.desiredNodes());
Optional<String> tier = preferredTierFunction.apply(
tierPreference,
allocation.nodes(),
allocation.desiredNodes(),
allocation.getClusterState().metadata().nodeShutdowns()
);
if (tier.isPresent()) {
String tierName = tier.get();
if (allocationAllowed(tierName, node)) {
Expand Down Expand Up @@ -136,14 +149,20 @@ private static Decision debugYesAllowed(RoutingAllocation allocation, List<Strin
* in order to know if there are planned topology changes in the cluster
* that can remove a tier that's part of the cluster now.
*/
public static Optional<String> preferredAvailableTier(List<String> prioritizedTiers, DiscoveryNodes nodes, DesiredNodes desiredNodes) {
public static Optional<String> preferredAvailableTier(
List<String> prioritizedTiers,
DiscoveryNodes nodes,
DesiredNodes desiredNodes,
NodesShutdownMetadata shutdownMetadata
) {

final var desiredNodesPreferredTier = getPreferredTierFromDesiredNodes(prioritizedTiers, nodes, desiredNodes);

if (desiredNodesPreferredTier.isPresent()) {
return desiredNodesPreferredTier;
}

return getPreferredAvailableTierFromClusterMembers(prioritizedTiers, nodes);
return getPreferredAvailableTierFromClusterMembers(prioritizedTiers, nodes, removingNodeIds(shutdownMetadata));
}

/**
Expand Down Expand Up @@ -199,9 +218,13 @@ private static boolean isDesiredNodeWithinTierJoining(String tier, DiscoveryNode
return tierNodesPresent(tier, desiredNodes.pending()) && tierNodesPresent(tier, discoveryNodes);
}

private static Optional<String> getPreferredAvailableTierFromClusterMembers(List<String> prioritizedTiers, DiscoveryNodes nodes) {
private static Optional<String> getPreferredAvailableTierFromClusterMembers(
List<String> prioritizedTiers,
DiscoveryNodes nodes,
Set<String> removingNodeIds
) {
for (String tier : prioritizedTiers) {
if (tierNodesPresent(tier, nodes)) {
if (tierNodesPresentConsideringRemovals(tier, nodes, removingNodeIds)) {
return Optional.of(tier);
}
}
Expand All @@ -219,10 +242,40 @@ static boolean tierNodesPresent(String singleTier, Collection<DesiredNode> nodes
return false;
}

// This overload for Desired Nodes codepaths, which do not consider Node Shutdown, as Desired Nodes takes precedence
static boolean tierNodesPresent(String singleTier, DiscoveryNodes nodes) {
return tierNodesPresentConsideringRemovals(singleTier, nodes, Collections.emptySet());
}

static boolean tierNodesPresentConsideringRemovals(String singleTier, DiscoveryNodes nodes, Set<String> removingNodeIds) {
assert singleTier.equals(DiscoveryNodeRole.DATA_ROLE.roleName()) || DataTier.validTierName(singleTier)
: "tier " + singleTier + " is an invalid tier name";
return nodes.isRoleAvailable(DiscoveryNodeRole.DATA_ROLE.roleName()) || nodes.isRoleAvailable(singleTier);
var rolesToNodes = nodes.getTiersToNodeIds();
Set<String> nodesWithTier = rolesToNodes.getOrDefault(singleTier, Collections.emptySet());
Set<String> dataNodes = rolesToNodes.getOrDefault(DiscoveryNodeRole.DATA_ROLE.roleName(), Collections.emptySet());

if (removingNodeIds.isEmpty()) {
return nodesWithTier.isEmpty() == false || dataNodes.isEmpty() == false;
} else if (removingNodeIds.size() < nodesWithTier.size() || removingNodeIds.size() < dataNodes.size()) {
// There are more nodes in the tier (or more generic data nodes) than there are nodes that are being removed, so
// there's at least one node that can hold data for the preferred tier that isn't being removed
return true;
}

// A tier might be unavailable because all remaining nodes in the tier are being removed, so now we have to check if there are any
// nodes with appropriate roles that aren't being removed.
for (String nodeId : dataNodes) {
if (removingNodeIds.contains(nodeId) == false) {
return true;
}
}
for (String nodeId : nodesWithTier) {
if (removingNodeIds.contains(nodeId) == false) {
return true;
}
}
// All the nodes with roles appropriate for this tier are being removed, so this tier is not available.
return false;
}

public static boolean allocationAllowed(String tierName, DiscoveryNode node) {
Expand All @@ -244,4 +297,18 @@ public static boolean allocationAllowed(String tierName, Set<DiscoveryNodeRole>
}
return false;
}

private static Set<String> removingNodeIds(NodesShutdownMetadata shutdownMetadata) {
if (shutdownMetadata.getAll().isEmpty()) {
return Collections.emptySet();
}

Set<String> removingNodes = new HashSet<>();
for (var shutdownEntry : shutdownMetadata.getAll().values()) {
if (shutdownEntry.getType().isRemovalType()) {
removingNodes.add(shutdownEntry.getNodeId());
}
}
return Collections.unmodifiableSet(removingNodes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public Result isConditionMet(Index index, ClusterState clusterState) {
Optional<String> availableDestinationTier = DataTierAllocationDecider.preferredAvailableTier(
preferredTierConfiguration,
clusterState.getNodes(),
DesiredNodes.latestFromClusterState(clusterState)
DesiredNodes.latestFromClusterState(clusterState),
clusterState.metadata().nodeShutdowns()
);

if (ActiveShardCount.ALL.enoughShardsActive(clusterState, index.getName()) == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public Result isConditionMet(Index index, ClusterState clusterState) {
boolean present = DataTierAllocationDecider.preferredAvailableTier(
DataTier.parseTierList(tierPreference),
clusterState.nodes(),
DesiredNodes.latestFromClusterState(clusterState)
DesiredNodes.latestFromClusterState(clusterState),
clusterState.metadata().nodeShutdowns()
).isPresent();
SingleMessageFieldInfo info = present ? null : new SingleMessageFieldInfo("no nodes for tiers [" + tierPreference + "] available");
return new Result(present, info);
Expand Down

0 comments on commit f669a1f

Please sign in to comment.