From 88fac67200cff3f69a2ff339f9a2a4e7af9200d8 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Tue, 2 Sep 2025 09:48:36 -0700 Subject: [PATCH 01/18] wip --- .../cluster/routing/allocation/MoveDecision.java | 3 +++ .../allocator/BalancedShardsAllocator.java | 16 +++++++++++++--- .../allocator/DesiredBalanceReconciler.java | 2 +- .../decider/WriteLoadConstraintDecider.java | 4 ++-- 4 files changed, 19 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/MoveDecision.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/MoveDecision.java index 17c28ffe78b47..36fc9aeb8bb27 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/MoveDecision.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/MoveDecision.java @@ -179,6 +179,8 @@ public Decision getCanRemainDecision() { * the result of this method is meaningless, as no rebalance decision was taken. If {@link #isDecisionTaken()} * returns {@code false}, then invoking this method will throw an {@code IllegalStateException}. */ + // TODO NOMERGE NOTE: Can I leverage this to block rebalancing a hot-spot back to the hot node? Nope, testing only, darn thing. + // @VisibleForTesting public boolean canRebalanceCluster() { checkDecisionState(); return clusterRebalanceDecision != null && clusterRebalanceDecision.type() == Type.YES; @@ -192,6 +194,7 @@ public boolean canRebalanceCluster() { * If {@link #isDecisionTaken()} returns {@code false}, then invoking this method will throw an * {@code IllegalStateException}. */ + // @VisibleForTesting @Nullable public Decision getClusterRebalanceDecision() { checkDecisionState(); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index c737b89b80f73..3c5dc619fbf0b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -196,7 +196,7 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f if (shard.unassigned()) { allocateUnassignedDecision = balancer.decideAllocateUnassigned(index, shard); } else { - moveDecision = balancer.decideMove(index, shard); + moveDecision = balancer.decideMove(index, shard); //// this is the allocation explain path? Make sure that NOT_PREFERRED is surfaced in explain response if (moveDecision.isDecisionTaken() && moveDecision.canRemain()) { moveDecision = balancer.decideRebalance(index, shard, moveDecision.getCanRemainDecision()); } @@ -776,7 +776,7 @@ public MoveDecision decideMove(final ProjectIndex index, final ShardRouting shar assert sourceNode != null && sourceNode.containsShard(index, shardRouting); RoutingNode routingNode = sourceNode.getRoutingNode(); Decision canRemain = allocation.deciders().canRemain(shardRouting, routingNode, allocation); - if (canRemain.type() != Decision.Type.NO) { + if (canRemain.type() != Decision.Type.NO && canRemain.type() != Decision.Type.NOT_PREFERRED) { return MoveDecision.remain(canRemain); } @@ -787,6 +787,9 @@ public MoveDecision decideMove(final ProjectIndex index, final ShardRouting shar * This is not guaranteed to be balanced after this operation we still try best effort to * allocate on the minimal eligible node. */ + // TODO NOMERGE: how do we ensure that rebalancing doesn't return the hot shard back to the overloaded node? + // This might be the other ticket's problem + // NOMERGE NOTE: the MoveDecision may have a targetNode set. MoveDecision moveDecision = decideMove(sorter, shardRouting, sourceNode, canRemain, this::decideCanAllocate); if (moveDecision.canRemain() == false && moveDecision.forceMove() == false) { final boolean shardsOnReplacedNode = allocation.metadata().nodeShutdowns().contains(shardRouting.currentNodeId(), REPLACE); @@ -816,7 +819,10 @@ private MoveDecision decideMove( if (explain) { nodeResults.add(new NodeAllocationResult(currentNode.getRoutingNode().node(), allocationDecision, ++weightRanking)); } - // TODO maybe we can respect throttling here too? + if (allocationDecision.type() == Type.NOT_PREFERRED && remainDecision.type() == Type.NOT_PREFERRED) { + // Relocating a shard from one NOT_PREFERRED node to another would not improve the situation, so skip it. + continue; + } if (allocationDecision.type().higherThan(bestDecision)) { bestDecision = allocationDecision.type(); if (bestDecision == Type.YES) { @@ -826,6 +832,10 @@ private MoveDecision decideMove( // no need to continue iterating break; } + } else if (bestDecision == Type.NOT_PREFERRED) { + assert remainDecision.type() != Type.NOT_PREFERRED; + // If we don't ever get a YES decision, we'll go with NOT_PREFERRED. + targetNode = target; } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java index c55e2a23ab8fa..f3f3ceac2aaf2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java @@ -502,7 +502,7 @@ private void moveShards() { } final var routingNode = routingNodes.node(shardRouting.currentNodeId()); - final var canRemainDecision = allocation.deciders().canRemain(shardRouting, routingNode, allocation); + final var canRemainDecision = allocation.deciders().canRemain(shardRouting, routingNode, allocation); ////////// CAN-REMAIN if (canRemainDecision.type() != Decision.Type.NO) { // it's desired elsewhere but technically it can remain on its current node. Defer its movement until later on to give // priority to shards that _must_ move. diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java index e814f570a67bb..8a83d48b14222 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java @@ -73,7 +73,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing shardRouting.shardId() ); logger.debug(explain); - return Decision.single(Decision.Type.NO, NAME, explain); + return Decision.single(Decision.Type.NOT_PREFERRED, NAME, explain); } if (calculateShardMovementChange(nodeWriteThreadPoolStats, shardWriteLoad) >= nodeWriteThreadPoolLoadThreshold) { @@ -92,7 +92,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing nodeWriteThreadPoolStats.totalThreadPoolThreads() ); logger.debug(explain); - return Decision.single(Decision.Type.NO, NAME, explain); + return Decision.single(Decision.Type.NOT_PREFERRED, NAME, explain); } return Decision.YES; From c9c2645a5e2f8441ce24e0c94851951c3b3973d8 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Wed, 3 Sep 2025 16:24:33 -0700 Subject: [PATCH 02/18] comment cleanup --- .../elasticsearch/cluster/routing/allocation/MoveDecision.java | 1 - .../routing/allocation/allocator/BalancedShardsAllocator.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/MoveDecision.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/MoveDecision.java index 36fc9aeb8bb27..368451661fba0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/MoveDecision.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/MoveDecision.java @@ -179,7 +179,6 @@ public Decision getCanRemainDecision() { * the result of this method is meaningless, as no rebalance decision was taken. If {@link #isDecisionTaken()} * returns {@code false}, then invoking this method will throw an {@code IllegalStateException}. */ - // TODO NOMERGE NOTE: Can I leverage this to block rebalancing a hot-spot back to the hot node? Nope, testing only, darn thing. // @VisibleForTesting public boolean canRebalanceCluster() { checkDecisionState(); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 3c5dc619fbf0b..f6e8f28a52337 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -196,7 +196,7 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f if (shard.unassigned()) { allocateUnassignedDecision = balancer.decideAllocateUnassigned(index, shard); } else { - moveDecision = balancer.decideMove(index, shard); //// this is the allocation explain path? Make sure that NOT_PREFERRED is surfaced in explain response + moveDecision = balancer.decideMove(index, shard); if (moveDecision.isDecisionTaken() && moveDecision.canRemain()) { moveDecision = balancer.decideRebalance(index, shard, moveDecision.getCanRemainDecision()); } From 0f837c5ce25a9e8ded3776aa736eb46ca618dff6 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Thu, 4 Sep 2025 10:30:53 -0700 Subject: [PATCH 03/18] change for rebalancing, skip if not-preferred --- .../routing/allocation/allocator/BalancedShardsAllocator.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index f6e8f28a52337..02b0b55ae9f4f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -1140,7 +1140,7 @@ private boolean tryRelocateShard(ModelNode minNode, ModelNode maxNode, ProjectIn continue; } final Decision allocationDecision = deciders.canAllocate(shard, minNode.getRoutingNode(), allocation); - if (allocationDecision.type() == Type.NO) { + if (allocationDecision.type() == Type.NO || allocationDecision.type() == Type.NOT_PREFERRED) { continue; } @@ -1326,7 +1326,7 @@ public boolean containsShard(ShardRouting shard) { public static final class NodeSorter extends IntroSorter { final ModelNode[] modelNodes; - /* the nodes weights with respect to the current weight function / index */ + /** The nodes weights with respect to the current weight function / index */ final float[] weights; private final WeightFunction function; private ProjectIndex index; From 4fef486863cf41631fe700e3057f0b5eb4e357e0 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Thu, 4 Sep 2025 10:56:56 -0700 Subject: [PATCH 04/18] change write load decider to not-preferred answers; handle not-preferred in reconciliation --- .../allocation/allocator/BalancedShardsAllocator.java | 7 ++----- .../allocation/allocator/DesiredBalanceReconciler.java | 7 +++---- .../decider/WriteLoadConstraintDeciderTests.java | 4 ++-- 3 files changed, 7 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 02b0b55ae9f4f..13aceee34460b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -787,9 +787,6 @@ public MoveDecision decideMove(final ProjectIndex index, final ShardRouting shar * This is not guaranteed to be balanced after this operation we still try best effort to * allocate on the minimal eligible node. */ - // TODO NOMERGE: how do we ensure that rebalancing doesn't return the hot shard back to the overloaded node? - // This might be the other ticket's problem - // NOMERGE NOTE: the MoveDecision may have a targetNode set. MoveDecision moveDecision = decideMove(sorter, shardRouting, sourceNode, canRemain, this::decideCanAllocate); if (moveDecision.canRemain() == false && moveDecision.forceMove() == false) { final boolean shardsOnReplacedNode = allocation.metadata().nodeShutdowns().contains(shardRouting.currentNodeId(), REPLACE); @@ -820,7 +817,7 @@ private MoveDecision decideMove( nodeResults.add(new NodeAllocationResult(currentNode.getRoutingNode().node(), allocationDecision, ++weightRanking)); } if (allocationDecision.type() == Type.NOT_PREFERRED && remainDecision.type() == Type.NOT_PREFERRED) { - // Relocating a shard from one NOT_PREFERRED node to another would not improve the situation, so skip it. + // Relocating a shard from one NOT_PREFERRED node to another would not improve the situation. continue; } if (allocationDecision.type().higherThan(bestDecision)) { @@ -834,7 +831,7 @@ private MoveDecision decideMove( } } else if (bestDecision == Type.NOT_PREFERRED) { assert remainDecision.type() != Type.NOT_PREFERRED; - // If we don't ever get a YES decision, we'll go with NOT_PREFERRED. + // If we don't ever find a YES decision, we'll settle for NOT_PREFERRED as preferable to NO. targetNode = target; } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java index f3f3ceac2aaf2..de11ca6ae39d0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java @@ -502,10 +502,9 @@ private void moveShards() { } final var routingNode = routingNodes.node(shardRouting.currentNodeId()); - final var canRemainDecision = allocation.deciders().canRemain(shardRouting, routingNode, allocation); ////////// CAN-REMAIN - if (canRemainDecision.type() != Decision.Type.NO) { - // it's desired elsewhere but technically it can remain on its current node. Defer its movement until later on to give - // priority to shards that _must_ move. + final var canRemainDecision = allocation.deciders().canRemain(shardRouting, routingNode, allocation); + if (canRemainDecision.type() != Decision.Type.NO && canRemainDecision.type() != Decision.Type.NOT_PREFERRED) { + // If movement is throttled, a future reconciliation round will see a resolution. For now, leave it alone. continue; } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java index 12bfd8a0a4789..5b844cc1da635 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java @@ -104,7 +104,7 @@ public void testWriteLoadDeciderCanAllocate() { ); assertEquals( "Assigning a new shard to a node that is above the threshold should fail", - Decision.Type.NO, + Decision.Type.NOT_PREFERRED, writeLoadDecider.canAllocate( testHarness.shardRouting2, testHarness.exceedingThresholdRoutingNode, @@ -128,7 +128,7 @@ public void testWriteLoadDeciderCanAllocate() { ); assertEquals( "Assigning a new shard that would cause the node to exceed capacity should fail", - Decision.Type.NO, + Decision.Type.NOT_PREFERRED, writeLoadDecider.canAllocate(testHarness.shardRouting1, testHarness.nearThresholdRoutingNode, testHarness.routingAllocation) .type() ); From e3da02478ec07c7bf8edd1564fbb1086c46fb02e Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Thu, 4 Sep 2025 17:38:34 -0700 Subject: [PATCH 05/18] test code refactor, plus testing todos --- .../decider/WriteLoadConstraintDeciderIT.java | 255 ++++++++++++------ .../allocator/BalancedShardsAllocator.java | 1 + .../allocator/DesiredBalanceReconciler.java | 1 + 3 files changed, 172 insertions(+), 85 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java index ec584b1d0973d..5b1ca22a7468b 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceMetrics; +import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.CollectionUtils; @@ -71,6 +72,23 @@ protected Collection> getMockPlugins() { ); } + /** + * Tests that {@link AllocationDecider#canRemain} returning {@link Decision.Type#NO} for a {@code NodeX} will ignore a + * {@link AllocationDecider#canAllocate} response of {@link Decision.Type#NOT_PREFERRED} from {@code NodeY} and reassign the shard when + * there are no better node options. + */ + public void testShardsAreAssignedToNotPreferredWhenAlternativeIsNo() { + + } + + /** + * Tests that rebalancing will not override {@link Decision.Type#NOT_PREFERRED} in order to correct a node weight imbalance above the + * {@link BalancedShardsAllocator#THRESHOLD_SETTING} in a cluster. + */ + public void testThatRebalancingWillNotOverrideNotPreferred() { + + } + /** * Uses MockTransportService to set up write load stat responses from the data nodes and tests the allocation decisions made by the * balancer, specifically the effect of the {@link WriteLoadConstraintDecider}. @@ -79,64 +97,27 @@ protected Collection> getMockPlugins() { * Node1 while Node3 is hot-spotting, resulting in reassignment of all shards to Node2. */ public void testHighNodeWriteLoadPreventsNewShardAllocation() { - int randomUtilizationThresholdPercent = randomIntBetween(50, 100); - int numberOfWritePoolThreads = randomIntBetween(2, 20); - float shardWriteLoad = randomFloatBetween(0.0f, 0.01f, false); - Settings settings = Settings.builder() - .put( - WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), - WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED - ) - .put( - WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_HIGH_UTILIZATION_THRESHOLD_SETTING.getKey(), - randomUtilizationThresholdPercent + "%" - ) - .build(); - - final String masterName = internalCluster().startMasterOnlyNode(settings); - final var dataNodes = internalCluster().startDataOnlyNodes(3, settings); - final String firstDataNodeName = dataNodes.get(0); - final String secondDataNodeName = dataNodes.get(1); - final String thirdDataNodeName = dataNodes.get(2); - final String firstDataNodeId = getNodeId(firstDataNodeName); - final String secondDataNodeId = getNodeId(secondDataNodeName); - final String thirdDataNodeId = getNodeId(thirdDataNodeName); - ensureStableCluster(4); - - logger.info( - "---> first node name " - + firstDataNodeName - + " and ID " - + firstDataNodeId - + "; second node name " - + secondDataNodeName - + " and ID " - + secondDataNodeId - + "; third node name " - + thirdDataNodeName - + " and ID " - + thirdDataNodeId - ); + TestHarness harness = setUpTestNodes(); /** * Exclude assignment of shards to the second and third data nodes via the {@link FilterAllocationDecider} settings. * Then create an index with many shards, which will all be assigned to the first data node. */ - logger.info("---> Limit shard assignment to node " + firstDataNodeName + " by excluding the other nodes"); + logger.info("---> Limit shard assignment to node " + harness.firstDataNodeName + " by excluding the other nodes"); updateClusterSettings( - Settings.builder().put("cluster.routing.allocation.exclude._name", secondDataNodeName + "," + thirdDataNodeName) + Settings.builder().put("cluster.routing.allocation.exclude._name", harness.secondDataNodeName + "," + harness.thirdDataNodeName) ); String indexName = randomIdentifier(); int randomNumberOfShards = randomIntBetween(10, 20); // Pick a high number of shards, so it is clear assignment is not accidental. // Calculate the maximum utilization a node can report while still being able to accept all relocating shards - double additionalLoadFromAllShards = calculateUtilizationForWriteLoad( - shardWriteLoad * randomNumberOfShards, - numberOfWritePoolThreads + int shardWriteLoadOverhead = shardLoadUtilizationOverhead( + harness.randomShardWriteLoad * randomNumberOfShards, + harness.randomNumberOfWritePoolThreads ); - int maxUtilizationPercent = randomUtilizationThresholdPercent - (int) (additionalLoadFromAllShards * 100) - 1; + int maxUtilizationPercentBelowThreshold = harness.randomUtilizationThresholdPercent - shardWriteLoadOverhead - 1; var verifyAssignmentToFirstNodeListener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { var indexRoutingTable = clusterState.routingTable().index(indexName); @@ -146,9 +127,9 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() { return checkShardAssignment( clusterState.getRoutingNodes(), indexRoutingTable.getIndex(), - firstDataNodeId, - secondDataNodeId, - thirdDataNodeId, + harness.firstDataNodeId, + harness.secondDataNodeId, + harness.thirdDataNodeId, randomNumberOfShards, 0, 0 @@ -161,7 +142,7 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() { ); ensureGreen(indexName); - logger.info("---> Waiting for all shards to be assigned to node " + firstDataNodeName); + logger.info("---> Waiting for all shards to be assigned to node " + harness.firstDataNodeName); safeAwait(verifyAssignmentToFirstNodeListener); /** @@ -170,46 +151,45 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() { * write load stats (so that the WriteLoadDecider will evaluate assigning them to a node). */ - final DiscoveryNode firstDiscoveryNode = getDiscoveryNode(firstDataNodeName); - final DiscoveryNode secondDiscoveryNode = getDiscoveryNode(secondDataNodeName); - final DiscoveryNode thirdDiscoveryNode = getDiscoveryNode(thirdDataNodeName); final NodeUsageStatsForThreadPools firstNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools( - firstDiscoveryNode, - numberOfWritePoolThreads, - randomIntBetween(0, maxUtilizationPercent) / 100f, + harness.firstDiscoveryNode, + harness.randomNumberOfWritePoolThreads, + randomIntBetween(0, maxUtilizationPercentBelowThreshold) / 100f, 0 ); final NodeUsageStatsForThreadPools secondNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools( - secondDiscoveryNode, - numberOfWritePoolThreads, - randomIntBetween(0, maxUtilizationPercent) / 100f, + harness.secondDiscoveryNode, + harness.randomNumberOfWritePoolThreads, + randomIntBetween(0, maxUtilizationPercentBelowThreshold) / 100f, 0 ); final NodeUsageStatsForThreadPools thirdNodeHotSpottingNodeStats = createNodeUsageStatsForThreadPools( - thirdDiscoveryNode, - numberOfWritePoolThreads, - (randomUtilizationThresholdPercent + 1) / 100f, + harness.thirdDiscoveryNode, + harness.randomNumberOfWritePoolThreads, + (harness.randomUtilizationThresholdPercent + 1) / 100f, 0 ); - MockTransportService.getInstance(firstDataNodeName).addRequestHandlingBehavior( - TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", - (handler, request, channel, task) -> channel.sendResponse( - new NodeUsageStatsForThreadPoolsAction.NodeResponse(firstDiscoveryNode, firstNodeNonHotSpottingNodeStats) - ) - ); - MockTransportService.getInstance(secondDataNodeName) + MockTransportService.getInstance(harness.firstDataNodeName).< + NodeUsageStatsForThreadPoolsAction + .NodeRequest>addRequestHandlingBehavior( + TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", + (handler, request, channel, task) -> channel.sendResponse( + new NodeUsageStatsForThreadPoolsAction.NodeResponse(harness.firstDiscoveryNode, firstNodeNonHotSpottingNodeStats) + ) + ); + MockTransportService.getInstance(harness.secondDataNodeName) .addRequestHandlingBehavior( TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", (handler, request, channel, task) -> channel.sendResponse( - new NodeUsageStatsForThreadPoolsAction.NodeResponse(secondDiscoveryNode, secondNodeNonHotSpottingNodeStats) + new NodeUsageStatsForThreadPoolsAction.NodeResponse(harness.secondDiscoveryNode, secondNodeNonHotSpottingNodeStats) ) ); - MockTransportService.getInstance(thirdDataNodeName) + MockTransportService.getInstance(harness.thirdDataNodeName) .addRequestHandlingBehavior( TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", (handler, request, channel, task) -> channel.sendResponse( - new NodeUsageStatsForThreadPoolsAction.NodeResponse(thirdDiscoveryNode, thirdNodeHotSpottingNodeStats) + new NodeUsageStatsForThreadPoolsAction.NodeResponse(harness.thirdDiscoveryNode, thirdNodeHotSpottingNodeStats) ) ); @@ -218,26 +198,37 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() { .getMetadata() .getProject() .index(indexName); - MockTransportService.getInstance(firstDataNodeName) + MockTransportService.getInstance(harness.firstDataNodeName) .addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { List shardStats = new ArrayList<>(indexMetadata.getNumberOfShards()); for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) { - shardStats.add(createShardStats(indexMetadata, i, shardWriteLoad, firstDataNodeId)); + shardStats.add(createShardStats(indexMetadata, i, harness.randomShardWriteLoad, harness.firstDataNodeId)); } - TransportIndicesStatsAction instance = internalCluster().getInstance(TransportIndicesStatsAction.class, firstDataNodeName); - channel.sendResponse(instance.new NodeResponse(firstDataNodeId, indexMetadata.getNumberOfShards(), shardStats, List.of())); + TransportIndicesStatsAction instance = internalCluster().getInstance( + TransportIndicesStatsAction.class, + harness.firstDataNodeName + ); + channel.sendResponse( + instance.new NodeResponse(harness.firstDataNodeId, indexMetadata.getNumberOfShards(), shardStats, List.of()) + ); }); - MockTransportService.getInstance(secondDataNodeName) + MockTransportService.getInstance(harness.secondDataNodeName) .addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { // Return no stats for the index because none are assigned to this node. - TransportIndicesStatsAction instance = internalCluster().getInstance(TransportIndicesStatsAction.class, secondDataNodeName); - channel.sendResponse(instance.new NodeResponse(secondDataNodeId, 0, List.of(), List.of())); + TransportIndicesStatsAction instance = internalCluster().getInstance( + TransportIndicesStatsAction.class, + harness.secondDataNodeName + ); + channel.sendResponse(instance.new NodeResponse(harness.secondDataNodeId, 0, List.of(), List.of())); }); - MockTransportService.getInstance(thirdDataNodeName) + MockTransportService.getInstance(harness.thirdDataNodeName) .addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { // Return no stats for the index because none are assigned to this node. - TransportIndicesStatsAction instance = internalCluster().getInstance(TransportIndicesStatsAction.class, thirdDataNodeName); - channel.sendResponse(instance.new NodeResponse(thirdDataNodeId, 0, List.of(), List.of())); + TransportIndicesStatsAction instance = internalCluster().getInstance( + TransportIndicesStatsAction.class, + harness.thirdDataNodeName + ); + channel.sendResponse(instance.new NodeResponse(harness.thirdDataNodeId, 0, List.of(), List.of())); }); /** @@ -250,19 +241,19 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() { refreshClusterInfo(); logger.info( - "---> Update the filter to exclude " + firstDataNodeName + " so that shards will be reassigned away to the other nodes" + "---> Update the filter to exclude " + harness.firstDataNodeName + " so that shards will be reassigned away to the other nodes" ); // Updating the cluster settings will trigger a reroute request, no need to explicitly request one in the test. - updateClusterSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", firstDataNodeName)); + updateClusterSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", harness.firstDataNodeName)); safeAwait(ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { Index index = clusterState.routingTable().index(indexName).getIndex(); return checkShardAssignment( clusterState.getRoutingNodes(), index, - firstDataNodeId, - secondDataNodeId, - thirdDataNodeId, + harness.firstDataNodeId, + harness.secondDataNodeId, + harness.thirdDataNodeId, 0, randomNumberOfShards, 0 @@ -351,6 +342,28 @@ private boolean checkShardAssignment( return true; } + private Settings enabledWriteLoadDeciderSettings(int utilizationThresholdPercent) { + return Settings.builder() + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), + WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + ) + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_HIGH_UTILIZATION_THRESHOLD_SETTING.getKey(), + utilizationThresholdPercent + "%" + ) + .build(); + } + + /** + * The utilization percent overhead needed to add the given amount of shard write load to a node with the given number of write pool + * threads. + */ + private int shardLoadUtilizationOverhead(float totalShardWriteLoad, int numberOfWritePoolThreads) { + float totalWriteLoadPerThread = totalShardWriteLoad / numberOfWritePoolThreads; + return (int) (100 * totalWriteLoadPerThread); + } + private DiscoveryNode getDiscoveryNode(String nodeName) { final TransportService transportService = internalCluster().getInstance(TransportService.class, nodeName); assertNotNull(transportService); @@ -401,4 +414,76 @@ private static ShardStats createShardStats(IndexMetadata indexMeta, int shardInd ); return new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, null, null, false, 0); } + + /** + * Carries set-up state from {@link #setUpTestNodes()} to the testing logic. + */ + record TestHarness( + String firstDataNodeName, + String secondDataNodeName, + String thirdDataNodeName, + String firstDataNodeId, + String secondDataNodeId, + String thirdDataNodeId, + DiscoveryNode firstDiscoveryNode, + DiscoveryNode secondDiscoveryNode, + DiscoveryNode thirdDiscoveryNode, + int randomUtilizationThresholdPercent, + int randomNumberOfWritePoolThreads, + float randomShardWriteLoad + ) {}; + + /** + * Sets up common test infrastructure to deduplicate code across tests. + */ + private TestHarness setUpTestNodes() { + int randomUtilizationThresholdPercent = randomIntBetween(50, 100); + int randomNumberOfWritePoolThreads = randomIntBetween(2, 20); + float randomShardWriteLoad = randomFloatBetween(0.0f, 0.01f, false); + Settings settings = enabledWriteLoadDeciderSettings(randomUtilizationThresholdPercent); + + internalCluster().startMasterOnlyNode(settings); + final var dataNodes = internalCluster().startDataOnlyNodes(3, settings); + final String firstDataNodeName = dataNodes.get(0); + final String secondDataNodeName = dataNodes.get(1); + final String thirdDataNodeName = dataNodes.get(2); + final String firstDataNodeId = getNodeId(firstDataNodeName); + final String secondDataNodeId = getNodeId(secondDataNodeName); + final String thirdDataNodeId = getNodeId(thirdDataNodeName); + ensureStableCluster(4); + + logger.info( + "---> first node name " + + firstDataNodeName + + " and ID " + + firstDataNodeId + + "; second node name " + + secondDataNodeName + + " and ID " + + secondDataNodeId + + "; third node name " + + thirdDataNodeName + + " and ID " + + thirdDataNodeId + ); + + final DiscoveryNode firstDiscoveryNode = getDiscoveryNode(firstDataNodeName); + final DiscoveryNode secondDiscoveryNode = getDiscoveryNode(secondDataNodeName); + final DiscoveryNode thirdDiscoveryNode = getDiscoveryNode(thirdDataNodeName); + + return new TestHarness( + firstDataNodeName, + secondDataNodeName, + thirdDataNodeName, + firstDataNodeId, + secondDataNodeId, + thirdDataNodeId, + firstDiscoveryNode, + secondDiscoveryNode, + thirdDiscoveryNode, + randomUtilizationThresholdPercent, + randomNumberOfWritePoolThreads, + randomShardWriteLoad + ); + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 13aceee34460b..310c1a5963f94 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -816,6 +816,7 @@ private MoveDecision decideMove( if (explain) { nodeResults.add(new NodeAllocationResult(currentNode.getRoutingNode().node(), allocationDecision, ++weightRanking)); } + // TODO (ES-12633): test that nothing moves when the source is not-preferred and the target is not-preferred. if (allocationDecision.type() == Type.NOT_PREFERRED && remainDecision.type() == Type.NOT_PREFERRED) { // Relocating a shard from one NOT_PREFERRED node to another would not improve the situation. continue; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java index de11ca6ae39d0..dcf39db9e80e1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java @@ -503,6 +503,7 @@ private void moveShards() { final var routingNode = routingNodes.node(shardRouting.currentNodeId()); final var canRemainDecision = allocation.deciders().canRemain(shardRouting, routingNode, allocation); + // TODO (ES-12633): exercise canRemain to say NOT_PREFERRED, without other decider influence, and see that a shard is moved. if (canRemainDecision.type() != Decision.Type.NO && canRemainDecision.type() != Decision.Type.NOT_PREFERRED) { // If movement is throttled, a future reconciliation round will see a resolution. For now, leave it alone. continue; From a07e654f775a0b309420ab1394b1ee266c51fa04 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Fri, 5 Sep 2025 17:49:10 -0700 Subject: [PATCH 06/18] test improvements; obviate THROTTLE handling issue by overriding concurrency settings --- .../decider/WriteLoadConstraintDeciderIT.java | 65 ++++++++++++++----- 1 file changed, 48 insertions(+), 17 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java index 5b1ca22a7468b..fde2a55737904 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java @@ -58,6 +58,9 @@ import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; +import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING; +import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING; +import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) public class WriteLoadConstraintDeciderIT extends ESIntegTestCase { @@ -142,7 +145,7 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() { ); ensureGreen(indexName); - logger.info("---> Waiting for all shards to be assigned to node " + harness.firstDataNodeName); + logger.info("---> Waiting for all [" + randomNumberOfShards + "] shards to be assigned to node " + harness.firstDataNodeName); safeAwait(verifyAssignmentToFirstNodeListener); /** @@ -246,19 +249,34 @@ instance.new NodeResponse(harness.firstDataNodeId, indexMetadata.getNumberOfShar // Updating the cluster settings will trigger a reroute request, no need to explicitly request one in the test. updateClusterSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", harness.firstDataNodeName)); - safeAwait(ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { - Index index = clusterState.routingTable().index(indexName).getIndex(); - return checkShardAssignment( - clusterState.getRoutingNodes(), - index, - harness.firstDataNodeId, - harness.secondDataNodeId, - harness.thirdDataNodeId, - 0, - randomNumberOfShards, - 0 - ); - })); + try { + safeAwait(ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { + Index index = clusterState.routingTable().index(indexName).getIndex(); + return checkShardAssignment( + clusterState.getRoutingNodes(), + index, + harness.firstDataNodeId, + harness.secondDataNodeId, + harness.thirdDataNodeId, + 0, + randomNumberOfShards, + 0 + ); + })); + } catch (AssertionError error) { + ClusterState state = internalCluster().client() + .admin() + .cluster() + .prepareState(TEST_REQUEST_TIMEOUT) + .clear() + .setMetadata(true) + .setNodes(true) + .setRoutingTable(true) + .get() + .getState(); + logger.info("---> Failed to reach expected allocation state. Dumping assignments: " + state.getRoutingNodes()); + throw error; + } } public void testMaxQueueLatencyMetricIsPublished() { @@ -352,6 +370,10 @@ private Settings enabledWriteLoadDeciderSettings(int utilizationThresholdPercent WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_HIGH_UTILIZATION_THRESHOLD_SETTING.getKey(), utilizationThresholdPercent + "%" ) + // TODO (ES-12862): remove these overrides when throttling is turned off for simulations. + .put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 100) + .put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 100) + .put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 100) .build(); } @@ -452,6 +474,10 @@ private TestHarness setUpTestNodes() { final String thirdDataNodeId = getNodeId(thirdDataNodeName); ensureStableCluster(4); + final DiscoveryNode firstDiscoveryNode = getDiscoveryNode(firstDataNodeName); + final DiscoveryNode secondDiscoveryNode = getDiscoveryNode(secondDataNodeName); + final DiscoveryNode thirdDiscoveryNode = getDiscoveryNode(thirdDataNodeName); + logger.info( "---> first node name " + firstDataNodeName @@ -467,9 +493,14 @@ private TestHarness setUpTestNodes() { + thirdDataNodeId ); - final DiscoveryNode firstDiscoveryNode = getDiscoveryNode(firstDataNodeName); - final DiscoveryNode secondDiscoveryNode = getDiscoveryNode(secondDataNodeName); - final DiscoveryNode thirdDiscoveryNode = getDiscoveryNode(thirdDataNodeName); + logger.info( + "---> utilization threshold: " + + randomUtilizationThresholdPercent + + ", write threads: " + + randomNumberOfWritePoolThreads + + ", individual shard write loads: " + + randomShardWriteLoad + ); return new TestHarness( firstDataNodeName, From 3c16fa04f8f31d4415205d950eb502ff9fa1ad80 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Fri, 5 Sep 2025 17:50:43 -0700 Subject: [PATCH 07/18] bit of write load decider explanation / logging change --- .../decider/WriteLoadConstraintDecider.java | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java index 8a83d48b14222..13204007d3830 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java @@ -76,7 +76,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing return Decision.single(Decision.Type.NOT_PREFERRED, NAME, explain); } - if (calculateShardMovementChange(nodeWriteThreadPoolStats, shardWriteLoad) >= nodeWriteThreadPoolLoadThreshold) { + var newWriteThreadPoolUtilization = calculateShardMovementChange(nodeWriteThreadPoolStats, shardWriteLoad); + if (newWriteThreadPoolUtilization >= nodeWriteThreadPoolLoadThreshold) { // The node's write thread pool usage would be raised above the high utilization threshold with assignment of the new shard. // This could lead to a hot spot on this node and is undesirable. String explain = Strings.format( @@ -95,7 +96,27 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing return Decision.single(Decision.Type.NOT_PREFERRED, NAME, explain); } - return Decision.YES; + if (logger.isTraceEnabled()) { + logger.trace( + Strings.format( + "Shard [%s] in index [%s] can be assigned to node [%s]. The node's utilization would become [%s]", + shardRouting.shardId(), + shardRouting.index(), + node.nodeId(), + newWriteThreadPoolUtilization + ) + ); + } + + return allocation.decision( + Decision.YES, + NAME, + "Shard [%s] in index [%s] can be assigned to node [%s]. The node's utilization would become [%s]", + shardRouting.shardId(), + shardRouting.index(), + node.nodeId(), + newWriteThreadPoolUtilization + ); } @Override From 5c049383cf0a5c65df0025bcda7f1b0a574c29a9 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Tue, 9 Sep 2025 12:32:38 -0700 Subject: [PATCH 08/18] second test working; add reconciliation handling of NOT_PREFERRED, interpret as YES but delay until the YES assignments are exhausted --- .../decider/WriteLoadConstraintDeciderIT.java | 296 +++++++++++++----- .../allocator/DesiredBalanceReconciler.java | 19 +- 2 files changed, 233 insertions(+), 82 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java index fde2a55737904..1806ebdfd25c2 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java @@ -81,89 +81,166 @@ protected Collection> getMockPlugins() { * there are no better node options. */ public void testShardsAreAssignedToNotPreferredWhenAlternativeIsNo() { + TestHarness harness = setUpTestNodes(); - } + /** + * Override the {@link TransportNodeUsageStatsForThreadPoolsAction} action on the data nodes to supply artificial thread pool write + * load stats. The stats will show both the second and third nodes are hot-spotting. + */ - /** - * Tests that rebalancing will not override {@link Decision.Type#NOT_PREFERRED} in order to correct a node weight imbalance above the - * {@link BalancedShardsAllocator#THRESHOLD_SETTING} in a cluster. - */ - public void testThatRebalancingWillNotOverrideNotPreferred() { + final NodeUsageStatsForThreadPools firstNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools( + harness.firstDiscoveryNode, + harness.randomNumberOfWritePoolThreads, + randomIntBetween(0, harness.maxUtilBelowThresholdThatAllowsAllShardsRelocation) / 100f, + 0 + ); + final NodeUsageStatsForThreadPools secondNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools( + harness.secondDiscoveryNode, + harness.randomNumberOfWritePoolThreads, + (harness.randomUtilizationThresholdPercent + 1) / 100f, + 0 + ); + final NodeUsageStatsForThreadPools thirdNodeHotSpottingNodeStats = createNodeUsageStatsForThreadPools( + harness.thirdDiscoveryNode, + harness.randomNumberOfWritePoolThreads, + (harness.randomUtilizationThresholdPercent + 1) / 100f, + 0 + ); - } + MockTransportService.getInstance(harness.firstDataNodeName).< + NodeUsageStatsForThreadPoolsAction + .NodeRequest>addRequestHandlingBehavior( + TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", + (handler, request, channel, task) -> channel.sendResponse( + new NodeUsageStatsForThreadPoolsAction.NodeResponse(harness.firstDiscoveryNode, firstNodeNonHotSpottingNodeStats) + ) + ); + MockTransportService.getInstance(harness.secondDataNodeName) + .addRequestHandlingBehavior( + TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", + (handler, request, channel, task) -> channel.sendResponse( + new NodeUsageStatsForThreadPoolsAction.NodeResponse(harness.secondDiscoveryNode, secondNodeNonHotSpottingNodeStats) + ) + ); + MockTransportService.getInstance(harness.thirdDataNodeName) + .addRequestHandlingBehavior( + TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", + (handler, request, channel, task) -> channel.sendResponse( + new NodeUsageStatsForThreadPoolsAction.NodeResponse(harness.thirdDiscoveryNode, thirdNodeHotSpottingNodeStats) + ) + ); - /** - * Uses MockTransportService to set up write load stat responses from the data nodes and tests the allocation decisions made by the - * balancer, specifically the effect of the {@link WriteLoadConstraintDecider}. - * - * Leverages the {@link FilterAllocationDecider} to first start all shards on a Node1, and then eventually force the shards off of - * Node1 while Node3 is hot-spotting, resulting in reassignment of all shards to Node2. - */ - public void testHighNodeWriteLoadPreventsNewShardAllocation() { - TestHarness harness = setUpTestNodes(); + /** + * Override the {@link TransportIndicesStatsAction} action on the data nodes to supply artificial shard write load stats. The stats + * will show that all shards have non-empty write load stats (so that the WriteLoadDecider will evaluate assigning them to a node). + */ + + IndexMetadata indexMetadata = internalCluster().getCurrentMasterNodeInstance(ClusterService.class) + .state() + .getMetadata() + .getProject() + .index(harness.indexName); + MockTransportService.getInstance(harness.firstDataNodeName) + .addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { + List shardStats = new ArrayList<>(indexMetadata.getNumberOfShards()); + for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) { + shardStats.add(createShardStats(indexMetadata, i, harness.randomShardWriteLoad, harness.firstDataNodeId)); + } + TransportIndicesStatsAction instance = internalCluster().getInstance( + TransportIndicesStatsAction.class, + harness.firstDataNodeName + ); + channel.sendResponse( + instance.new NodeResponse(harness.firstDataNodeId, indexMetadata.getNumberOfShards(), shardStats, List.of()) + ); + }); + MockTransportService.getInstance(harness.secondDataNodeName) + .addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { + // Return no stats for the index because none are assigned to this node. + TransportIndicesStatsAction instance = internalCluster().getInstance( + TransportIndicesStatsAction.class, + harness.secondDataNodeName + ); + channel.sendResponse(instance.new NodeResponse(harness.secondDataNodeId, 0, List.of(), List.of())); + }); + MockTransportService.getInstance(harness.thirdDataNodeName) + .addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { + // Return no stats for the index because none are assigned to this node. + TransportIndicesStatsAction instance = internalCluster().getInstance( + TransportIndicesStatsAction.class, + harness.thirdDataNodeName + ); + channel.sendResponse(instance.new NodeResponse(harness.thirdDataNodeId, 0, List.of(), List.of())); + }); /** - * Exclude assignment of shards to the second and third data nodes via the {@link FilterAllocationDecider} settings. - * Then create an index with many shards, which will all be assigned to the first data node. + * Provoke a ClusterInfo stats refresh, update the cluster settings to make shard assignment to the first node undesired, and + * initiate rebalancing via a reroute request. Then wait to see a cluster state update that has all the shards assigned away from + * the first node _despite_ the second and third node reporting hot-spotting. */ - logger.info("---> Limit shard assignment to node " + harness.firstDataNodeName + " by excluding the other nodes"); - updateClusterSettings( - Settings.builder().put("cluster.routing.allocation.exclude._name", harness.secondDataNodeName + "," + harness.thirdDataNodeName) + logger.info("---> Refreshing the cluster info to pull in the dummy thread pool stats with a hot-spotting node"); + final InternalClusterInfoService clusterInfoService = asInstanceOf( + InternalClusterInfoService.class, + internalCluster().getInstance(ClusterInfoService.class, internalCluster().getMasterName()) ); + ClusterInfoServiceUtils.refresh(clusterInfoService); - String indexName = randomIdentifier(); - int randomNumberOfShards = randomIntBetween(10, 20); // Pick a high number of shards, so it is clear assignment is not accidental. - - // Calculate the maximum utilization a node can report while still being able to accept all relocating shards - int shardWriteLoadOverhead = shardLoadUtilizationOverhead( - harness.randomShardWriteLoad * randomNumberOfShards, - harness.randomNumberOfWritePoolThreads + logger.info( + "---> Update the filter to exclude " + harness.firstDataNodeName + " so that shards will be reassigned away to the other nodes" ); - int maxUtilizationPercentBelowThreshold = harness.randomUtilizationThresholdPercent - shardWriteLoadOverhead - 1; + // Updating the cluster settings will trigger a reroute request, no need to explicitly request one in the test. + updateClusterSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", harness.firstDataNodeName)); - var verifyAssignmentToFirstNodeListener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { - var indexRoutingTable = clusterState.routingTable().index(indexName); - if (indexRoutingTable == null) { + try { + safeAwait(ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { + Index index = clusterState.routingTable().index(harness.indexName).getIndex(); + if (clusterState.getRoutingNodes().node(harness.firstDataNodeId).numberOfOwningShardsForIndex(index) == 0) { + return true; + } return false; - } - return checkShardAssignment( - clusterState.getRoutingNodes(), - indexRoutingTable.getIndex(), - harness.firstDataNodeId, - harness.secondDataNodeId, - harness.thirdDataNodeId, - randomNumberOfShards, - 0, - 0 - ); - }); - - createIndex( - indexName, - Settings.builder().put(SETTING_NUMBER_OF_SHARDS, randomNumberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 0).build() - ); - ensureGreen(indexName); + })); + } catch (AssertionError error) { + ClusterState state = internalCluster().client() + .admin() + .cluster() + .prepareState(TEST_REQUEST_TIMEOUT) + .clear() + .setMetadata(true) + .setNodes(true) + .setRoutingTable(true) + .get() + .getState(); + logger.info("---> Failed to reach expected allocation state. Dumping assignments: " + state.getRoutingNodes()); + throw error; + } + } - logger.info("---> Waiting for all [" + randomNumberOfShards + "] shards to be assigned to node " + harness.firstDataNodeName); - safeAwait(verifyAssignmentToFirstNodeListener); + /** + * Uses MockTransportService to set up write load stat responses from the data nodes and tests the allocation decisions made by the + * balancer, specifically the effect of the {@link WriteLoadConstraintDecider}. + * + * Leverages the {@link FilterAllocationDecider} to first start all shards on a Node1, and then eventually force the shards off of + * Node1 while Node3 is hot-spotting, resulting in reassignment of all shards to Node2. + */ + public void testHighNodeWriteLoadPreventsNewShardAllocation() { + TestHarness harness = setUpTestNodes(); /** - * Override the {@link TransportNodeUsageStatsForThreadPoolsAction} and {@link TransportIndicesStatsAction} actions on the data - * nodes to supply artificial write load stats. The stats will show the third node hot-spotting, and that all shards have non-empty - * write load stats (so that the WriteLoadDecider will evaluate assigning them to a node). + * Override the {@link TransportNodeUsageStatsForThreadPoolsAction} action on the data nodes to supply artificial thread pool write + * load stats. The stats will show the third node hot-spotting, while the second node has capacity to receive all the index shards. */ final NodeUsageStatsForThreadPools firstNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools( harness.firstDiscoveryNode, harness.randomNumberOfWritePoolThreads, - randomIntBetween(0, maxUtilizationPercentBelowThreshold) / 100f, + randomIntBetween(0, harness.maxUtilBelowThresholdThatAllowsAllShardsRelocation) / 100f, 0 ); final NodeUsageStatsForThreadPools secondNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools( harness.secondDiscoveryNode, harness.randomNumberOfWritePoolThreads, - randomIntBetween(0, maxUtilizationPercentBelowThreshold) / 100f, + randomIntBetween(0, harness.maxUtilBelowThresholdThatAllowsAllShardsRelocation) / 100f, 0 ); final NodeUsageStatsForThreadPools thirdNodeHotSpottingNodeStats = createNodeUsageStatsForThreadPools( @@ -196,11 +273,16 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() { ) ); + /** + * Override the {@link TransportIndicesStatsAction} action on the data nodes to supply artificial shard write load stats. The stats + * will show that all shards have non-empty write load stats (so that the WriteLoadDecider will evaluate assigning them to a node). + */ + IndexMetadata indexMetadata = internalCluster().getCurrentMasterNodeInstance(ClusterService.class) .state() .getMetadata() .getProject() - .index(indexName); + .index(harness.indexName); MockTransportService.getInstance(harness.firstDataNodeName) .addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { List shardStats = new ArrayList<>(indexMetadata.getNumberOfShards()); @@ -251,7 +333,7 @@ instance.new NodeResponse(harness.firstDataNodeId, indexMetadata.getNumberOfShar try { safeAwait(ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { - Index index = clusterState.routingTable().index(indexName).getIndex(); + Index index = clusterState.routingTable().index(harness.indexName).getIndex(); return checkShardAssignment( clusterState.getRoutingNodes(), index, @@ -259,7 +341,7 @@ instance.new NodeResponse(harness.firstDataNodeId, indexMetadata.getNumberOfShar harness.secondDataNodeId, harness.thirdDataNodeId, 0, - randomNumberOfShards, + harness.randomNumberOfShards, 0 ); })); @@ -437,26 +519,11 @@ private static ShardStats createShardStats(IndexMetadata indexMeta, int shardInd return new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, null, null, false, 0); } - /** - * Carries set-up state from {@link #setUpTestNodes()} to the testing logic. - */ - record TestHarness( - String firstDataNodeName, - String secondDataNodeName, - String thirdDataNodeName, - String firstDataNodeId, - String secondDataNodeId, - String thirdDataNodeId, - DiscoveryNode firstDiscoveryNode, - DiscoveryNode secondDiscoveryNode, - DiscoveryNode thirdDiscoveryNode, - int randomUtilizationThresholdPercent, - int randomNumberOfWritePoolThreads, - float randomShardWriteLoad - ) {}; - /** * Sets up common test infrastructure to deduplicate code across tests. + *

+ * Starts three data nodes and creates an index with many shards, then forces shard assignment to only the first data node via the + * {@link FilterAllocationDecider}. */ private TestHarness setUpTestNodes() { int randomUtilizationThresholdPercent = randomIntBetween(50, 100); @@ -502,6 +569,52 @@ private TestHarness setUpTestNodes() { + randomShardWriteLoad ); + /** + * Exclude assignment of shards to the second and third data nodes via the {@link FilterAllocationDecider} settings. + * Then create an index with many shards, which will all be assigned to the first data node. + */ + + logger.info("---> Limit shard assignment to node " + firstDataNodeName + " by excluding the other nodes"); + updateClusterSettings( + Settings.builder().put("cluster.routing.allocation.exclude._name", secondDataNodeName + "," + thirdDataNodeName) + ); + + String indexName = randomIdentifier(); + int randomNumberOfShards = randomIntBetween(10, 20); // Pick a high number of shards, so it is clear assignment is not accidental. + + // Calculate the maximum utilization a node can report while still being able to accept all relocating shards + int shardWriteLoadOverhead = shardLoadUtilizationOverhead( + randomShardWriteLoad * randomNumberOfShards, + randomNumberOfWritePoolThreads + ); + int maxUtilBelowThresholdThatAllowsAllShardsRelocation = randomUtilizationThresholdPercent - shardWriteLoadOverhead - 1; + + var verifyAssignmentToFirstNodeListener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { + var indexRoutingTable = clusterState.routingTable().index(indexName); + if (indexRoutingTable == null) { + return false; + } + return checkShardAssignment( + clusterState.getRoutingNodes(), + indexRoutingTable.getIndex(), + firstDataNodeId, + secondDataNodeId, + thirdDataNodeId, + randomNumberOfShards, + 0, + 0 + ); + }); + + createIndex( + indexName, + Settings.builder().put(SETTING_NUMBER_OF_SHARDS, randomNumberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 0).build() + ); + ensureGreen(indexName); + + logger.info("---> Waiting for all [" + randomNumberOfShards + "] shards to be assigned to node " + firstDataNodeName); + safeAwait(verifyAssignmentToFirstNodeListener); + return new TestHarness( firstDataNodeName, secondDataNodeName, @@ -514,7 +627,32 @@ private TestHarness setUpTestNodes() { thirdDiscoveryNode, randomUtilizationThresholdPercent, randomNumberOfWritePoolThreads, - randomShardWriteLoad + randomShardWriteLoad, + indexName, + randomNumberOfShards, + maxUtilBelowThresholdThatAllowsAllShardsRelocation ); } + + /** + * Carries set-up state from {@link #setUpTestNodes()} to the testing logic. + */ + record TestHarness( + String firstDataNodeName, + String secondDataNodeName, + String thirdDataNodeName, + String firstDataNodeId, + String secondDataNodeId, + String thirdDataNodeId, + DiscoveryNode firstDiscoveryNode, + DiscoveryNode secondDiscoveryNode, + DiscoveryNode thirdDiscoveryNode, + int randomUtilizationThresholdPercent, + int randomNumberOfWritePoolThreads, + float randomShardWriteLoad, + String indexName, + int randomNumberOfShards, + int maxUtilBelowThresholdThatAllowsAllShardsRelocation + ) {}; + } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java index dcf39db9e80e1..3755df502d700 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java @@ -650,6 +650,7 @@ private DiscoveryNode findRelocationTarget( Set desiredNodeIds, BiFunction canAllocateDecider ) { + DiscoveryNode chosenNode = null; for (final var nodeId : desiredNodeIds) { // TODO consider ignored nodes here too? if (nodeId.equals(shardRouting.currentNodeId())) { @@ -660,13 +661,25 @@ private DiscoveryNode findRelocationTarget( continue; } final var decision = canAllocateDecider.apply(shardRouting, node); - logger.trace("relocate {} to {}: {}", shardRouting, nodeId, decision); + logger.info("relocate {} to {}: {}", shardRouting, nodeId, decision); + + // Assign shards to the YES nodes first. This way we might delay moving shards to NOT_PREFERRED nodes until after shards are + // first moved away. The DesiredBalance could be moving shards away from a hot node as well as moving shards to it, and it's + // better to offload shards first. if (decision.type() == Decision.Type.YES) { - return node.node(); + chosenNode = node.node(); + // As soon as we get any YES, we return it. + break; + } else if (decision.type() == Decision.Type.NOT_PREFERRED && chosenNode == null) { + // If the best answer is not-preferred, then the shard will still be assigned. It is okay to assign to a not-preferred + // node because the desired balance computation had a reason to override it: when there aren't any better nodes to + // choose and the shard cannot remain where it is, we accept not-preferred. NOT_PREFERRED is essentially a YES for + // reconciliation. + chosenNode = node.node(); } } - return null; + return chosenNode; } private Decision decideCanAllocate(ShardRouting shardRouting, RoutingNode target) { From 89e2dd06f906c004bc9a2f2ba8a508c6d88126db Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Tue, 9 Sep 2025 12:41:28 -0700 Subject: [PATCH 09/18] fix test file after force rebase --- .../decider/WriteLoadConstraintDeciderIT.java | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java index 1806ebdfd25c2..0a5d9c82af635 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -24,7 +25,6 @@ import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceMetrics; -import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.CollectionUtils; @@ -54,13 +54,12 @@ import static java.util.stream.IntStream.range; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; -import static org.elasticsearch.cluster.routing.ShardMovementWriteLoadSimulator.calculateUtilizationForWriteLoad; -import static org.hamcrest.Matchers.everyItem; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.hasSize; import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING; import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING; import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) public class WriteLoadConstraintDeciderIT extends ESIntegTestCase { @@ -180,11 +179,7 @@ instance.new NodeResponse(harness.firstDataNodeId, indexMetadata.getNumberOfShar */ logger.info("---> Refreshing the cluster info to pull in the dummy thread pool stats with a hot-spotting node"); - final InternalClusterInfoService clusterInfoService = asInstanceOf( - InternalClusterInfoService.class, - internalCluster().getInstance(ClusterInfoService.class, internalCluster().getMasterName()) - ); - ClusterInfoServiceUtils.refresh(clusterInfoService); + refreshClusterInfo(); logger.info( "---> Update the filter to exclude " + harness.firstDataNodeName + " so that shards will be reassigned away to the other nodes" From 2621ac1588eccf9bfe5fe14f5c15b8b7fc5cab63 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Tue, 9 Sep 2025 12:50:47 -0700 Subject: [PATCH 10/18] tidy up test file --- .../decider/WriteLoadConstraintDeciderIT.java | 96 ++++++++++--------- 1 file changed, 50 insertions(+), 46 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java index 0a5d9c82af635..0b28b2de13e23 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java @@ -75,28 +75,30 @@ protected Collection> getMockPlugins() { } /** - * Tests that {@link AllocationDecider#canRemain} returning {@link Decision.Type#NO} for a {@code NodeX} will ignore a - * {@link AllocationDecider#canAllocate} response of {@link Decision.Type#NOT_PREFERRED} from {@code NodeY} and reassign the shard when - * there are no better node options. + * Uses MockTransportService to set up write load stat responses from the data nodes and tests the allocation decisions made by the + * balancer, specifically the effect of the {@link WriteLoadConstraintDecider}. + * + * Leverages the {@link FilterAllocationDecider} to first start all shards on a Node1, and then eventually force the shards off of + * Node1 while Node3 is hot-spotting, resulting in reassignment of all shards to Node2. */ - public void testShardsAreAssignedToNotPreferredWhenAlternativeIsNo() { - TestHarness harness = setUpTestNodes(); + public void testHighNodeWriteLoadPreventsNewShardAllocation() { + TestHarness harness = setUpThreeTestNodesAndAllIndexShardsOnFirstNode(); /** * Override the {@link TransportNodeUsageStatsForThreadPoolsAction} action on the data nodes to supply artificial thread pool write - * load stats. The stats will show both the second and third nodes are hot-spotting. + * load stats. The stats will show the third node hot-spotting, while the second node has capacity to receive all the index shards. */ final NodeUsageStatsForThreadPools firstNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools( harness.firstDiscoveryNode, harness.randomNumberOfWritePoolThreads, - randomIntBetween(0, harness.maxUtilBelowThresholdThatAllowsAllShardsRelocation) / 100f, + randomIntBetween(0, harness.maxUtilBelowThresholdThatAllowsAllShardsToRelocate) / 100f, 0 ); final NodeUsageStatsForThreadPools secondNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools( harness.secondDiscoveryNode, harness.randomNumberOfWritePoolThreads, - (harness.randomUtilizationThresholdPercent + 1) / 100f, + randomIntBetween(0, harness.maxUtilBelowThresholdThatAllowsAllShardsToRelocate) / 100f, 0 ); final NodeUsageStatsForThreadPools thirdNodeHotSpottingNodeStats = createNodeUsageStatsForThreadPools( @@ -109,11 +111,11 @@ public void testShardsAreAssignedToNotPreferredWhenAlternativeIsNo() { MockTransportService.getInstance(harness.firstDataNodeName).< NodeUsageStatsForThreadPoolsAction .NodeRequest>addRequestHandlingBehavior( - TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", - (handler, request, channel, task) -> channel.sendResponse( - new NodeUsageStatsForThreadPoolsAction.NodeResponse(harness.firstDiscoveryNode, firstNodeNonHotSpottingNodeStats) - ) - ); + TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", + (handler, request, channel, task) -> channel.sendResponse( + new NodeUsageStatsForThreadPoolsAction.NodeResponse(harness.firstDiscoveryNode, firstNodeNonHotSpottingNodeStats) + ) + ); MockTransportService.getInstance(harness.secondDataNodeName) .addRequestHandlingBehavior( TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", @@ -174,8 +176,8 @@ instance.new NodeResponse(harness.firstDataNodeId, indexMetadata.getNumberOfShar /** * Provoke a ClusterInfo stats refresh, update the cluster settings to make shard assignment to the first node undesired, and - * initiate rebalancing via a reroute request. Then wait to see a cluster state update that has all the shards assigned away from - * the first node _despite_ the second and third node reporting hot-spotting. + * initiate rebalancing via a reroute request. Then wait to see a cluster state update that has all the shards assigned to the + * second node, since the third is reporting as hot-spotted and should not accept any shards. */ logger.info("---> Refreshing the cluster info to pull in the dummy thread pool stats with a hot-spotting node"); @@ -190,10 +192,16 @@ instance.new NodeResponse(harness.firstDataNodeId, indexMetadata.getNumberOfShar try { safeAwait(ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { Index index = clusterState.routingTable().index(harness.indexName).getIndex(); - if (clusterState.getRoutingNodes().node(harness.firstDataNodeId).numberOfOwningShardsForIndex(index) == 0) { - return true; - } - return false; + return checkShardAssignment( + clusterState.getRoutingNodes(), + index, + harness.firstDataNodeId, + harness.secondDataNodeId, + harness.thirdDataNodeId, + 0, + harness.randomNumberOfShards, + 0 + ); })); } catch (AssertionError error) { ClusterState state = internalCluster().client() @@ -212,30 +220,32 @@ instance.new NodeResponse(harness.firstDataNodeId, indexMetadata.getNumberOfShar } /** - * Uses MockTransportService to set up write load stat responses from the data nodes and tests the allocation decisions made by the - * balancer, specifically the effect of the {@link WriteLoadConstraintDecider}. + * Tests that {@link AllocationDecider#canRemain} returning {@link Decision.Type#NO} for a {@code NodeX} will ignore a + * {@link AllocationDecider#canAllocate} response of {@link Decision.Type#NOT_PREFERRED} from a {@code NodeY} and reassign the shard + * when there are no better node options. * - * Leverages the {@link FilterAllocationDecider} to first start all shards on a Node1, and then eventually force the shards off of - * Node1 while Node3 is hot-spotting, resulting in reassignment of all shards to Node2. + * Uses MockTransportService to set up write load stat responses from the data nodes and tests the allocation decisions made by the + * balancer. Leverages the {@link FilterAllocationDecider} to first start all shards on a Node1, and then eventually force the shards + * off of Node1 while Node2 and Node3 are hot-spotting, resulting in overriding not-preferred and relocating shards to Node2 and Node3. */ - public void testHighNodeWriteLoadPreventsNewShardAllocation() { - TestHarness harness = setUpTestNodes(); + public void testShardsAreAssignedToNotPreferredWhenAlternativeIsNo() { + TestHarness harness = setUpThreeTestNodesAndAllIndexShardsOnFirstNode(); /** * Override the {@link TransportNodeUsageStatsForThreadPoolsAction} action on the data nodes to supply artificial thread pool write - * load stats. The stats will show the third node hot-spotting, while the second node has capacity to receive all the index shards. + * load stats. The stats will show both the second and third nodes are hot-spotting. */ final NodeUsageStatsForThreadPools firstNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools( harness.firstDiscoveryNode, harness.randomNumberOfWritePoolThreads, - randomIntBetween(0, harness.maxUtilBelowThresholdThatAllowsAllShardsRelocation) / 100f, + randomIntBetween(0, harness.maxUtilBelowThresholdThatAllowsAllShardsToRelocate) / 100f, 0 ); final NodeUsageStatsForThreadPools secondNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools( harness.secondDiscoveryNode, harness.randomNumberOfWritePoolThreads, - randomIntBetween(0, harness.maxUtilBelowThresholdThatAllowsAllShardsRelocation) / 100f, + (harness.randomUtilizationThresholdPercent + 1) / 100f, 0 ); final NodeUsageStatsForThreadPools thirdNodeHotSpottingNodeStats = createNodeUsageStatsForThreadPools( @@ -312,9 +322,9 @@ instance.new NodeResponse(harness.firstDataNodeId, indexMetadata.getNumberOfShar }); /** - * Provoke a ClusterInfo stats refresh, update the cluster settings to make shard assignment to the first node undesired, and - * initiate rebalancing via a reroute request. Then wait to see a cluster state update that has all the shards assigned to the - * second node, since the third is reporting as hot-spotted and should not accept any shards. + * Provoke a ClusterInfo stats refresh, update the cluster settings to make shard assignment to the first node undesired and + * initiate rebalancing. Then wait to see a cluster state update that has all the shards assigned away from the first node _despite_ + * the second and third node reporting hot-spotting: a canRemain::NO response should override a canAllocate::NOT_PREFERRED answer. */ logger.info("---> Refreshing the cluster info to pull in the dummy thread pool stats with a hot-spotting node"); @@ -329,16 +339,10 @@ instance.new NodeResponse(harness.firstDataNodeId, indexMetadata.getNumberOfShar try { safeAwait(ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { Index index = clusterState.routingTable().index(harness.indexName).getIndex(); - return checkShardAssignment( - clusterState.getRoutingNodes(), - index, - harness.firstDataNodeId, - harness.secondDataNodeId, - harness.thirdDataNodeId, - 0, - harness.randomNumberOfShards, - 0 - ); + if (clusterState.getRoutingNodes().node(harness.firstDataNodeId).numberOfOwningShardsForIndex(index) == 0) { + return true; + } + return false; })); } catch (AssertionError error) { ClusterState state = internalCluster().client() @@ -520,7 +524,7 @@ private static ShardStats createShardStats(IndexMetadata indexMeta, int shardInd * Starts three data nodes and creates an index with many shards, then forces shard assignment to only the first data node via the * {@link FilterAllocationDecider}. */ - private TestHarness setUpTestNodes() { + private TestHarness setUpThreeTestNodesAndAllIndexShardsOnFirstNode() { int randomUtilizationThresholdPercent = randomIntBetween(50, 100); int randomNumberOfWritePoolThreads = randomIntBetween(2, 20); float randomShardWriteLoad = randomFloatBetween(0.0f, 0.01f, false); @@ -582,7 +586,7 @@ private TestHarness setUpTestNodes() { randomShardWriteLoad * randomNumberOfShards, randomNumberOfWritePoolThreads ); - int maxUtilBelowThresholdThatAllowsAllShardsRelocation = randomUtilizationThresholdPercent - shardWriteLoadOverhead - 1; + int maxUtilBelowThresholdThatAllowsAllShardsToRelocate = randomUtilizationThresholdPercent - shardWriteLoadOverhead - 1; var verifyAssignmentToFirstNodeListener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { var indexRoutingTable = clusterState.routingTable().index(indexName); @@ -625,12 +629,12 @@ private TestHarness setUpTestNodes() { randomShardWriteLoad, indexName, randomNumberOfShards, - maxUtilBelowThresholdThatAllowsAllShardsRelocation + maxUtilBelowThresholdThatAllowsAllShardsToRelocate ); } /** - * Carries set-up state from {@link #setUpTestNodes()} to the testing logic. + * Carries set-up state from {@link #setUpThreeTestNodesAndAllIndexShardsOnFirstNode()} to the testing logic. */ record TestHarness( String firstDataNodeName, @@ -647,7 +651,7 @@ record TestHarness( float randomShardWriteLoad, String indexName, int randomNumberOfShards, - int maxUtilBelowThresholdThatAllowsAllShardsRelocation + int maxUtilBelowThresholdThatAllowsAllShardsToRelocate ) {}; } From d04e073dbbc994c42e64c8405846b9df425b67fe Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Tue, 9 Sep 2025 12:53:08 -0700 Subject: [PATCH 11/18] improve test change readability --- .../decider/WriteLoadConstraintDeciderIT.java | 92 +++++++++---------- 1 file changed, 46 insertions(+), 46 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java index 0b28b2de13e23..a78b66d21a009 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java @@ -75,18 +75,20 @@ protected Collection> getMockPlugins() { } /** - * Uses MockTransportService to set up write load stat responses from the data nodes and tests the allocation decisions made by the - * balancer, specifically the effect of the {@link WriteLoadConstraintDecider}. + * Tests that {@link AllocationDecider#canRemain} returning {@link Decision.Type#NO} for a {@code NodeX} will ignore a + * {@link AllocationDecider#canAllocate} response of {@link Decision.Type#NOT_PREFERRED} from a {@code NodeY} and reassign the shard + * when there are no better node options. * - * Leverages the {@link FilterAllocationDecider} to first start all shards on a Node1, and then eventually force the shards off of - * Node1 while Node3 is hot-spotting, resulting in reassignment of all shards to Node2. + * Uses MockTransportService to set up write load stat responses from the data nodes and tests the allocation decisions made by the + * balancer. Leverages the {@link FilterAllocationDecider} to first start all shards on a Node1, and then eventually force the shards + * off of Node1 while Node2 and Node3 are hot-spotting, resulting in overriding not-preferred and relocating shards to Node2 and Node3. */ - public void testHighNodeWriteLoadPreventsNewShardAllocation() { + public void testShardsAreAssignedToNotPreferredWhenAlternativeIsNo() { TestHarness harness = setUpThreeTestNodesAndAllIndexShardsOnFirstNode(); /** * Override the {@link TransportNodeUsageStatsForThreadPoolsAction} action on the data nodes to supply artificial thread pool write - * load stats. The stats will show the third node hot-spotting, while the second node has capacity to receive all the index shards. + * load stats. The stats will show both the second and third nodes are hot-spotting. */ final NodeUsageStatsForThreadPools firstNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools( @@ -98,7 +100,7 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() { final NodeUsageStatsForThreadPools secondNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools( harness.secondDiscoveryNode, harness.randomNumberOfWritePoolThreads, - randomIntBetween(0, harness.maxUtilBelowThresholdThatAllowsAllShardsToRelocate) / 100f, + (harness.randomUtilizationThresholdPercent + 1) / 100f, 0 ); final NodeUsageStatsForThreadPools thirdNodeHotSpottingNodeStats = createNodeUsageStatsForThreadPools( @@ -111,11 +113,11 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() { MockTransportService.getInstance(harness.firstDataNodeName).< NodeUsageStatsForThreadPoolsAction .NodeRequest>addRequestHandlingBehavior( - TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", - (handler, request, channel, task) -> channel.sendResponse( - new NodeUsageStatsForThreadPoolsAction.NodeResponse(harness.firstDiscoveryNode, firstNodeNonHotSpottingNodeStats) - ) - ); + TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", + (handler, request, channel, task) -> channel.sendResponse( + new NodeUsageStatsForThreadPoolsAction.NodeResponse(harness.firstDiscoveryNode, firstNodeNonHotSpottingNodeStats) + ) + ); MockTransportService.getInstance(harness.secondDataNodeName) .addRequestHandlingBehavior( TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", @@ -175,9 +177,9 @@ instance.new NodeResponse(harness.firstDataNodeId, indexMetadata.getNumberOfShar }); /** - * Provoke a ClusterInfo stats refresh, update the cluster settings to make shard assignment to the first node undesired, and - * initiate rebalancing via a reroute request. Then wait to see a cluster state update that has all the shards assigned to the - * second node, since the third is reporting as hot-spotted and should not accept any shards. + * Provoke a ClusterInfo stats refresh, update the cluster settings to make shard assignment to the first node undesired and + * initiate rebalancing. Then wait to see a cluster state update that has all the shards assigned away from the first node _despite_ + * the second and third node reporting hot-spotting: a canRemain::NO response should override a canAllocate::NOT_PREFERRED answer. */ logger.info("---> Refreshing the cluster info to pull in the dummy thread pool stats with a hot-spotting node"); @@ -192,16 +194,10 @@ instance.new NodeResponse(harness.firstDataNodeId, indexMetadata.getNumberOfShar try { safeAwait(ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { Index index = clusterState.routingTable().index(harness.indexName).getIndex(); - return checkShardAssignment( - clusterState.getRoutingNodes(), - index, - harness.firstDataNodeId, - harness.secondDataNodeId, - harness.thirdDataNodeId, - 0, - harness.randomNumberOfShards, - 0 - ); + if (clusterState.getRoutingNodes().node(harness.firstDataNodeId).numberOfOwningShardsForIndex(index) == 0) { + return true; + } + return false; })); } catch (AssertionError error) { ClusterState state = internalCluster().client() @@ -220,20 +216,18 @@ instance.new NodeResponse(harness.firstDataNodeId, indexMetadata.getNumberOfShar } /** - * Tests that {@link AllocationDecider#canRemain} returning {@link Decision.Type#NO} for a {@code NodeX} will ignore a - * {@link AllocationDecider#canAllocate} response of {@link Decision.Type#NOT_PREFERRED} from a {@code NodeY} and reassign the shard - * when there are no better node options. - * * Uses MockTransportService to set up write load stat responses from the data nodes and tests the allocation decisions made by the - * balancer. Leverages the {@link FilterAllocationDecider} to first start all shards on a Node1, and then eventually force the shards - * off of Node1 while Node2 and Node3 are hot-spotting, resulting in overriding not-preferred and relocating shards to Node2 and Node3. + * balancer, specifically the effect of the {@link WriteLoadConstraintDecider}. + * + * Leverages the {@link FilterAllocationDecider} to first start all shards on a Node1, and then eventually force the shards off of + * Node1 while Node3 is hot-spotting, resulting in reassignment of all shards to Node2. */ - public void testShardsAreAssignedToNotPreferredWhenAlternativeIsNo() { + public void testHighNodeWriteLoadPreventsNewShardAllocation() { TestHarness harness = setUpThreeTestNodesAndAllIndexShardsOnFirstNode(); /** * Override the {@link TransportNodeUsageStatsForThreadPoolsAction} action on the data nodes to supply artificial thread pool write - * load stats. The stats will show both the second and third nodes are hot-spotting. + * load stats. The stats will show the third node hot-spotting, while the second node has capacity to receive all the index shards. */ final NodeUsageStatsForThreadPools firstNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools( @@ -245,7 +239,7 @@ public void testShardsAreAssignedToNotPreferredWhenAlternativeIsNo() { final NodeUsageStatsForThreadPools secondNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools( harness.secondDiscoveryNode, harness.randomNumberOfWritePoolThreads, - (harness.randomUtilizationThresholdPercent + 1) / 100f, + randomIntBetween(0, harness.maxUtilBelowThresholdThatAllowsAllShardsToRelocate) / 100f, 0 ); final NodeUsageStatsForThreadPools thirdNodeHotSpottingNodeStats = createNodeUsageStatsForThreadPools( @@ -258,11 +252,11 @@ public void testShardsAreAssignedToNotPreferredWhenAlternativeIsNo() { MockTransportService.getInstance(harness.firstDataNodeName).< NodeUsageStatsForThreadPoolsAction .NodeRequest>addRequestHandlingBehavior( - TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", - (handler, request, channel, task) -> channel.sendResponse( - new NodeUsageStatsForThreadPoolsAction.NodeResponse(harness.firstDiscoveryNode, firstNodeNonHotSpottingNodeStats) - ) - ); + TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", + (handler, request, channel, task) -> channel.sendResponse( + new NodeUsageStatsForThreadPoolsAction.NodeResponse(harness.firstDiscoveryNode, firstNodeNonHotSpottingNodeStats) + ) + ); MockTransportService.getInstance(harness.secondDataNodeName) .addRequestHandlingBehavior( TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", @@ -322,9 +316,9 @@ instance.new NodeResponse(harness.firstDataNodeId, indexMetadata.getNumberOfShar }); /** - * Provoke a ClusterInfo stats refresh, update the cluster settings to make shard assignment to the first node undesired and - * initiate rebalancing. Then wait to see a cluster state update that has all the shards assigned away from the first node _despite_ - * the second and third node reporting hot-spotting: a canRemain::NO response should override a canAllocate::NOT_PREFERRED answer. + * Provoke a ClusterInfo stats refresh, update the cluster settings to make shard assignment to the first node undesired, and + * initiate rebalancing via a reroute request. Then wait to see a cluster state update that has all the shards assigned to the + * second node, since the third is reporting as hot-spotted and should not accept any shards. */ logger.info("---> Refreshing the cluster info to pull in the dummy thread pool stats with a hot-spotting node"); @@ -339,10 +333,16 @@ instance.new NodeResponse(harness.firstDataNodeId, indexMetadata.getNumberOfShar try { safeAwait(ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { Index index = clusterState.routingTable().index(harness.indexName).getIndex(); - if (clusterState.getRoutingNodes().node(harness.firstDataNodeId).numberOfOwningShardsForIndex(index) == 0) { - return true; - } - return false; + return checkShardAssignment( + clusterState.getRoutingNodes(), + index, + harness.firstDataNodeId, + harness.secondDataNodeId, + harness.thirdDataNodeId, + 0, + harness.randomNumberOfShards, + 0 + ); })); } catch (AssertionError error) { ClusterState state = internalCluster().client() From 515175db3de27de7a9dc95d0c3923b4b46a637b1 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Tue, 9 Sep 2025 12:54:48 -0700 Subject: [PATCH 12/18] improve test change readability again --- .../decider/WriteLoadConstraintDeciderIT.java | 94 +++++++++---------- 1 file changed, 47 insertions(+), 47 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java index a78b66d21a009..40b748249ca8c 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java @@ -75,20 +75,18 @@ protected Collection> getMockPlugins() { } /** - * Tests that {@link AllocationDecider#canRemain} returning {@link Decision.Type#NO} for a {@code NodeX} will ignore a - * {@link AllocationDecider#canAllocate} response of {@link Decision.Type#NOT_PREFERRED} from a {@code NodeY} and reassign the shard - * when there are no better node options. - * * Uses MockTransportService to set up write load stat responses from the data nodes and tests the allocation decisions made by the - * balancer. Leverages the {@link FilterAllocationDecider} to first start all shards on a Node1, and then eventually force the shards - * off of Node1 while Node2 and Node3 are hot-spotting, resulting in overriding not-preferred and relocating shards to Node2 and Node3. + * balancer, specifically the effect of the {@link WriteLoadConstraintDecider}. + * + * Leverages the {@link FilterAllocationDecider} to first start all shards on a Node1, and then eventually force the shards off of + * Node1 while Node3 is hot-spotting, resulting in reassignment of all shards to Node2. */ - public void testShardsAreAssignedToNotPreferredWhenAlternativeIsNo() { + public void testHighNodeWriteLoadPreventsNewShardAllocation() { TestHarness harness = setUpThreeTestNodesAndAllIndexShardsOnFirstNode(); /** * Override the {@link TransportNodeUsageStatsForThreadPoolsAction} action on the data nodes to supply artificial thread pool write - * load stats. The stats will show both the second and third nodes are hot-spotting. + * load stats. The stats will show the third node hot-spotting, while the second node has capacity to receive all the index shards. */ final NodeUsageStatsForThreadPools firstNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools( @@ -100,7 +98,7 @@ public void testShardsAreAssignedToNotPreferredWhenAlternativeIsNo() { final NodeUsageStatsForThreadPools secondNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools( harness.secondDiscoveryNode, harness.randomNumberOfWritePoolThreads, - (harness.randomUtilizationThresholdPercent + 1) / 100f, + randomIntBetween(0, harness.maxUtilBelowThresholdThatAllowsAllShardsToRelocate) / 100f, 0 ); final NodeUsageStatsForThreadPools thirdNodeHotSpottingNodeStats = createNodeUsageStatsForThreadPools( @@ -113,11 +111,11 @@ public void testShardsAreAssignedToNotPreferredWhenAlternativeIsNo() { MockTransportService.getInstance(harness.firstDataNodeName).< NodeUsageStatsForThreadPoolsAction .NodeRequest>addRequestHandlingBehavior( - TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", - (handler, request, channel, task) -> channel.sendResponse( - new NodeUsageStatsForThreadPoolsAction.NodeResponse(harness.firstDiscoveryNode, firstNodeNonHotSpottingNodeStats) - ) - ); + TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", + (handler, request, channel, task) -> channel.sendResponse( + new NodeUsageStatsForThreadPoolsAction.NodeResponse(harness.firstDiscoveryNode, firstNodeNonHotSpottingNodeStats) + ) + ); MockTransportService.getInstance(harness.secondDataNodeName) .addRequestHandlingBehavior( TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", @@ -177,9 +175,9 @@ instance.new NodeResponse(harness.firstDataNodeId, indexMetadata.getNumberOfShar }); /** - * Provoke a ClusterInfo stats refresh, update the cluster settings to make shard assignment to the first node undesired and - * initiate rebalancing. Then wait to see a cluster state update that has all the shards assigned away from the first node _despite_ - * the second and third node reporting hot-spotting: a canRemain::NO response should override a canAllocate::NOT_PREFERRED answer. + * Provoke a ClusterInfo stats refresh, update the cluster settings to make shard assignment to the first node undesired, and + * initiate rebalancing via a reroute request. Then wait to see a cluster state update that has all the shards assigned to the + * second node, since the third is reporting as hot-spotted and should not accept any shards. */ logger.info("---> Refreshing the cluster info to pull in the dummy thread pool stats with a hot-spotting node"); @@ -194,10 +192,16 @@ instance.new NodeResponse(harness.firstDataNodeId, indexMetadata.getNumberOfShar try { safeAwait(ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { Index index = clusterState.routingTable().index(harness.indexName).getIndex(); - if (clusterState.getRoutingNodes().node(harness.firstDataNodeId).numberOfOwningShardsForIndex(index) == 0) { - return true; - } - return false; + return checkShardAssignment( + clusterState.getRoutingNodes(), + index, + harness.firstDataNodeId, + harness.secondDataNodeId, + harness.thirdDataNodeId, + 0, + harness.randomNumberOfShards, + 0 + ); })); } catch (AssertionError error) { ClusterState state = internalCluster().client() @@ -214,20 +218,22 @@ instance.new NodeResponse(harness.firstDataNodeId, indexMetadata.getNumberOfShar throw error; } } - + /** - * Uses MockTransportService to set up write load stat responses from the data nodes and tests the allocation decisions made by the - * balancer, specifically the effect of the {@link WriteLoadConstraintDecider}. + * Tests that {@link AllocationDecider#canRemain} returning {@link Decision.Type#NO} for a {@code NodeX} will ignore a + * {@link AllocationDecider#canAllocate} response of {@link Decision.Type#NOT_PREFERRED} from a {@code NodeY} and reassign the shard + * when there are no better node options. * - * Leverages the {@link FilterAllocationDecider} to first start all shards on a Node1, and then eventually force the shards off of - * Node1 while Node3 is hot-spotting, resulting in reassignment of all shards to Node2. + * Uses MockTransportService to set up write load stat responses from the data nodes and tests the allocation decisions made by the + * balancer. Leverages the {@link FilterAllocationDecider} to first start all shards on a Node1, and then eventually force the shards + * off of Node1 while Node2 and Node3 are hot-spotting, resulting in overriding not-preferred and relocating shards to Node2 and Node3. */ - public void testHighNodeWriteLoadPreventsNewShardAllocation() { + public void testShardsAreAssignedToNotPreferredWhenAlternativeIsNo() { TestHarness harness = setUpThreeTestNodesAndAllIndexShardsOnFirstNode(); /** * Override the {@link TransportNodeUsageStatsForThreadPoolsAction} action on the data nodes to supply artificial thread pool write - * load stats. The stats will show the third node hot-spotting, while the second node has capacity to receive all the index shards. + * load stats. The stats will show both the second and third nodes are hot-spotting. */ final NodeUsageStatsForThreadPools firstNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools( @@ -239,7 +245,7 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() { final NodeUsageStatsForThreadPools secondNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools( harness.secondDiscoveryNode, harness.randomNumberOfWritePoolThreads, - randomIntBetween(0, harness.maxUtilBelowThresholdThatAllowsAllShardsToRelocate) / 100f, + (harness.randomUtilizationThresholdPercent + 1) / 100f, 0 ); final NodeUsageStatsForThreadPools thirdNodeHotSpottingNodeStats = createNodeUsageStatsForThreadPools( @@ -252,11 +258,11 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() { MockTransportService.getInstance(harness.firstDataNodeName).< NodeUsageStatsForThreadPoolsAction .NodeRequest>addRequestHandlingBehavior( - TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", - (handler, request, channel, task) -> channel.sendResponse( - new NodeUsageStatsForThreadPoolsAction.NodeResponse(harness.firstDiscoveryNode, firstNodeNonHotSpottingNodeStats) - ) - ); + TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", + (handler, request, channel, task) -> channel.sendResponse( + new NodeUsageStatsForThreadPoolsAction.NodeResponse(harness.firstDiscoveryNode, firstNodeNonHotSpottingNodeStats) + ) + ); MockTransportService.getInstance(harness.secondDataNodeName) .addRequestHandlingBehavior( TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", @@ -316,9 +322,9 @@ instance.new NodeResponse(harness.firstDataNodeId, indexMetadata.getNumberOfShar }); /** - * Provoke a ClusterInfo stats refresh, update the cluster settings to make shard assignment to the first node undesired, and - * initiate rebalancing via a reroute request. Then wait to see a cluster state update that has all the shards assigned to the - * second node, since the third is reporting as hot-spotted and should not accept any shards. + * Provoke a ClusterInfo stats refresh, update the cluster settings to make shard assignment to the first node undesired and + * initiate rebalancing. Then wait to see a cluster state update that has all the shards assigned away from the first node _despite_ + * the second and third node reporting hot-spotting: a canRemain::NO response should override a canAllocate::NOT_PREFERRED answer. */ logger.info("---> Refreshing the cluster info to pull in the dummy thread pool stats with a hot-spotting node"); @@ -333,16 +339,10 @@ instance.new NodeResponse(harness.firstDataNodeId, indexMetadata.getNumberOfShar try { safeAwait(ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { Index index = clusterState.routingTable().index(harness.indexName).getIndex(); - return checkShardAssignment( - clusterState.getRoutingNodes(), - index, - harness.firstDataNodeId, - harness.secondDataNodeId, - harness.thirdDataNodeId, - 0, - harness.randomNumberOfShards, - 0 - ); + if (clusterState.getRoutingNodes().node(harness.firstDataNodeId).numberOfOwningShardsForIndex(index) == 0) { + return true; + } + return false; })); } catch (AssertionError error) { ClusterState state = internalCluster().client() From 0aba59a83365eead809188a781dad38086804226 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Tue, 9 Sep 2025 12:57:32 -0700 Subject: [PATCH 13/18] return log level after debugging --- .../allocation/decider/WriteLoadConstraintDeciderIT.java | 2 +- .../routing/allocation/allocator/DesiredBalanceReconciler.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java index 40b748249ca8c..0b28b2de13e23 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java @@ -218,7 +218,7 @@ instance.new NodeResponse(harness.firstDataNodeId, indexMetadata.getNumberOfShar throw error; } } - + /** * Tests that {@link AllocationDecider#canRemain} returning {@link Decision.Type#NO} for a {@code NodeX} will ignore a * {@link AllocationDecider#canAllocate} response of {@link Decision.Type#NOT_PREFERRED} from a {@code NodeY} and reassign the shard diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java index 3755df502d700..d6500981ab639 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java @@ -661,7 +661,7 @@ private DiscoveryNode findRelocationTarget( continue; } final var decision = canAllocateDecider.apply(shardRouting, node); - logger.info("relocate {} to {}: {}", shardRouting, nodeId, decision); + logger.trace("relocate {} to {}: {}", shardRouting, nodeId, decision); // Assign shards to the YES nodes first. This way we might delay moving shards to NOT_PREFERRED nodes until after shards are // first moved away. The DesiredBalance could be moving shards away from a hot node as well as moving shards to it, and it's From b609df874b5f24fe471fa844f3ad57fea80ff5ce Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Tue, 9 Sep 2025 20:05:10 +0000 Subject: [PATCH 14/18] [CI] Auto commit changes from spotless --- .../decider/WriteLoadConstraintDeciderIT.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java index 0b28b2de13e23..2325168daed01 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java @@ -111,11 +111,11 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() { MockTransportService.getInstance(harness.firstDataNodeName).< NodeUsageStatsForThreadPoolsAction .NodeRequest>addRequestHandlingBehavior( - TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", - (handler, request, channel, task) -> channel.sendResponse( - new NodeUsageStatsForThreadPoolsAction.NodeResponse(harness.firstDiscoveryNode, firstNodeNonHotSpottingNodeStats) - ) - ); + TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", + (handler, request, channel, task) -> channel.sendResponse( + new NodeUsageStatsForThreadPoolsAction.NodeResponse(harness.firstDiscoveryNode, firstNodeNonHotSpottingNodeStats) + ) + ); MockTransportService.getInstance(harness.secondDataNodeName) .addRequestHandlingBehavior( TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", From 8dd940ccb0e37e567e621d90a684b63af6d724bc Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Tue, 9 Sep 2025 15:03:44 -0700 Subject: [PATCH 15/18] add an assert for protection --- .../routing/allocation/allocator/BalancedShardsAllocator.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 310c1a5963f94..4a69926db37ac 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -832,6 +832,7 @@ private MoveDecision decideMove( } } else if (bestDecision == Type.NOT_PREFERRED) { assert remainDecision.type() != Type.NOT_PREFERRED; + assert bestDecision != Type.YES; // If we don't ever find a YES decision, we'll settle for NOT_PREFERRED as preferable to NO. targetNode = target; } From fd217dbb9c02ae8a9e20046e22c118b171a36b10 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Tue, 9 Sep 2025 15:09:36 -0700 Subject: [PATCH 16/18] remove todo, consider canRemain not-preferred as YES during reconciliation, not reconciliation's job --- .../routing/allocation/allocator/DesiredBalanceReconciler.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java index d6500981ab639..38be34c0ff610 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java @@ -503,9 +503,10 @@ private void moveShards() { final var routingNode = routingNodes.node(shardRouting.currentNodeId()); final var canRemainDecision = allocation.deciders().canRemain(shardRouting, routingNode, allocation); - // TODO (ES-12633): exercise canRemain to say NOT_PREFERRED, without other decider influence, and see that a shard is moved. if (canRemainDecision.type() != Decision.Type.NO && canRemainDecision.type() != Decision.Type.NOT_PREFERRED) { // If movement is throttled, a future reconciliation round will see a resolution. For now, leave it alone. + // Reconciliation treats canRemain NOT_PREFERRED answers as YES because the DesiredBalance computation already decided + // how to handle the situation. continue; } From 18fa1f2caf3be61a87c9aceeb30abea0181380c0 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Wed, 10 Sep 2025 09:09:36 -0700 Subject: [PATCH 17/18] improve string use, remove duplicate code --- .../decider/WriteLoadConstraintDecider.java | 22 ++++++------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java index 13204007d3830..8c427227e4717 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java @@ -96,27 +96,19 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing return Decision.single(Decision.Type.NOT_PREFERRED, NAME, explain); } - if (logger.isTraceEnabled()) { - logger.trace( - Strings.format( - "Shard [%s] in index [%s] can be assigned to node [%s]. The node's utilization would become [%s]", - shardRouting.shardId(), - shardRouting.index(), - node.nodeId(), - newWriteThreadPoolUtilization - ) - ); - } - - return allocation.decision( - Decision.YES, - NAME, + String explanation = Strings.format( "Shard [%s] in index [%s] can be assigned to node [%s]. The node's utilization would become [%s]", shardRouting.shardId(), shardRouting.index(), node.nodeId(), newWriteThreadPoolUtilization ); + + if (logger.isTraceEnabled()) { + logger.trace(explanation); + } + + return allocation.decision(Decision.YES, NAME, explanation); } @Override From d048da09ba4baba0ebeaff0f00b9917c0169c66c Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Fri, 12 Sep 2025 10:49:18 -0700 Subject: [PATCH 18/18] remove redundant assert --- .../routing/allocation/allocator/BalancedShardsAllocator.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 4a69926db37ac..310c1a5963f94 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -832,7 +832,6 @@ private MoveDecision decideMove( } } else if (bestDecision == Type.NOT_PREFERRED) { assert remainDecision.type() != Type.NOT_PREFERRED; - assert bestDecision != Type.YES; // If we don't ever find a YES decision, we'll settle for NOT_PREFERRED as preferable to NO. targetNode = target; }