diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index c737b89b80f73..e2cad43308284 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -155,6 +155,7 @@ public void allocate(RoutingAllocation allocation) { final Balancer balancer = new Balancer(writeLoadForecaster, allocation, balancerSettings.getThreshold(), balancingWeights); balancer.allocateUnassigned(); balancer.moveShards(); + balancer.moveNonPreferred(); balancer.balance(); // Node weights are calculated after each internal balancing round and saved to the RoutingNodes copy. @@ -711,6 +712,95 @@ protected int comparePivot(int j) { return indices; } + /** + * Move started shards that are in non-preferred allocations + */ + public void moveNonPreferred() { + boolean movedAShard = false; + do { + for (Iterator problemIterator = allocation.deciders() + .findAllocationProblems(allocation); problemIterator.hasNext();) { + AllocationDeciders.AllocationProblem problem = problemIterator.next(); + if (tryResolve(problem)) { + movedAShard = true; + break; + } + logger.debug("Unable to resolve [{}]", problem); + } + // TODO: Update cluster info + } while (movedAShard); + } + + private boolean tryResolve(AllocationDeciders.AllocationProblem problem) { + for (Iterator shardIterator = problem.preferredShardMovements(); shardIterator.hasNext();) { + ShardRouting shardRouting = shardIterator.next(); + ProjectIndex index = projectIndex(shardRouting); + final MoveDecision moveDecision = decideMoveNonPreferred(index, shardRouting); + if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) { + final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); + final ModelNode targetNode = nodes.get(moveDecision.getTargetNode().getId()); + sourceNode.removeShard(index, shardRouting); + Tuple relocatingShards = routingNodes.relocateShard( + shardRouting, + targetNode.getNodeId(), + allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), + problem.relocateReason(), + allocation.changes() + ); + final ShardRouting shard = relocatingShards.v2(); + targetNode.addShard(projectIndex(shard), shard); + if (logger.isTraceEnabled()) { + logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode()); + } + return true; + } else if (moveDecision.isDecisionTaken() && moveDecision.canRemain() == false) { + logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id()); + } + } + return false; + } + + /** + * Makes a decision on whether to move a started shard to another node. The following rules apply + * to the {@link MoveDecision} return object: + * 1. If the shard is not started, no decision will be taken and {@link MoveDecision#isDecisionTaken()} will return false. + * 2. If the shard's current allocation is preferred ({@link Decision.Type#YES}), no attempt will be made to move the shard and + * {@link MoveDecision#getCanRemainDecision} will have a decision type of YES. All other fields in the object will be null. + * 3. If the shard is not allowed ({@link Decision.Type#NO}), or not preferred ({@link Decision.Type#NOT_PREFERRED}) to remain + * on its current node, then {@link MoveDecision#getAllocationDecision()} will be populated with the decision of moving to + * another node. If {@link MoveDecision#forceMove()} returns {@code true}, then {@link MoveDecision#getTargetNode} will return + * a non-null value representing a node that returned {@link Decision.Type#YES} from canAllocate, otherwise the assignedNodeId + * will be null. + * 4. If the method is invoked in explain mode (e.g. from the cluster allocation explain APIs), then + * {@link MoveDecision#getNodeDecisions} will have a non-null value. + */ + public MoveDecision decideMoveNonPreferred(final ProjectIndex index, final ShardRouting shardRouting) { + NodeSorter sorter = nodeSorters.sorterForShard(shardRouting); + index.assertMatch(shardRouting); + + if (shardRouting.started() == false) { + // we can only move started shards + return MoveDecision.NOT_TAKEN; + } + + final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); + assert sourceNode != null && sourceNode.containsShard(index, shardRouting); + RoutingNode routingNode = sourceNode.getRoutingNode(); + Decision canRemain = allocation.deciders().canRemain(shardRouting, routingNode, allocation); + if ((canRemain.type() == Type.NOT_PREFERRED || canRemain.type() == Type.NO) == false) { + return MoveDecision.remain(canRemain); + } + + sorter.reset(index); + /* + * the sorter holds the minimum weight node first for the shards index. + * We now walk through the nodes until we find a node to allocate the shard. + * This is not guaranteed to be balanced after this operation we still try best effort to + * allocate on the minimal eligible node. + */ + return decideMove(sorter, shardRouting, sourceNode, canRemain, this::decideCanAllocatePreferredOnly); + } + /** * Move started shards that can not be allocated to a node anymore * @@ -839,6 +929,15 @@ private MoveDecision decideMove( ); } + private Decision decideCanAllocatePreferredOnly(ShardRouting shardRouting, RoutingNode target) { + Decision decision = allocation.deciders().canAllocate(shardRouting, target, allocation); + // not-preferred means no here + if (decision.type() == Type.NOT_PREFERRED) { + return Decision.NO; + } + return decision; + } + private Decision decideCanAllocate(ShardRouting shardRouting, RoutingNode target) { // don't use canRebalance as we want hard filtering rules to apply. See #17698 return allocation.deciders().canAllocate(shardRouting, target, allocation); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java index 7fae18a332f0c..35ddd7874c265 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type; +import java.util.Collection; import java.util.Optional; import java.util.Set; @@ -153,4 +154,14 @@ public Decision canAllocateReplicaWhenThereIsRetentionLease(ShardRouting shardRo public Optional> getForcedInitialShardAllocationToNodes(ShardRouting shardRouting, RoutingAllocation allocation) { return Optional.empty(); } + + /** + * Get a list of allocation problems that can be fixed by moving some shards + * + * @param allocation the current routing allocation + * @return A list of node IDs that contain shards this decider would like to move elsewhere, in order of descending priority + */ + public Optional> getAllocationProblems(RoutingAllocation allocation) { + return Optional.empty(); + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java index 6a09c894dbc7d..650379a521e89 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RoutingChangesObserver; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; @@ -20,8 +21,11 @@ import org.elasticsearch.common.util.set.Sets; import java.util.Collection; +import java.util.Comparator; +import java.util.Iterator; import java.util.Optional; import java.util.Set; +import java.util.TreeSet; import java.util.function.BiFunction; import java.util.function.Function; @@ -244,4 +248,34 @@ public Optional> getForcedInitialShardAllocationToNodes(ShardRouting } return result; } + + public Iterator findAllocationProblems(RoutingAllocation routingAllocation) { + var problems = new TreeSet<>(Comparator.comparing(AllocationProblem::priority).reversed()); + for (AllocationDecider decider : deciders) { + decider.getAllocationProblems(routingAllocation).ifPresent(problems::addAll); + } + return problems.iterator(); + } + + public interface AllocationProblem { + + /** + * Shard movements to attempt to resolve the problem in descending priority order. + */ + Iterator preferredShardMovements(); + + /** + * The reason for the relocation + * + * @see RoutingChangesObserver#relocationStarted(ShardRouting, ShardRouting, String) + */ + String relocateReason(); + + /** + * We could prioritize them this way + */ + default int priority() { + return 1; + } + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java index e814f570a67bb..1cc254df16145 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java @@ -22,6 +22,11 @@ import org.elasticsearch.core.Strings; import org.elasticsearch.threadpool.ThreadPool; +import java.util.Collection; +import java.util.Iterator; +import java.util.Optional; +import java.util.stream.Collectors; + /** * Decides whether shards can be allocated to cluster nodes, or can remain on cluster nodes, based on the target node's current write thread * pool usage stats and any candidate shard's write load estimate. @@ -109,6 +114,47 @@ public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting return Decision.single(Decision.Type.YES, NAME, "canRemain() is not yet implemented"); } + @Override + public Optional> getAllocationProblems(RoutingAllocation allocation) { + if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled().notFullyEnabled()) { + return Optional.empty(); + } + + final var nodeUsageStatsForThreadPools = allocation.clusterInfo().getNodeUsageStatsForThreadPools(); + final Collection hotSpots = nodeUsageStatsForThreadPools.entrySet() + .stream() + .filter(entry -> entry.getValue().threadPoolUsageStatsMap().containsKey(ThreadPool.Names.WRITE)) + .filter(entry -> { + long maxQueueLatency = entry.getValue() + .threadPoolUsageStatsMap() + .get(ThreadPool.Names.WRITE) + .maxThreadPoolQueueLatencyMillis(); + return maxQueueLatency > writeLoadConstraintSettings.getQueueLatencyThreshold().millis(); + }) + .map(entry -> new HotSpot(entry.getKey())) + .collect(Collectors.toList()); + return hotSpots.isEmpty() == false ? Optional.of(hotSpots) : Optional.empty(); + } + + private record HotSpot(String nodeId) implements AllocationDeciders.AllocationProblem { + + @Override + public Iterator preferredShardMovements() { + // TODO: return shards in priority order + return null; + } + + @Override + public String relocateReason() { + return "hot-spotting"; + } + + @Override + public String toString() { + return "Hot-spotting on node " + nodeId; + } + } + /** * Calculates the change to the node's write thread pool utilization percentage if the shard is added to the node. * Returns the percent thread pool utilization change.