From c18578bf5d3f3a2b8ce8aae0a877a479581a7ea6 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 13 Nov 2025 08:01:45 -0800 Subject: [PATCH] Handle index deletion while querying in ES|QL (#137702) If the index is deleted while querying, ES|QL might try to get the index routing from the latest cluster state. This could throw an index-not-found exception, which is not handled properly, causing the user to never receive a response, even though execution has completed. This also results in a dangling task in the task manager. Relates #126653 Closes #135863 (cherry picked from commit 0051df80d66012a9d45b93b06a56ea153b1cbe66) --- docs/changelog/137702.yaml | 6 ++ .../xpack/esql/action/EsqlRetryIT.java | 69 +++++++++++++++++++ .../esql/plugin/DataNodeRequestSender.java | 69 +++++++++++++------ 3 files changed, 123 insertions(+), 21 deletions(-) create mode 100644 docs/changelog/137702.yaml diff --git a/docs/changelog/137702.yaml b/docs/changelog/137702.yaml new file mode 100644 index 0000000000000..3d510962f6f46 --- /dev/null +++ b/docs/changelog/137702.yaml @@ -0,0 +1,6 @@ +pr: 137702 +summary: Handle index deletion while querying in ES|QL +area: ES|QL +type: bug +issues: + - 135863 diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlRetryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlRetryIT.java index 05b2211deecb8..6a5ee700151e4 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlRetryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlRetryIT.java @@ -8,6 +8,9 @@ package org.elasticsearch.xpack.esql.action; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; @@ -20,11 +23,14 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.index.shard.IndexShardTestCase.closeShardNoCheck; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; public class EsqlRetryIT extends AbstractEsqlIntegTestCase { @@ -59,6 +65,37 @@ public void testRetryOnShardFailures() throws Exception { } } + public void testQueryWhileDeletingIndices() { + populateIndices(); + CountDownLatch waitForDeletion = new CountDownLatch(1); + try { + final AtomicBoolean deleted = new AtomicBoolean(); + for (String node : internalCluster().getNodeNames()) { + MockTransportService.getInstance(node) + .addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> { + if (deleted.compareAndSet(false, true)) { + deleteIndexCompletely("log-index-2"); + waitForDeletion.countDown(); + } else { + assertTrue(waitForDeletion.await(10, TimeUnit.SECONDS)); + } + handler.messageReceived(request, channel, task); + }); + } + EsqlQueryRequest request = new EsqlQueryRequest(); + request.query("FROM log-* | STATS COUNT(timestamp) | LIMIT 1"); + request.allowPartialResults(true); + try (var resp = run(request)) { + assertTrue(resp.isPartial()); + assertThat(EsqlTestUtils.getValuesList(resp).getFirst().getFirst(), equalTo(4L)); + } + } finally { + for (String node : internalCluster().getNodeNames()) { + MockTransportService.getInstance(node).clearAllRules(); + } + } + } + private void populateIndices() { internalCluster().ensureAtLeastNumDataNodes(2); assertAcked(prepareCreate("log-index-1").setSettings(indexSettings(between(1, 3), 1)).setMapping("timestamp", "type=date")); @@ -88,4 +125,36 @@ private void closeOrFailShards(String nodeName) throws Exception { } } } + + /** + * Deletes the given index and ensures it is completely removed from the cluster state and from all nodes + */ + private void deleteIndexCompletely(String indexName) throws Exception { + assertAcked(indicesAdmin().prepareDelete(indexName)); + String[] nodeNames = internalCluster().getNodeNames(); + assertBusy(() -> { + for (String nodeName : nodeNames) { + ClusterState clusterState = internalCluster().getInstance(ClusterService.class, nodeName).state(); + for (IndexMetadata imd : clusterState.metadata().indicesAllProjects()) { + assertThat( + "Index [" + indexName + "] still exists in the cluster state on [" + nodeName + "]", + imd.getIndex().getName(), + not(equalTo(indexName)) + ); + } + } + for (String nodeName : nodeNames) { + final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName); + for (IndexService indexService : indicesService) { + for (IndexShard indexShard : indexService) { + assertThat( + "Index [" + indexName + "] still exists on node [" + nodeName + "]", + indexShard.shardId().getIndexName(), + not(equalTo(indexName)) + ); + } + } + } + }); + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java index 11f1a444eb20a..fda8e8bfd293a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java @@ -29,6 +29,8 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.tasks.CancellableTask; @@ -66,6 +68,8 @@ */ abstract class DataNodeRequestSender { + private static final Logger LOGGER = LogManager.getLogger(DataNodeRequestSender.class); + /** * Query order according to the * node roles. @@ -282,38 +286,51 @@ private void sendOneNodeRequest(TargetShards targetShards, ComputeListener compu final ActionListener listener = computeListener.acquireCompute(); sendRequest(request.node, request.shardIds, request.aliasFilters, new NodeListener() { - void onAfter(DriverCompletionInfo info) { + void onAfterRequest() { nodePermits.get(request.node).release(); if (concurrentRequests != null) { concurrentRequests.release(); } trySendingRequestsForPendingShards(targetShards, computeListener); - listener.onResponse(info); } @Override public void onResponse(DataNodeComputeResponse response) { - // remove failures of successful shards - for (ShardId shardId : request.shardIds()) { - if (response.shardLevelFailures().containsKey(shardId) == false) { - shardFailures.remove(shardId); + try { + // remove failures of successful shards + for (ShardId shardId : request.shardIds()) { + if (response.shardLevelFailures().containsKey(shardId) == false) { + shardFailures.remove(shardId); + } } + for (var entry : response.shardLevelFailures().entrySet()) { + final ShardId shardId = entry.getKey(); + trackShardLevelFailure(shardId, false, entry.getValue()); + pendingShardIds.add(shardId); + } + onAfterRequest(); + } catch (Exception ex) { + expectNoFailure("expect no failure while handling data node response", ex); + listener.onFailure(ex); + return; } - for (var entry : response.shardLevelFailures().entrySet()) { - final ShardId shardId = entry.getKey(); - trackShardLevelFailure(shardId, false, entry.getValue()); - pendingShardIds.add(shardId); - } - onAfter(response.completionInfo()); + listener.onResponse(response.completionInfo()); } @Override public void onFailure(Exception e, boolean receivedData) { - for (ShardId shardId : request.shardIds) { - trackShardLevelFailure(shardId, receivedData, e); - pendingShardIds.add(shardId); + try { + for (ShardId shardId : request.shardIds) { + trackShardLevelFailure(shardId, receivedData, e); + pendingShardIds.add(shardId); + } + onAfterRequest(); + } catch (Exception ex) { + expectNoFailure("expect no failure while handling failure of data node request", ex); + listener.onFailure(ex); + return; } - onAfter(DriverCompletionInfo.EMPTY); + listener.onResponse(DriverCompletionInfo.EMPTY); } @Override @@ -325,6 +342,11 @@ public void onSkip() { onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of())); } } + + private void expectNoFailure(String message, Exception e) { + LOGGER.error(message, e); + assert false : new AssertionError(message, e); + } }); } @@ -515,15 +537,20 @@ Map> resolveShards(Set shardIds) { var project = projectResolver.getProjectState(clusterService.state()); var nodes = Maps.>newMapWithExpectedSize(shardIds.size()); for (var shardId : shardIds) { - nodes.put( - shardId, - project.routingTable() + List allocatedNodes; + try { + allocatedNodes = project.routingTable() .shardRoutingTable(shardId) .allShards() .filter(shard -> shard.active() && shard.isSearchable()) .map(shard -> project.cluster().nodes().get(shard.currentNodeId())) - .toList() - ); + .toList(); + } catch (Exception ignored) { + // If the target index is deleted or the target shard is not found after the query has started, + // we skip resolving its new shard routing, and that shard will not be retried. + continue; + } + nodes.put(shardId, allocatedNodes); } return nodes; }