Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void testRerouteIsCalledWhenHotSpotAppears() {
WriteLoadConstraintMonitor.class.getCanonicalName(),
Level.DEBUG,
Strings.format("""
Nodes [[%s]] are hot-spotting, of 4 total cluster nodes. Reroute for hot-spotting has never previously been called. \
Nodes [[%s]] are hot-spotting, of 3 total ingest nodes. Reroute for hot-spotting has never previously been called. \
Previously hot-spotting nodes are [0 nodes]. The write thread pool queue latency threshold is [%s]. \
Triggering reroute.
""", getNodeId(dataNodeOne), TimeValue.timeValueMillis(queueLatencyThresholdMillis))
Expand Down Expand Up @@ -113,7 +113,7 @@ public void testRerouteIsCalledWhenHotSpotAppears() {
WriteLoadConstraintMonitor.class.getCanonicalName(),
Level.DEBUG,
Strings.format("""
Nodes [[*]] are hot-spotting, of 4 total cluster nodes. \
Nodes [[*]] are hot-spotting, of 3 total ingest nodes. \
Reroute for hot-spotting was last called [*] ago. Previously hot-spotting nodes are [[%s]]. \
The write thread pool queue latency threshold is [%s]. Triggering reroute.
""", getNodeId(dataNodeOne), TimeValue.timeValueMillis(queueLatencyThresholdMillis))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

Expand Down Expand Up @@ -78,28 +77,33 @@ public void onNewInfo(ClusterInfo clusterInfo) {

final int numberOfNodes = clusterInfo.getNodeUsageStatsForThreadPools().size();
final Set<String> writeNodeIdsExceedingQueueLatencyThreshold = Sets.newHashSetWithExpectedSize(numberOfNodes);
AtomicBoolean haveWriteNodesBelowQueueLatencyThreshold = new AtomicBoolean(false);
clusterInfo.getNodeUsageStatsForThreadPools().forEach((nodeId, usageStats) -> {
if (state.getNodes().get(nodeId).getRoles().contains(DiscoveryNodeRole.SEARCH_ROLE)) {
// Search nodes are not expected to have write load hot-spots and are not considered for shard relocation.
var haveWriteNodesBelowQueueLatencyThreshold = false;
var totalIngestNodes = 0;
for (var entry : clusterInfo.getNodeUsageStatsForThreadPools().entrySet()) {
final var nodeId = entry.getKey();
final var usageStats = entry.getValue();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just changed to a for-loop so we didn't need to create all these AtomicXXX to store the counter/boolean

final var nodeRoles = state.getNodes().get(nodeId).getRoles();
if (nodeRoles.contains(DiscoveryNodeRole.SEARCH_ROLE) || nodeRoles.contains(DiscoveryNodeRole.ML_ROLE)) {
// Search & ML nodes are not expected to have write load hot-spots and are not considered for shard relocation.
// TODO (ES-13314): consider stateful data tiers
return;
continue;
}
totalIngestNodes++;
final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = usageStats.threadPoolUsageStatsMap()
.get(ThreadPool.Names.WRITE);
assert writeThreadPoolStats != null : "Write thread pool is not publishing usage stats for node [" + nodeId + "]";
if (writeThreadPoolStats.maxThreadPoolQueueLatencyMillis() >= writeLoadConstraintSettings.getQueueLatencyThreshold().millis()) {
writeNodeIdsExceedingQueueLatencyThreshold.add(nodeId);
} else {
haveWriteNodesBelowQueueLatencyThreshold.set(true);
haveWriteNodesBelowQueueLatencyThreshold = true;
}
});
}

if (writeNodeIdsExceedingQueueLatencyThreshold.isEmpty()) {
logger.trace("No hot-spotting write nodes detected");
return;
}
if (haveWriteNodesBelowQueueLatencyThreshold.get() == false) {
if (haveWriteNodesBelowQueueLatencyThreshold == false) {
logger.debug("""
Nodes [{}] are above the queue latency threshold, but there are no write nodes below the threshold. \
Cannot rebalance shards.""", nodeSummary(writeNodeIdsExceedingQueueLatencyThreshold));
Expand All @@ -118,11 +122,11 @@ public void onNewInfo(ClusterInfo clusterInfo) {
if (logger.isDebugEnabled()) {
logger.debug(
"""
Nodes [{}] are hot-spotting, of {} total cluster nodes. Reroute for hot-spotting {}. \
Nodes [{}] are hot-spotting, of {} total ingest nodes. Reroute for hot-spotting {}. \
Previously hot-spotting nodes are [{}]. The write thread pool queue latency threshold is [{}]. Triggering reroute.
""",
nodeSummary(writeNodeIdsExceedingQueueLatencyThreshold),
state.nodes().size(),
totalIngestNodes,
lastRerouteTimeMillis == 0
? "has never previously been called"
: "was last called [" + TimeValue.timeValueMillis(timeSinceLastRerouteMillis) + "] ago",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ public void testRerouteIsNotCalledInAnAllNodesAreHotSpottingCluster() {
final TestState testState = createTestStateWithNumberOfNodesAndHotSpots(
numberOfIndexNodes,
randomIntBetween(1, 5), // Search nodes should not be considered to address write load hot-spots.
randomIntBetween(1, 5), // ML nodes should not be considered to address write load hot-spots.
numberOfIndexNodes
);
final WriteLoadConstraintMonitor writeLoadConstraintMonitor = new WriteLoadConstraintMonitor(
Expand Down Expand Up @@ -282,7 +283,7 @@ public void testRerouteIsCalledBeforeMinimumIntervalHasPassedIfNewNodesBecomeHot
Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsWithExtraHotSpot = new HashMap<>();
for (var entry : testState.clusterInfo.getNodeUsageStatsForThreadPools().entrySet()) {
if (thresholdIncreased.get() == false
&& nonSearchNodeBelowQueueLatencyThreshold(
&& indexingNodeBelowQueueLatencyThreshold(
testState.clusterState,
entry.getKey(),
entry.getValue(),
Expand Down Expand Up @@ -316,13 +317,15 @@ && nonSearchNodeBelowQueueLatencyThreshold(
verify(testState.mockRerouteService).reroute(anyString(), eq(Priority.NORMAL), any());
}

private boolean nonSearchNodeBelowQueueLatencyThreshold(
private boolean indexingNodeBelowQueueLatencyThreshold(
ClusterState clusterState,
String nodeId,
NodeUsageStatsForThreadPools nodeUsageStats,
long latencyThresholdMillis
) {
return clusterState.getNodes().get(nodeId).getRoles().contains(DiscoveryNodeRole.SEARCH_ROLE) == false
final var nodeRoles = clusterState.getNodes().get(nodeId).getRoles();
return nodeRoles.contains(DiscoveryNodeRole.SEARCH_ROLE) == false
&& nodeRoles.contains(DiscoveryNodeRole.ML_ROLE) == false
&& nodeUsageStats.threadPoolUsageStatsMap()
.get(ThreadPool.Names.WRITE)
.maxThreadPoolQueueLatencyMillis() < latencyThresholdMillis;
Expand All @@ -331,12 +334,18 @@ private boolean nonSearchNodeBelowQueueLatencyThreshold(
private TestState createRandomTestStateThatWillTriggerReroute() {
int numberOfNodes = randomIntBetween(3, 10);
int numberOfHotSpottingNodes = numberOfNodes - 2; // Leave at least 2 non-hot-spotting nodes.
return createTestStateWithNumberOfNodesAndHotSpots(numberOfNodes, randomIntBetween(0, 5), numberOfHotSpottingNodes);
return createTestStateWithNumberOfNodesAndHotSpots(
numberOfNodes,
randomIntBetween(0, 5), // search nodes
randomIntBetween(0, 2), // ML nodes
numberOfHotSpottingNodes
);
}

private TestState createTestStateWithNumberOfNodesAndHotSpots(
int numberOfIndexNodes,
int numberOfSearchNodes,
int numberOfMLNodes,
int numberOfHotSpottingNodes
) {
assert numberOfHotSpottingNodes <= numberOfIndexNodes;
Expand All @@ -350,8 +359,9 @@ private TestState createTestStateWithNumberOfNodesAndHotSpots(
final ClusterState state = ClusterStateCreationUtils.buildServerlessRoleNodes(
randomIdentifier(), // index name
randomIntBetween(1, numberOfIndexNodes), // num shard primaries
numberOfIndexNodes, // number of index role nodes
numberOfSearchNodes // number of search role nodes
numberOfIndexNodes,
numberOfSearchNodes,
numberOfMLNodes
);

final RerouteService rerouteService = mock(RerouteService.class);
Expand Down Expand Up @@ -400,7 +410,8 @@ private static ClusterSettings createClusterSettings(
/**
* Create a {@link ClusterInfo} with the specified number of hot spotting index nodes,
* all other index nodes will have no queue latency and have utilization below the specified
* high-utilization threshold. Any search nodes in the cluster will have zero usage write load stats.
* high-utilization threshold. Any search or ML nodes in the cluster will have zero usage
* write load stats.
*
* @param state The cluster state
* @param numberOfNodesHotSpotting The number of nodes that should be hot-spotting
Expand Down Expand Up @@ -429,8 +440,8 @@ private static ClusterInfo createClusterInfoWithHotSpots(
final AtomicInteger hotSpottingNodes = new AtomicInteger(numberOfNodesHotSpotting);
return ClusterInfo.builder()
.nodeUsageStatsForThreadPools(state.nodes().stream().collect(Collectors.toMap(DiscoveryNode::getId, node -> {
if (node.getRoles().contains(DiscoveryNodeRole.SEARCH_ROLE)) {
// Search nodes are skipped for write load hot-spots.
if (node.getRoles().contains(DiscoveryNodeRole.SEARCH_ROLE) || node.getRoles().contains(DiscoveryNodeRole.ML_ROLE)) {
// Search & ML nodes are skipped for write load hot-spots.
return new NodeUsageStatsForThreadPools(node.getId(), ZERO_USAGE_THREAD_POOL_USAGE_MAP);
}
if (hotSpottingNodes.getAndDecrement() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,15 +334,17 @@ public static ClusterState state(ProjectId projectId, String indexName, int numb
}

/**
* Creates cluster state with an index that has #(numberOfPrimaries) primary shards in the started state and no replicas. The cluster
* state contains #(numberOfIndexNodes) nodes with {@link DiscoveryNodeRole#INDEX_ROLE}, assigning the primary shards to those nodes,
* and #(numberOfSearchNodes) nodes with {@link DiscoveryNodeRole#SEARCH_ROLE}.
* Creates cluster state with an index that has {@code numberOfPrimaries} primary shards in the started state and no replicas. The
* cluster state contains {@code numberOfIndexNodes} nodes with {@link DiscoveryNodeRole#INDEX_ROLE}, assigning the primary shards
* to those nodes, {@code numberOfSearchNodes} nodes with {@link DiscoveryNodeRole#SEARCH_ROLE}, and {@code numberOfMLNodes} with
* {@link DiscoveryNodeRole#ML_ROLE}.
*/
public static ClusterState buildServerlessRoleNodes(
String indexName,
int numberOfPrimaries,
int numberOfIndexNodes,
int numberOfSearchNodes
int numberOfSearchNodes,
int numberOfMLNodes
) {
ProjectId projectId = Metadata.DEFAULT_PROJECT_ID;
DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
Expand All @@ -356,6 +358,10 @@ public static ClusterState buildServerlessRoleNodes(
final DiscoveryNode node = DiscoveryNodeUtils.builder("search_" + i).roles(Set.of(DiscoveryNodeRole.SEARCH_ROLE)).build();
discoBuilder = discoBuilder.add(node);
}
for (int i = 0; i < numberOfMLNodes; i++) {
final DiscoveryNode node = DiscoveryNodeUtils.builder("ml_" + i).roles(Set.of(DiscoveryNodeRole.ML_ROLE)).build();
discoBuilder = discoBuilder.add(node);
}
discoBuilder.localNodeId(randomFrom(indexNodeIds));
discoBuilder.masterNodeId(randomFrom(indexNodeIds));
IndexState index = buildIndex(indexName, numberOfPrimaries, indexNodeIds);
Expand Down