Skip to content

Commit

Permalink
Fix autoexpand during node replace (#98891)
Browse files Browse the repository at this point in the history
Prior to this change NodeReplacementAllocationDecider was unconditionally skipping both replacement source and target nodes when calculation auto-expand replicas. This is fixed by autoexpanding to the replacement node if source node already had shards of the index

Backport of PR #96281 amended for 7.17.x

Closes #89527

Co-authored-by: Ievgen Degtiarenko <ievgen.degtiarenko@elastic.co>
  • Loading branch information
kingherc and idegtiarenko committed Aug 28, 2023
1 parent 0df52c8 commit f7af19b
Show file tree
Hide file tree
Showing 6 changed files with 537 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,20 @@ public DiscoveryNode findByAddress(TransportAddress address) {
return null;
}

/**
* Check if a node with provided name exists
*
* @return {@code true} node identified with provided name exists or {@code false} otherwise
*/
public boolean hasByName(String name) {
for (DiscoveryNode node : nodes.values()) {
if (node.getName().equals(name)) {
return true;
}
}
return false;
}

/**
* Returns the version of the node with the oldest version in the cluster that is not a client node
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
import java.util.Map;
import java.util.Optional;

/**
* An allocation decider that ensures that all the shards allocated to the node scheduled for removal are relocated to the replacement node.
* It also ensures that auto-expands replicas are expanded to only the replacement source or target (not both at the same time)
* and only of the shards that were already present on the source node.
*/
public class NodeReplacementAllocationDecider extends AllocationDecider {

public static final String NAME = "node_replacement";
Expand All @@ -37,8 +42,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
Decision.Type.YES,
NAME,
"node [%s] is replacing node [%s], and may receive shards from it",
shardRouting.currentNodeId(),
node.nodeId()
node.nodeId(),
shardRouting.currentNodeId()
);
} else if (isReplacementSource(allocation, shardRouting.currentNodeId())) {
return Decision.single(
Expand Down Expand Up @@ -96,22 +101,54 @@ public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNod
return NO_REPLACEMENTS;
} else if (isReplacementTargetName(allocation, node.getName())) {
final SingleNodeShutdownMetadata shutdown = allocation.replacementTargetShutdowns().get(node.getName());
return Decision.single(
Decision.Type.NO,
NAME,
"node [%s] is a node replacement target for node [%s], "
+ "shards cannot auto expand to be on it until the replacement is complete",
node.getId(),
shutdown == null ? null : shutdown.getNodeId()
);
final String sourceNodeId = shutdown != null ? shutdown.getNodeId() : null;
final boolean hasShardsAllocatedOnSourceOrTarget = hasShardOnNode(indexMetadata, node.getId(), allocation)
|| (sourceNodeId != null && hasShardOnNode(indexMetadata, sourceNodeId, allocation));

if (hasShardsAllocatedOnSourceOrTarget) {
return allocation.decision(
Decision.YES,
NAME,
"node [%s] is a node replacement target for node [%s], "
+ "shard can auto expand to it as it was already present on the source node",
node.getId(),
sourceNodeId
);
} else {
return allocation.decision(
Decision.NO,
NAME,
"node [%s] is a node replacement target for node [%s], "
+ "shards cannot auto expand to be on it until the replacement is complete",
node.getId(),
sourceNodeId
);
}
} else if (isReplacementSource(allocation, node.getId())) {
return Decision.single(
Decision.Type.NO,
NAME,
"node [%s] is being replaced by [%s], shards cannot auto expand to be on it",
node.getId(),
getReplacementName(allocation, node.getId())
);
final SingleNodeShutdownMetadata shutdown = allocation.metadata().nodeShutdowns().get(node.getId());
final String replacementNodeName = shutdown != null ? shutdown.getTargetNodeName() : null;
final boolean hasShardOnSource = hasShardOnNode(indexMetadata, node.getId(), allocation)
&& shutdown != null
&& allocation.nodes().hasByName(replacementNodeName) == false;

if (hasShardOnSource) {
return allocation.decision(
Decision.YES,
NAME,
"node [%s] is being replaced by [%s], shards can auto expand to be on it "
+ "while replacement node has not joined the cluster",
node.getId(),
replacementNodeName
);
} else {
return allocation.decision(
Decision.NO,
NAME,
"node [%s] is being replaced by [%s], shards cannot auto expand to be on it",
node.getId(),
replacementNodeName
);
}
} else {
return Decision.single(
Decision.Type.YES,
Expand All @@ -121,6 +158,11 @@ public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNod
}
}

private static boolean hasShardOnNode(IndexMetadata indexMetadata, String nodeId, RoutingAllocation allocation) {
RoutingNode node = allocation.routingNodes().node(nodeId);
return node != null && node.numberOfOwningShardsForIndex(indexMetadata.getIndex()) >= 1;
}

@Override
public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (replacementFromSourceToTarget(allocation, shardRouting.currentNodeId(), node.node().getName())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNod
node.getId()
);
case REPLACE:
if (allocation.nodes().hasByName(thisNodeShutdownMetadata.getTargetNodeName()) == false) {
return allocation.decision(
Decision.YES,
NAME,
"node [%s] is preparing to be removed from the cluster, but replacement is not yet present",
node.getId()
);
} else {
return allocation.decision(Decision.NO, NAME, "node [%s] is preparing for removal from the cluster", node.getId());
}
case REMOVE:
return allocation.decision(Decision.NO, NAME, "node [%s] is preparing for removal from the cluster", node.getId());
default:
Expand Down

0 comments on commit f7af19b

Please sign in to comment.