Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.admin.cluster.node.stats.TransportNodesStatsAction;
import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilters;
Expand All @@ -21,6 +22,7 @@
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
Expand All @@ -39,16 +41,19 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.Matchers;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.Collections.emptySet;
Expand Down Expand Up @@ -334,4 +339,65 @@ public void testClusterInfoServiceInformationClearOnError() {
);
}
}

public void testClusterInfoIncludesNodeUsageStatsForThreadPools() {
var settings = Settings.builder()
.put(
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
)
.build();
var masterName = internalCluster().startMasterOnlyNode(settings);
var dataNodeName = internalCluster().startDataOnlyNode(settings);
ensureStableCluster(2);
assertEquals(internalCluster().getMasterName(), masterName);
assertNotEquals(internalCluster().getMasterName(), dataNodeName);
logger.info("---> master node: " + masterName + ", data node: " + dataNodeName);

// Track when the data node receives a poll from the master for the write thread pool's stats.
final MockTransportService dataNodeMockTransportService = MockTransportService.getInstance(dataNodeName);
final CountDownLatch nodeThreadPoolStatsPolledByMaster = new CountDownLatch(1);
dataNodeMockTransportService.addRequestHandlingBehavior(
TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]",
(handler, request, channel, task) -> {
handler.messageReceived(request, channel, task);

if (nodeThreadPoolStatsPolledByMaster.getCount() > 0) {
logger.info("---> Data node received a request for thread pool stats");
}
nodeThreadPoolStatsPolledByMaster.countDown();
}
);

// Do some writes to create some write thread pool activity.
final String indexName = randomIdentifier();
for (int i = 0; i < randomIntBetween(1, 1000); i++) {
index(indexName, Integer.toString(i), Collections.singletonMap("foo", "bar"));
}

// Force a refresh of the ClusterInfo state to collect fresh info from the data nodes.
final InternalClusterInfoService masterClusterInfoService = asInstanceOf(
InternalClusterInfoService.class,
internalCluster().getCurrentMasterNodeInstance(ClusterInfoService.class)
);
final ClusterInfo clusterInfo = ClusterInfoServiceUtils.refresh(masterClusterInfoService);

// Verify that the data node received a request for thread pool stats.
safeAwait(nodeThreadPoolStatsPolledByMaster);

final Map<String, NodeUsageStatsForThreadPools> usageStatsForThreadPools = clusterInfo.getNodeUsageStatsForThreadPools();
logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools);
assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collectedg
var dataNodeId = getNodeId(dataNodeName);
var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId);
assertNotNull(nodeUsageStatsForThreadPool);
logger.info("---> Data node's thread pool stats: " + nodeUsageStatsForThreadPool);

assertEquals(dataNodeId, nodeUsageStatsForThreadPool.nodeId());
var writeThreadPoolStats = nodeUsageStatsForThreadPool.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE);
assertNotNull("Expected to find stats for the WRITE thread pool", writeThreadPoolStats);
assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThan(0));
assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThan(0f));
assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be good to add a test where we block write-threads to ensure we have a queue latency here. We can do that in a follow-up, this is not essential before merge.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I flagged it on my JIRA ticket so I don't forget 👍

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.elasticsearch.cluster.EstimatedHeapUsageCollector;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
import org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -92,7 +91,6 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -135,11 +133,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return pluginList(
InternalSettingsPlugin.class,
BogusEstimatedHeapUsagePlugin.class,
BogusNodeUsageStatsForThreadPoolsCollectorPlugin.class
);
return pluginList(InternalSettingsPlugin.class, BogusEstimatedHeapUsagePlugin.class);
}

