diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java index ec584b1d0973d..2325168daed01 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -53,7 +54,9 @@ import static java.util.stream.IntStream.range; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; -import static org.elasticsearch.cluster.routing.ShardMovementWriteLoadSimulator.calculateUtilizationForWriteLoad; +import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING; +import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING; +import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING; import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; @@ -79,195 +82,282 @@ protected Collection> getMockPlugins() { * Node1 while Node3 is hot-spotting, resulting in reassignment of all shards to Node2. */ public void testHighNodeWriteLoadPreventsNewShardAllocation() { - int randomUtilizationThresholdPercent = randomIntBetween(50, 100); - int numberOfWritePoolThreads = randomIntBetween(2, 20); - float shardWriteLoad = randomFloatBetween(0.0f, 0.01f, false); - Settings settings = Settings.builder() - .put( - WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), - WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED - ) - .put( - WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_HIGH_UTILIZATION_THRESHOLD_SETTING.getKey(), - randomUtilizationThresholdPercent + "%" - ) - .build(); + TestHarness harness = setUpThreeTestNodesAndAllIndexShardsOnFirstNode(); - final String masterName = internalCluster().startMasterOnlyNode(settings); - final var dataNodes = internalCluster().startDataOnlyNodes(3, settings); - final String firstDataNodeName = dataNodes.get(0); - final String secondDataNodeName = dataNodes.get(1); - final String thirdDataNodeName = dataNodes.get(2); - final String firstDataNodeId = getNodeId(firstDataNodeName); - final String secondDataNodeId = getNodeId(secondDataNodeName); - final String thirdDataNodeId = getNodeId(thirdDataNodeName); - ensureStableCluster(4); + /** + * Override the {@link TransportNodeUsageStatsForThreadPoolsAction} action on the data nodes to supply artificial thread pool write + * load stats. The stats will show the third node hot-spotting, while the second node has capacity to receive all the index shards. + */ - logger.info( - "---> first node name " - + firstDataNodeName - + " and ID " - + firstDataNodeId - + "; second node name " - + secondDataNodeName - + " and ID " - + secondDataNodeId - + "; third node name " - + thirdDataNodeName - + " and ID " - + thirdDataNodeId + final NodeUsageStatsForThreadPools firstNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools( + harness.firstDiscoveryNode, + harness.randomNumberOfWritePoolThreads, + randomIntBetween(0, harness.maxUtilBelowThresholdThatAllowsAllShardsToRelocate) / 100f, + 0 + ); + final NodeUsageStatsForThreadPools secondNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools( + harness.secondDiscoveryNode, + harness.randomNumberOfWritePoolThreads, + randomIntBetween(0, harness.maxUtilBelowThresholdThatAllowsAllShardsToRelocate) / 100f, + 0 ); + final NodeUsageStatsForThreadPools thirdNodeHotSpottingNodeStats = createNodeUsageStatsForThreadPools( + harness.thirdDiscoveryNode, + harness.randomNumberOfWritePoolThreads, + (harness.randomUtilizationThresholdPercent + 1) / 100f, + 0 + ); + + MockTransportService.getInstance(harness.firstDataNodeName).< + NodeUsageStatsForThreadPoolsAction + .NodeRequest>addRequestHandlingBehavior( + TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", + (handler, request, channel, task) -> channel.sendResponse( + new NodeUsageStatsForThreadPoolsAction.NodeResponse(harness.firstDiscoveryNode, firstNodeNonHotSpottingNodeStats) + ) + ); + MockTransportService.getInstance(harness.secondDataNodeName) + .addRequestHandlingBehavior( + TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", + (handler, request, channel, task) -> channel.sendResponse( + new NodeUsageStatsForThreadPoolsAction.NodeResponse(harness.secondDiscoveryNode, secondNodeNonHotSpottingNodeStats) + ) + ); + MockTransportService.getInstance(harness.thirdDataNodeName) + .addRequestHandlingBehavior( + TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", + (handler, request, channel, task) -> channel.sendResponse( + new NodeUsageStatsForThreadPoolsAction.NodeResponse(harness.thirdDiscoveryNode, thirdNodeHotSpottingNodeStats) + ) + ); /** - * Exclude assignment of shards to the second and third data nodes via the {@link FilterAllocationDecider} settings. - * Then create an index with many shards, which will all be assigned to the first data node. + * Override the {@link TransportIndicesStatsAction} action on the data nodes to supply artificial shard write load stats. The stats + * will show that all shards have non-empty write load stats (so that the WriteLoadDecider will evaluate assigning them to a node). */ - logger.info("---> Limit shard assignment to node " + firstDataNodeName + " by excluding the other nodes"); - updateClusterSettings( - Settings.builder().put("cluster.routing.allocation.exclude._name", secondDataNodeName + "," + thirdDataNodeName) - ); - - String indexName = randomIdentifier(); - int randomNumberOfShards = randomIntBetween(10, 20); // Pick a high number of shards, so it is clear assignment is not accidental. + IndexMetadata indexMetadata = internalCluster().getCurrentMasterNodeInstance(ClusterService.class) + .state() + .getMetadata() + .getProject() + .index(harness.indexName); + MockTransportService.getInstance(harness.firstDataNodeName) + .addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { + List shardStats = new ArrayList<>(indexMetadata.getNumberOfShards()); + for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) { + shardStats.add(createShardStats(indexMetadata, i, harness.randomShardWriteLoad, harness.firstDataNodeId)); + } + TransportIndicesStatsAction instance = internalCluster().getInstance( + TransportIndicesStatsAction.class, + harness.firstDataNodeName + ); + channel.sendResponse( + instance.new NodeResponse(harness.firstDataNodeId, indexMetadata.getNumberOfShards(), shardStats, List.of()) + ); + }); + MockTransportService.getInstance(harness.secondDataNodeName) + .addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { + // Return no stats for the index because none are assigned to this node. + TransportIndicesStatsAction instance = internalCluster().getInstance( + TransportIndicesStatsAction.class, + harness.secondDataNodeName + ); + channel.sendResponse(instance.new NodeResponse(harness.secondDataNodeId, 0, List.of(), List.of())); + }); + MockTransportService.getInstance(harness.thirdDataNodeName) + .addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { + // Return no stats for the index because none are assigned to this node. + TransportIndicesStatsAction instance = internalCluster().getInstance( + TransportIndicesStatsAction.class, + harness.thirdDataNodeName + ); + channel.sendResponse(instance.new NodeResponse(harness.thirdDataNodeId, 0, List.of(), List.of())); + }); - // Calculate the maximum utilization a node can report while still being able to accept all relocating shards - double additionalLoadFromAllShards = calculateUtilizationForWriteLoad( - shardWriteLoad * randomNumberOfShards, - numberOfWritePoolThreads - ); - int maxUtilizationPercent = randomUtilizationThresholdPercent - (int) (additionalLoadFromAllShards * 100) - 1; + /** + * Provoke a ClusterInfo stats refresh, update the cluster settings to make shard assignment to the first node undesired, and + * initiate rebalancing via a reroute request. Then wait to see a cluster state update that has all the shards assigned to the + * second node, since the third is reporting as hot-spotted and should not accept any shards. + */ - var verifyAssignmentToFirstNodeListener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { - var indexRoutingTable = clusterState.routingTable().index(indexName); - if (indexRoutingTable == null) { - return false; - } - return checkShardAssignment( - clusterState.getRoutingNodes(), - indexRoutingTable.getIndex(), - firstDataNodeId, - secondDataNodeId, - thirdDataNodeId, - randomNumberOfShards, - 0, - 0 - ); - }); + logger.info("---> Refreshing the cluster info to pull in the dummy thread pool stats with a hot-spotting node"); + refreshClusterInfo(); - createIndex( - indexName, - Settings.builder().put(SETTING_NUMBER_OF_SHARDS, randomNumberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 0).build() + logger.info( + "---> Update the filter to exclude " + harness.firstDataNodeName + " so that shards will be reassigned away to the other nodes" ); - ensureGreen(indexName); + // Updating the cluster settings will trigger a reroute request, no need to explicitly request one in the test. + updateClusterSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", harness.firstDataNodeName)); + + try { + safeAwait(ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { + Index index = clusterState.routingTable().index(harness.indexName).getIndex(); + return checkShardAssignment( + clusterState.getRoutingNodes(), + index, + harness.firstDataNodeId, + harness.secondDataNodeId, + harness.thirdDataNodeId, + 0, + harness.randomNumberOfShards, + 0 + ); + })); + } catch (AssertionError error) { + ClusterState state = internalCluster().client() + .admin() + .cluster() + .prepareState(TEST_REQUEST_TIMEOUT) + .clear() + .setMetadata(true) + .setNodes(true) + .setRoutingTable(true) + .get() + .getState(); + logger.info("---> Failed to reach expected allocation state. Dumping assignments: " + state.getRoutingNodes()); + throw error; + } + } - logger.info("---> Waiting for all shards to be assigned to node " + firstDataNodeName); - safeAwait(verifyAssignmentToFirstNodeListener); + /** + * Tests that {@link AllocationDecider#canRemain} returning {@link Decision.Type#NO} for a {@code NodeX} will ignore a + * {@link AllocationDecider#canAllocate} response of {@link Decision.Type#NOT_PREFERRED} from a {@code NodeY} and reassign the shard + * when there are no better node options. + * + * Uses MockTransportService to set up write load stat responses from the data nodes and tests the allocation decisions made by the + * balancer. Leverages the {@link FilterAllocationDecider} to first start all shards on a Node1, and then eventually force the shards + * off of Node1 while Node2 and Node3 are hot-spotting, resulting in overriding not-preferred and relocating shards to Node2 and Node3. + */ + public void testShardsAreAssignedToNotPreferredWhenAlternativeIsNo() { + TestHarness harness = setUpThreeTestNodesAndAllIndexShardsOnFirstNode(); /** - * Override the {@link TransportNodeUsageStatsForThreadPoolsAction} and {@link TransportIndicesStatsAction} actions on the data - * nodes to supply artificial write load stats. The stats will show the third node hot-spotting, and that all shards have non-empty - * write load stats (so that the WriteLoadDecider will evaluate assigning them to a node). + * Override the {@link TransportNodeUsageStatsForThreadPoolsAction} action on the data nodes to supply artificial thread pool write + * load stats. The stats will show both the second and third nodes are hot-spotting. */ - final DiscoveryNode firstDiscoveryNode = getDiscoveryNode(firstDataNodeName); - final DiscoveryNode secondDiscoveryNode = getDiscoveryNode(secondDataNodeName); - final DiscoveryNode thirdDiscoveryNode = getDiscoveryNode(thirdDataNodeName); final NodeUsageStatsForThreadPools firstNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools( - firstDiscoveryNode, - numberOfWritePoolThreads, - randomIntBetween(0, maxUtilizationPercent) / 100f, + harness.firstDiscoveryNode, + harness.randomNumberOfWritePoolThreads, + randomIntBetween(0, harness.maxUtilBelowThresholdThatAllowsAllShardsToRelocate) / 100f, 0 ); final NodeUsageStatsForThreadPools secondNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools( - secondDiscoveryNode, - numberOfWritePoolThreads, - randomIntBetween(0, maxUtilizationPercent) / 100f, + harness.secondDiscoveryNode, + harness.randomNumberOfWritePoolThreads, + (harness.randomUtilizationThresholdPercent + 1) / 100f, 0 ); final NodeUsageStatsForThreadPools thirdNodeHotSpottingNodeStats = createNodeUsageStatsForThreadPools( - thirdDiscoveryNode, - numberOfWritePoolThreads, - (randomUtilizationThresholdPercent + 1) / 100f, + harness.thirdDiscoveryNode, + harness.randomNumberOfWritePoolThreads, + (harness.randomUtilizationThresholdPercent + 1) / 100f, 0 ); - MockTransportService.getInstance(firstDataNodeName).addRequestHandlingBehavior( - TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", - (handler, request, channel, task) -> channel.sendResponse( - new NodeUsageStatsForThreadPoolsAction.NodeResponse(firstDiscoveryNode, firstNodeNonHotSpottingNodeStats) - ) - ); - MockTransportService.getInstance(secondDataNodeName) + MockTransportService.getInstance(harness.firstDataNodeName).< + NodeUsageStatsForThreadPoolsAction + .NodeRequest>addRequestHandlingBehavior( + TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", + (handler, request, channel, task) -> channel.sendResponse( + new NodeUsageStatsForThreadPoolsAction.NodeResponse(harness.firstDiscoveryNode, firstNodeNonHotSpottingNodeStats) + ) + ); + MockTransportService.getInstance(harness.secondDataNodeName) .addRequestHandlingBehavior( TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", (handler, request, channel, task) -> channel.sendResponse( - new NodeUsageStatsForThreadPoolsAction.NodeResponse(secondDiscoveryNode, secondNodeNonHotSpottingNodeStats) + new NodeUsageStatsForThreadPoolsAction.NodeResponse(harness.secondDiscoveryNode, secondNodeNonHotSpottingNodeStats) ) ); - MockTransportService.getInstance(thirdDataNodeName) + MockTransportService.getInstance(harness.thirdDataNodeName) .addRequestHandlingBehavior( TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", (handler, request, channel, task) -> channel.sendResponse( - new NodeUsageStatsForThreadPoolsAction.NodeResponse(thirdDiscoveryNode, thirdNodeHotSpottingNodeStats) + new NodeUsageStatsForThreadPoolsAction.NodeResponse(harness.thirdDiscoveryNode, thirdNodeHotSpottingNodeStats) ) ); + /** + * Override the {@link TransportIndicesStatsAction} action on the data nodes to supply artificial shard write load stats. The stats + * will show that all shards have non-empty write load stats (so that the WriteLoadDecider will evaluate assigning them to a node). + */ + IndexMetadata indexMetadata = internalCluster().getCurrentMasterNodeInstance(ClusterService.class) .state() .getMetadata() .getProject() - .index(indexName); - MockTransportService.getInstance(firstDataNodeName) + .index(harness.indexName); + MockTransportService.getInstance(harness.firstDataNodeName) .addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { List shardStats = new ArrayList<>(indexMetadata.getNumberOfShards()); for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) { - shardStats.add(createShardStats(indexMetadata, i, shardWriteLoad, firstDataNodeId)); + shardStats.add(createShardStats(indexMetadata, i, harness.randomShardWriteLoad, harness.firstDataNodeId)); } - TransportIndicesStatsAction instance = internalCluster().getInstance(TransportIndicesStatsAction.class, firstDataNodeName); - channel.sendResponse(instance.new NodeResponse(firstDataNodeId, indexMetadata.getNumberOfShards(), shardStats, List.of())); + TransportIndicesStatsAction instance = internalCluster().getInstance( + TransportIndicesStatsAction.class, + harness.firstDataNodeName + ); + channel.sendResponse( + instance.new NodeResponse(harness.firstDataNodeId, indexMetadata.getNumberOfShards(), shardStats, List.of()) + ); }); - MockTransportService.getInstance(secondDataNodeName) + MockTransportService.getInstance(harness.secondDataNodeName) .addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { // Return no stats for the index because none are assigned to this node. - TransportIndicesStatsAction instance = internalCluster().getInstance(TransportIndicesStatsAction.class, secondDataNodeName); - channel.sendResponse(instance.new NodeResponse(secondDataNodeId, 0, List.of(), List.of())); + TransportIndicesStatsAction instance = internalCluster().getInstance( + TransportIndicesStatsAction.class, + harness.secondDataNodeName + ); + channel.sendResponse(instance.new NodeResponse(harness.secondDataNodeId, 0, List.of(), List.of())); }); - MockTransportService.getInstance(thirdDataNodeName) + MockTransportService.getInstance(harness.thirdDataNodeName) .addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { // Return no stats for the index because none are assigned to this node. - TransportIndicesStatsAction instance = internalCluster().getInstance(TransportIndicesStatsAction.class, thirdDataNodeName); - channel.sendResponse(instance.new NodeResponse(thirdDataNodeId, 0, List.of(), List.of())); + TransportIndicesStatsAction instance = internalCluster().getInstance( + TransportIndicesStatsAction.class, + harness.thirdDataNodeName + ); + channel.sendResponse(instance.new NodeResponse(harness.thirdDataNodeId, 0, List.of(), List.of())); }); /** - * Provoke a ClusterInfo stats refresh, update the cluster settings to make shard assignment to the first node undesired, and - * initiate rebalancing via a reroute request. Then wait to see a cluster state update that has all the shards assigned to the - * second node, since the third is reporting as hot-spotted and should not accept any shards. + * Provoke a ClusterInfo stats refresh, update the cluster settings to make shard assignment to the first node undesired and + * initiate rebalancing. Then wait to see a cluster state update that has all the shards assigned away from the first node _despite_ + * the second and third node reporting hot-spotting: a canRemain::NO response should override a canAllocate::NOT_PREFERRED answer. */ logger.info("---> Refreshing the cluster info to pull in the dummy thread pool stats with a hot-spotting node"); refreshClusterInfo(); logger.info( - "---> Update the filter to exclude " + firstDataNodeName + " so that shards will be reassigned away to the other nodes" + "---> Update the filter to exclude " + harness.firstDataNodeName + " so that shards will be reassigned away to the other nodes" ); // Updating the cluster settings will trigger a reroute request, no need to explicitly request one in the test. - updateClusterSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", firstDataNodeName)); + updateClusterSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", harness.firstDataNodeName)); - safeAwait(ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { - Index index = clusterState.routingTable().index(indexName).getIndex(); - return checkShardAssignment( - clusterState.getRoutingNodes(), - index, - firstDataNodeId, - secondDataNodeId, - thirdDataNodeId, - 0, - randomNumberOfShards, - 0 - ); - })); + try { + safeAwait(ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { + Index index = clusterState.routingTable().index(harness.indexName).getIndex(); + if (clusterState.getRoutingNodes().node(harness.firstDataNodeId).numberOfOwningShardsForIndex(index) == 0) { + return true; + } + return false; + })); + } catch (AssertionError error) { + ClusterState state = internalCluster().client() + .admin() + .cluster() + .prepareState(TEST_REQUEST_TIMEOUT) + .clear() + .setMetadata(true) + .setNodes(true) + .setRoutingTable(true) + .get() + .getState(); + logger.info("---> Failed to reach expected allocation state. Dumping assignments: " + state.getRoutingNodes()); + throw error; + } } public void testMaxQueueLatencyMetricIsPublished() { @@ -351,6 +441,32 @@ private boolean checkShardAssignment( return true; } + private Settings enabledWriteLoadDeciderSettings(int utilizationThresholdPercent) { + return Settings.builder() + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), + WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + ) + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_HIGH_UTILIZATION_THRESHOLD_SETTING.getKey(), + utilizationThresholdPercent + "%" + ) + // TODO (ES-12862): remove these overrides when throttling is turned off for simulations. + .put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 100) + .put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 100) + .put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 100) + .build(); + } + + /** + * The utilization percent overhead needed to add the given amount of shard write load to a node with the given number of write pool + * threads. + */ + private int shardLoadUtilizationOverhead(float totalShardWriteLoad, int numberOfWritePoolThreads) { + float totalWriteLoadPerThread = totalShardWriteLoad / numberOfWritePoolThreads; + return (int) (100 * totalWriteLoadPerThread); + } + private DiscoveryNode getDiscoveryNode(String nodeName) { final TransportService transportService = internalCluster().getInstance(TransportService.class, nodeName); assertNotNull(transportService); @@ -401,4 +517,141 @@ private static ShardStats createShardStats(IndexMetadata indexMeta, int shardInd ); return new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, null, null, false, 0); } + + /** + * Sets up common test infrastructure to deduplicate code across tests. + *

+ * Starts three data nodes and creates an index with many shards, then forces shard assignment to only the first data node via the + * {@link FilterAllocationDecider}. + */ + private TestHarness setUpThreeTestNodesAndAllIndexShardsOnFirstNode() { + int randomUtilizationThresholdPercent = randomIntBetween(50, 100); + int randomNumberOfWritePoolThreads = randomIntBetween(2, 20); + float randomShardWriteLoad = randomFloatBetween(0.0f, 0.01f, false); + Settings settings = enabledWriteLoadDeciderSettings(randomUtilizationThresholdPercent); + + internalCluster().startMasterOnlyNode(settings); + final var dataNodes = internalCluster().startDataOnlyNodes(3, settings); + final String firstDataNodeName = dataNodes.get(0); + final String secondDataNodeName = dataNodes.get(1); + final String thirdDataNodeName = dataNodes.get(2); + final String firstDataNodeId = getNodeId(firstDataNodeName); + final String secondDataNodeId = getNodeId(secondDataNodeName); + final String thirdDataNodeId = getNodeId(thirdDataNodeName); + ensureStableCluster(4); + + final DiscoveryNode firstDiscoveryNode = getDiscoveryNode(firstDataNodeName); + final DiscoveryNode secondDiscoveryNode = getDiscoveryNode(secondDataNodeName); + final DiscoveryNode thirdDiscoveryNode = getDiscoveryNode(thirdDataNodeName); + + logger.info( + "---> first node name " + + firstDataNodeName + + " and ID " + + firstDataNodeId + + "; second node name " + + secondDataNodeName + + " and ID " + + secondDataNodeId + + "; third node name " + + thirdDataNodeName + + " and ID " + + thirdDataNodeId + ); + + logger.info( + "---> utilization threshold: " + + randomUtilizationThresholdPercent + + ", write threads: " + + randomNumberOfWritePoolThreads + + ", individual shard write loads: " + + randomShardWriteLoad + ); + + /** + * Exclude assignment of shards to the second and third data nodes via the {@link FilterAllocationDecider} settings. + * Then create an index with many shards, which will all be assigned to the first data node. + */ + + logger.info("---> Limit shard assignment to node " + firstDataNodeName + " by excluding the other nodes"); + updateClusterSettings( + Settings.builder().put("cluster.routing.allocation.exclude._name", secondDataNodeName + "," + thirdDataNodeName) + ); + + String indexName = randomIdentifier(); + int randomNumberOfShards = randomIntBetween(10, 20); // Pick a high number of shards, so it is clear assignment is not accidental. + + // Calculate the maximum utilization a node can report while still being able to accept all relocating shards + int shardWriteLoadOverhead = shardLoadUtilizationOverhead( + randomShardWriteLoad * randomNumberOfShards, + randomNumberOfWritePoolThreads + ); + int maxUtilBelowThresholdThatAllowsAllShardsToRelocate = randomUtilizationThresholdPercent - shardWriteLoadOverhead - 1; + + var verifyAssignmentToFirstNodeListener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { + var indexRoutingTable = clusterState.routingTable().index(indexName); + if (indexRoutingTable == null) { + return false; + } + return checkShardAssignment( + clusterState.getRoutingNodes(), + indexRoutingTable.getIndex(), + firstDataNodeId, + secondDataNodeId, + thirdDataNodeId, + randomNumberOfShards, + 0, + 0 + ); + }); + + createIndex( + indexName, + Settings.builder().put(SETTING_NUMBER_OF_SHARDS, randomNumberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 0).build() + ); + ensureGreen(indexName); + + logger.info("---> Waiting for all [" + randomNumberOfShards + "] shards to be assigned to node " + firstDataNodeName); + safeAwait(verifyAssignmentToFirstNodeListener); + + return new TestHarness( + firstDataNodeName, + secondDataNodeName, + thirdDataNodeName, + firstDataNodeId, + secondDataNodeId, + thirdDataNodeId, + firstDiscoveryNode, + secondDiscoveryNode, + thirdDiscoveryNode, + randomUtilizationThresholdPercent, + randomNumberOfWritePoolThreads, + randomShardWriteLoad, + indexName, + randomNumberOfShards, + maxUtilBelowThresholdThatAllowsAllShardsToRelocate + ); + } + + /** + * Carries set-up state from {@link #setUpThreeTestNodesAndAllIndexShardsOnFirstNode()} to the testing logic. + */ + record TestHarness( + String firstDataNodeName, + String secondDataNodeName, + String thirdDataNodeName, + String firstDataNodeId, + String secondDataNodeId, + String thirdDataNodeId, + DiscoveryNode firstDiscoveryNode, + DiscoveryNode secondDiscoveryNode, + DiscoveryNode thirdDiscoveryNode, + int randomUtilizationThresholdPercent, + int randomNumberOfWritePoolThreads, + float randomShardWriteLoad, + String indexName, + int randomNumberOfShards, + int maxUtilBelowThresholdThatAllowsAllShardsToRelocate + ) {}; + } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/MoveDecision.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/MoveDecision.java index 17c28ffe78b47..368451661fba0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/MoveDecision.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/MoveDecision.java @@ -179,6 +179,7 @@ public Decision getCanRemainDecision() { * the result of this method is meaningless, as no rebalance decision was taken. If {@link #isDecisionTaken()} * returns {@code false}, then invoking this method will throw an {@code IllegalStateException}. */ + // @VisibleForTesting public boolean canRebalanceCluster() { checkDecisionState(); return clusterRebalanceDecision != null && clusterRebalanceDecision.type() == Type.YES; @@ -192,6 +193,7 @@ public boolean canRebalanceCluster() { * If {@link #isDecisionTaken()} returns {@code false}, then invoking this method will throw an * {@code IllegalStateException}. */ + // @VisibleForTesting @Nullable public Decision getClusterRebalanceDecision() { checkDecisionState(); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 1255a1c784d92..3ae392661e98e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -861,7 +861,7 @@ public MoveDecision decideMove(final ProjectIndex index, final ShardRouting shar assert sourceNode != null && sourceNode.containsShard(index, shardRouting); RoutingNode routingNode = sourceNode.getRoutingNode(); Decision canRemain = allocation.deciders().canRemain(shardRouting, routingNode, allocation); - if (canRemain.type() != Decision.Type.NO) { + if (canRemain.type() != Decision.Type.NO && canRemain.type() != Decision.Type.NOT_PREFERRED) { return MoveDecision.remain(canRemain); } @@ -901,7 +901,11 @@ private MoveDecision decideMove( if (explain) { nodeResults.add(new NodeAllocationResult(currentNode.getRoutingNode().node(), allocationDecision, ++weightRanking)); } - // TODO maybe we can respect throttling here too? + // TODO (ES-12633): test that nothing moves when the source is not-preferred and the target is not-preferred. + if (allocationDecision.type() == Type.NOT_PREFERRED && remainDecision.type() == Type.NOT_PREFERRED) { + // Relocating a shard from one NOT_PREFERRED node to another would not improve the situation. + continue; + } if (allocationDecision.type().higherThan(bestDecision)) { bestDecision = allocationDecision.type(); if (bestDecision == Type.YES) { @@ -911,6 +915,10 @@ private MoveDecision decideMove( // no need to continue iterating break; } + } else if (bestDecision == Type.NOT_PREFERRED) { + assert remainDecision.type() != Type.NOT_PREFERRED; + // If we don't ever find a YES decision, we'll settle for NOT_PREFERRED as preferable to NO. + targetNode = target; } } } @@ -1221,7 +1229,7 @@ private boolean tryRelocateShard(ModelNode minNode, ModelNode maxNode, ProjectIn continue; } final Decision allocationDecision = deciders.canAllocate(shard, minNode.getRoutingNode(), allocation); - if (allocationDecision.type() == Type.NO) { + if (allocationDecision.type() == Type.NO || allocationDecision.type() == Type.NOT_PREFERRED) { continue; } @@ -1407,7 +1415,7 @@ public boolean containsShard(ShardRouting shard) { public static final class NodeSorter extends IntroSorter { final ModelNode[] modelNodes; - /* the nodes weights with respect to the current weight function / index */ + /** The nodes weights with respect to the current weight function / index */ final float[] weights; private final WeightFunction function; private ProjectIndex index; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java index c55e2a23ab8fa..38be34c0ff610 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java @@ -503,9 +503,10 @@ private void moveShards() { final var routingNode = routingNodes.node(shardRouting.currentNodeId()); final var canRemainDecision = allocation.deciders().canRemain(shardRouting, routingNode, allocation); - if (canRemainDecision.type() != Decision.Type.NO) { - // it's desired elsewhere but technically it can remain on its current node. Defer its movement until later on to give - // priority to shards that _must_ move. + if (canRemainDecision.type() != Decision.Type.NO && canRemainDecision.type() != Decision.Type.NOT_PREFERRED) { + // If movement is throttled, a future reconciliation round will see a resolution. For now, leave it alone. + // Reconciliation treats canRemain NOT_PREFERRED answers as YES because the DesiredBalance computation already decided + // how to handle the situation. continue; } @@ -650,6 +651,7 @@ private DiscoveryNode findRelocationTarget( Set desiredNodeIds, BiFunction canAllocateDecider ) { + DiscoveryNode chosenNode = null; for (final var nodeId : desiredNodeIds) { // TODO consider ignored nodes here too? if (nodeId.equals(shardRouting.currentNodeId())) { @@ -661,12 +663,24 @@ private DiscoveryNode findRelocationTarget( } final var decision = canAllocateDecider.apply(shardRouting, node); logger.trace("relocate {} to {}: {}", shardRouting, nodeId, decision); + + // Assign shards to the YES nodes first. This way we might delay moving shards to NOT_PREFERRED nodes until after shards are + // first moved away. The DesiredBalance could be moving shards away from a hot node as well as moving shards to it, and it's + // better to offload shards first. if (decision.type() == Decision.Type.YES) { - return node.node(); + chosenNode = node.node(); + // As soon as we get any YES, we return it. + break; + } else if (decision.type() == Decision.Type.NOT_PREFERRED && chosenNode == null) { + // If the best answer is not-preferred, then the shard will still be assigned. It is okay to assign to a not-preferred + // node because the desired balance computation had a reason to override it: when there aren't any better nodes to + // choose and the shard cannot remain where it is, we accept not-preferred. NOT_PREFERRED is essentially a YES for + // reconciliation. + chosenNode = node.node(); } } - return null; + return chosenNode; } private Decision decideCanAllocate(ShardRouting shardRouting, RoutingNode target) { diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java index e814f570a67bb..8c427227e4717 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java @@ -73,10 +73,11 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing shardRouting.shardId() ); logger.debug(explain); - return Decision.single(Decision.Type.NO, NAME, explain); + return Decision.single(Decision.Type.NOT_PREFERRED, NAME, explain); } - if (calculateShardMovementChange(nodeWriteThreadPoolStats, shardWriteLoad) >= nodeWriteThreadPoolLoadThreshold) { + var newWriteThreadPoolUtilization = calculateShardMovementChange(nodeWriteThreadPoolStats, shardWriteLoad); + if (newWriteThreadPoolUtilization >= nodeWriteThreadPoolLoadThreshold) { // The node's write thread pool usage would be raised above the high utilization threshold with assignment of the new shard. // This could lead to a hot spot on this node and is undesirable. String explain = Strings.format( @@ -92,10 +93,22 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing nodeWriteThreadPoolStats.totalThreadPoolThreads() ); logger.debug(explain); - return Decision.single(Decision.Type.NO, NAME, explain); + return Decision.single(Decision.Type.NOT_PREFERRED, NAME, explain); } - return Decision.YES; + String explanation = Strings.format( + "Shard [%s] in index [%s] can be assigned to node [%s]. The node's utilization would become [%s]", + shardRouting.shardId(), + shardRouting.index(), + node.nodeId(), + newWriteThreadPoolUtilization + ); + + if (logger.isTraceEnabled()) { + logger.trace(explanation); + } + + return allocation.decision(Decision.YES, NAME, explanation); } @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java index 12bfd8a0a4789..5b844cc1da635 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java @@ -104,7 +104,7 @@ public void testWriteLoadDeciderCanAllocate() { ); assertEquals( "Assigning a new shard to a node that is above the threshold should fail", - Decision.Type.NO, + Decision.Type.NOT_PREFERRED, writeLoadDecider.canAllocate( testHarness.shardRouting2, testHarness.exceedingThresholdRoutingNode, @@ -128,7 +128,7 @@ public void testWriteLoadDeciderCanAllocate() { ); assertEquals( "Assigning a new shard that would cause the node to exceed capacity should fail", - Decision.Type.NO, + Decision.Type.NOT_PREFERRED, writeLoadDecider.canAllocate(testHarness.shardRouting1, testHarness.nearThresholdRoutingNode, testHarness.routingAllocation) .type() );