diff --git a/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/10_basic.yml b/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/10_basic.yml index fa3560bec516e..26b9b148afbfa 100644 --- a/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/10_basic.yml +++ b/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/10_basic.yml @@ -1555,7 +1555,6 @@ setup: body: settings: number_of_shards: 1 - number_of_replicas: 0 index: mode: time_series routing_path: [ k8s.pod.name ] @@ -1639,7 +1638,6 @@ setup: body: settings: number_of_shards: 1 - number_of_replicas: 0 index: mode: time_series routing_path: [ k8s.pod.name, k8s.pod.empty ] diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/Downsample.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/Downsample.java index c7d3a79bc082a..ce24ba47e2ad3 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/Downsample.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/Downsample.java @@ -98,6 +98,7 @@ public List> getPersistentTasksExecutor( new DownsampleShardPersistentTaskExecutor( client, DownsampleShardTask.TASK_NAME, + clusterService.getSettings(), threadPool.executor(DOWNSAMPLE_TASK_THREAD_POOL_NAME) ) ); diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java index 3f16535908430..ed9b34cf69421 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.IndexNotFoundException; @@ -57,10 +58,12 @@ public class DownsampleShardPersistentTaskExecutor extends PersistentTasksExecutor { private static final Logger LOGGER = LogManager.getLogger(DownsampleShardPersistentTaskExecutor.class); private final Client client; + private final boolean isStateless; - public DownsampleShardPersistentTaskExecutor(final Client client, final String taskName, final Executor executor) { + public DownsampleShardPersistentTaskExecutor(final Client client, final String taskName, Settings settings, final Executor executor) { super(taskName, executor); this.client = Objects.requireNonNull(client); + this.isStateless = DiscoveryNode.isStateless(settings); } @Override @@ -142,23 +145,38 @@ public PersistentTasksCustomMetadata.Assignment getAssignment( return new PersistentTasksCustomMetadata.Assignment(node.getId(), "a node to fail and stop this persistent task"); } - final ShardRouting shardRouting = indexShardRouting.primaryShard(); - if (shardRouting.started() == false) { - return NO_NODE_FOUND; - } - - return candidateNodes.stream() - .filter(candidateNode -> candidateNode.getId().equals(shardRouting.currentNodeId())) + // We find the nodes that hold the eligible shards. + // If the current node of such a shard is a candidate node, then we assign the task there. + // This code is inefficient, but we are relying on the laziness of the intermediate operations + // and the assumption that the first shard we examine has high chances of being assigned to a candidate node. + return indexShardRouting.activeShards() + .stream() + .filter(this::isEligible) + .map(ShardRouting::currentNodeId) + .filter(nodeId -> isCandidateNode(candidateNodes, nodeId)) .findAny() - .map( - node -> new PersistentTasksCustomMetadata.Assignment( - node.getId(), - "downsampling using node holding shard [" + shardId + "]" - ) - ) + .map(nodeId -> new PersistentTasksCustomMetadata.Assignment(nodeId, "downsampling using node holding shard [" + shardId + "]")) .orElse(NO_NODE_FOUND); } + /** + * Only shards that can be searched can be used as the source of a downsampling task. + * In stateless deployment, this means that shards that CANNOT be promoted to primary can be used. + * For simplicity, in non-stateless deployments we use the primary shard. + */ + private boolean isEligible(ShardRouting shardRouting) { + return shardRouting.started() && (isStateless ? shardRouting.isPromotableToPrimary() == false : shardRouting.primary()); + } + + private boolean isCandidateNode(Collection candidateNodes, String nodeId) { + for (DiscoveryNode candidateNode : candidateNodes) { + if (candidateNode.getId().equals(nodeId)) { + return true; + } + } + return false; + } + @Override public Executor getExecutor() { // The delegate action forks to the a downsample thread: diff --git a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java index c132912da133a..3ef575efc4a8c 100644 --- a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java +++ b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java @@ -18,8 +18,10 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; @@ -35,9 +37,11 @@ import java.util.Set; import java.util.concurrent.Executor; +import static org.elasticsearch.cluster.node.DiscoveryNode.STATELESS_ENABLED_SETTING_NAME; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.elasticsearch.cluster.routing.TestShardRouting.shardRoutingBuilder; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.Mockito.mock; public class DownsampleShardPersistentTaskExecutorTests extends ESTestCase { @@ -57,7 +61,12 @@ public void setup() { "metrics-app1", List.of(new Tuple<>(start, end)) ); - executor = new DownsampleShardPersistentTaskExecutor(mock(Client.class), DownsampleShardTask.TASK_NAME, mock(Executor.class)); + executor = new DownsampleShardPersistentTaskExecutor( + mock(Client.class), + DownsampleShardTask.TASK_NAME, + Settings.EMPTY, + mock(Executor.class) + ); } public void testGetAssignment() { @@ -124,7 +133,69 @@ public void testGetAssignmentMissingIndex() { assertThat(result.getExplanation(), equalTo("a node to fail and stop this persistent task")); } + public void testGetStatelessAssignment() { + executor = new DownsampleShardPersistentTaskExecutor( + mock(Client.class), + DownsampleShardTask.TASK_NAME, + Settings.builder().put(STATELESS_ENABLED_SETTING_NAME, "true").build(), + mock(Executor.class) + ); + var backingIndex = initialClusterState.metadata().getProject(projectId).dataStreams().get("metrics-app1").getWriteIndex(); + var searchNode = newNode(Set.of(DiscoveryNodeRole.SEARCH_ROLE)); + var indexNode = newNode(Set.of(DiscoveryNodeRole.INDEX_ROLE)); + var shardId = new ShardId(backingIndex, 0); + var clusterState = ClusterState.builder(initialClusterState) + .nodes(new DiscoveryNodes.Builder().add(indexNode).add(searchNode).build()) + .putRoutingTable( + projectId, + RoutingTable.builder() + .add( + IndexRoutingTable.builder(backingIndex) + .addShard(shardRoutingBuilder(shardId, indexNode.getId(), true, STARTED).withRecoverySource(null).build()) + ) + .build() + ) + .build(); + + var params = new DownsampleShardTaskParams( + new DownsampleConfig(new DateHistogramInterval("1h")), + shardId.getIndexName(), + 1, + 1, + shardId, + Strings.EMPTY_ARRAY, + Strings.EMPTY_ARRAY, + Strings.EMPTY_ARRAY + ); + var result = executor.getAssignment(params, Set.of(indexNode, searchNode), clusterState); + assertThat(result.getExecutorNode(), nullValue()); + + // Assign a copy of the shard to a search node + clusterState = ClusterState.builder(clusterState) + .putRoutingTable( + projectId, + RoutingTable.builder() + .add( + IndexRoutingTable.builder(backingIndex) + .addShard(shardRoutingBuilder(shardId, indexNode.getId(), true, STARTED).withRecoverySource(null).build()) + .addShard( + shardRoutingBuilder(shardId, searchNode.getId(), false, STARTED).withRecoverySource(null) + .withRole(ShardRouting.Role.SEARCH_ONLY) + .build() + ) + ) + .build() + ) + .build(); + result = executor.getAssignment(params, Set.of(indexNode, searchNode), clusterState); + assertThat(result.getExecutorNode(), equalTo(searchNode.getId())); + } + private static DiscoveryNode newNode() { + return newNode(DiscoveryNodeRole.roles()); + } + + private static DiscoveryNode newNode(Set nodes) { return DiscoveryNodeUtils.create( "node_" + UUIDs.randomBase64UUID(random()), buildNewFakeTransportAddress(),