From 0253b78f6c720152c9f62d3aaedaf04f7b80b4f8 Mon Sep 17 00:00:00 2001 From: gmarouli Date: Thu, 26 Jun 2025 21:45:15 +0300 Subject: [PATCH 1/4] Ensure downsample tasks in stateless work with replicas --- .../xpack/downsample/Downsample.java | 1 + ...DownsampleShardPersistentTaskExecutor.java | 36 ++++++++-- ...ampleShardPersistentTaskExecutorTests.java | 68 ++++++++++++++++++- 3 files changed, 100 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/Downsample.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/Downsample.java index c7d3a79bc082a..ce24ba47e2ad3 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/Downsample.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/Downsample.java @@ -98,6 +98,7 @@ public List> getPersistentTasksExecutor( new DownsampleShardPersistentTaskExecutor( client, DownsampleShardTask.TASK_NAME, + clusterService.getSettings(), threadPool.executor(DOWNSAMPLE_TASK_THREAD_POOL_NAME) ) ); diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java index 3f16535908430..1f24b8b61db94 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.IndexNotFoundException; @@ -52,15 +53,19 @@ import java.util.Collections; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.Executor; +import java.util.stream.Collectors; public class DownsampleShardPersistentTaskExecutor extends PersistentTasksExecutor { private static final Logger LOGGER = LogManager.getLogger(DownsampleShardPersistentTaskExecutor.class); private final Client client; + private final boolean isStateless; - public DownsampleShardPersistentTaskExecutor(final Client client, final String taskName, final Executor executor) { + public DownsampleShardPersistentTaskExecutor(final Client client, final String taskName, Settings settings, final Executor executor) { super(taskName, executor); this.client = Objects.requireNonNull(client); + this.isStateless = DiscoveryNode.isStateless(settings); } @Override @@ -142,13 +147,13 @@ public PersistentTasksCustomMetadata.Assignment getAssignment( return new PersistentTasksCustomMetadata.Assignment(node.getId(), "a node to fail and stop this persistent task"); } - final ShardRouting shardRouting = indexShardRouting.primaryShard(); - if (shardRouting.started() == false) { + final Set eligibleNodes = getEligibleNodes(indexShardRouting); + if (eligibleNodes.isEmpty()) { return NO_NODE_FOUND; } return candidateNodes.stream() - .filter(candidateNode -> candidateNode.getId().equals(shardRouting.currentNodeId())) + .filter(candidateNode -> eligibleNodes.contains(candidateNode.getId())) .findAny() .map( node -> new PersistentTasksCustomMetadata.Assignment( @@ -159,6 +164,29 @@ public PersistentTasksCustomMetadata.Assignment getAssignment( .orElse(NO_NODE_FOUND); } + /** + * An eligible node to run the downsampling task for a shard is a node that holds + * a searchable version of this shard. + * In stateless deployment we choose only nodes that hold search shards. + * Otherwise, we choose the node that holds the primary shard. + * Visible for testing. + * @param indexShardRouting the routing of the shard to be downsampled + * @return the set of candidate nodes downsampling can run on. + */ + Set getEligibleNodes(IndexShardRoutingTable indexShardRouting) { + if (isStateless) { + return indexShardRouting.assignedShards() + .stream() + .filter(shardRouting -> shardRouting.primary() == false && shardRouting.started()) + .map(ShardRouting::currentNodeId) + .collect(Collectors.toSet()); + } + if (indexShardRouting.primaryShard().started()) { + return Set.of(indexShardRouting.primaryShard().currentNodeId()); + } + return Set.of(); + } + @Override public Executor getExecutor() { // The delegate action forks to the a downsample thread: diff --git a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java index c132912da133a..5bbee3a8cada6 100644 --- a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java +++ b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; @@ -35,9 +36,11 @@ import java.util.Set; import java.util.concurrent.Executor; +import static org.elasticsearch.cluster.node.DiscoveryNode.STATELESS_ENABLED_SETTING_NAME; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.elasticsearch.cluster.routing.TestShardRouting.shardRoutingBuilder; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.Mockito.mock; public class DownsampleShardPersistentTaskExecutorTests extends ESTestCase { @@ -57,7 +60,12 @@ public void setup() { "metrics-app1", List.of(new Tuple<>(start, end)) ); - executor = new DownsampleShardPersistentTaskExecutor(mock(Client.class), DownsampleShardTask.TASK_NAME, mock(Executor.class)); + executor = new DownsampleShardPersistentTaskExecutor( + mock(Client.class), + DownsampleShardTask.TASK_NAME, + Settings.EMPTY, + mock(Executor.class) + ); } public void testGetAssignment() { @@ -124,7 +132,65 @@ public void testGetAssignmentMissingIndex() { assertThat(result.getExplanation(), equalTo("a node to fail and stop this persistent task")); } + public void testGetStatelessAssignment() { + executor = new DownsampleShardPersistentTaskExecutor( + mock(Client.class), + DownsampleShardTask.TASK_NAME, + Settings.builder().put(STATELESS_ENABLED_SETTING_NAME, "true").build(), + mock(Executor.class) + ); + var backingIndex = initialClusterState.metadata().getProject(projectId).dataStreams().get("metrics-app1").getWriteIndex(); + var searchNode = newNode(Set.of(DiscoveryNodeRole.SEARCH_ROLE)); + var indexNode = newNode(Set.of(DiscoveryNodeRole.INDEX_ROLE)); + var shardId = new ShardId(backingIndex, 0); + var clusterState = ClusterState.builder(initialClusterState) + .nodes(new DiscoveryNodes.Builder().add(indexNode).add(searchNode).build()) + .putRoutingTable( + projectId, + RoutingTable.builder() + .add( + IndexRoutingTable.builder(backingIndex) + .addShard(shardRoutingBuilder(shardId, indexNode.getId(), true, STARTED).withRecoverySource(null).build()) + ) + .build() + ) + .build(); + + var params = new DownsampleShardTaskParams( + new DownsampleConfig(new DateHistogramInterval("1h")), + shardId.getIndexName(), + 1, + 1, + shardId, + Strings.EMPTY_ARRAY, + Strings.EMPTY_ARRAY, + Strings.EMPTY_ARRAY + ); + var result = executor.getAssignment(params, Set.of(indexNode, searchNode), clusterState); + assertThat(result.getExecutorNode(), nullValue()); + + // Assign a copy of the shard to a search node + clusterState = ClusterState.builder(clusterState) + .putRoutingTable( + projectId, + RoutingTable.builder() + .add( + IndexRoutingTable.builder(backingIndex) + .addShard(shardRoutingBuilder(shardId, indexNode.getId(), true, STARTED).withRecoverySource(null).build()) + .addShard(shardRoutingBuilder(shardId, searchNode.getId(), false, STARTED).withRecoverySource(null).build()) + ) + .build() + ) + .build(); + result = executor.getAssignment(params, Set.of(indexNode, searchNode), clusterState); + assertThat(result.getExecutorNode(), equalTo(searchNode.getId())); + } + private static DiscoveryNode newNode() { + return newNode(DiscoveryNodeRole.roles()); + } + + private static DiscoveryNode newNode(Set nodes) { return DiscoveryNodeUtils.create( "node_" + UUIDs.randomBase64UUID(random()), buildNewFakeTransportAddress(), From 9db54d538aec197726c45706c3231154ca2e6f5b Mon Sep 17 00:00:00 2001 From: gmarouli Date: Fri, 27 Jun 2025 14:04:52 +0300 Subject: [PATCH 2/4] Refactor based on review comments --- ...DownsampleShardPersistentTaskExecutor.java | 56 ++++++++----------- 1 file changed, 23 insertions(+), 33 deletions(-) diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java index 1f24b8b61db94..ed9b34cf69421 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java @@ -53,9 +53,7 @@ import java.util.Collections; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.concurrent.Executor; -import java.util.stream.Collectors; public class DownsampleShardPersistentTaskExecutor extends PersistentTasksExecutor { private static final Logger LOGGER = LogManager.getLogger(DownsampleShardPersistentTaskExecutor.class); @@ -147,44 +145,36 @@ public PersistentTasksCustomMetadata.Assignment getAssignment( return new PersistentTasksCustomMetadata.Assignment(node.getId(), "a node to fail and stop this persistent task"); } - final Set eligibleNodes = getEligibleNodes(indexShardRouting); - if (eligibleNodes.isEmpty()) { - return NO_NODE_FOUND; - } - - return candidateNodes.stream() - .filter(candidateNode -> eligibleNodes.contains(candidateNode.getId())) + // We find the nodes that hold the eligible shards. + // If the current node of such a shard is a candidate node, then we assign the task there. + // This code is inefficient, but we are relying on the laziness of the intermediate operations + // and the assumption that the first shard we examine has high chances of being assigned to a candidate node. + return indexShardRouting.activeShards() + .stream() + .filter(this::isEligible) + .map(ShardRouting::currentNodeId) + .filter(nodeId -> isCandidateNode(candidateNodes, nodeId)) .findAny() - .map( - node -> new PersistentTasksCustomMetadata.Assignment( - node.getId(), - "downsampling using node holding shard [" + shardId + "]" - ) - ) + .map(nodeId -> new PersistentTasksCustomMetadata.Assignment(nodeId, "downsampling using node holding shard [" + shardId + "]")) .orElse(NO_NODE_FOUND); } /** - * An eligible node to run the downsampling task for a shard is a node that holds - * a searchable version of this shard. - * In stateless deployment we choose only nodes that hold search shards. - * Otherwise, we choose the node that holds the primary shard. - * Visible for testing. - * @param indexShardRouting the routing of the shard to be downsampled - * @return the set of candidate nodes downsampling can run on. + * Only shards that can be searched can be used as the source of a downsampling task. + * In stateless deployment, this means that shards that CANNOT be promoted to primary can be used. + * For simplicity, in non-stateless deployments we use the primary shard. */ - Set getEligibleNodes(IndexShardRoutingTable indexShardRouting) { - if (isStateless) { - return indexShardRouting.assignedShards() - .stream() - .filter(shardRouting -> shardRouting.primary() == false && shardRouting.started()) - .map(ShardRouting::currentNodeId) - .collect(Collectors.toSet()); - } - if (indexShardRouting.primaryShard().started()) { - return Set.of(indexShardRouting.primaryShard().currentNodeId()); + private boolean isEligible(ShardRouting shardRouting) { + return shardRouting.started() && (isStateless ? shardRouting.isPromotableToPrimary() == false : shardRouting.primary()); + } + + private boolean isCandidateNode(Collection candidateNodes, String nodeId) { + for (DiscoveryNode candidateNode : candidateNodes) { + if (candidateNode.getId().equals(nodeId)) { + return true; + } } - return Set.of(); + return false; } @Override From 22d0310e87a067d3eb28c119d3045d97f692cb63 Mon Sep 17 00:00:00 2001 From: gmarouli Date: Fri, 27 Jun 2025 14:12:45 +0300 Subject: [PATCH 3/4] Fix test to capture the "search-only" role --- .../DownsampleShardPersistentTaskExecutorTests.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java index 5bbee3a8cada6..3ef575efc4a8c 100644 --- a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java +++ b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; @@ -177,7 +178,11 @@ public void testGetStatelessAssignment() { .add( IndexRoutingTable.builder(backingIndex) .addShard(shardRoutingBuilder(shardId, indexNode.getId(), true, STARTED).withRecoverySource(null).build()) - .addShard(shardRoutingBuilder(shardId, searchNode.getId(), false, STARTED).withRecoverySource(null).build()) + .addShard( + shardRoutingBuilder(shardId, searchNode.getId(), false, STARTED).withRecoverySource(null) + .withRole(ShardRouting.Role.SEARCH_ONLY) + .build() + ) ) .build() ) From 342e17ed0364cf9d29dcfae1c8a0ea10c19e8fc5 Mon Sep 17 00:00:00 2001 From: gmarouli Date: Fri, 27 Jun 2025 22:53:29 +0300 Subject: [PATCH 4/4] Remove the 0 replicas to ensure we have search shards in stateless --- .../resources/rest-api-spec/test/downsample/10_basic.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/10_basic.yml b/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/10_basic.yml index fa3560bec516e..26b9b148afbfa 100644 --- a/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/10_basic.yml +++ b/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/10_basic.yml @@ -1555,7 +1555,6 @@ setup: body: settings: number_of_shards: 1 - number_of_replicas: 0 index: mode: time_series routing_path: [ k8s.pod.name ] @@ -1639,7 +1638,6 @@ setup: body: settings: number_of_shards: 1 - number_of_replicas: 0 index: mode: time_series routing_path: [ k8s.pod.name, k8s.pod.empty ]