Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -1555,7 +1555,6 @@ setup:
body:
settings:
number_of_shards: 1
number_of_replicas: 0
index:
mode: time_series
routing_path: [ k8s.pod.name ]
Expand Down Expand Up @@ -1639,7 +1638,6 @@ setup:
body:
settings:
number_of_shards: 1
number_of_replicas: 0
index:
mode: time_series
routing_path: [ k8s.pod.name, k8s.pod.empty ]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
new DownsampleShardPersistentTaskExecutor(
client,
DownsampleShardTask.TASK_NAME,
clusterService.getSettings(),
threadPool.executor(DOWNSAMPLE_TASK_THREAD_POOL_NAME)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.IndexNotFoundException;
Expand Down Expand Up @@ -57,10 +58,12 @@
public class DownsampleShardPersistentTaskExecutor extends PersistentTasksExecutor<DownsampleShardTaskParams> {
private static final Logger LOGGER = LogManager.getLogger(DownsampleShardPersistentTaskExecutor.class);
private final Client client;
private final boolean isStateless;

public DownsampleShardPersistentTaskExecutor(final Client client, final String taskName, final Executor executor) {
public DownsampleShardPersistentTaskExecutor(final Client client, final String taskName, Settings settings, final Executor executor) {
super(taskName, executor);
this.client = Objects.requireNonNull(client);
this.isStateless = DiscoveryNode.isStateless(settings);
}

@Override
Expand Down Expand Up @@ -142,23 +145,38 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(
return new PersistentTasksCustomMetadata.Assignment(node.getId(), "a node to fail and stop this persistent task");
}

final ShardRouting shardRouting = indexShardRouting.primaryShard();
if (shardRouting.started() == false) {
return NO_NODE_FOUND;
}

return candidateNodes.stream()
.filter(candidateNode -> candidateNode.getId().equals(shardRouting.currentNodeId()))
// We find the nodes that hold the eligible shards.
// If the current node of such a shard is a candidate node, then we assign the task there.
// This code is inefficient, but we are relying on the laziness of the intermediate operations
// and the assumption that the first shard we examine has high chances of being assigned to a candidate node.
return indexShardRouting.activeShards()
.stream()
.filter(this::isEligible)
.map(ShardRouting::currentNodeId)
.filter(nodeId -> isCandidateNode(candidateNodes, nodeId))
.findAny()
.map(
node -> new PersistentTasksCustomMetadata.Assignment(
node.getId(),
"downsampling using node holding shard [" + shardId + "]"
)
)
.map(nodeId -> new PersistentTasksCustomMetadata.Assignment(nodeId, "downsampling using node holding shard [" + shardId + "]"))
.orElse(NO_NODE_FOUND);
}

/**
* Only shards that can be searched can be used as the source of a downsampling task.
* In stateless deployment, this means that shards that CANNOT be promoted to primary can be used.
* For simplicity, in non-stateless deployments we use the primary shard.
*/
private boolean isEligible(ShardRouting shardRouting) {
return shardRouting.started() && (isStateless ? shardRouting.isPromotableToPrimary() == false : shardRouting.primary());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit I think the last parenthesis could be simplified with shardRouting.isSearchable()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even better, I will change it.

}

private boolean isCandidateNode(Collection<DiscoveryNode> candidateNodes, String nodeId) {
for (DiscoveryNode candidateNode : candidateNodes) {
if (candidateNode.getId().equals(nodeId)) {
return true;
}
}
return false;
}

@Override
public Executor getExecutor() {
// The delegate action forks to the a downsample thread:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
Expand All @@ -35,9 +37,11 @@
import java.util.Set;
import java.util.concurrent.Executor;

import static org.elasticsearch.cluster.node.DiscoveryNode.STATELESS_ENABLED_SETTING_NAME;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.cluster.routing.TestShardRouting.shardRoutingBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;

public class DownsampleShardPersistentTaskExecutorTests extends ESTestCase {
Expand All @@ -57,7 +61,12 @@ public void setup() {
"metrics-app1",
List.of(new Tuple<>(start, end))
);
executor = new DownsampleShardPersistentTaskExecutor(mock(Client.class), DownsampleShardTask.TASK_NAME, mock(Executor.class));
executor = new DownsampleShardPersistentTaskExecutor(
mock(Client.class),
DownsampleShardTask.TASK_NAME,
Settings.EMPTY,
mock(Executor.class)
);
}

public void testGetAssignment() {
Expand Down Expand Up @@ -124,7 +133,69 @@ public void testGetAssignmentMissingIndex() {
assertThat(result.getExplanation(), equalTo("a node to fail and stop this persistent task"));
}

public void testGetStatelessAssignment() {
executor = new DownsampleShardPersistentTaskExecutor(
mock(Client.class),
DownsampleShardTask.TASK_NAME,
Settings.builder().put(STATELESS_ENABLED_SETTING_NAME, "true").build(),
mock(Executor.class)
);
var backingIndex = initialClusterState.metadata().getProject(projectId).dataStreams().get("metrics-app1").getWriteIndex();
var searchNode = newNode(Set.of(DiscoveryNodeRole.SEARCH_ROLE));
var indexNode = newNode(Set.of(DiscoveryNodeRole.INDEX_ROLE));
var shardId = new ShardId(backingIndex, 0);
var clusterState = ClusterState.builder(initialClusterState)
.nodes(new DiscoveryNodes.Builder().add(indexNode).add(searchNode).build())
.putRoutingTable(
projectId,
RoutingTable.builder()
.add(
IndexRoutingTable.builder(backingIndex)
.addShard(shardRoutingBuilder(shardId, indexNode.getId(), true, STARTED).withRecoverySource(null).build())
)
.build()
)
.build();

var params = new DownsampleShardTaskParams(
new DownsampleConfig(new DateHistogramInterval("1h")),
shardId.getIndexName(),
1,
1,
shardId,
Strings.EMPTY_ARRAY,
Strings.EMPTY_ARRAY,
Strings.EMPTY_ARRAY
);
var result = executor.getAssignment(params, Set.of(indexNode, searchNode), clusterState);
assertThat(result.getExecutorNode(), nullValue());

// Assign a copy of the shard to a search node
clusterState = ClusterState.builder(clusterState)
.putRoutingTable(
projectId,
RoutingTable.builder()
.add(
IndexRoutingTable.builder(backingIndex)
.addShard(shardRoutingBuilder(shardId, indexNode.getId(), true, STARTED).withRecoverySource(null).build())
.addShard(
shardRoutingBuilder(shardId, searchNode.getId(), false, STARTED).withRecoverySource(null)
.withRole(ShardRouting.Role.SEARCH_ONLY)
.build()
)
)
.build()
)
.build();
result = executor.getAssignment(params, Set.of(indexNode, searchNode), clusterState);
assertThat(result.getExecutorNode(), equalTo(searchNode.getId()));
}

private static DiscoveryNode newNode() {
return newNode(DiscoveryNodeRole.roles());
}

private static DiscoveryNode newNode(Set<DiscoveryNodeRole> nodes) {
return DiscoveryNodeUtils.create(
"node_" + UUIDs.randomBase64UUID(random()),
buildNewFakeTransportAddress(),
Expand Down