From f81f9f960c1393cd5c022d89f1cafe7563fa99e4 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Tue, 1 Jul 2025 13:08:20 -0700 Subject: [PATCH 1/6] Add node write load to the ClusterInfo Sets up ClusterInfoService to collect node write load and pass it into ClusterInfo. The node write load stats are not yet supplied, they'll be zero/empty in the ClusterInfo for now. Relates ES-11990 --- .../index/shard/IndexShardIT.java | 101 +++++++++++++++++- ...g.elasticsearch.cluster.WriteLoadCollector | 10 ++ .../org/elasticsearch/TransportVersions.java | 1 + .../elasticsearch/cluster/ClusterInfo.java | 40 +++++-- .../cluster/ClusterInfoSimulator.java | 5 +- .../cluster/InternalClusterInfoService.java | 48 ++++++++- .../elasticsearch/cluster/NodeWriteLoad.java | 71 ++++++++++++ .../cluster/WriteLoadCollector.java | 33 ++++++ .../node/NodeServiceProvider.java | 8 +- .../cluster/ClusterInfoTests.java | 18 +++- ...rnalClusterInfoServiceSchedulingTests.java | 23 +++- .../ExpectedShardSizeEstimatorTests.java | 1 + .../AllocationStatsServiceTests.java | 1 + .../allocation/DiskThresholdMonitorTests.java | 2 +- .../ExpectedShardSizeAllocationTests.java | 3 +- .../BalancedShardsAllocatorTests.java | 3 +- .../ClusterAllocationSimulationTests.java | 2 +- .../allocator/ClusterBalanceStatsTests.java | 1 + .../allocator/ClusterInfoSimulatorTests.java | 1 + .../DesiredBalanceComputerTests.java | 4 +- .../DesiredBalanceReconcilerTests.java | 1 + .../decider/DiskThresholdDeciderTests.java | 2 +- .../DiskThresholdDeciderUnitTests.java | 5 + .../MockInternalClusterInfoService.java | 2 +- .../ReactiveStorageDeciderService.java | 1 + ...oscalingCalculateCapacityServiceTests.java | 4 +- .../FrozenStorageDeciderServiceTests.java | 2 +- .../ProactiveStorageDeciderServiceTests.java | 2 +- .../ReactiveStorageDeciderServiceTests.java | 4 +- ...nsportNodeDeprecationCheckActionTests.java | 1 + 30 files changed, 372 insertions(+), 28 deletions(-) create mode 100644 server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.WriteLoadCollector create mode 100644 server/src/main/java/org/elasticsearch/cluster/NodeWriteLoad.java create mode 100644 server/src/main/java/org/elasticsearch/cluster/WriteLoadCollector.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index 6dafab431500e..b3a031f4f1070 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -8,6 +8,8 @@ */ package org.elasticsearch.index.shard; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.util.SetOnce; @@ -22,6 +24,8 @@ import org.elasticsearch.cluster.EstimatedHeapUsage; import org.elasticsearch.cluster.EstimatedHeapUsageCollector; import org.elasticsearch.cluster.InternalClusterInfoService; +import org.elasticsearch.cluster.NodeWriteLoad; +import org.elasticsearch.cluster.WriteLoadCollector; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; @@ -29,6 +33,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; @@ -117,14 +122,16 @@ import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThanOrEqualTo; public class IndexShardIT extends ESSingleNodeTestCase { + private static final Logger logger = LogManager.getLogger(IndexShardIT.class); @Override protected Collection> getPlugins() { - return pluginList(InternalSettingsPlugin.class, BogusEstimatedHeapUsagePlugin.class); + return pluginList(InternalSettingsPlugin.class, BogusEstimatedHeapUsagePlugin.class, BogusWriteLoadCollectorPlugin.class); } public void testLockTryingToDelete() throws Exception { @@ -295,6 +302,47 @@ public void testHeapUsageEstimateIsPresent() { } } + public void testNodeWriteLoadsArePresent() { + InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class); + ClusterInfoServiceUtils.refresh(clusterInfoService); + Map nodeWriteLoads = clusterInfoService.getClusterInfo().getNodeWriteLoads(); + assertNotNull(nodeWriteLoads); + /** Not collecting stats yet because allocation write load stats collection is disabled by default. + * see {@link WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING} */ + assertTrue(nodeWriteLoads.isEmpty()); + + // Enable collection for node write loads. + updateClusterSettings( + Settings.builder() + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), + WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + ) + .build() + ); + try { + // Force a ClusterInfo refresh to run collection of the node write loads. + ClusterInfoServiceUtils.refresh(clusterInfoService); + nodeWriteLoads = clusterInfoService.getClusterInfo().getNodeWriteLoads(); + + /** Verify that each node has a write load reported. The test {@link BogusWriteLoadCollector} generates random load values */ + ClusterState state = getInstanceFromNode(ClusterService.class).state(); + assertEquals(state.nodes().size(), nodeWriteLoads.size()); + for (DiscoveryNode node : state.nodes()) { + assertTrue(nodeWriteLoads.containsKey(node.getId())); + NodeWriteLoad nodeWriteLoad = nodeWriteLoads.get(node.getId()); + assertThat(nodeWriteLoad.nodeId(), equalTo(node.getId())); + assertThat(nodeWriteLoad.totalWriteThreadPoolThreads(), greaterThanOrEqualTo(0)); + assertThat(nodeWriteLoad.percentWriteThreadPoolUtilization(), greaterThanOrEqualTo(0)); + assertThat(nodeWriteLoad.maxTaskTimeInWriteQueueMillis(), greaterThanOrEqualTo(0L)); + } + } finally { + updateClusterSettings( + Settings.builder().putNull(WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey()).build() + ); + } + } + public void testIndexCanChangeCustomDataPath() throws Exception { final String index = "test-custom-data-path"; final Path sharedDataPath = getInstanceFromNode(Environment.class).sharedDataDir().resolve(randomAsciiLettersOfLength(10)); @@ -875,4 +923,55 @@ public ClusterService getClusterService() { return clusterService.get(); } } + + /** + * A simple {@link WriteLoadCollector} implementation that creates and returns random {@link NodeWriteLoad} for each node in the + * cluster. + *

