From 3517abe08d19983bb10427c1f750d8ea138bd6c7 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 13 Oct 2025 14:53:58 +1100 Subject: [PATCH 1/4] Add WriteLoadConstraintMonitorIT --- .../WriteLoadConstraintMonitorIT.java | 211 ++++++++++++++++++ .../WriteLoadConstraintMonitor.java | 2 +- 2 files changed, 212 insertions(+), 1 deletion(-) create mode 100644 server/src/internalClusterTest/java/org/elasticsearch/cluster/allocation/WriteLoadConstraintMonitorIT.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/allocation/WriteLoadConstraintMonitorIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/allocation/WriteLoadConstraintMonitorIT.java new file mode 100644 index 0000000000000..fc10667c91ceb --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/allocation/WriteLoadConstraintMonitorIT.java @@ -0,0 +1,211 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.allocation; + +import org.apache.logging.log4j.Level; +import org.elasticsearch.action.admin.cluster.node.usage.NodeUsageStatsForThreadPoolsAction; +import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction; +import org.elasticsearch.action.support.ChannelActionListener; +import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; +import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintMonitor; +import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; +import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.MockLog; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TestTransportChannel; + +import java.util.Collection; +import java.util.Map; +import java.util.stream.Collectors; + +@ESIntegTestCase.ClusterScope(numDataNodes = 0) +public class WriteLoadConstraintMonitorIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class); + } + + @TestLogging( + value = "org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintMonitor:TRACE", + reason = "so we can see what the monitor is doing" + ) + public void testRerouteIsCalledWhenHotSpotAppears() { + final long queueLatencyThresholdMillis = randomLongBetween(50_000, 100_000); + final int utilisationThresholdPercent = randomIntBetween(50, 100); + internalCluster().startMasterOnlyNode(enabledWriteLoadDeciderSettings(utilisationThresholdPercent, queueLatencyThresholdMillis)); + final String dataNodeOne = internalCluster().startDataOnlyNode( + enabledWriteLoadDeciderSettings(utilisationThresholdPercent, queueLatencyThresholdMillis) + ); + final String dataNodeTwo = internalCluster().startDataOnlyNode( + enabledWriteLoadDeciderSettings(utilisationThresholdPercent, queueLatencyThresholdMillis) + ); + + // Unmodified cluster info should detect no hot-spotting nodes + MockLog.awaitLogger( + ESIntegTestCase::refreshClusterInfo, + WriteLoadConstraintMonitor.class, + new MockLog.SeenEventExpectation( + "no hot-spots detected", + WriteLoadConstraintMonitor.class.getCanonicalName(), + Level.TRACE, + "No hot-spotting nodes detected" + ) + ); + + // Simulate hot-spotting on a node + simulateHotSpottingOnNode(dataNodeOne, queueLatencyThresholdMillis); + + // Single node hot-spotting should trigger reroute + MockLog.awaitLogger( + ESIntegTestCase::refreshClusterInfo, + WriteLoadConstraintMonitor.class, + new MockLog.SeenEventExpectation( + "hot spot detected message", + WriteLoadConstraintMonitor.class.getCanonicalName(), + Level.DEBUG, + Strings.format(""" + Nodes [[%s]] are hot-spotting, of 3 total cluster nodes. Reroute for hot-spotting has never previously been called. \ + Previously hot-spotting nodes are [0 nodes]. The write thread pool queue latency threshold is [%s]. \ + Triggering reroute. + """, getNodeId(dataNodeOne), TimeValue.timeValueMillis(queueLatencyThresholdMillis)) + ) + ); + + // We should skip another re-route if no additional nodes are hot-spotting + MockLog.awaitLogger( + ESIntegTestCase::refreshClusterInfo, + WriteLoadConstraintMonitor.class, + new MockLog.SeenEventExpectation( + "reroute skipped due to being called recently", + WriteLoadConstraintMonitor.class.getCanonicalName(), + Level.DEBUG, + Strings.format( + "Not calling reroute because we called reroute [*] ago and there are no new hot spots", + getNodeId(dataNodeOne), + TimeValue.timeValueMillis(queueLatencyThresholdMillis) + ) + ) + ); + + // Simulate hot-spotting on an additional node + simulateHotSpottingOnNode(dataNodeTwo, queueLatencyThresholdMillis); + + // Additional node hot-spotting should trigger reroute + MockLog.awaitLogger( + ESIntegTestCase::refreshClusterInfo, + WriteLoadConstraintMonitor.class, + new MockLog.SeenEventExpectation( + "hot spot detected message", + WriteLoadConstraintMonitor.class.getCanonicalName(), + Level.DEBUG, + Strings.format(""" + Nodes [[*]] are hot-spotting, of 3 total cluster nodes. \ + Reroute for hot-spotting was last called [*] ago. Previously hot-spotting nodes are [[%s]]. \ + The write thread pool queue latency threshold is [%s]. Triggering reroute. + """, getNodeId(dataNodeOne), TimeValue.timeValueMillis(queueLatencyThresholdMillis)) + ) + ); + + // Clear simulated hot-spotting + MockTransportService.getInstance(dataNodeOne).clearAllRules(); + MockTransportService.getInstance(dataNodeTwo).clearAllRules(); + + // We should again detect no hot-spotting nodes + MockLog.awaitLogger( + ESIntegTestCase::refreshClusterInfo, + WriteLoadConstraintMonitor.class, + new MockLog.SeenEventExpectation( + "no hot-spots detected", + WriteLoadConstraintMonitor.class.getCanonicalName(), + Level.TRACE, + "No hot-spotting nodes detected" + ) + ); + } + + private void simulateHotSpottingOnNode(String nodeName, long queueLatencyThresholdMillis) { + MockTransportService.getInstance(nodeName) + .addRequestHandlingBehavior(TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", (handler, request, channel, task) -> { + handler.messageReceived( + request, + new TestTransportChannel(new ChannelActionListener<>(channel).delegateFailure((l, response) -> { + NodeUsageStatsForThreadPoolsAction.NodeResponse r = (NodeUsageStatsForThreadPoolsAction.NodeResponse) response; + l.onResponse( + new NodeUsageStatsForThreadPoolsAction.NodeResponse( + r.getNode(), + new NodeUsageStatsForThreadPools( + r.getNodeUsageStatsForThreadPools().nodeId(), + addQueueLatencyToWriteThreadPool( + r.getNodeUsageStatsForThreadPools().threadPoolUsageStatsMap(), + queueLatencyThresholdMillis + ) + ) + ) + ); + })), + task + ); + }); + } + + private Map addQueueLatencyToWriteThreadPool( + Map stringThreadPoolUsageStatsMap, + long queueLatencyThresholdMillis + ) { + return stringThreadPoolUsageStatsMap.entrySet().stream().collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> { + NodeUsageStatsForThreadPools.ThreadPoolUsageStats originalStats = e.getValue(); + if (e.getKey().equals(ThreadPool.Names.WRITE)) { + return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( + originalStats.totalThreadPoolThreads(), + originalStats.averageThreadPoolUtilization(), + randomLongBetween(queueLatencyThresholdMillis * 2, queueLatencyThresholdMillis * 3) + ); + } + return originalStats; + })); + + } + + /** + * Enables the write load decider and overrides other write load decider settings. + * @param utilizationThresholdPercent Sets the write thread pool utilization threshold, controlling when canAllocate starts returning + * not-preferred. + * @param queueLatencyThresholdMillis Sets the queue latency threshold, controlling when canRemain starts returning not-preferred. + */ + private Settings enabledWriteLoadDeciderSettings(int utilizationThresholdPercent, long queueLatencyThresholdMillis) { + return Settings.builder() + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), + WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + ) + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_HIGH_UTILIZATION_THRESHOLD_SETTING.getKey(), + utilizationThresholdPercent + "%" + ) + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_QUEUE_LATENCY_THRESHOLD_SETTING.getKey(), + TimeValue.timeValueMillis(queueLatencyThresholdMillis) + ) + // Make the re-route interval large so we can test it + .put(WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING.getKey(), TimeValue.timeValueMinutes(5)) + // Disable rebalancing so that testing can see Decider change outcomes only. + .put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none") + .build(); + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitor.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitor.java index c0d8bd4e06e48..c2deff8608593 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitor.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitor.java @@ -111,7 +111,7 @@ public void onNewInfo(ClusterInfo clusterInfo) { ? "has never previously been called" : "was last called [" + TimeValue.timeValueMillis(timeSinceLastRerouteMillis) + "] ago", nodeSummary(lastSetOfHotSpottedNodes), - state.nodes().size() + writeLoadConstraintSettings.getQueueLatencyThreshold() ); } final String reason = "hot-spotting detected by write load constraint monitor"; From f50e9427d1de687d8f291633529a8fbe77eeb9fd Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 13 Oct 2025 14:58:04 +1100 Subject: [PATCH 2/4] Get rid of unnecessary settings --- .../WriteLoadConstraintMonitorIT.java | 23 ++++--------------- 1 file changed, 5 insertions(+), 18 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/allocation/WriteLoadConstraintMonitorIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/allocation/WriteLoadConstraintMonitorIT.java index fc10667c91ceb..edf4b5e18f08a 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/allocation/WriteLoadConstraintMonitorIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/allocation/WriteLoadConstraintMonitorIT.java @@ -16,7 +16,6 @@ import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintMonitor; import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; -import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.CollectionUtils; @@ -47,14 +46,10 @@ protected Collection> nodePlugins() { ) public void testRerouteIsCalledWhenHotSpotAppears() { final long queueLatencyThresholdMillis = randomLongBetween(50_000, 100_000); - final int utilisationThresholdPercent = randomIntBetween(50, 100); - internalCluster().startMasterOnlyNode(enabledWriteLoadDeciderSettings(utilisationThresholdPercent, queueLatencyThresholdMillis)); - final String dataNodeOne = internalCluster().startDataOnlyNode( - enabledWriteLoadDeciderSettings(utilisationThresholdPercent, queueLatencyThresholdMillis) - ); - final String dataNodeTwo = internalCluster().startDataOnlyNode( - enabledWriteLoadDeciderSettings(utilisationThresholdPercent, queueLatencyThresholdMillis) - ); + final Settings settings = enabledWriteLoadDeciderSettings(queueLatencyThresholdMillis); + internalCluster().startMasterOnlyNode(settings); + final String dataNodeOne = internalCluster().startDataOnlyNode(settings); + final String dataNodeTwo = internalCluster().startDataOnlyNode(settings); // Unmodified cluster info should detect no hot-spotting nodes MockLog.awaitLogger( @@ -184,28 +179,20 @@ private Map addQueueL /** * Enables the write load decider and overrides other write load decider settings. - * @param utilizationThresholdPercent Sets the write thread pool utilization threshold, controlling when canAllocate starts returning - * not-preferred. * @param queueLatencyThresholdMillis Sets the queue latency threshold, controlling when canRemain starts returning not-preferred. */ - private Settings enabledWriteLoadDeciderSettings(int utilizationThresholdPercent, long queueLatencyThresholdMillis) { + private Settings enabledWriteLoadDeciderSettings(long queueLatencyThresholdMillis) { return Settings.builder() .put( WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED ) - .put( - WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_HIGH_UTILIZATION_THRESHOLD_SETTING.getKey(), - utilizationThresholdPercent + "%" - ) .put( WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_QUEUE_LATENCY_THRESHOLD_SETTING.getKey(), TimeValue.timeValueMillis(queueLatencyThresholdMillis) ) // Make the re-route interval large so we can test it .put(WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING.getKey(), TimeValue.timeValueMinutes(5)) - // Disable rebalancing so that testing can see Decider change outcomes only. - .put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none") .build(); } } From 433b3801e7b46b307e91a782c4ab4fe129bd6759 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 13 Oct 2025 14:59:27 +1100 Subject: [PATCH 3/4] Comment --- .../cluster/allocation/WriteLoadConstraintMonitorIT.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/allocation/WriteLoadConstraintMonitorIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/allocation/WriteLoadConstraintMonitorIT.java index edf4b5e18f08a..aa92625c6ac3e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/allocation/WriteLoadConstraintMonitorIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/allocation/WriteLoadConstraintMonitorIT.java @@ -45,6 +45,7 @@ protected Collection> nodePlugins() { reason = "so we can see what the monitor is doing" ) public void testRerouteIsCalledWhenHotSpotAppears() { + // Set the threshold very high so we don't get any non-synthetic hot-spotting occurring final long queueLatencyThresholdMillis = randomLongBetween(50_000, 100_000); final Settings settings = enabledWriteLoadDeciderSettings(queueLatencyThresholdMillis); internalCluster().startMasterOnlyNode(settings); From 5a6d399e64df6e434b547b34c710dcba78c64aeb Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 13 Oct 2025 15:06:50 +1100 Subject: [PATCH 4/4] Javadoc --- .../cluster/allocation/WriteLoadConstraintMonitorIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/allocation/WriteLoadConstraintMonitorIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/allocation/WriteLoadConstraintMonitorIT.java index aa92625c6ac3e..14de086c1bf2b 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/allocation/WriteLoadConstraintMonitorIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/allocation/WriteLoadConstraintMonitorIT.java @@ -179,8 +179,8 @@ private Map addQueueL } /** - * Enables the write load decider and overrides other write load decider settings. - * @param queueLatencyThresholdMillis Sets the queue latency threshold, controlling when canRemain starts returning not-preferred. + * Enables the write-load decider and overrides other write load decider settings. + * @param queueLatencyThresholdMillis Exceeding this is what makes the monitor call re-route */ private Settings enabledWriteLoadDeciderSettings(long queueLatencyThresholdMillis) { return Settings.builder()