From 31f150c4b62204307f171322c1a2c9e90e0eb1ae Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Thu, 10 Jul 2025 17:21:29 -0700 Subject: [PATCH 1/6] Collect node thread pool usage for shard balancing Adds a new transport action to collect usage stats from the data nodes. ClusterInfoService uses the action to pull thread pool usage information from the data nodes to the master node periodically. Also removes NodeUsageStatsForThreadPoolsCollector as an interface/plugin and replaces it with a single class implementation. Closes ES-12316 --- .../cluster/ClusterInfoServiceIT.java | 69 ++++++++- .../index/shard/IndexShardIT.java | 70 +-------- ...ster.NodeUsageStatsForThreadPoolsCollector | 10 -- .../org/elasticsearch/TransportVersions.java | 1 + .../elasticsearch/action/ActionModule.java | 2 + .../NodeUsageStatsForThreadPoolsAction.java | 143 ++++++++++++++++++ ...ortNodeUsageStatsForThreadPoolsAction.java | 120 +++++++++++++++ .../cluster/InternalClusterInfoService.java | 29 ++-- .../cluster/NodeUsageStatsForThreadPools.java | 26 ++-- ...NodeUsageStatsForThreadPoolsCollector.java | 41 +++-- .../elasticsearch/node/NodeConstruction.java | 1 + .../node/NodeServiceProvider.java | 6 +- .../cluster/ClusterInfoTests.java | 2 +- ...rnalClusterInfoServiceSchedulingTests.java | 21 +-- 14 files changed, 408 insertions(+), 133 deletions(-) delete mode 100644 server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector create mode 100644 server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodeUsageStatsForThreadPoolsAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java index 853ac9b877d3f..2a7a2ef8874c0 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.admin.cluster.node.stats.TransportNodesStatsAction; +import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction; import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; import org.elasticsearch.action.support.ActionFilter; import org.elasticsearch.action.support.ActionFilters; @@ -21,6 +22,7 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; @@ -39,21 +41,25 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.hamcrest.Matchers; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Collections.emptySet; import static java.util.Collections.singletonList; import static java.util.Collections.unmodifiableSet; +import static org.elasticsearch.cluster.InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING; import static org.elasticsearch.common.util.set.Sets.newHashSet; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.CoreMatchers.equalTo; @@ -202,7 +208,7 @@ public void testClusterInfoServiceInformationClearOnError() { internalCluster().startNodes( 2, // manually control publishing - Settings.builder().put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m").build() + Settings.builder().put(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m").build() ); prepareCreate("test").setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)).get(); ensureGreen("test"); @@ -334,4 +340,65 @@ public void testClusterInfoServiceInformationClearOnError() { ); } } + + public void testClusterInfoIncludesNodeUsageStatsForThreadPools() { + var settings = Settings.builder() + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), + WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + ) + .build(); + var masterName = internalCluster().startMasterOnlyNode(settings); + var dataNodeName = internalCluster().startDataOnlyNode(settings); + ensureStableCluster(2); + assertEquals(internalCluster().getMasterName(), masterName); + assertNotEquals(internalCluster().getMasterName(), dataNodeName); + logger.info("---> master node: " + masterName + ", data node: " + dataNodeName); + + // Track when the data node receives a poll from the master for the write thread pool's stats. + final MockTransportService dataNodeMockTransportService = MockTransportService.getInstance(dataNodeName); + final CountDownLatch nodeThreadPoolStatsPolledByMaster = new CountDownLatch(1); + dataNodeMockTransportService.addRequestHandlingBehavior( + TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", + (handler, request, channel, task) -> { + handler.messageReceived(request, channel, task); + + if (nodeThreadPoolStatsPolledByMaster.getCount() > 0) { + logger.info("---> Data node received a request for thread pool stats"); + } + nodeThreadPoolStatsPolledByMaster.countDown(); + } + ); + + // Do some writes to create some write thread pool activity. + final String indexName = randomIdentifier(); + for (int i = 0; i < randomIntBetween(1, 1000); i++) { + index(indexName, Integer.toString(i), Collections.singletonMap("foo", "bar")); + } + + // Force a refresh of the ClusterInfo state to collect fresh info from the data nodes. + final InternalClusterInfoService masterClusterInfoService = asInstanceOf( + InternalClusterInfoService.class, + internalCluster().getCurrentMasterNodeInstance(ClusterInfoService.class) + ); + final ClusterInfo clusterInfo = ClusterInfoServiceUtils.refresh(masterClusterInfoService); + + // Verify that the data node received a request for thread pool stats. + safeAwait(nodeThreadPoolStatsPolledByMaster); + + final Map usageStatsForThreadPools = clusterInfo.getNodeUsageStatsForThreadPools(); + logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools); + assertThat(usageStatsForThreadPools.size(), equalTo(2)); // master and data node + var dataNodeId = getNodeId(dataNodeName); + var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId); + assertNotNull(nodeUsageStatsForThreadPool); + logger.info("---> Data node's thread pool stats: " + nodeUsageStatsForThreadPool); + + assertEquals(dataNodeId, nodeUsageStatsForThreadPool.nodeId()); + var writeThreadPoolStats = nodeUsageStatsForThreadPool.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE); + assertNotNull("Expected to find stats for the WRITE thread pool", writeThreadPoolStats); + assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThan(0)); + assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThan(0f)); + assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L)); + } } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index 25ae21964ba0e..9db3836fdb32f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -25,7 +25,6 @@ import org.elasticsearch.cluster.EstimatedHeapUsageCollector; import org.elasticsearch.cluster.InternalClusterInfoService; import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; -import org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; @@ -91,7 +90,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -133,11 +131,7 @@ public class IndexShardIT extends ESSingleNodeTestCase { @Override protected Collection> getPlugins() { - return pluginList( - InternalSettingsPlugin.class, - BogusEstimatedHeapUsagePlugin.class, - BogusNodeUsageStatsForThreadPoolsCollectorPlugin.class - ); + return pluginList(InternalSettingsPlugin.class, BogusEstimatedHeapUsagePlugin.class); } public void testLockTryingToDelete() throws Exception { @@ -332,8 +326,7 @@ public void testNodeWriteLoadsArePresent() { ClusterInfoServiceUtils.refresh(clusterInfoService); nodeThreadPoolStats = clusterInfoService.getClusterInfo().getNodeUsageStatsForThreadPools(); - /** Verify that each node has usage stats reported. The test {@link BogusNodeUsageStatsForThreadPoolsCollector} implementation - * generates random usage values */ + /** Verify that each node has usage stats reported. */ ClusterState state = getInstanceFromNode(ClusterService.class).state(); assertEquals(state.nodes().size(), nodeThreadPoolStats.size()); for (DiscoveryNode node : state.nodes()) { @@ -346,7 +339,7 @@ public void testNodeWriteLoadsArePresent() { assertNotNull(writeThreadPoolStats); assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThanOrEqualTo(0)); assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThanOrEqualTo(0.0f)); - assertThat(writeThreadPoolStats.averageThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L)); + assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L)); } } finally { updateClusterSettings( @@ -935,61 +928,4 @@ public ClusterService getClusterService() { return clusterService.get(); } } - - /** - * A simple {@link NodeUsageStatsForThreadPoolsCollector} implementation that creates and returns random - * {@link NodeUsageStatsForThreadPools} for each node in the cluster. - *

- * Note: there's an 'org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector' file that declares this implementation so that the - * plugin system can pick it up and use it for the test set-up. - */ - public static class BogusNodeUsageStatsForThreadPoolsCollector implements NodeUsageStatsForThreadPoolsCollector { - - private final BogusNodeUsageStatsForThreadPoolsCollectorPlugin plugin; - - public BogusNodeUsageStatsForThreadPoolsCollector(BogusNodeUsageStatsForThreadPoolsCollectorPlugin plugin) { - this.plugin = plugin; - } - - @Override - public void collectUsageStats(ActionListener> listener) { - ActionListener.completeWith( - listener, - () -> plugin.getClusterService() - .state() - .nodes() - .stream() - .collect(Collectors.toUnmodifiableMap(DiscoveryNode::getId, node -> makeRandomNodeUsageStats(node.getId()))) - ); - } - - private NodeUsageStatsForThreadPools makeRandomNodeUsageStats(String nodeId) { - NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( - randomNonNegativeInt(), - randomFloat(), - randomNonNegativeLong() - ); - Map statsForThreadPools = new HashMap<>(); - statsForThreadPools.put(ThreadPool.Names.WRITE, writeThreadPoolStats); - return new NodeUsageStatsForThreadPools(nodeId, statsForThreadPools); - } - } - - /** - * Make a plugin to gain access to the {@link ClusterService} instance. - */ - public static class BogusNodeUsageStatsForThreadPoolsCollectorPlugin extends Plugin implements ClusterPlugin { - - private final SetOnce clusterService = new SetOnce<>(); - - @Override - public Collection createComponents(PluginServices services) { - clusterService.set(services.clusterService()); - return List.of(); - } - - public ClusterService getClusterService() { - return clusterService.get(); - } - } } diff --git a/server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector b/server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector deleted file mode 100644 index 787ce436c3ca6..0000000000000 --- a/server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector +++ /dev/null @@ -1,10 +0,0 @@ -# -# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -# or more contributor license agreements. Licensed under the "Elastic License -# 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side -# Public License v 1"; you may not use this file except in compliance with, at -# your election, the "Elastic License 2.0", the "GNU Affero General Public -# License v3.0 only", or the "Server Side Public License, v 1". -# - -org.elasticsearch.index.shard.IndexShardIT$BogusNodeUsageStatsForThreadPoolsCollector diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 11a0103cd22e0..95c41c9d17092 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -342,6 +342,7 @@ static TransportVersion def(int id) { public static final TransportVersion NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO = def(9_121_0_00); public static final TransportVersion ESQL_CATEGORIZE_OPTIONS = def(9_122_0_00); public static final TransportVersion ML_INFERENCE_AZURE_AI_STUDIO_RERANK_ADDED = def(9_123_0_00); + public static final TransportVersion TRANSPORT_NODE_USAGE_STATS_FOR_THREAD_POOLS_ACTION = def(9_124_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index f6f06f3301a6d..51091c5f0d886 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -38,6 +38,7 @@ import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction; import org.elasticsearch.action.admin.cluster.node.tasks.get.TransportGetTaskAction; import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction; +import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction; import org.elasticsearch.action.admin.cluster.node.usage.TransportNodesUsageAction; import org.elasticsearch.action.admin.cluster.remote.RemoteClusterNodesAction; import org.elasticsearch.action.admin.cluster.remote.TransportRemoteInfoAction; @@ -629,6 +630,7 @@ public void reg ActionRegistry actions = new ActionRegistry(); actions.register(TransportNodesInfoAction.TYPE, TransportNodesInfoAction.class); + actions.register(TransportNodeUsageStatsForThreadPoolsAction.TYPE, TransportNodeUsageStatsForThreadPoolsAction.class); actions.register(TransportRemoteInfoAction.TYPE, TransportRemoteInfoAction.class); actions.register(TransportNodesCapabilitiesAction.TYPE, TransportNodesCapabilitiesAction.class); actions.register(TransportNodesFeaturesAction.TYPE, TransportNodesFeaturesAction.class); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodeUsageStatsForThreadPoolsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodeUsageStatsForThreadPoolsAction.java new file mode 100644 index 0000000000000..81af31dbcafbb --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodeUsageStatsForThreadPoolsAction.java @@ -0,0 +1,143 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.cluster.node.usage; + +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.nodes.BaseNodeResponse; +import org.elasticsearch.action.support.nodes.BaseNodesRequest; +import org.elasticsearch.action.support.nodes.BaseNodesResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.transport.AbstractTransportRequest; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Defines the request/response types for {@link TransportNodeUsageStatsForThreadPoolsAction}. + */ +public class NodeUsageStatsForThreadPoolsAction { + /** + * The sender request type that will be resolved to send individual {@link NodeRequest} requests to every node in the cluster. + */ + public static class Request extends BaseNodesRequest { + public Request() { + // Send all nodes a request by specifying null. + super((String[]) null); + } + } + + /** + * Request sent to and received by a cluster node. There are no parameters needed in the node-specific request. + */ + public static class NodeRequest extends AbstractTransportRequest { + public NodeRequest(StreamInput in) throws IOException { + super(in); + } + + public NodeRequest() {} + } + + /** + * A collection of {@link NodeUsageStatsForThreadPools} responses from all the cluster nodes. + */ + public static class Response extends BaseNodesResponse { + + protected Response(StreamInput in) throws IOException { + super(in); + } + + public Response( + ClusterName clusterName, + List nodeResponses, + List nodeFailures + ) { + super(clusterName, nodeResponses, nodeFailures); + } + + /** + * Combines the responses from each node that was called into a single map (by node ID) for the final {@link Response}. + */ + public Map getAllNodeUsageStatsForThreadPools() { + Map allNodeUsageStatsForThreadPools = new HashMap<>(); + for (NodeUsageStatsForThreadPoolsAction.NodeResponse nodeResponse : getNodes()) { + // NOMERGE: Is the nodeID in NodeUsageStatsForThreadPools redundant? What is it useful for? If not, remove? + allNodeUsageStatsForThreadPools.put( + nodeResponse.getNodeUsageStatsForThreadPools().nodeId(), + nodeResponse.getNodeUsageStatsForThreadPools() + ); + } + return allNodeUsageStatsForThreadPools; + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodeResponses) throws IOException { + out.writeCollection(nodeResponses); + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readCollectionAsList(NodeUsageStatsForThreadPoolsAction.NodeResponse::new); + } + + @Override + public String toString() { + return "NodeUsageStatsForThreadPoolsAction.Response{" + getNodes() + "}"; + } + } + + /** + * A {@link NodeUsageStatsForThreadPools} response from a single cluster node. + */ + public static class NodeResponse extends BaseNodeResponse { + private final NodeUsageStatsForThreadPools nodeUsageStatsForThreadPools; + + protected NodeResponse(StreamInput in, DiscoveryNode node) throws IOException { + super(in, node); + this.nodeUsageStatsForThreadPools = new NodeUsageStatsForThreadPools(in); + } + + public NodeResponse(DiscoveryNode node, NodeUsageStatsForThreadPools nodeUsageStatsForThreadPools) { + super(node); + this.nodeUsageStatsForThreadPools = nodeUsageStatsForThreadPools; + } + + public NodeResponse(StreamInput in) throws IOException { + super(in); + this.nodeUsageStatsForThreadPools = new NodeUsageStatsForThreadPools(in); + } + + public NodeUsageStatsForThreadPools getNodeUsageStatsForThreadPools() { + return nodeUsageStatsForThreadPools; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + nodeUsageStatsForThreadPools.writeTo(out); + } + + @Override + public String toString() { + return "NodeUsageStatsForThreadPoolsAction.NodeResponse{" + + "nodeId=" + + getNode().getId() + + ", nodeUsageStatsForThreadPools=" + + nodeUsageStatsForThreadPools + + "}"; + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java new file mode 100644 index 0000000000000..710028bf54627 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java @@ -0,0 +1,120 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.cluster.node.usage; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.nodes.TransportNodesAction; +import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; +import org.elasticsearch.cluster.NodeUsageStatsForThreadPools.ThreadPoolUsageStats; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Collects some thread pool stats from each data node for purposes of shard allocation balancing. The specific stats are defined in + * {@link NodeUsageStatsForThreadPools}. + */ +public class TransportNodeUsageStatsForThreadPoolsAction extends TransportNodesAction< + NodeUsageStatsForThreadPoolsAction.Request, + NodeUsageStatsForThreadPoolsAction.Response, + NodeUsageStatsForThreadPoolsAction.NodeRequest, + NodeUsageStatsForThreadPoolsAction.NodeResponse, + Void> { + + private static final Logger logger = LogManager.getLogger(TransportNodeUsageStatsForThreadPoolsAction.class); + + public static final String NAME = "internal:monitor/thread_pool/stats"; + public static final ActionType TYPE = new ActionType<>(NAME); + + private final ThreadPool threadPool; + private final ClusterService clusterService; + + @Inject + public TransportNodeUsageStatsForThreadPoolsAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters + ) { + super( + NAME, + clusterService, + transportService, + actionFilters, + NodeUsageStatsForThreadPoolsAction.NodeRequest::new, + threadPool.executor(ThreadPool.Names.MANAGEMENT) + ); + this.threadPool = threadPool; + this.clusterService = clusterService; + } + + @Override + protected NodeUsageStatsForThreadPoolsAction.Response newResponse( + NodeUsageStatsForThreadPoolsAction.Request request, + List nodeResponses, + List nodeFailures + ) { + + return new NodeUsageStatsForThreadPoolsAction.Response(clusterService.getClusterName(), nodeResponses, nodeFailures); + } + + @Override + protected NodeUsageStatsForThreadPoolsAction.NodeRequest newNodeRequest(NodeUsageStatsForThreadPoolsAction.Request request) { + return new NodeUsageStatsForThreadPoolsAction.NodeRequest(); + } + + @Override + protected NodeUsageStatsForThreadPoolsAction.NodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException { + return new NodeUsageStatsForThreadPoolsAction.NodeResponse(in); + } + + @Override + protected NodeUsageStatsForThreadPoolsAction.NodeResponse nodeOperation( + NodeUsageStatsForThreadPoolsAction.NodeRequest request, + Task task + ) { + logger.info("~~~TransportNodeUsageStatsForThreadPoolsAction: START"); + DiscoveryNode localNode = clusterService.localNode(); + var writeExecutor = threadPool.executor(ThreadPool.Names.WRITE); + assert writeExecutor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor; + var trackingForWriteExecutor = (TaskExecutionTimeTrackingEsThreadPoolExecutor) writeExecutor; + + ThreadPoolUsageStats threadPoolUsageStats = new ThreadPoolUsageStats( + trackingForWriteExecutor.getMaximumPoolSize(), + (float) trackingForWriteExecutor.pollUtilization( + TaskExecutionTimeTrackingEsThreadPoolExecutor.UtilizationTrackingPurpose.ALLOCATION + ), + trackingForWriteExecutor.getMaxQueueLatencyMillisSinceLastPollAndReset() + ); + + logger.info("~~~TransportNodeUsageStatsForThreadPoolsAction: " + threadPoolUsageStats); + + Map perThreadPool = new HashMap<>(); + perThreadPool.put(ThreadPool.Names.WRITE, threadPoolUsageStats); + return new NodeUsageStatsForThreadPoolsAction.NodeResponse( + localNode, + new NodeUsageStatsForThreadPools(localNode.getId(), perThreadPool) + ); + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index 89394c8fa8ba8..19ecbf0ff9f0f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -50,6 +50,7 @@ import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Consumer; +import java.util.function.Supplier; import static org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING; import static org.elasticsearch.core.Strings.format; @@ -107,6 +108,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt private final ThreadPool threadPool; private final Client client; + private final Supplier clusterStateSupplier; private final List> listeners = new CopyOnWriteArrayList<>(); private final Object mutex = new Object(); @@ -139,6 +141,7 @@ public InternalClusterInfoService( this.updateFrequency = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings); this.fetchTimeout = INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING.get(settings); this.diskThresholdEnabled = DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings); + this.clusterStateSupplier = clusterService::state; ClusterSettings clusterSettings = clusterService.getClusterSettings(); clusterSettings.addSettingsUpdateConsumer(INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING, this::setFetchTimeout); clusterSettings.addSettingsUpdateConsumer(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING, this::setUpdateFrequency); @@ -269,18 +272,22 @@ private void maybeFetchNodesUsageStatsForThreadPools(WriteLoadDeciderStatus writ } private void fetchNodesUsageStatsForThreadPools() { - nodeUsageStatsForThreadPoolsCollector.collectUsageStats(ActionListener.releaseAfter(new ActionListener<>() { - @Override - public void onResponse(Map writeLoads) { - nodeThreadPoolUsageStatsPerNode = writeLoads; - } + nodeUsageStatsForThreadPoolsCollector.collectUsageStats( + client, + clusterStateSupplier.get(), + ActionListener.releaseAfter(new ActionListener<>() { + @Override + public void onResponse(Map threadPoolStats) { + nodeThreadPoolUsageStatsPerNode = threadPoolStats; + } - @Override - public void onFailure(Exception e) { - logger.warn("failed to fetch write load estimates for nodes", e); - nodeThreadPoolUsageStatsPerNode = Map.of(); - } - }, fetchRefs.acquire())); + @Override + public void onFailure(Exception e) { + logger.warn("failed to fetch thread pool usage estimates for nodes", e); + nodeThreadPoolUsageStatsPerNode = Map.of(); + } + }, fetchRefs.acquire()) + ); } private void fetchNodesEstimatedHeapUsage() { diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java index 5e84f29af8412..9b0297cd73abd 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java @@ -33,7 +33,7 @@ public NodeUsageStatsForThreadPools(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(this.nodeId); - out.writeMap(threadPoolUsageStatsMap, StreamOutput::writeWriteable); + out.writeMap(this.threadPoolUsageStatsMap, StreamOutput::writeWriteable); } @Override @@ -47,6 +47,9 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; NodeUsageStatsForThreadPools other = (NodeUsageStatsForThreadPools) o; for (var entry : other.threadPoolUsageStatsMap.entrySet()) { + if (nodeId.equals(other.nodeId) == false) { + return false; + } var loadStats = threadPoolUsageStatsMap.get(entry.getKey()); if (loadStats == null || loadStats.equals(entry.getValue()) == false) { return false; @@ -70,14 +73,11 @@ public String toString() { * * @param totalThreadPoolThreads Total number of threads in the thread pool. * @param averageThreadPoolUtilization Percent of thread pool threads that are in use, averaged over some period of time. - * @param averageThreadPoolQueueLatencyMillis How much time tasks spend in the thread pool queue. Zero if there is nothing being queued - * in the write thread pool. + * @param maxThreadPoolQueueLatencyMillis The max time any task has spent in the thread pool queue. Zero if no task is queued. */ - public record ThreadPoolUsageStats( - int totalThreadPoolThreads, - float averageThreadPoolUtilization, - long averageThreadPoolQueueLatencyMillis - ) implements Writeable { + public record ThreadPoolUsageStats(int totalThreadPoolThreads, float averageThreadPoolUtilization, long maxThreadPoolQueueLatencyMillis) + implements + Writeable { public ThreadPoolUsageStats(StreamInput in) throws IOException { this(in.readVInt(), in.readFloat(), in.readVLong()); @@ -87,12 +87,12 @@ public ThreadPoolUsageStats(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { out.writeVInt(this.totalThreadPoolThreads); out.writeFloat(this.averageThreadPoolUtilization); - out.writeVLong(this.averageThreadPoolQueueLatencyMillis); + out.writeVLong(this.maxThreadPoolQueueLatencyMillis); } @Override public int hashCode() { - return Objects.hash(totalThreadPoolThreads, averageThreadPoolUtilization, averageThreadPoolQueueLatencyMillis); + return Objects.hash(totalThreadPoolThreads, averageThreadPoolUtilization, maxThreadPoolQueueLatencyMillis); } @Override @@ -101,8 +101,8 @@ public String toString() { + totalThreadPoolThreads + ", averageThreadPoolUtilization=" + averageThreadPoolUtilization - + ", averageThreadPoolQueueLatencyMillis=" - + averageThreadPoolQueueLatencyMillis + + ", maxThreadPoolQueueLatencyMillis=" + + maxThreadPoolQueueLatencyMillis + "]"; } @@ -113,7 +113,7 @@ public boolean equals(Object o) { ThreadPoolUsageStats other = (ThreadPoolUsageStats) o; return totalThreadPoolThreads == other.totalThreadPoolThreads && averageThreadPoolUtilization == other.averageThreadPoolUtilization - && averageThreadPoolQueueLatencyMillis == other.averageThreadPoolQueueLatencyMillis; + && maxThreadPoolQueueLatencyMillis == other.maxThreadPoolQueueLatencyMillis; } } // ThreadPoolUsageStats diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java index e302a4abed559..e08563a648eb1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java @@ -9,25 +9,48 @@ package org.elasticsearch.cluster; +import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.usage.NodeUsageStatsForThreadPoolsAction; +import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction; +import org.elasticsearch.client.internal.Client; import java.util.Map; /** - * Collects the usage stats (like write thread pool load) estimations for each node in the cluster. + * Collects the thread pool usage stats for each node in the cluster. *

* Results are returned as a map of node ID to node usage stats. */ -public interface NodeUsageStatsForThreadPoolsCollector { - /** - * This will be used when there is no NodeUsageLoadCollector available. - */ - NodeUsageStatsForThreadPoolsCollector EMPTY = listener -> listener.onResponse(Map.of()); +public class NodeUsageStatsForThreadPoolsCollector { + public static final NodeUsageStatsForThreadPoolsCollector EMPTY = new NodeUsageStatsForThreadPoolsCollector() { + public void collectUsageStats( + Client client, + ClusterState clusterState, + ActionListener> listener + ) { + listener.onResponse(Map.of()); + } + }; /** - * Collects the write load estimates from the cluster. + * Collects the thread pool usage stats ({@link NodeUsageStatsForThreadPools}) for each node in the cluster. * - * @param listener The listener to receive the write load results. + * @param listener The listener to receive the usage results. */ - void collectUsageStats(ActionListener> listener); + public void collectUsageStats( + Client client, + ClusterState clusterState, + ActionListener> listener + ) { + if (clusterState.getMinTransportVersion().onOrAfter(TransportVersions.TRANSPORT_NODE_USAGE_STATS_FOR_THREAD_POOLS_ACTION)) { + client.execute( + TransportNodeUsageStatsForThreadPoolsAction.TYPE, + new NodeUsageStatsForThreadPoolsAction.Request(), + listener.map(response -> response.getAllNodeUsageStatsForThreadPools()) + ); + } else { + listener.onResponse(Map.of()); + } + } } diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index bb28ed4a8aff5..369dc8117d953 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -60,6 +60,7 @@ import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor; +import org.elasticsearch.cluster.routing.allocation.NodeUsageStatsForThreadPoolsMonitor; import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.version.CompatibilityVersions; diff --git a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java index 326002c7d346c..3b44d6b25b7af 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java +++ b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java @@ -80,17 +80,13 @@ ClusterInfoService newClusterInfoService( EstimatedHeapUsageCollector.class, () -> EstimatedHeapUsageCollector.EMPTY ); - final NodeUsageStatsForThreadPoolsCollector nodeUsageStatsForThreadPoolsCollector = pluginsService.loadSingletonServiceProvider( - NodeUsageStatsForThreadPoolsCollector.class, - () -> NodeUsageStatsForThreadPoolsCollector.EMPTY - ); final InternalClusterInfoService service = new InternalClusterInfoService( settings, clusterService, threadPool, client, estimatedHeapUsageCollector, - nodeUsageStatsForThreadPoolsCollector + new NodeUsageStatsForThreadPoolsCollector() ); if (DiscoveryNode.isMasterNode(settings)) { // listen for state changes (this node starts/stops being the elected master, or new nodes are added) diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java index 814aa102ce284..26af726bccc02 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java @@ -72,7 +72,7 @@ private static Map randomNodeUsageStatsFor NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolUsageStats = new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(/* totalThreadPoolThreads= */ randomIntBetween(1, 16), /* averageThreadPoolUtilization= */ randomFloat(), - /* averageThreadPoolQueueLatencyMillis= */ randomLongBetween(0, 50000) + /* maxThreadPoolQueueLatencyMillis= */ randomLongBetween(0, 50000) ); Map usageStatsForThreadPools = new HashMap<>(); usageStatsForThreadPools.put(ThreadPool.Names.WRITE, writeThreadPoolUsageStats); diff --git a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java index 6e80e0d087993..5263108f399e7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java @@ -84,8 +84,8 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { final FakeClusterInfoServiceClient client = new FakeClusterInfoServiceClient(threadPool); final EstimatedHeapUsageCollector mockEstimatedHeapUsageCollector = spy(new StubEstimatedEstimatedHeapUsageCollector()); - final NodeUsageStatsForThreadPoolsCollector mockNodeUsageStatsForThreadPoolsCollector = spy( - new StubNodeUsageStatsForThreadPoolsCollector() + final NodeUsageStatsForThreadPoolsCollector nodeUsageStatsForThreadPoolsCollector = spy( + new NodeUsageStatsForThreadPoolsCollector() ); final InternalClusterInfoService clusterInfoService = new InternalClusterInfoService( settings, @@ -93,7 +93,7 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { threadPool, client, mockEstimatedHeapUsageCollector, - mockNodeUsageStatsForThreadPoolsCollector + nodeUsageStatsForThreadPoolsCollector ); clusterService.addListener(clusterInfoService); clusterInfoService.addListener(ignored -> {}); @@ -131,14 +131,14 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { for (int i = 0; i < 3; i++) { Mockito.clearInvocations(mockEstimatedHeapUsageCollector); - Mockito.clearInvocations(mockNodeUsageStatsForThreadPoolsCollector); + Mockito.clearInvocations(nodeUsageStatsForThreadPoolsCollector); final int initialRequestCount = client.requestCount; final long duration = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings).millis(); runFor(deterministicTaskQueue, duration); deterministicTaskQueue.runAllRunnableTasks(); assertThat(client.requestCount, equalTo(initialRequestCount + 2)); // should have run two client requests per interval verify(mockEstimatedHeapUsageCollector).collectClusterHeapUsage(any()); // Should poll for heap usage once per interval - verify(mockNodeUsageStatsForThreadPoolsCollector).collectUsageStats(any()); + verify(nodeUsageStatsForThreadPoolsCollector).collectUsageStats(any(), any(), any()); } final AtomicBoolean failMaster2 = new AtomicBoolean(); @@ -163,17 +163,6 @@ public void collectClusterHeapUsage(ActionListener> listener) } } - /** - * Simple for test {@link NodeUsageStatsForThreadPoolsCollector} implementation that returns an empty map of nodeId string to - * {@link NodeUsageStatsForThreadPools}. - */ - private static class StubNodeUsageStatsForThreadPoolsCollector implements NodeUsageStatsForThreadPoolsCollector { - @Override - public void collectUsageStats(ActionListener> listener) { - listener.onResponse(Map.of()); - } - } - private static void runFor(DeterministicTaskQueue deterministicTaskQueue, long duration) { final long endTime = deterministicTaskQueue.getCurrentTimeMillis() + duration; while (deterministicTaskQueue.getCurrentTimeMillis() < endTime From 571ec0a52c9512d2e775cb6c790a4a12b171b096 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Thu, 17 Jul 2025 21:20:34 +0000 Subject: [PATCH 2/6] [CI] Auto commit changes from spotless --- .../src/main/java/org/elasticsearch/node/NodeConstruction.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 369dc8117d953..bb28ed4a8aff5 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -60,7 +60,6 @@ import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor; -import org.elasticsearch.cluster.routing.allocation.NodeUsageStatsForThreadPoolsMonitor; import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.version.CompatibilityVersions; From 3c6e8fdebb24368a2a1e03c1683057deb48abe61 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Thu, 17 Jul 2025 16:07:19 -0700 Subject: [PATCH 3/6] fix test and remove debug logging --- .../usage/TransportNodeUsageStatsForThreadPoolsAction.java | 3 --- .../org/elasticsearch/xpack/security/operator/Constants.java | 1 + 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java index 710028bf54627..29bc8efbbb192 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java @@ -94,7 +94,6 @@ protected NodeUsageStatsForThreadPoolsAction.NodeResponse nodeOperation( NodeUsageStatsForThreadPoolsAction.NodeRequest request, Task task ) { - logger.info("~~~TransportNodeUsageStatsForThreadPoolsAction: START"); DiscoveryNode localNode = clusterService.localNode(); var writeExecutor = threadPool.executor(ThreadPool.Names.WRITE); assert writeExecutor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor; @@ -108,8 +107,6 @@ protected NodeUsageStatsForThreadPoolsAction.NodeResponse nodeOperation( trackingForWriteExecutor.getMaxQueueLatencyMillisSinceLastPollAndReset() ); - logger.info("~~~TransportNodeUsageStatsForThreadPoolsAction: " + threadPoolUsageStats); - Map perThreadPool = new HashMap<>(); perThreadPool.put(ThreadPool.Names.WRITE, threadPoolUsageStats); return new NodeUsageStatsForThreadPoolsAction.NodeResponse( diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 5b22da00173ad..dbf062932e750 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -641,6 +641,7 @@ public class Constants { "internal:gateway/local/started_shards", "internal:admin/indices/prevalidate_shard_path", "internal:index/metadata/migration_version/update", + "internal:monitor/thread_pool/stats", "indices:admin/migration/reindex_status", "indices:admin/data_stream/index/reindex", "indices:admin/data_stream/reindex", From 9290acf099af9aafc8f1982ffa3c5153774bbbf9 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Thu, 31 Jul 2025 23:35:04 -0700 Subject: [PATCH 4/6] remove NOMERGE --- .../cluster/node/usage/NodeUsageStatsForThreadPoolsAction.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodeUsageStatsForThreadPoolsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodeUsageStatsForThreadPoolsAction.java index 81af31dbcafbb..e656ede06ffe1 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodeUsageStatsForThreadPoolsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodeUsageStatsForThreadPoolsAction.java @@ -73,7 +73,6 @@ public Response( public Map getAllNodeUsageStatsForThreadPools() { Map allNodeUsageStatsForThreadPools = new HashMap<>(); for (NodeUsageStatsForThreadPoolsAction.NodeResponse nodeResponse : getNodes()) { - // NOMERGE: Is the nodeID in NodeUsageStatsForThreadPools redundant? What is it useful for? If not, remove? allNodeUsageStatsForThreadPools.put( nodeResponse.getNodeUsageStatsForThreadPools().nodeId(), nodeResponse.getNodeUsageStatsForThreadPools() From 5505fab3c33243647e09f4b01186ad64a2e581cc Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Fri, 1 Aug 2025 11:10:44 -0700 Subject: [PATCH 5/6] send requests only to data nodes; and fix merge conflict with name change --- .../org/elasticsearch/cluster/ClusterInfoServiceIT.java | 2 +- .../node/usage/NodeUsageStatsForThreadPoolsAction.java | 9 ++++++--- .../cluster/NodeUsageStatsForThreadPoolsCollector.java | 3 ++- .../cluster/routing/ShardMovementWriteLoadSimulator.java | 2 +- 4 files changed, 10 insertions(+), 6 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java index 2a7a2ef8874c0..22e9c59b8c4f4 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java @@ -388,7 +388,7 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() { final Map usageStatsForThreadPools = clusterInfo.getNodeUsageStatsForThreadPools(); logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools); - assertThat(usageStatsForThreadPools.size(), equalTo(2)); // master and data node + assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collectedg var dataNodeId = getNodeId(dataNodeName); var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId); assertNotNull(nodeUsageStatsForThreadPool); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodeUsageStatsForThreadPoolsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodeUsageStatsForThreadPoolsAction.java index e656ede06ffe1..829d1fd8eceaf 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodeUsageStatsForThreadPoolsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodeUsageStatsForThreadPoolsAction.java @@ -33,9 +33,12 @@ public class NodeUsageStatsForThreadPoolsAction { * The sender request type that will be resolved to send individual {@link NodeRequest} requests to every node in the cluster. */ public static class Request extends BaseNodesRequest { - public Request() { - // Send all nodes a request by specifying null. - super((String[]) null); + /** + * @param nodeIds The list of nodes to which to send individual requests and collect responses from. If the list is null, all nodes + * in the cluster will be sent a request. + */ + public Request(String[] nodeIds) { + super(nodeIds); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java index e08563a648eb1..d5862ce737d43 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java @@ -43,10 +43,11 @@ public void collectUsageStats( ClusterState clusterState, ActionListener> listener ) { + var dataNodeIds = clusterState.nodes().getDataNodes().values().stream().map(node -> node.getId()).toArray(String[]::new); if (clusterState.getMinTransportVersion().onOrAfter(TransportVersions.TRANSPORT_NODE_USAGE_STATS_FOR_THREAD_POOLS_ACTION)) { client.execute( TransportNodeUsageStatsForThreadPoolsAction.TYPE, - new NodeUsageStatsForThreadPoolsAction.Request(), + new NodeUsageStatsForThreadPoolsAction.Request(dataNodeIds), listener.map(response -> response.getAllNodeUsageStatsForThreadPools()) ); } else { diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java index 1a729a992583c..dda98184b5f5a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java @@ -92,7 +92,7 @@ private static NodeUsageStatsForThreadPools.ThreadPoolUsageStats replaceWritePoo (writeThreadPoolStats.averageThreadPoolUtilization() + (writeLoadDelta / writeThreadPoolStats.totalThreadPoolThreads())), 0.0 ), - writeThreadPoolStats.averageThreadPoolQueueLatencyMillis() + writeThreadPoolStats.maxThreadPoolQueueLatencyMillis() ); } } From f378e3400b6e890871789132040a0e7f38fff3e7 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Fri, 1 Aug 2025 11:16:28 -0700 Subject: [PATCH 6/6] undo misc. change that slipped in --- .../java/org/elasticsearch/cluster/ClusterInfoServiceIT.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java index 22e9c59b8c4f4..6fd3133686b64 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java @@ -59,7 +59,6 @@ import static java.util.Collections.emptySet; import static java.util.Collections.singletonList; import static java.util.Collections.unmodifiableSet; -import static org.elasticsearch.cluster.InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING; import static org.elasticsearch.common.util.set.Sets.newHashSet; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.CoreMatchers.equalTo; @@ -208,7 +207,7 @@ public void testClusterInfoServiceInformationClearOnError() { internalCluster().startNodes( 2, // manually control publishing - Settings.builder().put(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m").build() + Settings.builder().put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m").build() ); prepareCreate("test").setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)).get(); ensureGreen("test");