public void testLockTryingToDelete() throws Exception {
Expand Down Expand Up @@ -334,8 +328,7 @@ public void testNodeWriteLoadsArePresent() {
ClusterInfoServiceUtils.refresh(clusterInfoService);
nodeThreadPoolStats = clusterInfoService.getClusterInfo().getNodeUsageStatsForThreadPools();

/** Verify that each node has usage stats reported. The test {@link BogusNodeUsageStatsForThreadPoolsCollector} implementation
* generates random usage values */
/** Verify that each node has usage stats reported. */
ClusterState state = getInstanceFromNode(ClusterService.class).state();
assertEquals(state.nodes().size(), nodeThreadPoolStats.size());
for (DiscoveryNode node : state.nodes()) {
Expand All @@ -348,7 +341,7 @@ public void testNodeWriteLoadsArePresent() {
assertNotNull(writeThreadPoolStats);
assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThanOrEqualTo(0));
assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThanOrEqualTo(0.0f));
assertThat(writeThreadPoolStats.averageThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L));
assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L));
}
} finally {
updateClusterSettings(
Expand Down Expand Up @@ -993,61 +986,4 @@ public ClusterService getClusterService() {
return clusterService.get();
}
}

/**
* A simple {@link NodeUsageStatsForThreadPoolsCollector} implementation that creates and returns random
* {@link NodeUsageStatsForThreadPools} for each node in the cluster.
* <p>
* Note: there's an 'org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector' file that declares this implementation so that the
* plugin system can pick it up and use it for the test set-up.
*/
public static class BogusNodeUsageStatsForThreadPoolsCollector implements NodeUsageStatsForThreadPoolsCollector {

private final BogusNodeUsageStatsForThreadPoolsCollectorPlugin plugin;

public BogusNodeUsageStatsForThreadPoolsCollector(BogusNodeUsageStatsForThreadPoolsCollectorPlugin plugin) {
this.plugin = plugin;
}

@Override
public void collectUsageStats(ActionListener<Map<String, NodeUsageStatsForThreadPools>> listener) {
ActionListener.completeWith(
listener,
() -> plugin.getClusterService()
.state()
.nodes()
.stream()
.collect(Collectors.toUnmodifiableMap(DiscoveryNode::getId, node -> makeRandomNodeUsageStats(node.getId())))
);
}

private NodeUsageStatsForThreadPools makeRandomNodeUsageStats(String nodeId) {
NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
randomNonNegativeInt(),
randomFloat(),
randomNonNegativeLong()
);
Map<String, NodeUsageStatsForThreadPools.ThreadPoolUsageStats> statsForThreadPools = new HashMap<>();
statsForThreadPools.put(ThreadPool.Names.WRITE, writeThreadPoolStats);
return new NodeUsageStatsForThreadPools(nodeId, statsForThreadPools);
}
}

