Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/137702.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 137702
summary: Handle index deletion while querying in ES|QL
area: ES|QL
type: bug
issues:
- 135863
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
Expand All @@ -20,11 +23,14 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.index.shard.IndexShardTestCase.closeShardNoCheck;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;

public class EsqlRetryIT extends AbstractEsqlIntegTestCase {

Expand Down Expand Up @@ -59,6 +65,37 @@ public void testRetryOnShardFailures() throws Exception {
}
}

public void testQueryWhileDeletingIndices() {
populateIndices();
CountDownLatch waitForDeletion = new CountDownLatch(1);
try {
final AtomicBoolean deleted = new AtomicBoolean();
for (String node : internalCluster().getNodeNames()) {
MockTransportService.getInstance(node)
.addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> {
if (deleted.compareAndSet(false, true)) {
deleteIndexCompletely("log-index-2");
waitForDeletion.countDown();
} else {
assertTrue(waitForDeletion.await(10, TimeUnit.SECONDS));
}
handler.messageReceived(request, channel, task);
});
}
EsqlQueryRequest request = new EsqlQueryRequest();
request.query("FROM log-* | STATS COUNT(timestamp) | LIMIT 1");
request.allowPartialResults(true);
try (var resp = run(request)) {
assertTrue(resp.isPartial());
assertThat(EsqlTestUtils.getValuesList(resp).getFirst().getFirst(), equalTo(4L));
}
} finally {
for (String node : internalCluster().getNodeNames()) {
MockTransportService.getInstance(node).clearAllRules();
}
}
}

private void populateIndices() {
internalCluster().ensureAtLeastNumDataNodes(2);
assertAcked(prepareCreate("log-index-1").setSettings(indexSettings(between(1, 3), 1)).setMapping("timestamp", "type=date"));
Expand Down Expand Up @@ -88,4 +125,36 @@ private void closeOrFailShards(String nodeName) throws Exception {
}
}
}

/**
* Deletes the given index and ensures it is completely removed from the cluster state and from all nodes
*/
private void deleteIndexCompletely(String indexName) throws Exception {
assertAcked(indicesAdmin().prepareDelete(indexName));
String[] nodeNames = internalCluster().getNodeNames();
assertBusy(() -> {
for (String nodeName : nodeNames) {
ClusterState clusterState = internalCluster().getInstance(ClusterService.class, nodeName).state();
for (IndexMetadata imd : clusterState.metadata().indicesAllProjects()) {
assertThat(
"Index [" + indexName + "] still exists in the cluster state on [" + nodeName + "]",
imd.getIndex().getName(),
not(equalTo(indexName))
);
}
}
for (String nodeName : nodeNames) {
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName);
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {
assertThat(
"Index [" + indexName + "] still exists on node [" + nodeName + "]",
indexShard.shardId().getIndexName(),
not(equalTo(indexName))
);
}
}
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.tasks.CancellableTask;
Expand Down Expand Up @@ -66,6 +68,8 @@
*/
abstract class DataNodeRequestSender {

private static final Logger LOGGER = LogManager.getLogger(DataNodeRequestSender.class);

/**
* Query order according to the
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/node-roles-overview.html">node roles</a>.
Expand Down Expand Up @@ -282,38 +286,51 @@ private void sendOneNodeRequest(TargetShards targetShards, ComputeListener compu
final ActionListener<DriverCompletionInfo> listener = computeListener.acquireCompute();
sendRequest(request.node, request.shardIds, request.aliasFilters, new NodeListener() {

void onAfter(DriverCompletionInfo info) {
void onAfterRequest() {
nodePermits.get(request.node).release();
if (concurrentRequests != null) {
concurrentRequests.release();
}
trySendingRequestsForPendingShards(targetShards, computeListener);
listener.onResponse(info);
}

@Override
public void onResponse(DataNodeComputeResponse response) {
// remove failures of successful shards
for (ShardId shardId : request.shardIds()) {
if (response.shardLevelFailures().containsKey(shardId) == false) {
shardFailures.remove(shardId);
try {
// remove failures of successful shards
for (ShardId shardId : request.shardIds()) {
if (response.shardLevelFailures().containsKey(shardId) == false) {
shardFailures.remove(shardId);
}
}
for (var entry : response.shardLevelFailures().entrySet()) {
final ShardId shardId = entry.getKey();
trackShardLevelFailure(shardId, false, entry.getValue());
pendingShardIds.add(shardId);
}
onAfterRequest();
} catch (Exception ex) {
expectNoFailure("expect no failure while handling data node response", ex);
listener.onFailure(ex);
return;
}
for (var entry : response.shardLevelFailures().entrySet()) {
final ShardId shardId = entry.getKey();
trackShardLevelFailure(shardId, false, entry.getValue());
pendingShardIds.add(shardId);
}
onAfter(response.completionInfo());
listener.onResponse(response.completionInfo());
}

@Override
public void onFailure(Exception e, boolean receivedData) {
for (ShardId shardId : request.shardIds) {
trackShardLevelFailure(shardId, receivedData, e);
pendingShardIds.add(shardId);
try {
for (ShardId shardId : request.shardIds) {
trackShardLevelFailure(shardId, receivedData, e);
pendingShardIds.add(shardId);
}
onAfterRequest();
} catch (Exception ex) {
expectNoFailure("expect no failure while handling failure of data node request", ex);
listener.onFailure(ex);
return;
}
onAfter(DriverCompletionInfo.EMPTY);
listener.onResponse(DriverCompletionInfo.EMPTY);
}

@Override
Expand All @@ -325,6 +342,11 @@ public void onSkip() {
onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()));
}
}

private void expectNoFailure(String message, Exception e) {
LOGGER.error(message, e);
assert false : new AssertionError(message, e);
}
});
}

Expand Down Expand Up @@ -515,15 +537,20 @@ Map<ShardId, List<DiscoveryNode>> resolveShards(Set<ShardId> shardIds) {
var project = projectResolver.getProjectState(clusterService.state());
var nodes = Maps.<ShardId, List<DiscoveryNode>>newMapWithExpectedSize(shardIds.size());
for (var shardId : shardIds) {
nodes.put(
shardId,
project.routingTable()
List<DiscoveryNode> allocatedNodes;
try {
allocatedNodes = project.routingTable()
.shardRoutingTable(shardId)
.allShards()
.filter(shard -> shard.active() && shard.isSearchable())
.map(shard -> project.cluster().nodes().get(shard.currentNodeId()))
.toList()
);
.toList();
} catch (Exception ignored) {
// If the target index is deleted or the target shard is not found after the query has started,
// we skip resolving its new shard routing, and that shard will not be retried.
continue;
}
nodes.put(shardId, allocatedNodes);
}
return nodes;
}
Expand Down