Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/106247.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 106247
summary: Fix a downsample persistent task assignment bug
area: Downsampling
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -135,7 +136,7 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(
// If during re-assignment the source index was deleted, then we need to break out.
// Returning NO_NODE_FOUND just keeps the persistent task until the source index appears again (which would never happen)
// So let's return a node and then in the node operation we would just fail and stop this persistent task
var indexShardRouting = clusterState.routingTable().shardRoutingTable(params.shardId().getIndexName(), params.shardId().id());
var indexShardRouting = findShardRoutingTable(shardId, clusterState);
if (indexShardRouting == null) {
var node = selectLeastLoadedNode(clusterState, candidateNodes, DiscoveryNode::canContainData);
return new PersistentTasksCustomMetadata.Assignment(node.getId(), "a node to fail and stop this persistent task");
Expand Down Expand Up @@ -176,6 +177,14 @@ private void delegate(final AllocatedPersistentTask task, final DownsampleShardT
);
}

private static IndexShardRoutingTable findShardRoutingTable(ShardId shardId, ClusterState clusterState) {
var indexRoutingTable = clusterState.routingTable().index(shardId.getIndexName());
if (indexRoutingTable != null) {
return indexRoutingTable.shard(shardId.getId());
}
return null;
}

static void realNodeOperation(
Client client,
IndicesService indicesService,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.downsample;

import org.elasticsearch.action.downsample.DownsampleConfig;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.downsample.DownsampleShardTask;
import org.junit.Before;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;

import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.cluster.routing.TestShardRouting.shardRoutingBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;

public class DownsampleShardPersistentTaskExecutorTests extends ESTestCase {

private ClusterState initialClusterState;
private DownsampleShardPersistentTaskExecutor executor;

@Before
public void setup() {
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
Instant start = now.minus(2, ChronoUnit.HOURS);
Instant end = now.plus(40, ChronoUnit.MINUTES);
initialClusterState = DataStreamTestHelper.getClusterStateWithDataStream("metrics-app1", List.of(new Tuple<>(start, end)));
executor = new DownsampleShardPersistentTaskExecutor(mock(Client.class), DownsampleShardTask.TASK_NAME, mock(Executor.class));
}

public void testGetAssignment() {
var backingIndex = initialClusterState.metadata().dataStreams().get("metrics-app1").getWriteIndex();
var node = newNode();
var shardId = new ShardId(backingIndex, 0);
var clusterState = ClusterState.builder(initialClusterState)
.nodes(new DiscoveryNodes.Builder().add(node).build())
.routingTable(
RoutingTable.builder()
.add(
IndexRoutingTable.builder(backingIndex)
.addShard(shardRoutingBuilder(shardId, node.getId(), true, STARTED).withRecoverySource(null).build())
)
)
.build();

var params = new DownsampleShardTaskParams(
new DownsampleConfig(new DateHistogramInterval("1h")),
shardId.getIndexName(),
1,
1,
shardId,
Strings.EMPTY_ARRAY,
Strings.EMPTY_ARRAY,
Strings.EMPTY_ARRAY
);
var result = executor.getAssignment(params, Set.of(node), clusterState);
assertThat(result.getExecutorNode(), equalTo(node.getId()));
}

public void testGetAssignmentMissingIndex() {
var backingIndex = initialClusterState.metadata().dataStreams().get("metrics-app1").getWriteIndex();
var node = newNode();
var shardId = new ShardId(backingIndex, 0);
var clusterState = ClusterState.builder(initialClusterState)
.nodes(new DiscoveryNodes.Builder().add(node).build())
.routingTable(
RoutingTable.builder()
.add(
IndexRoutingTable.builder(backingIndex)
.addShard(shardRoutingBuilder(shardId, node.getId(), true, STARTED).withRecoverySource(null).build())
)
)
.build();

var missingShardId = new ShardId(new Index("another_index", "uid"), 0);
var params = new DownsampleShardTaskParams(
new DownsampleConfig(new DateHistogramInterval("1h")),
missingShardId.getIndexName(),
1,
1,
missingShardId,
Strings.EMPTY_ARRAY,
Strings.EMPTY_ARRAY,
Strings.EMPTY_ARRAY
);
var result = executor.getAssignment(params, Set.of(node), clusterState);
assertThat(result.getExecutorNode(), equalTo(node.getId()));
assertThat(result.getExplanation(), equalTo("a node to fail and stop this persistent task"));
}

private static DiscoveryNode newNode() {
return DiscoveryNodeUtils.create(
"node_" + UUIDs.randomBase64UUID(random()),
buildNewFakeTransportAddress(),
Map.of(),
DiscoveryNodeRole.roles()
);
}

}