diff --git a/docs/changelog/137702.yaml b/docs/changelog/137702.yaml
new file mode 100644
index 0000000000000..3d510962f6f46
--- /dev/null
+++ b/docs/changelog/137702.yaml
@@ -0,0 +1,6 @@
+pr: 137702
+summary: Handle index deletion while querying in ES|QL
+area: ES|QL
+type: bug
+issues:
+ - 135863
diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlRetryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlRetryIT.java
index 05b2211deecb8..6a5ee700151e4 100644
--- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlRetryIT.java
+++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlRetryIT.java
@@ -8,6 +8,9 @@
package org.elasticsearch.xpack.esql.action;
import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
@@ -20,11 +23,14 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.index.shard.IndexShardTestCase.closeShardNoCheck;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
public class EsqlRetryIT extends AbstractEsqlIntegTestCase {
@@ -59,6 +65,37 @@ public void testRetryOnShardFailures() throws Exception {
}
}
+ public void testQueryWhileDeletingIndices() {
+ populateIndices();
+ CountDownLatch waitForDeletion = new CountDownLatch(1);
+ try {
+ final AtomicBoolean deleted = new AtomicBoolean();
+ for (String node : internalCluster().getNodeNames()) {
+ MockTransportService.getInstance(node)
+ .addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> {
+ if (deleted.compareAndSet(false, true)) {
+ deleteIndexCompletely("log-index-2");
+ waitForDeletion.countDown();
+ } else {
+ assertTrue(waitForDeletion.await(10, TimeUnit.SECONDS));
+ }
+ handler.messageReceived(request, channel, task);
+ });
+ }
+ EsqlQueryRequest request = new EsqlQueryRequest();
+ request.query("FROM log-* | STATS COUNT(timestamp) | LIMIT 1");
+ request.allowPartialResults(true);
+ try (var resp = run(request)) {
+ assertTrue(resp.isPartial());
+ assertThat(EsqlTestUtils.getValuesList(resp).getFirst().getFirst(), equalTo(4L));
+ }
+ } finally {
+ for (String node : internalCluster().getNodeNames()) {
+ MockTransportService.getInstance(node).clearAllRules();
+ }
+ }
+ }
+
private void populateIndices() {
internalCluster().ensureAtLeastNumDataNodes(2);
assertAcked(prepareCreate("log-index-1").setSettings(indexSettings(between(1, 3), 1)).setMapping("timestamp", "type=date"));
@@ -88,4 +125,36 @@ private void closeOrFailShards(String nodeName) throws Exception {
}
}
}
+
+ /**
+ * Deletes the given index and ensures it is completely removed from the cluster state and from all nodes
+ */
+ private void deleteIndexCompletely(String indexName) throws Exception {
+ assertAcked(indicesAdmin().prepareDelete(indexName));
+ String[] nodeNames = internalCluster().getNodeNames();
+ assertBusy(() -> {
+ for (String nodeName : nodeNames) {
+ ClusterState clusterState = internalCluster().getInstance(ClusterService.class, nodeName).state();
+ for (IndexMetadata imd : clusterState.metadata().indicesAllProjects()) {
+ assertThat(
+ "Index [" + indexName + "] still exists in the cluster state on [" + nodeName + "]",
+ imd.getIndex().getName(),
+ not(equalTo(indexName))
+ );
+ }
+ }
+ for (String nodeName : nodeNames) {
+ final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName);
+ for (IndexService indexService : indicesService) {
+ for (IndexShard indexShard : indexService) {
+ assertThat(
+ "Index [" + indexName + "] still exists on node [" + nodeName + "]",
+ indexShard.shardId().getIndexName(),
+ not(equalTo(indexName))
+ );
+ }
+ }
+ }
+ });
+ }
}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java
index 11f1a444eb20a..fda8e8bfd293a 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java
@@ -29,6 +29,8 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.logging.LogManager;
+import org.elasticsearch.logging.Logger;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.tasks.CancellableTask;
@@ -66,6 +68,8 @@
*/
abstract class DataNodeRequestSender {
+ private static final Logger LOGGER = LogManager.getLogger(DataNodeRequestSender.class);
+
/**
* Query order according to the
* node roles.
@@ -282,38 +286,51 @@ private void sendOneNodeRequest(TargetShards targetShards, ComputeListener compu
final ActionListener listener = computeListener.acquireCompute();
sendRequest(request.node, request.shardIds, request.aliasFilters, new NodeListener() {
- void onAfter(DriverCompletionInfo info) {
+ void onAfterRequest() {
nodePermits.get(request.node).release();
if (concurrentRequests != null) {
concurrentRequests.release();
}
trySendingRequestsForPendingShards(targetShards, computeListener);
- listener.onResponse(info);
}
@Override
public void onResponse(DataNodeComputeResponse response) {
- // remove failures of successful shards
- for (ShardId shardId : request.shardIds()) {
- if (response.shardLevelFailures().containsKey(shardId) == false) {
- shardFailures.remove(shardId);
+ try {
+ // remove failures of successful shards
+ for (ShardId shardId : request.shardIds()) {
+ if (response.shardLevelFailures().containsKey(shardId) == false) {
+ shardFailures.remove(shardId);
+ }
}
+ for (var entry : response.shardLevelFailures().entrySet()) {
+ final ShardId shardId = entry.getKey();
+ trackShardLevelFailure(shardId, false, entry.getValue());
+ pendingShardIds.add(shardId);
+ }
+ onAfterRequest();
+ } catch (Exception ex) {
+ expectNoFailure("expect no failure while handling data node response", ex);
+ listener.onFailure(ex);
+ return;
}
- for (var entry : response.shardLevelFailures().entrySet()) {
- final ShardId shardId = entry.getKey();
- trackShardLevelFailure(shardId, false, entry.getValue());
- pendingShardIds.add(shardId);
- }
- onAfter(response.completionInfo());
+ listener.onResponse(response.completionInfo());
}
@Override
public void onFailure(Exception e, boolean receivedData) {
- for (ShardId shardId : request.shardIds) {
- trackShardLevelFailure(shardId, receivedData, e);
- pendingShardIds.add(shardId);
+ try {
+ for (ShardId shardId : request.shardIds) {
+ trackShardLevelFailure(shardId, receivedData, e);
+ pendingShardIds.add(shardId);
+ }
+ onAfterRequest();
+ } catch (Exception ex) {
+ expectNoFailure("expect no failure while handling failure of data node request", ex);
+ listener.onFailure(ex);
+ return;
}
- onAfter(DriverCompletionInfo.EMPTY);
+ listener.onResponse(DriverCompletionInfo.EMPTY);
}
@Override
@@ -325,6 +342,11 @@ public void onSkip() {
onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()));
}
}
+
+ private void expectNoFailure(String message, Exception e) {
+ LOGGER.error(message, e);
+ assert false : new AssertionError(message, e);
+ }
});
}
@@ -515,15 +537,20 @@ Map> resolveShards(Set shardIds) {
var project = projectResolver.getProjectState(clusterService.state());
var nodes = Maps.>newMapWithExpectedSize(shardIds.size());
for (var shardId : shardIds) {
- nodes.put(
- shardId,
- project.routingTable()
+ List allocatedNodes;
+ try {
+ allocatedNodes = project.routingTable()
.shardRoutingTable(shardId)
.allShards()
.filter(shard -> shard.active() && shard.isSearchable())
.map(shard -> project.cluster().nodes().get(shard.currentNodeId()))
- .toList()
- );
+ .toList();
+ } catch (Exception ignored) {
+ // If the target index is deleted or the target shard is not found after the query has started,
+ // we skip resolving its new shard routing, and that shard will not be retried.
+ continue;
+ }
+ nodes.put(shardId, allocatedNodes);
}
return nodes;
}