/**
* Make a plugin to gain access to the {@link ClusterService} instance.
*/
public static class BogusNodeUsageStatsForThreadPoolsCollectorPlugin extends Plugin implements ClusterPlugin {

private final SetOnce<ClusterService> clusterService = new SetOnce<>();

@Override
public Collection<?> createComponents(PluginServices services) {
clusterService.set(services.clusterService());
return List.of();
}

public ClusterService getClusterService() {
return clusterService.get();
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ static TransportVersion def(int id) {
public static final TransportVersion COMPONENT_TEMPLATE_TRACKING_INFO = def(9_132_0_00);
public static final TransportVersion TO_CHILD_BLOCK_JOIN_QUERY = def(9_133_0_00);
public static final TransportVersion ML_INFERENCE_AI21_COMPLETION_ADDED = def(9_134_0_00);
public static final TransportVersion TRANSPORT_NODE_USAGE_STATS_FOR_THREAD_POOLS_ACTION = def(9_135_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.get.TransportGetTaskAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction;
import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction;
import org.elasticsearch.action.admin.cluster.node.usage.TransportNodesUsageAction;
import org.elasticsearch.action.admin.cluster.remote.RemoteClusterNodesAction;
import org.elasticsearch.action.admin.cluster.remote.TransportRemoteInfoAction;
Expand Down Expand Up @@ -629,6 +630,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
ActionRegistry actions = new ActionRegistry();

actions.register(TransportNodesInfoAction.TYPE, TransportNodesInfoAction.class);
actions.register(TransportNodeUsageStatsForThreadPoolsAction.TYPE, TransportNodeUsageStatsForThreadPoolsAction.class);
actions.register(TransportRemoteInfoAction.TYPE, TransportRemoteInfoAction.class);
actions.register(TransportNodesCapabilitiesAction.TYPE, TransportNodesCapabilitiesAction.class);
actions.register(TransportNodesFeaturesAction.TYPE, TransportNodesFeaturesAction.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.action.admin.cluster.node.usage;

import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.AbstractTransportRequest;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Defines the request/response types for {@link TransportNodeUsageStatsForThreadPoolsAction}.
*/
public class NodeUsageStatsForThreadPoolsAction {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a TransportNodesStatsAction which can produce thread-pool usage. Do we need a separate action for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The calls to collect the thread pool stats are destructive. For example, collecting the max queue latency seen since the last call and then resetting max seen to zero. Pool utilization is also destructive, resetting an execution time tracker after collection. So we can't hook the new stats up to the TransportNodesStatsAction API and have random callers clearing the state we'll need for allocation.

Copy link
Contributor

@nicktindall nicktindall Jul 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're building a whole separate set of polling messages just because of the destructive-ness of the utilisation read, can we instead just limit the re-calculation to no more often than 30s or something (returning the most recent calculated value until that interval has passed) and have all readers poll from a single monitor? Or instead just switch to having a separate dedicated utilisation polling task on the data node, and make the reads non-destructive.

That sounds like a tidier alternative to me. We can discuss tomorrow at the catch up, but it seems to me like having separate monitoring state is now cascading into more work/maintenance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we instead just limit the re-calculation to no more often than 30s or something (returning the most recent calculated value until that interval has passed) and have all readers poll from a single monitor?

If we make it time based, recalculated on the data node every 30 seconds, and then the master node polls every 30 seconds, operations can race such that the master node sees the same value twice and misses a value that is just about to be calculated.

Or instead just switch to having a separate dedicated utilisation polling task on the data node, and make the reads non-destructive.

For this we'd have to build a component on the data node to asynchronously run a thread every 30 seconds to calculate a new value. Not obviously easier than building a TransportAction (which is also already implemented). Transport actions are apparently lightweight and there isn't concern about adding more, FWIW.

There was a discussion on this subject back in the 07-01 meeting (David, Pooya and I) (minutes 10-32). We were thinking EWMA at the time. We did like the idea of using the node stats api. The concern became public documentation of the new values because we were uncertain whether we'd want to change what metrics were sent in future. We can't change publicly visible stats. Arguably the stats we have now are easier to explain, though I'm not sure they're final yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the new action is more about having a separate action, request/response that we can play around with and include things into that may never go to node-stats. There is also not too much overhead to a new action so I prefer to have it as such. One can argue that node-stats should maybe not be used from cluster-info - rather we should have a dedicated action - but let us not go there now.

/**
* The sender request type that will be resolved to send individual {@link NodeRequest} requests to every node in the cluster.
*/
public static class Request extends BaseNodesRequest {
/**
* @param nodeIds The list of nodes to which to send individual requests and collect responses from. If the list is null, all nodes
* in the cluster will be sent a request.
*/
public Request(String[] nodeIds) {
super(nodeIds);
}
}

/**
* Request sent to and received by a cluster node. There are no parameters needed in the node-specific request.
*/
public static class NodeRequest extends AbstractTransportRequest {
public NodeRequest(StreamInput in) throws IOException {
super(in);
}

public NodeRequest() {}
}

/**
* A collection of {@link NodeUsageStatsForThreadPools} responses from all the cluster nodes.
*/
public static class Response extends BaseNodesResponse<NodeUsageStatsForThreadPoolsAction.NodeResponse> {

protected Response(StreamInput in) throws IOException {
super(in);
}

public Response(
ClusterName clusterName,
List<NodeUsageStatsForThreadPoolsAction.NodeResponse> nodeResponses,
List<FailedNodeException> nodeFailures
) {
super(clusterName, nodeResponses, nodeFailures);
}

/**
* Combines the responses from each node that was called into a single map (by node ID) for the final {@link Response}.
*/
public Map<String, NodeUsageStatsForThreadPools> getAllNodeUsageStatsForThreadPools() {
Map<String, NodeUsageStatsForThreadPools> allNodeUsageStatsForThreadPools = new HashMap<>();
for (NodeUsageStatsForThreadPoolsAction.NodeResponse nodeResponse : getNodes()) {
allNodeUsageStatsForThreadPools.put(
nodeResponse.getNodeUsageStatsForThreadPools().nodeId(),
nodeResponse.getNodeUsageStatsForThreadPools()
);
}
return allNodeUsageStatsForThreadPools;
}

@Override
protected void writeNodesTo(StreamOutput out, List<NodeResponse> nodeResponses) throws IOException {
out.writeCollection(nodeResponses);
}

@Override
protected List<NodeResponse> readNodesFrom(StreamInput in) throws IOException {
return in.readCollectionAsList(NodeUsageStatsForThreadPoolsAction.NodeResponse::new);
}

@Override
public String toString() {
return "NodeUsageStatsForThreadPoolsAction.Response{" + getNodes() + "}";
}
}

/**
* A {@link NodeUsageStatsForThreadPools} response from a single cluster node.
*/
public static class NodeResponse extends BaseNodeResponse {
private final NodeUsageStatsForThreadPools nodeUsageStatsForThreadPools;

protected NodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
super(in, node);
this.nodeUsageStatsForThreadPools = new NodeUsageStatsForThreadPools(in);
}

public NodeResponse(DiscoveryNode node, NodeUsageStatsForThreadPools nodeUsageStatsForThreadPools) {
super(node);
this.nodeUsageStatsForThreadPools = nodeUsageStatsForThreadPools;
}

public NodeResponse(StreamInput in) throws IOException {
super(in);
this.nodeUsageStatsForThreadPools = new NodeUsageStatsForThreadPools(in);
}

public NodeUsageStatsForThreadPools getNodeUsageStatsForThreadPools() {
return nodeUsageStatsForThreadPools;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
nodeUsageStatsForThreadPools.writeTo(out);
}

@Override
public String toString() {
return "NodeUsageStatsForThreadPoolsAction.NodeResponse{"
+ "nodeId="
+ getNode().getId()
+ ", nodeUsageStatsForThreadPools="
+ nodeUsageStatsForThreadPools
+ "}";
}
}

}
Loading