+ * Note: there's an 'org.elasticsearch.cluster.WriteLoadCollector' file that declares this implementation so that the plugin system can + * pick it up and use it for the test set-up. + */ + public static class BogusWriteLoadCollector implements WriteLoadCollector { + + private final BogusWriteLoadCollectorPlugin plugin; + + public BogusWriteLoadCollector(BogusWriteLoadCollectorPlugin plugin) { + this.plugin = plugin; + } + + @Override + public void collectWriteLoads(ActionListener> listener) { + ActionListener.completeWith( + listener, + () -> plugin.getClusterService() + .state() + .nodes() + .stream() + .collect( + Collectors.toUnmodifiableMap( + DiscoveryNode::getId, + node -> new NodeWriteLoad(node.getId(), randomNonNegativeInt(), randomNonNegativeInt(), randomNonNegativeLong()) + ) + ) + ); + } + } + + /** + * Make a plugin to gain access to the {@link ClusterService} instance. + */ + public static class BogusWriteLoadCollectorPlugin extends Plugin implements ClusterPlugin { + + private final SetOnce clusterService = new SetOnce<>(); + + @Override + public Collection createComponents(PluginServices services) { + clusterService.set(services.clusterService()); + return List.of(); + } + + public ClusterService getClusterService() { + return clusterService.get(); + } + } } diff --git a/server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.WriteLoadCollector b/server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.WriteLoadCollector new file mode 100644 index 0000000000000..3bc75accc0095 --- /dev/null +++ b/server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.WriteLoadCollector @@ -0,0 +1,10 @@ +# +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the "Elastic License +# 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side +# Public License v 1"; you may not use this file except in compliance with, at +# your election, the "Elastic License 2.0", the "GNU Affero General Public +# License v3.0 only", or the "Server Side Public License, v 1". +# + +org.elasticsearch.index.shard.IndexShardIT$BogusWriteLoadCollector diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 431e2bf1a20eb..e3df75a407e0e 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -326,6 +326,7 @@ static TransportVersion def(int id) { public static final TransportVersion ML_INFERENCE_COHERE_API_VERSION = def(9_110_0_00); public static final TransportVersion ESQL_PROFILE_INCLUDE_PLAN = def(9_111_0_00); public static final TransportVersion MAPPINGS_IN_DATA_STREAMS = def(9_112_0_00); + public static final TransportVersion NODE_WRITE_LOAD_IN_CLUSTER_INFO = def(9_113_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java index e8cb0421979c5..cc14d1fde5ddd 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java @@ -58,9 +58,10 @@ public class ClusterInfo implements ChunkedToXContent, Writeable { final Map dataPath; final Map reservedSpace; final Map estimatedHeapUsages; + final Map nodeWriteLoads; protected ClusterInfo() { - this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); + this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); } /** @@ -73,6 +74,7 @@ protected ClusterInfo() { * @param dataPath the shard routing to datapath mapping * @param reservedSpace reserved space per shard broken down by node and data path * @param estimatedHeapUsages estimated heap usage broken down by node + * @param nodeWriteLoads estimated node-level write load broken down by node * @see #shardIdentifierFromRouting */ public ClusterInfo( @@ -82,7 +84,8 @@ public ClusterInfo( Map shardDataSetSizes, Map dataPath, Map reservedSpace, - Map estimatedHeapUsages + Map estimatedHeapUsages, + Map nodeWriteLoads ) { this.leastAvailableSpaceUsage = Map.copyOf(leastAvailableSpaceUsage); this.mostAvailableSpaceUsage = Map.copyOf(mostAvailableSpaceUsage); @@ -91,6 +94,7 @@ public ClusterInfo( this.dataPath = Map.copyOf(dataPath); this.reservedSpace = Map.copyOf(reservedSpace); this.estimatedHeapUsages = Map.copyOf(estimatedHeapUsages); + this.nodeWriteLoads = Map.copyOf(nodeWriteLoads); } public ClusterInfo(StreamInput in) throws IOException { @@ -107,6 +111,11 @@ public ClusterInfo(StreamInput in) throws IOException { } else { this.estimatedHeapUsages = Map.of(); } + if (in.getTransportVersion().onOrAfter(TransportVersions.NODE_WRITE_LOAD_IN_CLUSTER_INFO)) { + this.nodeWriteLoads = in.readImmutableMap(NodeWriteLoad::new); + } else { + this.nodeWriteLoads = Map.of(); + } } @Override @@ -124,6 +133,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().onOrAfter(TransportVersions.HEAP_USAGE_IN_CLUSTER_INFO)) { out.writeMap(this.estimatedHeapUsages, StreamOutput::writeWriteable); } + if (out.getTransportVersion().onOrAfter(TransportVersions.NODE_WRITE_LOAD_IN_CLUSTER_INFO)) { + out.writeMap(this.nodeWriteLoads, StreamOutput::writeWriteable); + } } /** @@ -204,8 +216,8 @@ public Iterator toXContentChunked(ToXContent.Params params return builder.endObject(); // NodeAndPath }), endArray() // end "reserved_sizes" - // NOTE: We don't serialize estimatedHeapUsages at this stage, to avoid - // committing to API payloads until the feature is settled + // NOTE: We don't serialize estimatedHeapUsages/nodeWriteLoads at this stage, to avoid + // committing to API payloads until the features are settled ); } @@ -220,6 +232,13 @@ public Map getEstimatedHeapUsages() { return estimatedHeapUsages; } + /** + * Returns a map containing the node-level write load estimate for each node by node ID. + */ + public Map getNodeWriteLoads() { + return nodeWriteLoads; + } + /** * Returns a node id to disk usage mapping for the path that has the least available space on the node. * Note that this does not take account of reserved space: there may be another path with less available _and unreserved_ space. @@ -311,12 +330,21 @@ public boolean equals(Object o) { && shardSizes.equals(that.shardSizes) && shardDataSetSizes.equals(that.shardDataSetSizes) && dataPath.equals(that.dataPath) - && reservedSpace.equals(that.reservedSpace); + && reservedSpace.equals(that.reservedSpace) + && nodeWriteLoads.equals(that.nodeWriteLoads); } @Override public int hashCode() { - return Objects.hash(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, shardDataSetSizes, dataPath, reservedSpace); + return Objects.hash( + leastAvailableSpaceUsage, + mostAvailableSpaceUsage, + shardSizes, + shardDataSetSizes, + dataPath, + reservedSpace, + nodeWriteLoads + ); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java index b47b15f545ed8..3f95fc3c63dc1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java @@ -34,6 +34,7 @@ public class ClusterInfoSimulator { private final Map shardDataSetSizes; private final Map dataPath; private final Map estimatedHeapUsages; + private final Map nodeWriteLoads; public ClusterInfoSimulator(RoutingAllocation allocation) { this.allocation = allocation; @@ -43,6 +44,7 @@ public ClusterInfoSimulator(RoutingAllocation allocation) { this.shardDataSetSizes = Map.copyOf(allocation.clusterInfo().shardDataSetSizes); this.dataPath = Map.copyOf(allocation.clusterInfo().dataPath); this.estimatedHeapUsages = allocation.clusterInfo().getEstimatedHeapUsages(); + this.nodeWriteLoads = allocation.clusterInfo().getNodeWriteLoads(); } /** @@ -156,7 +158,8 @@ public ClusterInfo getClusterInfo() { shardDataSetSizes, dataPath, Map.of(), - estimatedHeapUsages + estimatedHeapUsages, + nodeWriteLoads ); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index 066667dfaba84..5e5b21469d51a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; +import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings.WriteLoadDeciderStatus; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; @@ -50,6 +51,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Consumer; +import static org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING; import static org.elasticsearch.core.Strings.format; /** @@ -92,6 +94,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt private volatile boolean diskThresholdEnabled; private volatile boolean estimatedHeapThresholdEnabled; + private volatile WriteLoadDeciderStatus writeLoadConstraintEnabled; private volatile TimeValue updateFrequency; private volatile TimeValue fetchTimeout; @@ -99,6 +102,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt private volatile Map mostAvailableSpaceUsages; private volatile Map maxHeapPerNode; private volatile Map estimatedHeapUsagePerNode; + private volatile Map nodeWriteLoadPerNode; private volatile IndicesStatsSummary indicesStatsSummary; private final ThreadPool threadPool; @@ -108,6 +112,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt private final Object mutex = new Object(); private final List> nextRefreshListeners = new ArrayList<>(); private final EstimatedHeapUsageCollector estimatedHeapUsageCollector; + private final WriteLoadCollector writeLoadCollector; private AsyncRefresh currentRefresh; private RefreshScheduler refreshScheduler; @@ -118,16 +123,19 @@ public InternalClusterInfoService( ClusterService clusterService, ThreadPool threadPool, Client client, - EstimatedHeapUsageCollector estimatedHeapUsageCollector + EstimatedHeapUsageCollector estimatedHeapUsageCollector, + WriteLoadCollector writeLoadCollector ) { this.leastAvailableSpaceUsages = Map.of(); this.mostAvailableSpaceUsages = Map.of(); this.maxHeapPerNode = Map.of(); this.estimatedHeapUsagePerNode = Map.of(); + this.nodeWriteLoadPerNode = Map.of(); this.indicesStatsSummary = IndicesStatsSummary.EMPTY; this.threadPool = threadPool; this.client = client; this.estimatedHeapUsageCollector = estimatedHeapUsageCollector; + this.writeLoadCollector = writeLoadCollector; this.updateFrequency = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings); this.fetchTimeout = INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING.get(settings); this.diskThresholdEnabled = DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings); @@ -142,6 +150,8 @@ public InternalClusterInfoService( CLUSTER_ROUTING_ALLOCATION_ESTIMATED_HEAP_THRESHOLD_DECIDER_ENABLED, this::setEstimatedHeapThresholdEnabled ); + + clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_ENABLED_SETTING, this::setWriteLoadConstraintEnabled); } private void setDiskThresholdEnabled(boolean diskThresholdEnabled) { @@ -152,6 +162,10 @@ private void setEstimatedHeapThresholdEnabled(boolean estimatedHeapThresholdEnab this.estimatedHeapThresholdEnabled = estimatedHeapThresholdEnabled; } + private void setWriteLoadConstraintEnabled(WriteLoadDeciderStatus writeLoadConstraintEnabled) { + this.writeLoadConstraintEnabled = writeLoadConstraintEnabled; + } + private void setFetchTimeout(TimeValue fetchTimeout) { this.fetchTimeout = fetchTimeout; } @@ -204,6 +218,7 @@ void execute() { maybeFetchIndicesStats(diskThresholdEnabled); maybeFetchNodeStats(diskThresholdEnabled || estimatedHeapThresholdEnabled); maybeFetchNodesEstimatedHeapUsage(estimatedHeapThresholdEnabled); + maybeFetchNodesWriteLoadStats(writeLoadConstraintEnabled); } } @@ -242,6 +257,32 @@ private void maybeFetchNodesEstimatedHeapUsage(boolean shouldFetch) { } } + private void maybeFetchNodesWriteLoadStats(WriteLoadDeciderStatus writeLoadConstraintEnabled) { + if (writeLoadConstraintEnabled != WriteLoadDeciderStatus.DISABLED) { + try (var ignored = threadPool.getThreadContext().clearTraceContext()) { + fetchNodesWriteLoadStats(); + } + } else { + logger.trace("skipping collecting shard/node write load estimates from cluster, feature currently disabled"); + nodeWriteLoadPerNode = Map.of(); + } + } + + private void fetchNodesWriteLoadStats() { + writeLoadCollector.collectWriteLoads(ActionListener.releaseAfter(new ActionListener<>() { + @Override + public void onResponse(Map writeLoads) { + nodeWriteLoadPerNode = writeLoads; + } + + @Override + public void onFailure(Exception e) { + logger.warn("failed to fetch write load estimates for nodes", e); + nodeWriteLoadPerNode = Map.of(); + } + }, fetchRefs.acquire())); + } + private void fetchNodesEstimatedHeapUsage() { estimatedHeapUsageCollector.collectClusterHeapUsage(ActionListener.releaseAfter(new ActionListener<>() { @Override @@ -486,6 +527,8 @@ public ClusterInfo getClusterInfo() { estimatedHeapUsages.put(nodeId, new EstimatedHeapUsage(nodeId, maxHeapSize.getBytes(), estimatedHeapUsage)); } }); + final Map nodeWriteLoads = new HashMap<>(); + nodeWriteLoadPerNode.forEach((nodeId, nodeWriteLoad) -> { nodeWriteLoads.put(nodeId, nodeWriteLoad); }); return new ClusterInfo( leastAvailableSpaceUsages, mostAvailableSpaceUsages, @@ -493,7 +536,8 @@ public ClusterInfo getClusterInfo() { indicesStatsSummary.shardDataSetSizes, indicesStatsSummary.dataPath, indicesStatsSummary.reservedSpace, - estimatedHeapUsages + estimatedHeapUsages, + nodeWriteLoads ); } diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeWriteLoad.java b/server/src/main/java/org/elasticsearch/cluster/NodeWriteLoad.java new file mode 100644 index 0000000000000..463b82077d795 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/NodeWriteLoad.java @@ -0,0 +1,71 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; + +/** + * Record representing an estimate of a node's write load. The estimation is based on the usage of the node's write thread pool. + * + * @param nodeId Node ID. + * @param totalWriteThreadPoolThreads Total number of threads in the write thread pool. + * @param percentWriteThreadPoolUtilization Percent of write thread pool threads that are in use, averaged over some period of time. + * @param maxTaskTimeInWriteQueueMillis How long the oldest task (next to be run) in the write thread pool queue has been queued. Zero if + * there is no write thread pool queue. + */ +public record NodeWriteLoad( + String nodeId, + int totalWriteThreadPoolThreads, + int percentWriteThreadPoolUtilization, + long maxTaskTimeInWriteQueueMillis +) implements Writeable { + + public NodeWriteLoad(StreamInput in) throws IOException { + this(in.readString(), in.readVInt(), in.readVInt(), in.readVLong()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(this.nodeId); + out.writeVInt(this.totalWriteThreadPoolThreads); + out.writeVInt(this.percentWriteThreadPoolUtilization); + out.writeVLong(this.maxTaskTimeInWriteQueueMillis); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + NodeWriteLoad other = (NodeWriteLoad) o; + return nodeId.equals(other.nodeId) + && totalWriteThreadPoolThreads == other.totalWriteThreadPoolThreads + && percentWriteThreadPoolUtilization == other.percentWriteThreadPoolUtilization + && maxTaskTimeInWriteQueueMillis == other.maxTaskTimeInWriteQueueMillis; + } + + @Override + public String toString() { + return getClass().getSimpleName() + + "{nodeId=[" + + nodeId + + "], totalWriteThreadPoolThreads=[" + + totalWriteThreadPoolThreads + + "], percentWriteThreadPoolUtilization=[" + + percentWriteThreadPoolUtilization + + "], maxTaskTimeInWriteQueueMillis=[" + + maxTaskTimeInWriteQueueMillis + + "]}"; + } + +} diff --git a/server/src/main/java/org/elasticsearch/cluster/WriteLoadCollector.java b/server/src/main/java/org/elasticsearch/cluster/WriteLoadCollector.java new file mode 100644 index 0000000000000..d1e6161cc064a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/WriteLoadCollector.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster; + +import org.elasticsearch.action.ActionListener; + +import java.util.Map; + +/** + * Collects the write load estimates for each node in the cluster. + *

+ * Results are returned as a map of node ID to node write load. + */ +public interface WriteLoadCollector { + /** + * This will be used when there is no WriteLoadCollector available. + */ + WriteLoadCollector EMPTY = listener -> listener.onResponse(Map.of()); + + /** + * Collects the write load estimates from the cluster. + * + * @param listener The listener to receive the write load results. + */ + void collectWriteLoads(ActionListener> listener); +} diff --git a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java index 53c26dc679677..4b24085f844dd 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java +++ b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java @@ -14,6 +14,7 @@ import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.EstimatedHeapUsageCollector; import org.elasticsearch.cluster.InternalClusterInfoService; +import org.elasticsearch.cluster.WriteLoadCollector; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.breaker.CircuitBreaker; @@ -79,12 +80,17 @@ ClusterInfoService newClusterInfoService( EstimatedHeapUsageCollector.class, () -> EstimatedHeapUsageCollector.EMPTY ); + final WriteLoadCollector writeLoadCollector = pluginsService.loadSingletonServiceProvider( + WriteLoadCollector.class, + () -> WriteLoadCollector.EMPTY + ); final InternalClusterInfoService service = new InternalClusterInfoService( settings, clusterService, threadPool, client, - estimatedHeapUsageCollector + estimatedHeapUsageCollector, + writeLoadCollector ); if (DiscoveryNode.isMasterNode(settings)) { // listen for state changes (this node starts/stops being the elected master, or new nodes are added) diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java index 25932c9e8b9f3..2673f18918d1c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java @@ -42,7 +42,8 @@ public static ClusterInfo randomClusterInfo() { randomDataSetSizes(), randomRoutingToDataPath(), randomReservedSpace(), - randomNodeHeapUsage() + randomNodeHeapUsage(), + randomNodeWriteLoads() ); } @@ -62,6 +63,21 @@ private static Map randomNodeHeapUsage() { return nodeHeapUsage; } + private static Map randomNodeWriteLoads() { + int numEntries = randomIntBetween(0, 128); + Map nodeWriteLoads = new HashMap<>(numEntries); + for (int i = 0; i < numEntries; i++) { + String nodeIdKey = randomAlphaOfLength(32); + final NodeWriteLoad nodeWriteLoad = new NodeWriteLoad(/* nodeId= */ nodeIdKey, + /* totalWriteThreadPoolThreads= */ randomIntBetween(1, 16), + /* percentWriteThreadPoolUtilization= */randomIntBetween(0, 100), + /* maxTaskTimeInWriteQueueMillis= */ randomLongBetween(0, 50000) + ); + nodeWriteLoads.put(nodeIdKey, nodeWriteLoad); + } + return nodeWriteLoads; + } + private static Map randomDiskUsage() { int numEntries = randomIntBetween(0, 128); Map builder = new HashMap<>(numEntries); diff --git a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java index ea9c940793778..a9bbb90407800 100644 --- a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.FakeThreadPoolMasterService; @@ -55,7 +56,11 @@ public void testScheduling() { final Settings.Builder settingsBuilder = Settings.builder() .put(Node.NODE_NAME_SETTING.getKey(), discoveryNode.getName()) - .put(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_ESTIMATED_HEAP_THRESHOLD_DECIDER_ENABLED.getKey(), true); + .put(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_ESTIMATED_HEAP_THRESHOLD_DECIDER_ENABLED.getKey(), true) + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), + WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + ); if (randomBoolean()) { settingsBuilder.put(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), randomIntBetween(10000, 60000) + "ms"); } @@ -79,12 +84,14 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { final FakeClusterInfoServiceClient client = new FakeClusterInfoServiceClient(threadPool); final EstimatedHeapUsageCollector mockEstimatedHeapUsageCollector = spy(new StubEstimatedEstimatedHeapUsageCollector()); + final WriteLoadCollector mockWriteLoadCollector = spy(new StubWriteLoadCollector()); final InternalClusterInfoService clusterInfoService = new InternalClusterInfoService( settings, clusterService, threadPool, client, - mockEstimatedHeapUsageCollector + mockEstimatedHeapUsageCollector, + mockWriteLoadCollector ); clusterService.addListener(clusterInfoService); clusterInfoService.addListener(ignored -> {}); @@ -122,12 +129,14 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { for (int i = 0; i < 3; i++) { Mockito.clearInvocations(mockEstimatedHeapUsageCollector); + Mockito.clearInvocations(mockWriteLoadCollector); final int initialRequestCount = client.requestCount; final long duration = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings).millis(); runFor(deterministicTaskQueue, duration); deterministicTaskQueue.runAllRunnableTasks(); assertThat(client.requestCount, equalTo(initialRequestCount + 2)); // should have run two client requests per interval verify(mockEstimatedHeapUsageCollector).collectClusterHeapUsage(any()); // Should poll for heap usage once per interval + verify(mockWriteLoadCollector).collectWriteLoads(any()); } final AtomicBoolean failMaster2 = new AtomicBoolean(); @@ -152,6 +161,16 @@ public void collectClusterHeapUsage(ActionListener> listener) } } + /** + * Simple for test {@link WriteLoadCollector} implementation that returns an empty map of nodeId string to {@link NodeWriteLoad}. + */ + private static class StubWriteLoadCollector implements WriteLoadCollector { + @Override + public void collectWriteLoads(ActionListener> listener) { + listener.onResponse(Map.of()); + } + } + private static void runFor(DeterministicTaskQueue deterministicTaskQueue, long duration) { final long endTime = deterministicTaskQueue.getCurrentTimeMillis() + duration; while (deterministicTaskQueue.getCurrentTimeMillis() < endTime diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/ExpectedShardSizeEstimatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/ExpectedShardSizeEstimatorTests.java index 754b4d2b22d0d..96e277284a659 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/ExpectedShardSizeEstimatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/ExpectedShardSizeEstimatorTests.java @@ -206,6 +206,7 @@ private static ClusterInfo createClusterInfo(ShardRouting shard, Long size) { Map.of(), Map.of(), Map.of(), + Map.of(), Map.of() ); } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java index 14e0aaa253749..8b54cecb29580 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java @@ -79,6 +79,7 @@ public void testShardStats() { Map.of(), Map.of(), Map.of(), + Map.of(), Map.of() ); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java index df0fa875a7249..170677ff3632f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java @@ -1580,7 +1580,7 @@ private static ClusterInfo clusterInfo( Map diskUsages, Map reservedSpace ) { - return new ClusterInfo(diskUsages, Map.of(), Map.of(), Map.of(), Map.of(), reservedSpace, Map.of()); + return new ClusterInfo(diskUsages, Map.of(), Map.of(), Map.of(), Map.of(), reservedSpace, Map.of(), Map.of()); } private static DiscoveryNode newFrozenOnlyNode(String nodeId) { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ExpectedShardSizeAllocationTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ExpectedShardSizeAllocationTests.java index f1a2b4b1358fe..5ab57a2bba607 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ExpectedShardSizeAllocationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ExpectedShardSizeAllocationTests.java @@ -259,11 +259,12 @@ private static ClusterInfo createClusterInfoWith(ShardId shardId, long size) { Map.of(), Map.of(), Map.of(), + Map.of(), Map.of() ); } private static ClusterInfo createClusterInfo(Map diskUsage, Map shardSizes) { - return new ClusterInfo(diskUsage, diskUsage, shardSizes, Map.of(), Map.of(), Map.of(), Map.of()); + return new ClusterInfo(diskUsage, diskUsage, shardSizes, Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java index 8ab031aa53fe1..50ace093019ed 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java @@ -609,6 +609,7 @@ public void testShardSizeDiscrepancyWithinIndex() { Map.of(), Map.of(), Map.of(), + Map.of(), Map.of() ) ); @@ -705,7 +706,7 @@ private RoutingAllocation createRoutingAllocation(ClusterState clusterState) { } private static ClusterInfo createClusterInfo(Map indexSizes) { - return new ClusterInfo(Map.of(), Map.of(), indexSizes, Map.of(), Map.of(), Map.of(), Map.of()); + return new ClusterInfo(Map.of(), Map.of(), indexSizes, Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); } private static IndexMetadata.Builder anIndex(String name) { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java index 277521c5832a1..44e9eb7ff5232 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java @@ -561,7 +561,7 @@ public ClusterInfo getClusterInfo() { dataPath.put(new ClusterInfo.NodeAndShard(shardRouting.currentNodeId(), shardRouting.shardId()), "/data"); } - return new ClusterInfo(diskSpaceUsage, diskSpaceUsage, shardSizes, Map.of(), dataPath, Map.of(), Map.of()); + return new ClusterInfo(diskSpaceUsage, diskSpaceUsage, shardSizes, Map.of(), dataPath, Map.of(), Map.of(), Map.of()); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterBalanceStatsTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterBalanceStatsTests.java index 80fe603488fd3..73d0d7ac00796 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterBalanceStatsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterBalanceStatsTests.java @@ -346,6 +346,7 @@ private ClusterInfo createClusterInfo(List> shardSizes) { Map.of(), Map.of(), Map.of(), + Map.of(), Map.of() ); } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterInfoSimulatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterInfoSimulatorTests.java index b67e248999ced..921e9046a57e5 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterInfoSimulatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterInfoSimulatorTests.java @@ -697,6 +697,7 @@ public ClusterInfo build() { Map.of(), Map.of(), reservedSpace, + Map.of(), Map.of() ); } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java index a0d28ce124584..9c8147f507961 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java @@ -690,7 +690,7 @@ public void testDesiredBalanceShouldConvergeInABigCluster() { .stream() .collect(toMap(Map.Entry::getKey, it -> new DiskUsage(it.getKey(), it.getKey(), "/data", diskSize, diskSize - it.getValue()))); - var clusterInfo = new ClusterInfo(diskUsage, diskUsage, shardSizes, Map.of(), dataPath, Map.of(), Map.of()); + var clusterInfo = new ClusterInfo(diskUsage, diskUsage, shardSizes, Map.of(), dataPath, Map.of(), Map.of(), Map.of()); var settings = Settings.EMPTY; @@ -1196,7 +1196,7 @@ public ClusterInfoTestBuilder withReservedSpace(String nodeId, long size, ShardI } public ClusterInfo build() { - return new ClusterInfo(diskUsage, diskUsage, shardSizes, Map.of(), Map.of(), reservedSpace, Map.of()); + return new ClusterInfo(diskUsage, diskUsage, shardSizes, Map.of(), Map.of(), reservedSpace, Map.of(), Map.of()); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java index 844912cba4c17..37646d376f8fd 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java @@ -620,6 +620,7 @@ public void testUnassignedAllocationPredictsDiskUsage() { ImmutableOpenMap.of(), ImmutableOpenMap.of(), ImmutableOpenMap.of(), + ImmutableOpenMap.of(), ImmutableOpenMap.of() ); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index 5467d313834b8..f85c2678e04e7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -1406,7 +1406,7 @@ static class DevNullClusterInfo extends ClusterInfo { Map shardSizes, Map reservedSpace ) { - super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, Map.of(), Map.of(), reservedSpace, Map.of()); + super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, Map.of(), Map.of(), reservedSpace, Map.of(), Map.of()); } @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java index 7da75f61da801..dc98cf5349c09 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java @@ -109,6 +109,7 @@ public void testCanAllocateUsesMaxAvailableSpace() { Map.of(), Map.of(), Map.of(), + Map.of(), Map.of() ); RoutingAllocation allocation = new RoutingAllocation( @@ -181,6 +182,7 @@ private void doTestCannotAllocateDueToLackOfDiskResources(boolean testMaxHeadroo Map.of(), Map.of(), Map.of(), + Map.of(), Map.of() ); RoutingAllocation allocation = new RoutingAllocation( @@ -327,6 +329,7 @@ private void doTestCanRemainUsesLeastAvailableSpace(boolean testMaxHeadroom) { Map.of(), shardRoutingMap, Map.of(), + Map.of(), Map.of() ); RoutingAllocation allocation = new RoutingAllocation( @@ -847,6 +850,7 @@ public void testDecidesYesIfWatermarksIgnored() { Map.of(), Map.of(), Map.of(), + Map.of(), Map.of() ); RoutingAllocation allocation = new RoutingAllocation( @@ -915,6 +919,7 @@ public void testCannotForceAllocateOver100PercentUsage() { Map.of(), Map.of(), Map.of(), + Map.of(), Map.of() ); RoutingAllocation allocation = new RoutingAllocation( diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java index 01b11ce97460a..bd7c29eb579d4 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java @@ -43,7 +43,7 @@ public static class TestPlugin extends Plugin {} private volatile BiFunction diskUsageFunction; public MockInternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) { - super(settings, clusterService, threadPool, client, EstimatedHeapUsageCollector.EMPTY); + super(settings, clusterService, threadPool, client, EstimatedHeapUsageCollector.EMPTY, WriteLoadCollector.EMPTY); } public void setDiskUsageFunctionAndRefresh(BiFunction diskUsageFn) { diff --git a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java index e451b1d45817d..6c4066a447b67 100644 --- a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java +++ b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java @@ -960,6 +960,7 @@ private ExtendedClusterInfo(Map extraShardSizes, ClusterInfo info) Map.of(), Map.of(), Map.of(), + Map.of(), Map.of() ); this.delegate = info; diff --git a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityServiceTests.java b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityServiceTests.java index 12f7dde103c9c..a5ce2ff894817 100644 --- a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityServiceTests.java +++ b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityServiceTests.java @@ -262,7 +262,7 @@ public void testContext() { } } state = ClusterState.builder(ClusterName.DEFAULT).nodes(nodes).build(); - info = new ClusterInfo(leastUsages, mostUsages, Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); + info = new ClusterInfo(leastUsages, mostUsages, Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); context = new AutoscalingCalculateCapacityService.DefaultAutoscalingDeciderContext( roleNames, state, @@ -311,7 +311,7 @@ public void testContext() { ) ); - info = new ClusterInfo(leastUsages, mostUsages, Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); + info = new ClusterInfo(leastUsages, mostUsages, Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); context = new AutoscalingCalculateCapacityService.DefaultAutoscalingDeciderContext( roleNames, state, diff --git a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/FrozenStorageDeciderServiceTests.java b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/FrozenStorageDeciderServiceTests.java index 37295ebf44208..e28f2b7ba2ec8 100644 --- a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/FrozenStorageDeciderServiceTests.java +++ b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/FrozenStorageDeciderServiceTests.java @@ -109,7 +109,7 @@ public Tuple sizeAndClusterInfo(IndexMetadata indexMetadata) // add irrelevant shards noise for completeness (should not happen IRL). sizes.put(new ShardId(index, i), randomLongBetween(0, Integer.MAX_VALUE)); } - ClusterInfo info = new ClusterInfo(Map.of(), Map.of(), Map.of(), sizes, Map.of(), Map.of(), Map.of()); + ClusterInfo info = new ClusterInfo(Map.of(), Map.of(), Map.of(), sizes, Map.of(), Map.of(), Map.of(), Map.of()); return Tuple.tuple(totalSize, info); } } diff --git a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ProactiveStorageDeciderServiceTests.java b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ProactiveStorageDeciderServiceTests.java index b252fdf5564db..f2eaa830cdb20 100644 --- a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ProactiveStorageDeciderServiceTests.java +++ b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ProactiveStorageDeciderServiceTests.java @@ -397,7 +397,7 @@ private ClusterInfo randomClusterInfo(ClusterState state) { for (var id : state.nodes().getDataNodes().keySet()) { diskUsage.put(id, new DiskUsage(id, id, "/test", Long.MAX_VALUE, Long.MAX_VALUE)); } - return new ClusterInfo(diskUsage, diskUsage, shardSizes, Map.of(), Map.of(), Map.of(), Map.of()); + return new ClusterInfo(diskUsage, diskUsage, shardSizes, Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); } private ClusterState.Builder applyCreatedDates( diff --git a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderServiceTests.java b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderServiceTests.java index 2ee94340f6d2c..94364e3e90c27 100644 --- a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderServiceTests.java +++ b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderServiceTests.java @@ -379,7 +379,7 @@ public void validateSizeOf(ClusterState clusterState, ShardRouting subjectShard, } private ReactiveStorageDeciderService.AllocationState createAllocationState(Map shardSize, ClusterState clusterState) { - ClusterInfo info = new ClusterInfo(Map.of(), Map.of(), shardSize, Map.of(), Map.of(), Map.of(), Map.of()); + ClusterInfo info = new ClusterInfo(Map.of(), Map.of(), shardSize, Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); ReactiveStorageDeciderService.AllocationState allocationState = new ReactiveStorageDeciderService.AllocationState( clusterState, null, @@ -544,7 +544,7 @@ public void testUnmovableSize() { } var diskUsages = Map.of(nodeId, new DiskUsage(nodeId, null, null, ByteSizeUnit.KB.toBytes(100), ByteSizeUnit.KB.toBytes(5))); - ClusterInfo info = new ClusterInfo(diskUsages, diskUsages, shardSize, Map.of(), Map.of(), Map.of(), Map.of()); + ClusterInfo info = new ClusterInfo(diskUsages, diskUsages, shardSize, Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); ReactiveStorageDeciderService.AllocationState allocationState = new ReactiveStorageDeciderService.AllocationState( clusterState, diff --git a/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/TransportNodeDeprecationCheckActionTests.java b/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/TransportNodeDeprecationCheckActionTests.java index 40a564088aee6..905dd93c3ff1b 100644 --- a/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/TransportNodeDeprecationCheckActionTests.java +++ b/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/TransportNodeDeprecationCheckActionTests.java @@ -174,6 +174,7 @@ public void testCheckDiskLowWatermark() { Map.of(), Map.of(), Map.of(), + Map.of(), Map.of() ); DeprecationIssue issue = TransportNodeDeprecationCheckAction.checkDiskLowWatermark( From ac350d9de75b010d7acc0fc5660bc14271a19c78 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Wed, 2 Jul 2025 16:08:11 -0700 Subject: [PATCH 2/6] change names and types --- .../index/shard/IndexShardIT.java | 4 +-- .../elasticsearch/cluster/NodeWriteLoad.java | 26 +++++++++---------- .../cluster/ClusterInfoTests.java | 4 +-- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index b3a031f4f1070..79f05badccc53 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -333,8 +333,8 @@ public void testNodeWriteLoadsArePresent() { NodeWriteLoad nodeWriteLoad = nodeWriteLoads.get(node.getId()); assertThat(nodeWriteLoad.nodeId(), equalTo(node.getId())); assertThat(nodeWriteLoad.totalWriteThreadPoolThreads(), greaterThanOrEqualTo(0)); - assertThat(nodeWriteLoad.percentWriteThreadPoolUtilization(), greaterThanOrEqualTo(0)); - assertThat(nodeWriteLoad.maxTaskTimeInWriteQueueMillis(), greaterThanOrEqualTo(0L)); + assertThat(nodeWriteLoad.averageWriteThreadPoolUtilization(), greaterThanOrEqualTo(0.0f)); + assertThat(nodeWriteLoad.averageWriteThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L)); } } finally { updateClusterSettings( diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeWriteLoad.java b/server/src/main/java/org/elasticsearch/cluster/NodeWriteLoad.java index 463b82077d795..82e9fc70fff6e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeWriteLoad.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeWriteLoad.java @@ -20,15 +20,15 @@ * * @param nodeId Node ID. * @param totalWriteThreadPoolThreads Total number of threads in the write thread pool. - * @param percentWriteThreadPoolUtilization Percent of write thread pool threads that are in use, averaged over some period of time. - * @param maxTaskTimeInWriteQueueMillis How long the oldest task (next to be run) in the write thread pool queue has been queued. Zero if - * there is no write thread pool queue. + * @param averageWriteThreadPoolUtilization Percent of write thread pool threads that are in use, averaged over some period of time. + * @param averageWriteThreadPoolQueueLatencyMillis How much time tasks spend in the write thread pool queue. Zero if there is nothing being + * queued in the write thread pool. */ public record NodeWriteLoad( String nodeId, int totalWriteThreadPoolThreads, - int percentWriteThreadPoolUtilization, - long maxTaskTimeInWriteQueueMillis + float averageWriteThreadPoolUtilization, + long averageWriteThreadPoolQueueLatencyMillis ) implements Writeable { public NodeWriteLoad(StreamInput in) throws IOException { @@ -39,8 +39,8 @@ public NodeWriteLoad(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { out.writeString(this.nodeId); out.writeVInt(this.totalWriteThreadPoolThreads); - out.writeVInt(this.percentWriteThreadPoolUtilization); - out.writeVLong(this.maxTaskTimeInWriteQueueMillis); + out.writeDouble(this.averageWriteThreadPoolUtilization); + out.writeVLong(this.averageWriteThreadPoolQueueLatencyMillis); } @Override @@ -50,8 +50,8 @@ public boolean equals(Object o) { NodeWriteLoad other = (NodeWriteLoad) o; return nodeId.equals(other.nodeId) && totalWriteThreadPoolThreads == other.totalWriteThreadPoolThreads - && percentWriteThreadPoolUtilization == other.percentWriteThreadPoolUtilization - && maxTaskTimeInWriteQueueMillis == other.maxTaskTimeInWriteQueueMillis; + && averageWriteThreadPoolUtilization == other.averageWriteThreadPoolUtilization + && averageWriteThreadPoolQueueLatencyMillis == other.averageWriteThreadPoolQueueLatencyMillis; } @Override @@ -61,10 +61,10 @@ public String toString() { + nodeId + "], totalWriteThreadPoolThreads=[" + totalWriteThreadPoolThreads - + "], percentWriteThreadPoolUtilization=[" - + percentWriteThreadPoolUtilization - + "], maxTaskTimeInWriteQueueMillis=[" - + maxTaskTimeInWriteQueueMillis + + "], averageWriteThreadPoolUtilization=[" + + averageWriteThreadPoolUtilization + + "], averageWriteThreadPoolQueueLatencyMillis=[" + + averageWriteThreadPoolQueueLatencyMillis + "]}"; } diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java index 2673f18918d1c..656011a7dc865 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java @@ -70,8 +70,8 @@ private static Map randomNodeWriteLoads() { String nodeIdKey = randomAlphaOfLength(32); final NodeWriteLoad nodeWriteLoad = new NodeWriteLoad(/* nodeId= */ nodeIdKey, /* totalWriteThreadPoolThreads= */ randomIntBetween(1, 16), - /* percentWriteThreadPoolUtilization= */randomIntBetween(0, 100), - /* maxTaskTimeInWriteQueueMillis= */ randomLongBetween(0, 50000) + /* averageWriteThreadPoolUtilization= */randomIntBetween(0, 100), + /* averageWriteThreadPoolQueueLatencyMillis= */ randomLongBetween(0, 50000) ); nodeWriteLoads.put(nodeIdKey, nodeWriteLoad); } From 9c7e04cff46a54d7f3c4c5d1651338eb7a51f42e Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Thu, 3 Jul 2025 12:47:51 -0700 Subject: [PATCH 3/6] change NodeLoad data structure --- .../index/shard/IndexShardIT.java | 65 ++++++---- ...sticsearch.cluster.NodeUsageLoadCollector} | 2 +- .../elasticsearch/cluster/ClusterInfo.java | 24 ++-- .../cluster/ClusterInfoSimulator.java | 6 +- .../cluster/InternalClusterInfoService.java | 24 ++-- .../cluster/NodeExecutionLoad.java | 121 ++++++++++++++++++ ...ector.java => NodeUsageLoadCollector.java} | 12 +- .../elasticsearch/cluster/NodeWriteLoad.java | 71 ---------- .../node/NodeServiceProvider.java | 10 +- .../cluster/ClusterInfoTests.java | 17 ++- ...rnalClusterInfoServiceSchedulingTests.java | 14 +- .../MockInternalClusterInfoService.java | 2 +- 12 files changed, 216 insertions(+), 152 deletions(-) rename server/src/internalClusterTest/resources/META-INF/services/{org.elasticsearch.cluster.WriteLoadCollector => org.elasticsearch.cluster.NodeUsageLoadCollector} (86%) create mode 100644 server/src/main/java/org/elasticsearch/cluster/NodeExecutionLoad.java rename server/src/main/java/org/elasticsearch/cluster/{WriteLoadCollector.java => NodeUsageLoadCollector.java} (63%) delete mode 100644 server/src/main/java/org/elasticsearch/cluster/NodeWriteLoad.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index 79f05badccc53..ed03ada7dce1f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -24,8 +24,8 @@ import org.elasticsearch.cluster.EstimatedHeapUsage; import org.elasticsearch.cluster.EstimatedHeapUsageCollector; import org.elasticsearch.cluster.InternalClusterInfoService; -import org.elasticsearch.cluster.NodeWriteLoad; -import org.elasticsearch.cluster.WriteLoadCollector; +import org.elasticsearch.cluster.NodeExecutionLoad; +import org.elasticsearch.cluster.NodeUsageLoadCollector; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; @@ -78,6 +78,7 @@ import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.InternalSettingsPlugin; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.XContentType; import org.junit.Assert; @@ -90,6 +91,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -131,7 +133,7 @@ public class IndexShardIT extends ESSingleNodeTestCase { @Override protected Collection> getPlugins() { - return pluginList(InternalSettingsPlugin.class, BogusEstimatedHeapUsagePlugin.class, BogusWriteLoadCollectorPlugin.class); + return pluginList(InternalSettingsPlugin.class, BogusEstimatedHeapUsagePlugin.class, BogusNodeUsageLoadCollectorPlugin.class); } public void testLockTryingToDelete() throws Exception { @@ -305,11 +307,11 @@ public void testHeapUsageEstimateIsPresent() { public void testNodeWriteLoadsArePresent() { InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class); ClusterInfoServiceUtils.refresh(clusterInfoService); - Map nodeWriteLoads = clusterInfoService.getClusterInfo().getNodeWriteLoads(); - assertNotNull(nodeWriteLoads); + Map nodeThreadPoolStats = clusterInfoService.getClusterInfo().getNodeExecutionStats(); + assertNotNull(nodeThreadPoolStats); /** Not collecting stats yet because allocation write load stats collection is disabled by default. * see {@link WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING} */ - assertTrue(nodeWriteLoads.isEmpty()); + assertTrue(nodeThreadPoolStats.isEmpty()); // Enable collection for node write loads. updateClusterSettings( @@ -323,18 +325,21 @@ public void testNodeWriteLoadsArePresent() { try { // Force a ClusterInfo refresh to run collection of the node write loads. ClusterInfoServiceUtils.refresh(clusterInfoService); - nodeWriteLoads = clusterInfoService.getClusterInfo().getNodeWriteLoads(); + nodeThreadPoolStats = clusterInfoService.getClusterInfo().getNodeExecutionStats(); - /** Verify that each node has a write load reported. The test {@link BogusWriteLoadCollector} generates random load values */ + /** Verify that each node has a write load reported. The test {@link BogusNodeUsageLoadCollector} generates random load values */ ClusterState state = getInstanceFromNode(ClusterService.class).state(); - assertEquals(state.nodes().size(), nodeWriteLoads.size()); + assertEquals(state.nodes().size(), nodeThreadPoolStats.size()); for (DiscoveryNode node : state.nodes()) { - assertTrue(nodeWriteLoads.containsKey(node.getId())); - NodeWriteLoad nodeWriteLoad = nodeWriteLoads.get(node.getId()); - assertThat(nodeWriteLoad.nodeId(), equalTo(node.getId())); - assertThat(nodeWriteLoad.totalWriteThreadPoolThreads(), greaterThanOrEqualTo(0)); - assertThat(nodeWriteLoad.averageWriteThreadPoolUtilization(), greaterThanOrEqualTo(0.0f)); - assertThat(nodeWriteLoad.averageWriteThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L)); + assertTrue(nodeThreadPoolStats.containsKey(node.getId())); + NodeExecutionLoad nodeExecutionLoad = nodeThreadPoolStats.get(node.getId()); + assertThat(nodeExecutionLoad.nodeId(), equalTo(node.getId())); + NodeExecutionLoad.ThreadPoolUsageStats writeThreadPoolStats = nodeExecutionLoad.threadPoolUsageStatsMap() + .get(ThreadPool.Names.WRITE); + assertNotNull(writeThreadPoolStats); + assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThanOrEqualTo(0)); + assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThanOrEqualTo(0.0f)); + assertThat(writeThreadPoolStats.averageThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L)); } } finally { updateClusterSettings( @@ -925,42 +930,48 @@ public ClusterService getClusterService() { } /** - * A simple {@link WriteLoadCollector} implementation that creates and returns random {@link NodeWriteLoad} for each node in the + * A simple {@link NodeUsageLoadCollector} implementation that creates and returns random {@link NodeExecutionLoad} for each node in the * cluster. *

* Note: there's an 'org.elasticsearch.cluster.WriteLoadCollector' file that declares this implementation so that the plugin system can * pick it up and use it for the test set-up. */ - public static class BogusWriteLoadCollector implements WriteLoadCollector { + public static class BogusNodeUsageLoadCollector implements NodeUsageLoadCollector { - private final BogusWriteLoadCollectorPlugin plugin; + private final BogusNodeUsageLoadCollectorPlugin plugin; - public BogusWriteLoadCollector(BogusWriteLoadCollectorPlugin plugin) { + public BogusNodeUsageLoadCollector(BogusNodeUsageLoadCollectorPlugin plugin) { this.plugin = plugin; } @Override - public void collectWriteLoads(ActionListener> listener) { + public void collectUsageStats(ActionListener> listener) { ActionListener.completeWith( listener, () -> plugin.getClusterService() .state() .nodes() .stream() - .collect( - Collectors.toUnmodifiableMap( - DiscoveryNode::getId, - node -> new NodeWriteLoad(node.getId(), randomNonNegativeInt(), randomNonNegativeInt(), randomNonNegativeLong()) - ) - ) + .collect(Collectors.toUnmodifiableMap(DiscoveryNode::getId, node -> makeRandomNodeLoad(node.getId()))) + ); + } + + private NodeExecutionLoad makeRandomNodeLoad(String nodeId) { + NodeExecutionLoad.ThreadPoolUsageStats writeThreadPoolStats = new NodeExecutionLoad.ThreadPoolUsageStats( + randomNonNegativeInt(), + randomFloat(), + randomNonNegativeLong() ); + Map statsForThreadPools = new HashMap<>(); + statsForThreadPools.put(ThreadPool.Names.WRITE, writeThreadPoolStats); + return new NodeExecutionLoad(nodeId, statsForThreadPools); } } /** * Make a plugin to gain access to the {@link ClusterService} instance. */ - public static class BogusWriteLoadCollectorPlugin extends Plugin implements ClusterPlugin { + public static class BogusNodeUsageLoadCollectorPlugin extends Plugin implements ClusterPlugin { private final SetOnce clusterService = new SetOnce<>(); diff --git a/server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.WriteLoadCollector b/server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.NodeUsageLoadCollector similarity index 86% rename from server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.WriteLoadCollector rename to server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.NodeUsageLoadCollector index 3bc75accc0095..85865e1455af0 100644 --- a/server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.WriteLoadCollector +++ b/server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.NodeUsageLoadCollector @@ -7,4 +7,4 @@ # License v3.0 only", or the "Server Side Public License, v 1". # -org.elasticsearch.index.shard.IndexShardIT$BogusWriteLoadCollector +org.elasticsearch.index.shard.IndexShardIT$BogusNodeUsageLoadCollector diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java index cc14d1fde5ddd..2f1dacdb331ec 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java @@ -58,7 +58,7 @@ public class ClusterInfo implements ChunkedToXContent, Writeable { final Map dataPath; final Map reservedSpace; final Map estimatedHeapUsages; - final Map nodeWriteLoads; + final Map nodeExecutionStats; protected ClusterInfo() { this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); @@ -74,7 +74,7 @@ protected ClusterInfo() { * @param dataPath the shard routing to datapath mapping * @param reservedSpace reserved space per shard broken down by node and data path * @param estimatedHeapUsages estimated heap usage broken down by node - * @param nodeWriteLoads estimated node-level write load broken down by node + * @param nodeExecutionStats estimated node-level execution load broken down by node * @see #shardIdentifierFromRouting */ public ClusterInfo( @@ -85,7 +85,7 @@ public ClusterInfo( Map dataPath, Map reservedSpace, Map estimatedHeapUsages, - Map nodeWriteLoads + Map nodeExecutionStats ) { this.leastAvailableSpaceUsage = Map.copyOf(leastAvailableSpaceUsage); this.mostAvailableSpaceUsage = Map.copyOf(mostAvailableSpaceUsage); @@ -94,7 +94,7 @@ public ClusterInfo( this.dataPath = Map.copyOf(dataPath); this.reservedSpace = Map.copyOf(reservedSpace); this.estimatedHeapUsages = Map.copyOf(estimatedHeapUsages); - this.nodeWriteLoads = Map.copyOf(nodeWriteLoads); + this.nodeExecutionStats = Map.copyOf(nodeExecutionStats); } public ClusterInfo(StreamInput in) throws IOException { @@ -112,9 +112,9 @@ public ClusterInfo(StreamInput in) throws IOException { this.estimatedHeapUsages = Map.of(); } if (in.getTransportVersion().onOrAfter(TransportVersions.NODE_WRITE_LOAD_IN_CLUSTER_INFO)) { - this.nodeWriteLoads = in.readImmutableMap(NodeWriteLoad::new); + this.nodeExecutionStats = in.readImmutableMap(NodeExecutionLoad::new); } else { - this.nodeWriteLoads = Map.of(); + this.nodeExecutionStats = Map.of(); } } @@ -134,7 +134,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeMap(this.estimatedHeapUsages, StreamOutput::writeWriteable); } if (out.getTransportVersion().onOrAfter(TransportVersions.NODE_WRITE_LOAD_IN_CLUSTER_INFO)) { - out.writeMap(this.nodeWriteLoads, StreamOutput::writeWriteable); + out.writeMap(this.nodeExecutionStats, StreamOutput::writeWriteable); } } @@ -216,7 +216,7 @@ public Iterator toXContentChunked(ToXContent.Params params return builder.endObject(); // NodeAndPath }), endArray() // end "reserved_sizes" - // NOTE: We don't serialize estimatedHeapUsages/nodeWriteLoads at this stage, to avoid + // NOTE: We don't serialize estimatedHeapUsages/nodeExecutionStats at this stage, to avoid // committing to API payloads until the features are settled ); } @@ -235,8 +235,8 @@ public Map getEstimatedHeapUsages() { /** * Returns a map containing the node-level write load estimate for each node by node ID. */ - public Map getNodeWriteLoads() { - return nodeWriteLoads; + public Map getNodeExecutionStats() { + return nodeExecutionStats; } /** @@ -331,7 +331,7 @@ public boolean equals(Object o) { && shardDataSetSizes.equals(that.shardDataSetSizes) && dataPath.equals(that.dataPath) && reservedSpace.equals(that.reservedSpace) - && nodeWriteLoads.equals(that.nodeWriteLoads); + && nodeExecutionStats.equals(that.nodeExecutionStats); } @Override @@ -343,7 +343,7 @@ public int hashCode() { shardDataSetSizes, dataPath, reservedSpace, - nodeWriteLoads + nodeExecutionStats ); } diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java index 3f95fc3c63dc1..e528b3bb6468c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java @@ -34,7 +34,7 @@ public class ClusterInfoSimulator { private final Map shardDataSetSizes; private final Map dataPath; private final Map estimatedHeapUsages; - private final Map nodeWriteLoads; + private final Map nodeExecutionStats; public ClusterInfoSimulator(RoutingAllocation allocation) { this.allocation = allocation; @@ -44,7 +44,7 @@ public ClusterInfoSimulator(RoutingAllocation allocation) { this.shardDataSetSizes = Map.copyOf(allocation.clusterInfo().shardDataSetSizes); this.dataPath = Map.copyOf(allocation.clusterInfo().dataPath); this.estimatedHeapUsages = allocation.clusterInfo().getEstimatedHeapUsages(); - this.nodeWriteLoads = allocation.clusterInfo().getNodeWriteLoads(); + this.nodeExecutionStats = allocation.clusterInfo().getNodeExecutionStats(); } /** @@ -159,7 +159,7 @@ public ClusterInfo getClusterInfo() { dataPath, Map.of(), estimatedHeapUsages, - nodeWriteLoads + nodeExecutionStats ); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index 5e5b21469d51a..8cdc3e8ba98e4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -102,7 +102,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt private volatile Map mostAvailableSpaceUsages; private volatile Map maxHeapPerNode; private volatile Map estimatedHeapUsagePerNode; - private volatile Map nodeWriteLoadPerNode; + private volatile Map nodeExecutionLoadPerNode; private volatile IndicesStatsSummary indicesStatsSummary; private final ThreadPool threadPool; @@ -112,7 +112,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt private final Object mutex = new Object(); private final List> nextRefreshListeners = new ArrayList<>(); private final EstimatedHeapUsageCollector estimatedHeapUsageCollector; - private final WriteLoadCollector writeLoadCollector; + private final NodeUsageLoadCollector nodeUsageLoadCollector; private AsyncRefresh currentRefresh; private RefreshScheduler refreshScheduler; @@ -124,18 +124,18 @@ public InternalClusterInfoService( ThreadPool threadPool, Client client, EstimatedHeapUsageCollector estimatedHeapUsageCollector, - WriteLoadCollector writeLoadCollector + NodeUsageLoadCollector nodeUsageLoadCollector ) { this.leastAvailableSpaceUsages = Map.of(); this.mostAvailableSpaceUsages = Map.of(); this.maxHeapPerNode = Map.of(); this.estimatedHeapUsagePerNode = Map.of(); - this.nodeWriteLoadPerNode = Map.of(); + this.nodeExecutionLoadPerNode = Map.of(); this.indicesStatsSummary = IndicesStatsSummary.EMPTY; this.threadPool = threadPool; this.client = client; this.estimatedHeapUsageCollector = estimatedHeapUsageCollector; - this.writeLoadCollector = writeLoadCollector; + this.nodeUsageLoadCollector = nodeUsageLoadCollector; this.updateFrequency = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings); this.fetchTimeout = INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING.get(settings); this.diskThresholdEnabled = DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings); @@ -264,21 +264,21 @@ private void maybeFetchNodesWriteLoadStats(WriteLoadDeciderStatus writeLoadConst } } else { logger.trace("skipping collecting shard/node write load estimates from cluster, feature currently disabled"); - nodeWriteLoadPerNode = Map.of(); + nodeExecutionLoadPerNode = Map.of(); } } private void fetchNodesWriteLoadStats() { - writeLoadCollector.collectWriteLoads(ActionListener.releaseAfter(new ActionListener<>() { + nodeUsageLoadCollector.collectUsageStats(ActionListener.releaseAfter(new ActionListener<>() { @Override - public void onResponse(Map writeLoads) { - nodeWriteLoadPerNode = writeLoads; + public void onResponse(Map writeLoads) { + nodeExecutionLoadPerNode = writeLoads; } @Override public void onFailure(Exception e) { logger.warn("failed to fetch write load estimates for nodes", e); - nodeWriteLoadPerNode = Map.of(); + nodeExecutionLoadPerNode = Map.of(); } }, fetchRefs.acquire())); } @@ -527,8 +527,8 @@ public ClusterInfo getClusterInfo() { estimatedHeapUsages.put(nodeId, new EstimatedHeapUsage(nodeId, maxHeapSize.getBytes(), estimatedHeapUsage)); } }); - final Map nodeWriteLoads = new HashMap<>(); - nodeWriteLoadPerNode.forEach((nodeId, nodeWriteLoad) -> { nodeWriteLoads.put(nodeId, nodeWriteLoad); }); + final Map nodeWriteLoads = new HashMap<>(); + nodeExecutionLoadPerNode.forEach((nodeId, nodeWriteLoad) -> { nodeWriteLoads.put(nodeId, nodeWriteLoad); }); return new ClusterInfo( leastAvailableSpaceUsages, mostAvailableSpaceUsages, diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeExecutionLoad.java b/server/src/main/java/org/elasticsearch/cluster/NodeExecutionLoad.java new file mode 100644 index 0000000000000..8332f3073cea2 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/NodeExecutionLoad.java @@ -0,0 +1,121 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +/** + * Record of a node's thread pool usage stats. Maps thread pool stats by thread pool name. + * + * @param nodeId The node ID. + * @param threadPoolUsageStatsMap A map of thread pool name ({@link org.elasticsearch.threadpool.ThreadPool.Names}) to the thread pool's + * usage stats ({@link ThreadPoolUsageStats}). + */ +public record NodeExecutionLoad(String nodeId, Map threadPoolUsageStatsMap) implements Writeable { + + public NodeExecutionLoad(StreamInput in) throws IOException { + this(in.readString(), in.readMap(ThreadPoolUsageStats::new)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(this.nodeId); + out.writeMap(threadPoolUsageStatsMap, StreamOutput::writeWriteable); + } + + @Override + public int hashCode() { + return Objects.hash(nodeId, threadPoolUsageStatsMap); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + NodeExecutionLoad other = (NodeExecutionLoad) o; + for (var entry : other.threadPoolUsageStatsMap.entrySet()) { + var loadStats = threadPoolUsageStatsMap.get(entry.getKey()); + if (loadStats == null || loadStats.equals(entry.getValue()) == false) { + return false; + } + } + return true; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(getClass().getSimpleName() + "{nodeId=" + nodeId + ", threadPoolUsageStatsMap=["); + for (var entry : threadPoolUsageStatsMap.entrySet()) { + builder.append("{ThreadPool.Names=" + entry.getKey() + ", ThreadPoolUsageStats=" + entry.getValue() + "}"); + } + builder.append("]}"); + return builder.toString(); + } + + /** + * Record representing an estimate of a node's thread pool usage. + * + * @param totalThreadPoolThreads Total number of threads in the thread pool. + * @param averageThreadPoolUtilization Percent of thread pool threads that are in use, averaged over some period of time. + * @param averageThreadPoolQueueLatencyMillis How much time tasks spend in the thread pool queue. Zero if there is nothing being queued + * in the write thread pool. + */ + public record ThreadPoolUsageStats( + int totalThreadPoolThreads, + float averageThreadPoolUtilization, + long averageThreadPoolQueueLatencyMillis + ) implements Writeable { + + public ThreadPoolUsageStats(StreamInput in) throws IOException { + this(in.readVInt(), in.readFloat(), in.readVLong()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(this.totalThreadPoolThreads); + out.writeDouble(this.averageThreadPoolUtilization); + out.writeVLong(this.averageThreadPoolQueueLatencyMillis); + } + + @Override + public int hashCode() { + return Objects.hash(totalThreadPoolThreads, averageThreadPoolUtilization, averageThreadPoolQueueLatencyMillis); + } + + @Override + public String toString() { + return "[totalThreadPoolThreads=" + + totalThreadPoolThreads + + ", averageThreadPoolUtilization=" + + averageThreadPoolUtilization + + ", averageThreadPoolQueueLatencyMillis=" + + averageThreadPoolQueueLatencyMillis + + "]"; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ThreadPoolUsageStats other = (ThreadPoolUsageStats) o; + return totalThreadPoolThreads == other.totalThreadPoolThreads + && averageThreadPoolUtilization == other.averageThreadPoolUtilization + && averageThreadPoolQueueLatencyMillis == other.averageThreadPoolQueueLatencyMillis; + } + + } // ThreadPoolUsageStats + +} diff --git a/server/src/main/java/org/elasticsearch/cluster/WriteLoadCollector.java b/server/src/main/java/org/elasticsearch/cluster/NodeUsageLoadCollector.java similarity index 63% rename from server/src/main/java/org/elasticsearch/cluster/WriteLoadCollector.java rename to server/src/main/java/org/elasticsearch/cluster/NodeUsageLoadCollector.java index d1e6161cc064a..6d73a57201942 100644 --- a/server/src/main/java/org/elasticsearch/cluster/WriteLoadCollector.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeUsageLoadCollector.java @@ -14,20 +14,20 @@ import java.util.Map; /** - * Collects the write load estimates for each node in the cluster. + * Collects the usage stats (like write thread pool load) estimations for each node in the cluster. *

- * Results are returned as a map of node ID to node write load. + * Results are returned as a map of node ID to node usage stats. */ -public interface WriteLoadCollector { +public interface NodeUsageLoadCollector { /** - * This will be used when there is no WriteLoadCollector available. + * This will be used when there is no NodeUsageLoadCollector available. */ - WriteLoadCollector EMPTY = listener -> listener.onResponse(Map.of()); + NodeUsageLoadCollector EMPTY = listener -> listener.onResponse(Map.of()); /** * Collects the write load estimates from the cluster. * * @param listener The listener to receive the write load results. */ - void collectWriteLoads(ActionListener> listener); + void collectUsageStats(ActionListener> listener); } diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeWriteLoad.java b/server/src/main/java/org/elasticsearch/cluster/NodeWriteLoad.java deleted file mode 100644 index 82e9fc70fff6e..0000000000000 --- a/server/src/main/java/org/elasticsearch/cluster/NodeWriteLoad.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ - -package org.elasticsearch.cluster; - -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Writeable; - -import java.io.IOException; - -/** - * Record representing an estimate of a node's write load. The estimation is based on the usage of the node's write thread pool. - * - * @param nodeId Node ID. - * @param totalWriteThreadPoolThreads Total number of threads in the write thread pool. - * @param averageWriteThreadPoolUtilization Percent of write thread pool threads that are in use, averaged over some period of time. - * @param averageWriteThreadPoolQueueLatencyMillis How much time tasks spend in the write thread pool queue. Zero if there is nothing being - * queued in the write thread pool. - */ -public record NodeWriteLoad( - String nodeId, - int totalWriteThreadPoolThreads, - float averageWriteThreadPoolUtilization, - long averageWriteThreadPoolQueueLatencyMillis -) implements Writeable { - - public NodeWriteLoad(StreamInput in) throws IOException { - this(in.readString(), in.readVInt(), in.readVInt(), in.readVLong()); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeString(this.nodeId); - out.writeVInt(this.totalWriteThreadPoolThreads); - out.writeDouble(this.averageWriteThreadPoolUtilization); - out.writeVLong(this.averageWriteThreadPoolQueueLatencyMillis); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - NodeWriteLoad other = (NodeWriteLoad) o; - return nodeId.equals(other.nodeId) - && totalWriteThreadPoolThreads == other.totalWriteThreadPoolThreads - && averageWriteThreadPoolUtilization == other.averageWriteThreadPoolUtilization - && averageWriteThreadPoolQueueLatencyMillis == other.averageWriteThreadPoolQueueLatencyMillis; - } - - @Override - public String toString() { - return getClass().getSimpleName() - + "{nodeId=[" - + nodeId - + "], totalWriteThreadPoolThreads=[" - + totalWriteThreadPoolThreads - + "], averageWriteThreadPoolUtilization=[" - + averageWriteThreadPoolUtilization - + "], averageWriteThreadPoolQueueLatencyMillis=[" - + averageWriteThreadPoolQueueLatencyMillis - + "]}"; - } - -} diff --git a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java index 4b24085f844dd..5b189b417f2be 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java +++ b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java @@ -14,7 +14,7 @@ import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.EstimatedHeapUsageCollector; import org.elasticsearch.cluster.InternalClusterInfoService; -import org.elasticsearch.cluster.WriteLoadCollector; +import org.elasticsearch.cluster.NodeUsageLoadCollector; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.breaker.CircuitBreaker; @@ -80,9 +80,9 @@ ClusterInfoService newClusterInfoService( EstimatedHeapUsageCollector.class, () -> EstimatedHeapUsageCollector.EMPTY ); - final WriteLoadCollector writeLoadCollector = pluginsService.loadSingletonServiceProvider( - WriteLoadCollector.class, - () -> WriteLoadCollector.EMPTY + final NodeUsageLoadCollector nodeUsageLoadCollector = pluginsService.loadSingletonServiceProvider( + NodeUsageLoadCollector.class, + () -> NodeUsageLoadCollector.EMPTY ); final InternalClusterInfoService service = new InternalClusterInfoService( settings, @@ -90,7 +90,7 @@ ClusterInfoService newClusterInfoService( threadPool, client, estimatedHeapUsageCollector, - writeLoadCollector + nodeUsageLoadCollector ); if (DiscoveryNode.isMasterNode(settings)) { // listen for state changes (this node starts/stops being the elected master, or new nodes are added) diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java index 656011a7dc865..3b3c60b6c9405 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.AbstractChunkedSerializingTestCase; import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.threadpool.ThreadPool; import java.util.HashMap; import java.util.Map; @@ -63,17 +64,19 @@ private static Map randomNodeHeapUsage() { return nodeHeapUsage; } - private static Map randomNodeWriteLoads() { + private static Map randomNodeWriteLoads() { int numEntries = randomIntBetween(0, 128); - Map nodeWriteLoads = new HashMap<>(numEntries); + Map nodeWriteLoads = new HashMap<>(numEntries); for (int i = 0; i < numEntries; i++) { String nodeIdKey = randomAlphaOfLength(32); - final NodeWriteLoad nodeWriteLoad = new NodeWriteLoad(/* nodeId= */ nodeIdKey, - /* totalWriteThreadPoolThreads= */ randomIntBetween(1, 16), - /* averageWriteThreadPoolUtilization= */randomIntBetween(0, 100), - /* averageWriteThreadPoolQueueLatencyMillis= */ randomLongBetween(0, 50000) + NodeExecutionLoad.ThreadPoolUsageStats writeThreadPoolStats = new NodeExecutionLoad.ThreadPoolUsageStats( + /* totalThreadPoolThreads= */ randomIntBetween(1, 16), + /* averageThreadPoolUtilization= */ randomFloat(), + /* averageThreadPoolQueueLatencyMillis= */ randomLongBetween(0, 50000) ); - nodeWriteLoads.put(nodeIdKey, nodeWriteLoad); + Map statsForThreadPools = new HashMap<>(); + statsForThreadPools.put(ThreadPool.Names.WRITE, writeThreadPoolStats); + nodeWriteLoads.put(ThreadPool.Names.WRITE, new NodeExecutionLoad(nodeIdKey, statsForThreadPools)); } return nodeWriteLoads; } diff --git a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java index a9bbb90407800..4005124c175b4 100644 --- a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java @@ -84,14 +84,14 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { final FakeClusterInfoServiceClient client = new FakeClusterInfoServiceClient(threadPool); final EstimatedHeapUsageCollector mockEstimatedHeapUsageCollector = spy(new StubEstimatedEstimatedHeapUsageCollector()); - final WriteLoadCollector mockWriteLoadCollector = spy(new StubWriteLoadCollector()); + final NodeUsageLoadCollector mockNodeUsageLoadCollector = spy(new StubNodeUsageLoadCollector()); final InternalClusterInfoService clusterInfoService = new InternalClusterInfoService( settings, clusterService, threadPool, client, mockEstimatedHeapUsageCollector, - mockWriteLoadCollector + mockNodeUsageLoadCollector ); clusterService.addListener(clusterInfoService); clusterInfoService.addListener(ignored -> {}); @@ -129,14 +129,14 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { for (int i = 0; i < 3; i++) { Mockito.clearInvocations(mockEstimatedHeapUsageCollector); - Mockito.clearInvocations(mockWriteLoadCollector); + Mockito.clearInvocations(mockNodeUsageLoadCollector); final int initialRequestCount = client.requestCount; final long duration = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings).millis(); runFor(deterministicTaskQueue, duration); deterministicTaskQueue.runAllRunnableTasks(); assertThat(client.requestCount, equalTo(initialRequestCount + 2)); // should have run two client requests per interval verify(mockEstimatedHeapUsageCollector).collectClusterHeapUsage(any()); // Should poll for heap usage once per interval - verify(mockWriteLoadCollector).collectWriteLoads(any()); + verify(mockNodeUsageLoadCollector).collectUsageStats(any()); } final AtomicBoolean failMaster2 = new AtomicBoolean(); @@ -162,11 +162,11 @@ public void collectClusterHeapUsage(ActionListener> listener) } /** - * Simple for test {@link WriteLoadCollector} implementation that returns an empty map of nodeId string to {@link NodeWriteLoad}. + * Simple for test {@link NodeUsageLoadCollector} implementation that returns an empty map of nodeId string to {@link NodeExecutionLoad}. */ - private static class StubWriteLoadCollector implements WriteLoadCollector { + private static class StubNodeUsageLoadCollector implements NodeUsageLoadCollector { @Override - public void collectWriteLoads(ActionListener> listener) { + public void collectUsageStats(ActionListener> listener) { listener.onResponse(Map.of()); } } diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java index bd7c29eb579d4..8b147fb93809c 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java @@ -43,7 +43,7 @@ public static class TestPlugin extends Plugin {} private volatile BiFunction diskUsageFunction; public MockInternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) { - super(settings, clusterService, threadPool, client, EstimatedHeapUsageCollector.EMPTY, WriteLoadCollector.EMPTY); + super(settings, clusterService, threadPool, client, EstimatedHeapUsageCollector.EMPTY, NodeUsageLoadCollector.EMPTY); } public void setDiskUsageFunctionAndRefresh(BiFunction diskUsageFn) { From 57832173df3ba14ef29a5fed0ae455e904c6f30f Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Tue, 8 Jul 2025 15:24:10 -0700 Subject: [PATCH 4/6] rename collector --- .../index/shard/IndexShardIT.java | 20 +++++++++++-------- ...ter.NodeUsageStatsForThreadPoolsCollector} | 2 +- .../cluster/InternalClusterInfoService.java | 8 ++++---- ...odeUsageStatsForThreadPoolsCollector.java} | 4 ++-- .../node/NodeServiceProvider.java | 10 +++++----- ...rnalClusterInfoServiceSchedulingTests.java | 14 +++++++------ .../MockInternalClusterInfoService.java | 2 +- 7 files changed, 33 insertions(+), 27 deletions(-) rename server/src/internalClusterTest/resources/META-INF/services/{org.elasticsearch.cluster.NodeUsageLoadCollector => org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector} (84%) rename server/src/main/java/org/elasticsearch/cluster/{NodeUsageLoadCollector.java => NodeUsageStatsForThreadPoolsCollector.java} (88%) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index b9ec9e7d8058d..f7de03f44c0a9 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -24,8 +24,8 @@ import org.elasticsearch.cluster.EstimatedHeapUsage; import org.elasticsearch.cluster.EstimatedHeapUsageCollector; import org.elasticsearch.cluster.InternalClusterInfoService; -import org.elasticsearch.cluster.NodeUsageLoadCollector; import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; +import org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; @@ -133,7 +133,11 @@ public class IndexShardIT extends ESSingleNodeTestCase { @Override protected Collection> getPlugins() { - return pluginList(InternalSettingsPlugin.class, BogusEstimatedHeapUsagePlugin.class, BogusNodeUsageLoadCollectorPlugin.class); + return pluginList( + InternalSettingsPlugin.class, + BogusEstimatedHeapUsagePlugin.class, + BogusNodeUsageStatsForThreadPoolsCollectorPlugin.class + ); } public void testLockTryingToDelete() throws Exception { @@ -328,7 +332,7 @@ public void testNodeWriteLoadsArePresent() { ClusterInfoServiceUtils.refresh(clusterInfoService); nodeThreadPoolStats = clusterInfoService.getClusterInfo().getNodeUsageStatsForThreadPools(); - /** Verify that each node has a write load reported. The test {@link BogusNodeUsageLoadCollector} generates random load values */ + /** Verify that each node has a write load reported. The test {@link BogusNodeUsageStatsForThreadPoolsCollector} generates random load values */ ClusterState state = getInstanceFromNode(ClusterService.class).state(); assertEquals(state.nodes().size(), nodeThreadPoolStats.size()); for (DiscoveryNode node : state.nodes()) { @@ -932,17 +936,17 @@ public ClusterService getClusterService() { } /** - * A simple {@link NodeUsageLoadCollector} implementation that creates and returns random {@link NodeUsageStatsForThreadPools} for each node in the + * A simple {@link NodeUsageStatsForThreadPoolsCollector} implementation that creates and returns random {@link NodeUsageStatsForThreadPools} for each node in the * cluster. *

* Note: there's an 'org.elasticsearch.cluster.WriteLoadCollector' file that declares this implementation so that the plugin system can * pick it up and use it for the test set-up. */ - public static class BogusNodeUsageLoadCollector implements NodeUsageLoadCollector { + public static class BogusNodeUsageStatsForThreadPoolsCollector implements NodeUsageStatsForThreadPoolsCollector { - private final BogusNodeUsageLoadCollectorPlugin plugin; + private final BogusNodeUsageStatsForThreadPoolsCollectorPlugin plugin; - public BogusNodeUsageLoadCollector(BogusNodeUsageLoadCollectorPlugin plugin) { + public BogusNodeUsageStatsForThreadPoolsCollector(BogusNodeUsageStatsForThreadPoolsCollectorPlugin plugin) { this.plugin = plugin; } @@ -973,7 +977,7 @@ private NodeUsageStatsForThreadPools makeRandomNodeLoad(String nodeId) { /** * Make a plugin to gain access to the {@link ClusterService} instance. */ - public static class BogusNodeUsageLoadCollectorPlugin extends Plugin implements ClusterPlugin { + public static class BogusNodeUsageStatsForThreadPoolsCollectorPlugin extends Plugin implements ClusterPlugin { private final SetOnce clusterService = new SetOnce<>(); diff --git a/server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.NodeUsageLoadCollector b/server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector similarity index 84% rename from server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.NodeUsageLoadCollector rename to server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector index 85865e1455af0..787ce436c3ca6 100644 --- a/server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.NodeUsageLoadCollector +++ b/server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector @@ -7,4 +7,4 @@ # License v3.0 only", or the "Server Side Public License, v 1". # -org.elasticsearch.index.shard.IndexShardIT$BogusNodeUsageLoadCollector +org.elasticsearch.index.shard.IndexShardIT$BogusNodeUsageStatsForThreadPoolsCollector diff --git a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index abe9d1afb58b5..86306c950568d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -112,7 +112,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt private final Object mutex = new Object(); private final List> nextRefreshListeners = new ArrayList<>(); private final EstimatedHeapUsageCollector estimatedHeapUsageCollector; - private final NodeUsageLoadCollector nodeUsageLoadCollector; + private final NodeUsageStatsForThreadPoolsCollector nodeUsageStatsForThreadPoolsCollector; private AsyncRefresh currentRefresh; private RefreshScheduler refreshScheduler; @@ -124,7 +124,7 @@ public InternalClusterInfoService( ThreadPool threadPool, Client client, EstimatedHeapUsageCollector estimatedHeapUsageCollector, - NodeUsageLoadCollector nodeUsageLoadCollector + NodeUsageStatsForThreadPoolsCollector nodeUsageStatsForThreadPoolsCollector ) { this.leastAvailableSpaceUsages = Map.of(); this.mostAvailableSpaceUsages = Map.of(); @@ -135,7 +135,7 @@ public InternalClusterInfoService( this.threadPool = threadPool; this.client = client; this.estimatedHeapUsageCollector = estimatedHeapUsageCollector; - this.nodeUsageLoadCollector = nodeUsageLoadCollector; + this.nodeUsageStatsForThreadPoolsCollector = nodeUsageStatsForThreadPoolsCollector; this.updateFrequency = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings); this.fetchTimeout = INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING.get(settings); this.diskThresholdEnabled = DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings); @@ -269,7 +269,7 @@ private void maybeFetchNodesThreadPoolUsageStats(WriteLoadDeciderStatus writeLoa } private void fetchNodesThreadPoolUsageStats() { - nodeUsageLoadCollector.collectUsageStats(ActionListener.releaseAfter(new ActionListener<>() { + nodeUsageStatsForThreadPoolsCollector.collectUsageStats(ActionListener.releaseAfter(new ActionListener<>() { @Override public void onResponse(Map writeLoads) { nodeThreadPoolUsageStatsPerNode = writeLoads; diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeUsageLoadCollector.java b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java similarity index 88% rename from server/src/main/java/org/elasticsearch/cluster/NodeUsageLoadCollector.java rename to server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java index 2725c40fb917b..e302a4abed559 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeUsageLoadCollector.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java @@ -18,11 +18,11 @@ *

* Results are returned as a map of node ID to node usage stats. */ -public interface NodeUsageLoadCollector { +public interface NodeUsageStatsForThreadPoolsCollector { /** * This will be used when there is no NodeUsageLoadCollector available. */ - NodeUsageLoadCollector EMPTY = listener -> listener.onResponse(Map.of()); + NodeUsageStatsForThreadPoolsCollector EMPTY = listener -> listener.onResponse(Map.of()); /** * Collects the write load estimates from the cluster. diff --git a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java index aa849bcc0949c..326002c7d346c 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java +++ b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java @@ -14,7 +14,7 @@ import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.EstimatedHeapUsageCollector; import org.elasticsearch.cluster.InternalClusterInfoService; -import org.elasticsearch.cluster.NodeUsageLoadCollector; +import org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.breaker.CircuitBreaker; @@ -80,9 +80,9 @@ ClusterInfoService newClusterInfoService( EstimatedHeapUsageCollector.class, () -> EstimatedHeapUsageCollector.EMPTY ); - final NodeUsageLoadCollector nodeUsageLoadCollector = pluginsService.loadSingletonServiceProvider( - NodeUsageLoadCollector.class, - () -> NodeUsageLoadCollector.EMPTY + final NodeUsageStatsForThreadPoolsCollector nodeUsageStatsForThreadPoolsCollector = pluginsService.loadSingletonServiceProvider( + NodeUsageStatsForThreadPoolsCollector.class, + () -> NodeUsageStatsForThreadPoolsCollector.EMPTY ); final InternalClusterInfoService service = new InternalClusterInfoService( settings, @@ -90,7 +90,7 @@ ClusterInfoService newClusterInfoService( threadPool, client, estimatedHeapUsageCollector, - nodeUsageLoadCollector + nodeUsageStatsForThreadPoolsCollector ); if (DiscoveryNode.isMasterNode(settings)) { // listen for state changes (this node starts/stops being the elected master, or new nodes are added) diff --git a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java index 77a1e209455e9..29a511f95581a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java @@ -84,14 +84,16 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { final FakeClusterInfoServiceClient client = new FakeClusterInfoServiceClient(threadPool); final EstimatedHeapUsageCollector mockEstimatedHeapUsageCollector = spy(new StubEstimatedEstimatedHeapUsageCollector()); - final NodeUsageLoadCollector mockNodeUsageLoadCollector = spy(new StubNodeUsageLoadCollector()); + final NodeUsageStatsForThreadPoolsCollector mockNodeUsageStatsForThreadPoolsCollector = spy( + new StubNodeUsageStatsForThreadPoolsCollector() + ); final InternalClusterInfoService clusterInfoService = new InternalClusterInfoService( settings, clusterService, threadPool, client, mockEstimatedHeapUsageCollector, - mockNodeUsageLoadCollector + mockNodeUsageStatsForThreadPoolsCollector ); clusterService.addListener(clusterInfoService); clusterInfoService.addListener(ignored -> {}); @@ -129,14 +131,14 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { for (int i = 0; i < 3; i++) { Mockito.clearInvocations(mockEstimatedHeapUsageCollector); - Mockito.clearInvocations(mockNodeUsageLoadCollector); + Mockito.clearInvocations(mockNodeUsageStatsForThreadPoolsCollector); final int initialRequestCount = client.requestCount; final long duration = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings).millis(); runFor(deterministicTaskQueue, duration); deterministicTaskQueue.runAllRunnableTasks(); assertThat(client.requestCount, equalTo(initialRequestCount + 2)); // should have run two client requests per interval verify(mockEstimatedHeapUsageCollector).collectClusterHeapUsage(any()); // Should poll for heap usage once per interval - verify(mockNodeUsageLoadCollector).collectUsageStats(any()); + verify(mockNodeUsageStatsForThreadPoolsCollector).collectUsageStats(any()); } final AtomicBoolean failMaster2 = new AtomicBoolean(); @@ -162,9 +164,9 @@ public void collectClusterHeapUsage(ActionListener> listener) } /** - * Simple for test {@link NodeUsageLoadCollector} implementation that returns an empty map of nodeId string to {@link NodeUsageStatsForThreadPools}. + * Simple for test {@link NodeUsageStatsForThreadPoolsCollector} implementation that returns an empty map of nodeId string to {@link NodeUsageStatsForThreadPools}. */ - private static class StubNodeUsageLoadCollector implements NodeUsageLoadCollector { + private static class StubNodeUsageStatsForThreadPoolsCollector implements NodeUsageStatsForThreadPoolsCollector { @Override public void collectUsageStats(ActionListener> listener) { listener.onResponse(Map.of()); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java index 8b147fb93809c..6b6136c6c861b 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java @@ -43,7 +43,7 @@ public static class TestPlugin extends Plugin {} private volatile BiFunction diskUsageFunction; public MockInternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) { - super(settings, clusterService, threadPool, client, EstimatedHeapUsageCollector.EMPTY, NodeUsageLoadCollector.EMPTY); + super(settings, clusterService, threadPool, client, EstimatedHeapUsageCollector.EMPTY, NodeUsageStatsForThreadPoolsCollector.EMPTY); } public void setDiskUsageFunctionAndRefresh(BiFunction diskUsageFn) { From b0640716ff5266d715e488ac2716b4d6373d12ca Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Tue, 8 Jul 2025 16:01:28 -0700 Subject: [PATCH 5/6] tidying up rename --- .../index/shard/IndexShardIT.java | 17 ++++++------- .../org/elasticsearch/TransportVersions.java | 2 +- .../elasticsearch/cluster/ClusterInfo.java | 4 ++-- .../cluster/InternalClusterInfoService.java | 14 +++++------ .../cluster/ClusterInfoTests.java | 24 +++++++++---------- ...rnalClusterInfoServiceSchedulingTests.java | 3 ++- 6 files changed, 33 insertions(+), 31 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index f7de03f44c0a9..25ae21964ba0e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -328,11 +328,12 @@ public void testNodeWriteLoadsArePresent() { .build() ); try { - // Force a ClusterInfo refresh to run collection of the node write loads. + // Force a ClusterInfo refresh to run collection of the node thread pool usage stats. ClusterInfoServiceUtils.refresh(clusterInfoService); nodeThreadPoolStats = clusterInfoService.getClusterInfo().getNodeUsageStatsForThreadPools(); - /** Verify that each node has a write load reported. The test {@link BogusNodeUsageStatsForThreadPoolsCollector} generates random load values */ + /** Verify that each node has usage stats reported. The test {@link BogusNodeUsageStatsForThreadPoolsCollector} implementation + * generates random usage values */ ClusterState state = getInstanceFromNode(ClusterService.class).state(); assertEquals(state.nodes().size(), nodeThreadPoolStats.size()); for (DiscoveryNode node : state.nodes()) { @@ -936,11 +937,11 @@ public ClusterService getClusterService() { } /** - * A simple {@link NodeUsageStatsForThreadPoolsCollector} implementation that creates and returns random {@link NodeUsageStatsForThreadPools} for each node in the - * cluster. + * A simple {@link NodeUsageStatsForThreadPoolsCollector} implementation that creates and returns random + * {@link NodeUsageStatsForThreadPools} for each node in the cluster. *

- * Note: there's an 'org.elasticsearch.cluster.WriteLoadCollector' file that declares this implementation so that the plugin system can - * pick it up and use it for the test set-up. + * Note: there's an 'org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector' file that declares this implementation so that the + * plugin system can pick it up and use it for the test set-up. */ public static class BogusNodeUsageStatsForThreadPoolsCollector implements NodeUsageStatsForThreadPoolsCollector { @@ -958,11 +959,11 @@ public void collectUsageStats(ActionListener makeRandomNodeLoad(node.getId()))) + .collect(Collectors.toUnmodifiableMap(DiscoveryNode::getId, node -> makeRandomNodeUsageStats(node.getId()))) ); } - private NodeUsageStatsForThreadPools makeRandomNodeLoad(String nodeId) { + private NodeUsageStatsForThreadPools makeRandomNodeUsageStats(String nodeId) { NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( randomNonNegativeInt(), randomFloat(), diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index da492c6952178..4241a04ace3fb 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -335,7 +335,7 @@ static TransportVersion def(int id) { public static final TransportVersion ESQL_SPLIT_ON_BIG_VALUES = def(9_116_0_00); public static final TransportVersion ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS = def(9_117_0_00); public static final TransportVersion ML_INFERENCE_CUSTOM_SERVICE_EMBEDDING_TYPE = def(9_118_0_00); - public static final TransportVersion NODE_WRITE_LOAD_IN_CLUSTER_INFO = def(9_119_0_00); + public static final TransportVersion NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO = def(9_119_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java index 89fab67566632..7c627638e9b0b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java @@ -111,7 +111,7 @@ public ClusterInfo(StreamInput in) throws IOException { } else { this.estimatedHeapUsages = Map.of(); } - if (in.getTransportVersion().onOrAfter(TransportVersions.NODE_WRITE_LOAD_IN_CLUSTER_INFO)) { + if (in.getTransportVersion().onOrAfter(TransportVersions.NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO)) { this.nodeUsageStatsForThreadPools = in.readImmutableMap(NodeUsageStatsForThreadPools::new); } else { this.nodeUsageStatsForThreadPools = Map.of(); @@ -133,7 +133,7 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().onOrAfter(TransportVersions.HEAP_USAGE_IN_CLUSTER_INFO)) { out.writeMap(this.estimatedHeapUsages, StreamOutput::writeWriteable); } - if (out.getTransportVersion().onOrAfter(TransportVersions.NODE_WRITE_LOAD_IN_CLUSTER_INFO)) { + if (out.getTransportVersion().onOrAfter(TransportVersions.NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO)) { out.writeMap(this.nodeUsageStatsForThreadPools, StreamOutput::writeWriteable); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index 86306c950568d..89394c8fa8ba8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -218,7 +218,7 @@ void execute() { maybeFetchIndicesStats(diskThresholdEnabled); maybeFetchNodeStats(diskThresholdEnabled || estimatedHeapThresholdEnabled); maybeFetchNodesEstimatedHeapUsage(estimatedHeapThresholdEnabled); - maybeFetchNodesThreadPoolUsageStats(writeLoadConstraintEnabled); + maybeFetchNodesUsageStatsForThreadPools(writeLoadConstraintEnabled); } } @@ -257,10 +257,10 @@ private void maybeFetchNodesEstimatedHeapUsage(boolean shouldFetch) { } } - private void maybeFetchNodesThreadPoolUsageStats(WriteLoadDeciderStatus writeLoadConstraintEnabled) { + private void maybeFetchNodesUsageStatsForThreadPools(WriteLoadDeciderStatus writeLoadConstraintEnabled) { if (writeLoadConstraintEnabled != WriteLoadDeciderStatus.DISABLED) { try (var ignored = threadPool.getThreadContext().clearTraceContext()) { - fetchNodesThreadPoolUsageStats(); + fetchNodesUsageStatsForThreadPools(); } } else { logger.trace("skipping collecting shard/node write load estimates from cluster, feature currently disabled"); @@ -268,7 +268,7 @@ private void maybeFetchNodesThreadPoolUsageStats(WriteLoadDeciderStatus writeLoa } } - private void fetchNodesThreadPoolUsageStats() { + private void fetchNodesUsageStatsForThreadPools() { nodeUsageStatsForThreadPoolsCollector.collectUsageStats(ActionListener.releaseAfter(new ActionListener<>() { @Override public void onResponse(Map writeLoads) { @@ -527,8 +527,8 @@ public ClusterInfo getClusterInfo() { estimatedHeapUsages.put(nodeId, new EstimatedHeapUsage(nodeId, maxHeapSize.getBytes(), estimatedHeapUsage)); } }); - final Map nodeWriteLoads = new HashMap<>(); - nodeThreadPoolUsageStatsPerNode.forEach((nodeId, nodeWriteLoad) -> { nodeWriteLoads.put(nodeId, nodeWriteLoad); }); + final Map nodeThreadPoolUsageStats = new HashMap<>(); + nodeThreadPoolUsageStatsPerNode.forEach((nodeId, nodeWriteLoad) -> { nodeThreadPoolUsageStats.put(nodeId, nodeWriteLoad); }); return new ClusterInfo( leastAvailableSpaceUsages, mostAvailableSpaceUsages, @@ -537,7 +537,7 @@ public ClusterInfo getClusterInfo() { indicesStatsSummary.dataPath, indicesStatsSummary.reservedSpace, estimatedHeapUsages, - nodeWriteLoads + nodeThreadPoolUsageStats ); } diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java index 61fd61cba18b3..814aa102ce284 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java @@ -44,7 +44,7 @@ public static ClusterInfo randomClusterInfo() { randomRoutingToDataPath(), randomReservedSpace(), randomNodeHeapUsage(), - randomNodeWriteLoads() + randomNodeUsageStatsForThreadPools() ); } @@ -64,21 +64,21 @@ private static Map randomNodeHeapUsage() { return nodeHeapUsage; } - private static Map randomNodeWriteLoads() { + private static Map randomNodeUsageStatsForThreadPools() { int numEntries = randomIntBetween(0, 128); - Map nodeWriteLoads = new HashMap<>(numEntries); + Map nodeUsageStatsForThreadPools = new HashMap<>(numEntries); for (int i = 0; i < numEntries; i++) { String nodeIdKey = randomAlphaOfLength(32); - NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( - /* totalThreadPoolThreads= */ randomIntBetween(1, 16), - /* averageThreadPoolUtilization= */ randomFloat(), - /* averageThreadPoolQueueLatencyMillis= */ randomLongBetween(0, 50000) - ); - Map statsForThreadPools = new HashMap<>(); - statsForThreadPools.put(ThreadPool.Names.WRITE, writeThreadPoolStats); - nodeWriteLoads.put(ThreadPool.Names.WRITE, new NodeUsageStatsForThreadPools(nodeIdKey, statsForThreadPools)); + NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolUsageStats = + new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(/* totalThreadPoolThreads= */ randomIntBetween(1, 16), + /* averageThreadPoolUtilization= */ randomFloat(), + /* averageThreadPoolQueueLatencyMillis= */ randomLongBetween(0, 50000) + ); + Map usageStatsForThreadPools = new HashMap<>(); + usageStatsForThreadPools.put(ThreadPool.Names.WRITE, writeThreadPoolUsageStats); + nodeUsageStatsForThreadPools.put(ThreadPool.Names.WRITE, new NodeUsageStatsForThreadPools(nodeIdKey, usageStatsForThreadPools)); } - return nodeWriteLoads; + return nodeUsageStatsForThreadPools; } private static Map randomDiskUsage() { diff --git a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java index 29a511f95581a..6e80e0d087993 100644 --- a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java @@ -164,7 +164,8 @@ public void collectClusterHeapUsage(ActionListener> listener) } /** - * Simple for test {@link NodeUsageStatsForThreadPoolsCollector} implementation that returns an empty map of nodeId string to {@link NodeUsageStatsForThreadPools}. + * Simple for test {@link NodeUsageStatsForThreadPoolsCollector} implementation that returns an empty map of nodeId string to + * {@link NodeUsageStatsForThreadPools}. */ private static class StubNodeUsageStatsForThreadPoolsCollector implements NodeUsageStatsForThreadPoolsCollector { @Override From 90858f83338884297e4fec037a5fce435d295c9e Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Tue, 8 Jul 2025 17:05:50 -0700 Subject: [PATCH 6/6] fix serialization bug --- .../org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java index d4a2c354c6237..5e84f29af8412 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java @@ -86,7 +86,7 @@ public ThreadPoolUsageStats(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(this.totalThreadPoolThreads); - out.writeDouble(this.averageThreadPoolUtilization); + out.writeFloat(this.averageThreadPoolUtilization); out.writeVLong(this.averageThreadPoolQueueLatencyMillis); }