From e2bd8a84ec89c95712b9e2ac3f27576568a5dac3 Mon Sep 17 00:00:00 2001 From: Dimitris Rempapis Date: Tue, 11 Nov 2025 17:19:21 +0200 Subject: [PATCH 01/14] Add code and test class --- .../timeout/QueryPhaseForcedTimeoutIT.java | 243 ++++++++++++++++++ .../search/query/QueryPhase.java | 66 +++-- 2 files changed, 284 insertions(+), 25 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/timeout/QueryPhaseForcedTimeoutIT.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/timeout/QueryPhaseForcedTimeoutIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/timeout/QueryPhaseForcedTimeoutIT.java new file mode 100644 index 0000000000000..c7ef792c7a756 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/timeout/QueryPhaseForcedTimeoutIT.java @@ -0,0 +1,243 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.search.aggregations.timeout; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.SearchPlugin; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.CardinalityUpperBound; +import org.elasticsearch.search.aggregations.PipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.support.AggregationContext; +import org.elasticsearch.search.internal.ContextIndexSearcher; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; + +/** + * Integration test verifying that a TimeExceededException thrown during collector + * preparation (before the actual search executes) is caught in QueryPhase and + * correctly transformed into a partial response with `timed_out=true`, empty hits, + * and empty aggregations rather than an exception. + */ +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST) +public class QueryPhaseForcedTimeoutIT extends ESIntegTestCase { + + private static final String INDEX = "index"; + + @Override + protected Collection> nodePlugins() { + return List.of(ForceTimeoutAggPlugin.class); + } + + @Override + public Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .build(); + } + + @Before + public void setupIndex() throws Exception { + assertAcked(prepareCreate(INDEX).setMapping(""" + { + "properties": { + "kwd": { "type": "keyword" }, + "txt": { "type": "text" } + } + } + """)); + + for (int i = 0; i < 10; i++) { + IndexRequest ir = new IndexRequest(INDEX) + .source(jsonBuilder() + .startObject() + .field("kwd", "value" + i) + .field("txt", "text " + i) + .endObject()); + client().index(ir).actionGet(); + } + indicesAdmin().prepareRefresh(INDEX).get(); + } + + @After + public void cleanup() { + indicesAdmin().prepareDelete(INDEX).get(); + } + + /** + * Executes a search using the ForceTimeoutAggPlugin aggregation which throws + * TimeExceededException during collector preparation, and asserts that: + * - the response is returned without failure, + * - the `timed_out` flag is true, + * - hits are empty, and + * - aggregations are non-null but empty. + */ + public void testTimeoutDuringCollectorPreparationReturnsTimedOutEmptyResult() { + SearchResponse resp = null; + try { + resp = client().prepareSearch(INDEX) + .setQuery(QueryBuilders.matchAllQuery()) + .setSize(0) + .setAllowPartialSearchResults(true) + .addAggregation(new ForceTimeoutAggregationBuilder("force_timeout")) + .get(); + + + assertThat(resp, notNullValue()); + assertThat("search should be marked timed_out", resp.isTimedOut(), is(true)); + assertThat("no hits returned", resp.getHits().getHits().length, equalTo(0)); + assertThat(resp.getAggregations(), notNullValue()); + assertThat("no aggr returned", resp.getAggregations().asList().isEmpty(), is(true)); + assertThat("no shard failures expected", resp.getShardFailures() == null || resp.getShardFailures().length == 0, is(true)); + } finally { + if (resp != null) { + resp.decRef(); + } + } + } + + /** + * A minimal plugin registering a custom aggregation (ForceTimeoutAggregationBuilder) + * whose factory simulates a timeout during collector setup to test QueryPhase handling. + */ + public static class ForceTimeoutAggPlugin extends Plugin implements SearchPlugin { + public static final String NAME = "force_timeout_plugin"; + + @Override + public List getAggregations() { + return List.of( + new AggregationSpec( + NAME, + ForceTimeoutAggregationBuilder::new, + ForceTimeoutAggregationBuilder::parse + ) + ); + } + } + + /** + * Aggregation builder for the ForceTimeoutAggPlugin aggregation. + * It has no parameters and its factory immediately triggers a timeout exception + * when the search collectors are being prepared. + */ + static class ForceTimeoutAggregationBuilder extends AggregationBuilder { + + public static final String TYPE = ForceTimeoutAggPlugin.NAME; + + protected Map metadata; + + ForceTimeoutAggregationBuilder(String name) { + super(name); + } + + ForceTimeoutAggregationBuilder(StreamInput in) throws IOException { + super(in.readString()); + } + + public static ForceTimeoutAggregationBuilder parse(XContentParser parser, String name) { + return new ForceTimeoutAggregationBuilder(name); + } + + @Override + protected AggregatorFactory build(AggregationContext context, AggregatorFactory parent) throws IOException { + return new ForceTimeoutAggregatorFactory(getName(), context, parent, factoriesBuilder, getMetadata()); + } + + @Override + public AggregationBuilder subAggregation(AggregationBuilder aggregation) { + this.factoriesBuilder.addAggregator(aggregation); + return this; + } + + @Override + public AggregationBuilder subAggregation(PipelineAggregationBuilder aggregation) { + this.factoriesBuilder.addPipelineAggregator(aggregation); + return this; + } + + @Override + public AggregationBuilder subAggregations(AggregatorFactories.Builder subFactories) { + this.factoriesBuilder = subFactories; + return this; + } + + @Override + protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBuilder, Map metadata) { + ForceTimeoutAggregationBuilder copy = new ForceTimeoutAggregationBuilder(getName()); + copy.factoriesBuilder = factoriesBuilder; + copy.setMetadata(metadata); + return copy; + } + + @Override + public AggregationBuilder setMetadata(Map metadata) { + this.metadata = metadata; + return this; + } + + @Override public Map getMetadata() { return metadata == null ? Map.of() : metadata; } + @Override public BucketCardinality bucketCardinality() { return BucketCardinality.ONE; } + @Override public String getWriteableName() { return TYPE; } + @Override public TransportVersion getMinimalSupportedVersion() { return TransportVersion.current(); } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(getName()); } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { return builder; } + @Override public String getType() { return TYPE; } + + + /** + * Factory implementation for ForceTimeoutAggregationBuilder. + * Its createInternal() method throws a TimeExceededException + * before any actual collection occurs, simulating a timeout during setup. + */ + static class ForceTimeoutAggregatorFactory extends AggregatorFactory { + + ForceTimeoutAggregatorFactory( + String name, + AggregationContext context, + AggregatorFactory parent, + AggregatorFactories.Builder subFactoriesBuilder, + Map metadata + ) throws IOException { + super(name, context, parent, subFactoriesBuilder, metadata); + } + + @Override + protected Aggregator createInternal(Aggregator parent, CardinalityUpperBound cardinality, Map metadata) { + if(context.searcher() instanceof ContextIndexSearcher cis) { + cis.throwTimeExceededException(); + } + throw new AssertionError("unreachable"); + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java index 7736c05b89728..f94417c976f2e 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -33,6 +33,7 @@ import org.elasticsearch.search.SearchContextSourcePrinter; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.aggregations.AggregationPhase; +import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.internal.ContextIndexSearcher; import org.elasticsearch.search.internal.ScrollContext; import org.elasticsearch.search.internal.SearchContext; @@ -198,41 +199,56 @@ static void addCollectorsAndSearch(SearchContext searchContext, Long timeRangeFi ); } - CollectorManager collectorManager = QueryPhaseCollectorManager.createQueryPhaseCollectorManager( - postFilterWeight, - searchContext.aggregations() == null ? null : searchContext.aggregations().getAggsCollectorManager(), - searchContext, - hasFilterCollector - ); - final Runnable timeoutRunnable = getTimeoutCheck(searchContext); if (timeoutRunnable != null) { searcher.addQueryCancellation(timeoutRunnable); } - QueryPhaseResult queryPhaseResult = searcher.search(query, collectorManager); - if (searchContext.getProfilers() != null) { - searchContext.getProfilers().getCurrentQueryProfiler().setCollectorResult(queryPhaseResult.collectorResult()); - } - queryResult.topDocs(queryPhaseResult.topDocsAndMaxScore(), queryPhaseResult.sortValueFormats()); - if (searcher.timeExceeded()) { - assert timeoutRunnable != null : "TimeExceededException thrown even though timeout wasn't set"; + try { + CollectorManager collectorManager = QueryPhaseCollectorManager.createQueryPhaseCollectorManager( + postFilterWeight, + searchContext.aggregations() == null ? null : searchContext.aggregations().getAggsCollectorManager(), + searchContext, + hasFilterCollector + ); + + QueryPhaseResult queryPhaseResult = searcher.search(query, collectorManager); + + if (searchContext.getProfilers() != null) { + searchContext.getProfilers().getCurrentQueryProfiler().setCollectorResult(queryPhaseResult.collectorResult()); + } + queryResult.topDocs(queryPhaseResult.topDocsAndMaxScore(), queryPhaseResult.sortValueFormats()); + + if (searcher.timeExceeded()) { + assert timeoutRunnable != null : "TimeExceededException thrown even though timeout wasn't set"; + SearchTimeoutException.handleTimeout( + searchContext.request().allowPartialSearchResults(), + searchContext.shardTarget(), + searchContext.queryResult() + ); + } + if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER) { + queryResult.terminatedEarly(queryPhaseResult.terminatedAfter()); + } + ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH); + assert executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor + || (executor instanceof EsThreadPoolExecutor == false /* in case thread pool is mocked out in tests */) + : "SEARCH threadpool should have an executor that exposes EWMA metrics, but is of type " + executor.getClass(); + if (executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor rExecutor) { + queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize()); + queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA()); + } + } catch (ContextIndexSearcher.TimeExceededException tee) { SearchTimeoutException.handleTimeout( searchContext.request().allowPartialSearchResults(), searchContext.shardTarget(), searchContext.queryResult() ); - } - if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER) { - queryResult.terminatedEarly(queryPhaseResult.terminatedAfter()); - } - ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH); - assert executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor - || (executor instanceof EsThreadPoolExecutor == false /* in case thread pool is mocked out in tests */) - : "SEARCH threadpool should have an executor that exposes EWMA metrics, but is of type " + executor.getClass(); - if (executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor rExecutor) { - queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize()); - queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA()); + queryResult.topDocs(new TopDocsAndMaxScore(Lucene.EMPTY_TOP_DOCS, Float.NaN), new DocValueFormat[0]); + + if (searchContext.aggregations() != null) { + queryResult.aggregations(InternalAggregations.EMPTY); + } } } catch (Exception e) { throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Failed to execute main query", e); From 1421c60a01656812233b36aa74af5e8a9c8cdabe Mon Sep 17 00:00:00 2001 From: Dimitris Rempapis Date: Wed, 12 Nov 2025 12:41:28 +0200 Subject: [PATCH 02/14] update --- .../QueryPhaseForcedTimeoutIT.java | 8 +--- .../search/query/QueryPhase.java | 38 ++++++++++++++----- 2 files changed, 29 insertions(+), 17 deletions(-) rename server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/{timeout => }/QueryPhaseForcedTimeoutIT.java (95%) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/timeout/QueryPhaseForcedTimeoutIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java similarity index 95% rename from server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/timeout/QueryPhaseForcedTimeoutIT.java rename to server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java index c7ef792c7a756..fa8755ed44420 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/timeout/QueryPhaseForcedTimeoutIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java @@ -7,7 +7,7 @@ * License v3.0 only", or the "Server Side Public License, v 1". */ -package org.elasticsearch.search.aggregations.timeout; +package org.elasticsearch.search.aggregations; import org.elasticsearch.TransportVersion; import org.elasticsearch.action.index.IndexRequest; @@ -18,12 +18,6 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.SearchPlugin; -import org.elasticsearch.search.aggregations.AggregationBuilder; -import org.elasticsearch.search.aggregations.Aggregator; -import org.elasticsearch.search.aggregations.AggregatorFactories; -import org.elasticsearch.search.aggregations.AggregatorFactory; -import org.elasticsearch.search.aggregations.CardinalityUpperBound; -import org.elasticsearch.search.aggregations.PipelineAggregationBuilder; import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.internal.ContextIndexSearcher; import org.elasticsearch.test.ESIntegTestCase; diff --git a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java index f94417c976f2e..3f7f58b7df31e 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -214,6 +214,11 @@ static void addCollectorsAndSearch(SearchContext searchContext, Long timeRangeFi QueryPhaseResult queryPhaseResult = searcher.search(query, collectorManager); + if (queryPhaseResult == null) { + finalizeAsTimedOutResult(searchContext); + return; + } + if (searchContext.getProfilers() != null) { searchContext.getProfilers().getCurrentQueryProfiler().setCollectorResult(queryPhaseResult.collectorResult()); } @@ -239,22 +244,35 @@ static void addCollectorsAndSearch(SearchContext searchContext, Long timeRangeFi queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA()); } } catch (ContextIndexSearcher.TimeExceededException tee) { - SearchTimeoutException.handleTimeout( - searchContext.request().allowPartialSearchResults(), - searchContext.shardTarget(), - searchContext.queryResult() - ); - queryResult.topDocs(new TopDocsAndMaxScore(Lucene.EMPTY_TOP_DOCS, Float.NaN), new DocValueFormat[0]); - - if (searchContext.aggregations() != null) { - queryResult.aggregations(InternalAggregations.EMPTY); - } + finalizeAsTimedOutResult(searchContext); + return; } } catch (Exception e) { throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Failed to execute main query", e); } } + /** + * Marks the current search as timed out and finalizes the {@link QuerySearchResult} + * with a well-formed empty response. This ensures that even when a timeout occurs + * (e.g., during collector setup or search execution), the shard still returns a + * valid result object with empty top docs and aggregations instead of throwing. + */ + private static void finalizeAsTimedOutResult(SearchContext searchContext) { + QuerySearchResult queryResult = searchContext.queryResult(); + SearchTimeoutException.handleTimeout( + searchContext.request().allowPartialSearchResults(), + searchContext.shardTarget(), + queryResult + ); + + queryResult.topDocs(new TopDocsAndMaxScore(Lucene.EMPTY_TOP_DOCS, Float.NaN), new DocValueFormat[0]); + + if (searchContext.aggregations() != null) { + queryResult.aggregations(InternalAggregations.EMPTY); + } + } + /** * Returns whether collection within the provided reader can be early-terminated if it sorts * with sortAndFormats. From 19e212139ff263c330a6dcf93291785f03e1d7de Mon Sep 17 00:00:00 2001 From: Dimitris Rempapis Date: Thu, 13 Nov 2025 17:36:10 +0200 Subject: [PATCH 03/14] update --- .../QueryPhaseForcedTimeoutIT.java | 66 ++-- .../search/query/QueryPhase.java | 19 +- .../bucket/BucketsAggregatorTests.java | 2 +- .../search/query/QueryPhaseTimeoutTests.java | 336 +++++++++++++++++- .../aggregations/AggregatorTestCase.java | 2 +- 5 files changed, 383 insertions(+), 42 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java index fa8755ed44420..b5ee3eed046c1 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java @@ -55,9 +55,7 @@ protected Collection> nodePlugins() { @Override public Settings indexSettings() { - return Settings.builder() - .put(super.indexSettings()) - .build(); + return Settings.builder().put(super.indexSettings()).build(); } @Before @@ -72,12 +70,9 @@ public void setupIndex() throws Exception { """)); for (int i = 0; i < 10; i++) { - IndexRequest ir = new IndexRequest(INDEX) - .source(jsonBuilder() - .startObject() - .field("kwd", "value" + i) - .field("txt", "text " + i) - .endObject()); + IndexRequest ir = new IndexRequest(INDEX).source( + jsonBuilder().startObject().field("kwd", "value" + i).field("txt", "text " + i).endObject() + ); client().index(ir).actionGet(); } indicesAdmin().prepareRefresh(INDEX).get(); @@ -99,14 +94,13 @@ public void cleanup() { public void testTimeoutDuringCollectorPreparationReturnsTimedOutEmptyResult() { SearchResponse resp = null; try { - resp = client().prepareSearch(INDEX) + resp = client().prepareSearch(INDEX) .setQuery(QueryBuilders.matchAllQuery()) .setSize(0) .setAllowPartialSearchResults(true) .addAggregation(new ForceTimeoutAggregationBuilder("force_timeout")) .get(); - assertThat(resp, notNullValue()); assertThat("search should be marked timed_out", resp.isTimedOut(), is(true)); assertThat("no hits returned", resp.getHits().getHits().length, equalTo(0)); @@ -129,13 +123,7 @@ public static class ForceTimeoutAggPlugin extends Plugin implements SearchPlugin @Override public List getAggregations() { - return List.of( - new AggregationSpec( - NAME, - ForceTimeoutAggregationBuilder::new, - ForceTimeoutAggregationBuilder::parse - ) - ); + return List.of(new AggregationSpec(NAME, ForceTimeoutAggregationBuilder::new, ForceTimeoutAggregationBuilder::parse)); } } @@ -199,14 +187,40 @@ public AggregationBuilder setMetadata(Map metadata) { return this; } - @Override public Map getMetadata() { return metadata == null ? Map.of() : metadata; } - @Override public BucketCardinality bucketCardinality() { return BucketCardinality.ONE; } - @Override public String getWriteableName() { return TYPE; } - @Override public TransportVersion getMinimalSupportedVersion() { return TransportVersion.current(); } - @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(getName()); } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { return builder; } - @Override public String getType() { return TYPE; } + @Override + public Map getMetadata() { + return metadata == null ? Map.of() : metadata; + } + + @Override + public BucketCardinality bucketCardinality() { + return BucketCardinality.ONE; + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public TransportVersion getMinimalSupportedVersion() { + return TransportVersion.current(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(getName()); + } + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder; + } + + @Override + public String getType() { + return TYPE; + } /** * Factory implementation for ForceTimeoutAggregationBuilder. @@ -227,7 +241,7 @@ static class ForceTimeoutAggregatorFactory extends AggregatorFactory { @Override protected Aggregator createInternal(Aggregator parent, CardinalityUpperBound cardinality, Map metadata) { - if(context.searcher() instanceof ContextIndexSearcher cis) { + if (context.searcher() instanceof ContextIndexSearcher cis) { cis.throwTimeExceededException(); } throw new AssertionError("unreachable"); diff --git a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java index 3f7f58b7df31e..bab8c90c56f87 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -205,12 +205,13 @@ static void addCollectorsAndSearch(SearchContext searchContext, Long timeRangeFi } try { - CollectorManager collectorManager = QueryPhaseCollectorManager.createQueryPhaseCollectorManager( - postFilterWeight, - searchContext.aggregations() == null ? null : searchContext.aggregations().getAggsCollectorManager(), - searchContext, - hasFilterCollector - ); + CollectorManager collectorManager = QueryPhaseCollectorManager + .createQueryPhaseCollectorManager( + postFilterWeight, + searchContext.aggregations() == null ? null : searchContext.aggregations().getAggsCollectorManager(), + searchContext, + hasFilterCollector + ); QueryPhaseResult queryPhaseResult = searcher.search(query, collectorManager); @@ -260,11 +261,7 @@ static void addCollectorsAndSearch(SearchContext searchContext, Long timeRangeFi */ private static void finalizeAsTimedOutResult(SearchContext searchContext) { QuerySearchResult queryResult = searchContext.queryResult(); - SearchTimeoutException.handleTimeout( - searchContext.request().allowPartialSearchResults(), - searchContext.shardTarget(), - queryResult - ); + SearchTimeoutException.handleTimeout(searchContext.request().allowPartialSearchResults(), searchContext.shardTarget(), queryResult); queryResult.topDocs(new TopDocsAndMaxScore(Lucene.EMPTY_TOP_DOCS, Float.NaN), new DocValueFormat[0]); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregatorTests.java index fb4c62ad66f19..145b58aefb1f2 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregatorTests.java @@ -37,7 +37,7 @@ public class BucketsAggregatorTests extends AggregatorTestCase { private List toRelease = new ArrayList<>(); @Override - protected AggregationContext createAggregationContext(IndexReader indexSearcher, Query query, MappedFieldType... fieldTypes) + public AggregationContext createAggregationContext(IndexReader indexSearcher, Query query, MappedFieldType... fieldTypes) throws IOException { AggregationContext context = super.createAggregationContext(indexSearcher, query, fieldTypes); // Generally, we should avoid doing this, but this test doesn't do anything with reduction, so it should be safe here diff --git a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTimeoutTests.java b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTimeoutTests.java index e3eee4dea92f0..24823e666a08b 100644 --- a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTimeoutTests.java +++ b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTimeoutTests.java @@ -22,6 +22,8 @@ import org.apache.lucene.index.PointValues; import org.apache.lucene.index.TermsEnum; import org.apache.lucene.search.BulkScorer; +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.CollectorManager; import org.apache.lucene.search.ConstantScoreScorer; import org.apache.lucene.search.ConstantScoreWeight; import org.apache.lucene.search.DocIdSetIterator; @@ -40,11 +42,13 @@ import org.apache.lucene.tests.util.LuceneTestCase; import org.apache.lucene.util.Bits; import org.apache.lucene.util.CharsRefBuilder; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.index.IndexSettings; @@ -59,6 +63,15 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.CardinalityUpperBound; +import org.elasticsearch.search.aggregations.PipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.SearchContextAggregations; +import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.ContextIndexSearcher; @@ -70,12 +83,14 @@ import org.elasticsearch.search.suggest.SuggestionSearchContext; import org.elasticsearch.test.TestSearchContext; import org.elasticsearch.xcontent.Text; +import org.elasticsearch.xcontent.XContentBuilder; import org.hamcrest.Matchers; import org.junit.AfterClass; import org.junit.BeforeClass; import java.io.IOException; import java.util.Collections; +import java.util.Map; public class QueryPhaseTimeoutTests extends IndexShardTestCase { @@ -430,7 +445,7 @@ private SearchExecutionContext createSearchExecutionContext() { parserConfig(), writableRegistry(), null, - null, + new IndexSearcher(reader), () -> nowInMillis, null, null, @@ -499,7 +514,7 @@ public ShardSearchRequest request() { return context; } - private static final class TestSuggester extends Suggester { + public static final class TestSuggester extends Suggester { private final ContextIndexSearcher contextIndexSearcher; TestSuggester(ContextIndexSearcher contextIndexSearcher) { @@ -523,7 +538,7 @@ protected TestSuggestion emptySuggestion(String name, TestSuggestionContext sugg } } - private static final class TestSuggestionContext extends SuggestionSearchContext.SuggestionContext { + public static final class TestSuggestionContext extends SuggestionSearchContext.SuggestionContext { TestSuggestionContext(Suggester suggester, SearchExecutionContext searchExecutionContext) { super(suggester, searchExecutionContext); } @@ -615,4 +630,319 @@ public final boolean isCacheable(LeafReaderContext ctx) { return false; } } + + /** + * Verifies that when a timeout occurs before search execution and no aggregations + * are requested, QueryPhase returns an empty partial result with timed_out=true + * and no aggregation container. + */ + public void testTimeoutNoAggsReturnsEmptyResult() throws Exception { + ContextIndexSearcher base = newContextSearcher(reader); + ContextIndexSearcher throwing = new ContextIndexSearcher( + base.getIndexReader(), + base.getSimilarity(), + base.getQueryCache(), + base.getQueryCachingPolicy(), + true + ) { + @Override + public T search(Query query, CollectorManager collectorManager) { + this.throwTimeExceededException(); // simulate timeout right as search would begin + throw new AssertionError("unreachable"); + } + }; + + SearchSourceBuilder source = new SearchSourceBuilder().query(new MatchAllQueryBuilder()).size(0); + + try (SearchContext context = createSearchContext(source, throwing, null, null, true)) { + context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); + + QueryPhase.execute(context); + + assertTrue("search should be marked timed_out", context.queryResult().searchTimedOut()); + assertNotNull("topDocs must be present even on timeout", context.queryResult().topDocs()); + assertEquals("no hits returned on timeout", 0, context.queryResult().topDocs().topDocs.scoreDocs.length); + assertNull("no aggs were requested so container should remain null", context.queryResult().aggregations()); + } + } + + /** + * Verifies that when a timeout occurs during aggregation setup, the search response + * is returned as a partial result: marked timed_out=true, with empty hits and an + * empty but non-null aggregation container. + */ + public void testAggTimeoutReturnsEmptyAggsAndHits() throws Exception { + SearchSourceBuilder source = new SearchSourceBuilder().query(new MatchAllQueryBuilder()) + .aggregation(new ForceTimeoutAggregationBuilder("force_timeout")) + .size(0); + + try (SearchContext context = createSearchContext(source, newContextSearcher(reader), null, true)) { + context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); + assertNotNull("aggregations should be present in the context", context.aggregations()); + + QueryPhase.execute(context); + + assertTrue("search should be marked timed_out", context.queryResult().searchTimedOut()); + assertNotNull("topDocs must be present even on timeout", context.queryResult().topDocs()); + assertEquals("no hits returned on timeout", 0, context.queryResult().topDocs().topDocs.scoreDocs.length); + assertNotNull( + "aggregations container must be non-null on timeout when aggs were requested", + context.queryResult().aggregations() + ); + assertTrue("aggregations list should be empty on timeout", context.queryResult().aggregations().expand().asList().isEmpty()); + } + } + + /** + * Simulates the search layer returning null from ContextIndexSearcher.search() + * and verifies that QueryPhase converts it into a valid partial response instead + * of failing — with timed_out=true and empty topDocs/aggregations. + */ + public void testNullSearchResultHandledAsEmptyPartial() throws Exception { + ContextIndexSearcher base = newContextSearcher(reader); + ContextIndexSearcher nullReturning = new ContextIndexSearcher( + base.getIndexReader(), + base.getSimilarity(), + base.getQueryCache(), + base.getQueryCachingPolicy(), + true + ) { + @Override + public T search(Query query, CollectorManager collectorManager) { + return null; // simulate lower layer returning null + } + }; + + SearchSourceBuilder source = new SearchSourceBuilder().query(new MatchAllQueryBuilder()) + .aggregation(new ForceTimeoutAggregationBuilder("noop")) + .size(0); + + try (SearchContext context = createSearchContext(source, nullReturning, null, true)) { + context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); + + QueryPhase.execute(context); + + assertTrue("search should be marked timed_out", context.queryResult().searchTimedOut()); + assertNotNull("topDocs must be present even on timeout", context.queryResult().topDocs()); + assertEquals("no hits returned on timeout", 0, context.queryResult().topDocs().topDocs.scoreDocs.length); + assertNotNull("aggs container must be non-null on timeout when aggs were requested", context.queryResult().aggregations()); + assertTrue("aggregations list should be empty on timeout", context.queryResult().aggregations().expand().asList().isEmpty()); + } + } + + /** + * Verifies that when both suggestions and aggregations are present in the SearchContext, + * a timeout still results in a well-formed partial response — timed_out=true, empty hits, + * and safe handling of the suggest container. + */ + public void testTimeoutWithSuggestsReturnsPartial() throws Exception { + SearchSourceBuilder source = new SearchSourceBuilder().query(new MatchAllQueryBuilder()) + .aggregation(new ForceTimeoutAggregationBuilder("force_timeout")) + .size(0); + + SuggestionSearchContext suggestCtx = new SuggestionSearchContext(); + suggestCtx.addSuggestion( + "suggestion", + new QueryPhaseTimeoutTests.TestSuggestionContext(new QueryPhaseTimeoutTests.TestSuggester(newContextSearcher(reader)), null) + ); + + try (SearchContext context = createSearchContext(source, newContextSearcher(reader), suggestCtx, true)) { + context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); + + QueryPhase.execute(context); + + assertTrue("search should be marked timed_out", context.queryResult().searchTimedOut()); + assertNotNull("topDocs must be present even on timeout", context.queryResult().topDocs()); + assertEquals("no hits returned on timeout", 0, context.queryResult().topDocs().topDocs.scoreDocs.length); + assertNotNull("aggs container must be non-null on timeout when aggs were requested", context.queryResult().aggregations()); + assertTrue("aggregations list should be empty on timeout", context.queryResult().aggregations().expand().asList().isEmpty()); + + if (context.queryResult().suggest() != null) { + assertTrue("suggest container readable", context.queryResult().suggest().size() >= 0); + } + } + } + + /** + * Verifies that when allowPartialSearchResults=false, a timeout is not converted + * to a partial response but instead throws a SearchTimeoutException wrapped in + * a QueryPhaseExecutionException. + */ + public void testTimeoutDisallowPartialsThrowsException() throws Exception { + SearchSourceBuilder source = new SearchSourceBuilder().query(new MatchAllQueryBuilder()) + .aggregation(new ForceTimeoutAggregationBuilder("force_timeout")) + .size(0); + + try (SearchContext context = createSearchContext(source, newContextSearcher(reader), null, false)) { + context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); + + // expect QueryPhase to propagate a failure instead of marking timed_out=true + QueryPhaseExecutionException ex = expectThrows(QueryPhaseExecutionException.class, () -> QueryPhase.execute(context)); + assertNotNull("expected a root cause", ex.getCause()); + assertTrue("expected the cause to be a SearchTimeoutException", ex.getCause() instanceof SearchTimeoutException); + } + } + + private TestSearchContext createSearchContext( + SearchSourceBuilder source, + ContextIndexSearcher cis, + SuggestionSearchContext suggestCtx, + boolean allowPartials + ) throws IOException { + AggregatorFactories.Builder aggsBuilder = AggregatorFactories.builder() + .addAggregator(new ForceTimeoutAggregationBuilder("force_timeout")); + + AggregatorFactories factories; + try (AggregationTestHelper aggHelper = new AggregationTestHelper()) { + aggHelper.init(); + SearchExecutionContext sec = createSearchExecutionContext(); + AggregationContext aggCtx = aggHelper.createAggregationContext(sec.getIndexReader(), new MatchAllDocsQuery()); + factories = aggsBuilder.build(aggCtx, null); + } catch (Exception e) { + throw new IOException(e); + } + + SearchContextAggregations scAggs = new SearchContextAggregations(factories, () -> { + throw new AssertionError("reduce should not be called in this early-timeout test"); + }); + + return createSearchContext(source, cis, scAggs, suggestCtx, allowPartials); + } + + private TestSearchContext createSearchContext( + SearchSourceBuilder source, + ContextIndexSearcher cis, + SearchContextAggregations aggsCtx, + SuggestionSearchContext suggestCtx, + boolean allowPartials + ) throws IOException { + TestSearchContext ctx = new TestSearchContext(createSearchExecutionContext(), indexShard, cis) { + @Override + public SearchContextAggregations aggregations() { + return aggsCtx; + } + + @Override + public SuggestionSearchContext suggest() { + return suggestCtx; + } + + @Override + public ShardSearchRequest request() { + SearchRequest sr = new SearchRequest(); + sr.allowPartialSearchResults(allowPartials); + sr.source(source); + return new ShardSearchRequest( + OriginalIndices.NONE, + sr, + indexShard.shardId(), + 0, // slice id + 1, // total slices + AliasFilter.EMPTY, + 1F, + 0, + null + ); + } + }; + + ctx.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); + return ctx; + } + + /** + * Helper extending {@link AggregatorTestCase} for creating and initializing + * aggregation contexts in tests. Handles plugin setup and resource cleanup. + */ + private static final class AggregationTestHelper extends AggregatorTestCase implements AutoCloseable { + void init() { + super.initPlugins(); + } + + @Override + public void close() { + super.cleanupReleasables(); + } + } + + /** + * Test aggregation builder that simulates a timeout during collector setup + * to verify QueryPhase timeout handling behavior. + */ + private static final class ForceTimeoutAggregationBuilder extends AggregationBuilder { + ForceTimeoutAggregationBuilder(String name) { + super(name); + } + + @Override + public String getType() { + return "force_timeout"; + } + + @Override + public String getWriteableName() { + return "force_timeout"; + } + + @Override + public TransportVersion getMinimalSupportedVersion() { + return TransportVersion.current(); + } + + @Override + public BucketCardinality bucketCardinality() { + return BucketCardinality.ONE; + } + + @Override + protected AggregatorFactory build(AggregationContext ctx, AggregatorFactory parent) throws IOException { + return new AggregatorFactory(getName(), ctx, parent, AggregatorFactories.builder(), getMetadata()) { + @Override + protected Aggregator createInternal(Aggregator parent, CardinalityUpperBound cardinality, Map metadata) { + if (ctx.searcher() instanceof ContextIndexSearcher cis) { + cis.throwTimeExceededException(); + } + throw new AssertionError("unreachable"); + } + }; + } + + @Override + public AggregationBuilder setMetadata(Map metadata) { + return null; + } + + @Override + public Map getMetadata() { + return Map.of(); + } + + @Override + public AggregationBuilder subAggregation(AggregationBuilder aggregation) { + return null; + } + + @Override + public AggregationBuilder subAggregation(PipelineAggregationBuilder aggregation) { + return null; + } + + @Override + public AggregationBuilder subAggregations(AggregatorFactories.Builder subFactories) { + return null; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) { + return null; + } + + @Override + public void writeTo(StreamOutput out) {} + + @Override + protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBuilder, Map metadata) { + return null; + } + } } diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index f199fcaabd29b..f24147666bff3 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -275,7 +275,7 @@ private A createAggregator(AggregatorFactories.Builder bu * Deprecated - this will be made private in a future update */ @Deprecated - protected AggregationContext createAggregationContext(IndexReader indexReader, Query query, MappedFieldType... fieldTypes) + public AggregationContext createAggregationContext(IndexReader indexReader, Query query, MappedFieldType... fieldTypes) throws IOException { return createAggregationContext( indexReader, From 6b15d740c95fe9ecb7c97e1afaa36bab9cb75bb7 Mon Sep 17 00:00:00 2001 From: Dimitris Rempapis Date: Fri, 14 Nov 2025 09:59:44 +0200 Subject: [PATCH 04/14] update code --- .../aggregations/QueryPhaseForcedTimeoutIT.java | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java index b5ee3eed046c1..5e7b37146b58a 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java @@ -53,11 +53,6 @@ protected Collection> nodePlugins() { return List.of(ForceTimeoutAggPlugin.class); } - @Override - public Settings indexSettings() { - return Settings.builder().put(super.indexSettings()).build(); - } - @Before public void setupIndex() throws Exception { assertAcked(prepareCreate(INDEX).setMapping(""" @@ -76,6 +71,7 @@ public void setupIndex() throws Exception { client().index(ir).actionGet(); } indicesAdmin().prepareRefresh(INDEX).get(); + ensureGreen(INDEX); } @After @@ -96,7 +92,7 @@ public void testTimeoutDuringCollectorPreparationReturnsTimedOutEmptyResult() { try { resp = client().prepareSearch(INDEX) .setQuery(QueryBuilders.matchAllQuery()) - .setSize(0) + .setSize(10) .setAllowPartialSearchResults(true) .addAggregation(new ForceTimeoutAggregationBuilder("force_timeout")) .get(); @@ -136,7 +132,7 @@ static class ForceTimeoutAggregationBuilder extends AggregationBuilder { public static final String TYPE = ForceTimeoutAggPlugin.NAME; - protected Map metadata; + private Map metadata; ForceTimeoutAggregationBuilder(String name) { super(name); @@ -146,7 +142,7 @@ static class ForceTimeoutAggregationBuilder extends AggregationBuilder { super(in.readString()); } - public static ForceTimeoutAggregationBuilder parse(XContentParser parser, String name) { + static ForceTimeoutAggregationBuilder parse(XContentParser parser, String name) { return new ForceTimeoutAggregationBuilder(name); } @@ -189,7 +185,7 @@ public AggregationBuilder setMetadata(Map metadata) { @Override public Map getMetadata() { - return metadata == null ? Map.of() : metadata; + return metadata; } @Override From bfc3f02349cab9f20dcf8210299f097868fc9504 Mon Sep 17 00:00:00 2001 From: Dimitris Rempapis Date: Fri, 14 Nov 2025 10:36:47 +0200 Subject: [PATCH 05/14] update --- .../QueryPhaseForcedTimeoutIT.java | 1 - .../search/query/QueryPhaseTimeoutTests.java | 160 ++++++++---------- 2 files changed, 70 insertions(+), 91 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java index 5e7b37146b58a..36ede24aaee29 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java @@ -14,7 +14,6 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.SearchPlugin; diff --git a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTimeoutTests.java b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTimeoutTests.java index 24823e666a08b..c97d2b92dccc2 100644 --- a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTimeoutTests.java +++ b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTimeoutTests.java @@ -86,6 +86,7 @@ import org.elasticsearch.xcontent.XContentBuilder; import org.hamcrest.Matchers; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import java.io.IOException; @@ -514,7 +515,7 @@ public ShardSearchRequest request() { return context; } - public static final class TestSuggester extends Suggester { + private static final class TestSuggester extends Suggester { private final ContextIndexSearcher contextIndexSearcher; TestSuggester(ContextIndexSearcher contextIndexSearcher) { @@ -538,7 +539,7 @@ protected TestSuggestion emptySuggestion(String name, TestSuggestionContext sugg } } - public static final class TestSuggestionContext extends SuggestionSearchContext.SuggestionContext { + private static final class TestSuggestionContext extends SuggestionSearchContext.SuggestionContext { TestSuggestionContext(Suggester suggester, SearchExecutionContext searchExecutionContext) { super(suggester, searchExecutionContext); } @@ -637,6 +638,8 @@ public final boolean isCacheable(LeafReaderContext ctx) { * and no aggregation container. */ public void testTimeoutNoAggsReturnsEmptyResult() throws Exception { + SearchSourceBuilder source = new SearchSourceBuilder().query(new MatchAllQueryBuilder()).size(10); + ContextIndexSearcher base = newContextSearcher(reader); ContextIndexSearcher throwing = new ContextIndexSearcher( base.getIndexReader(), @@ -652,8 +655,6 @@ public T search(Query query, CollectorManager col } }; - SearchSourceBuilder source = new SearchSourceBuilder().query(new MatchAllQueryBuilder()).size(0); - try (SearchContext context = createSearchContext(source, throwing, null, null, true)) { context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); @@ -667,29 +668,33 @@ public T search(Query query, CollectorManager col } /** - * Verifies that when a timeout occurs during aggregation setup, the search response - * is returned as a partial result: marked timed_out=true, with empty hits and an - * empty but non-null aggregation container. + * Verifies that when both suggestions and aggregations are present in the SearchContext, + * and a timeout occurs during aggregation setup, QueryPhase returns a well-formed partial + * result: marked timed_out=true, with empty hits, an empty but non-null aggregation container, + * and a safely accessible suggest container. */ - public void testAggTimeoutReturnsEmptyAggsAndHits() throws Exception { + public void testTimeoutWithAggsAndSuggestsReturnsPartial() throws Exception { SearchSourceBuilder source = new SearchSourceBuilder().query(new MatchAllQueryBuilder()) .aggregation(new ForceTimeoutAggregationBuilder("force_timeout")) - .size(0); + .size(10); - try (SearchContext context = createSearchContext(source, newContextSearcher(reader), null, true)) { + SuggestionSearchContext suggestCtx = new SuggestionSearchContext(); + suggestCtx.addSuggestion( + "suggestion", + new QueryPhaseTimeoutTests.TestSuggestionContext(new QueryPhaseTimeoutTests.TestSuggester(newContextSearcher(reader)), null) + ); + + try (SearchContext context = createSearchContext(source, newContextSearcher(reader), suggestCtx, true)) { context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); - assertNotNull("aggregations should be present in the context", context.aggregations()); QueryPhase.execute(context); assertTrue("search should be marked timed_out", context.queryResult().searchTimedOut()); assertNotNull("topDocs must be present even on timeout", context.queryResult().topDocs()); assertEquals("no hits returned on timeout", 0, context.queryResult().topDocs().topDocs.scoreDocs.length); - assertNotNull( - "aggregations container must be non-null on timeout when aggs were requested", - context.queryResult().aggregations() - ); + assertNotNull("aggs container must be non-null on timeout when aggs were requested", context.queryResult().aggregations()); assertTrue("aggregations list should be empty on timeout", context.queryResult().aggregations().expand().asList().isEmpty()); + context.queryResult().suggest().iterator().forEachRemaining(Assert::assertNotNull); } } @@ -699,6 +704,10 @@ public void testAggTimeoutReturnsEmptyAggsAndHits() throws Exception { * of failing — with timed_out=true and empty topDocs/aggregations. */ public void testNullSearchResultHandledAsEmptyPartial() throws Exception { + SearchSourceBuilder source = new SearchSourceBuilder().query(new MatchAllQueryBuilder()) + .aggregation(new ForceTimeoutAggregationBuilder("force_timeout")) + .size(10); + ContextIndexSearcher base = newContextSearcher(reader); ContextIndexSearcher nullReturning = new ContextIndexSearcher( base.getIndexReader(), @@ -713,10 +722,6 @@ public T search(Query query, CollectorManager col } }; - SearchSourceBuilder source = new SearchSourceBuilder().query(new MatchAllQueryBuilder()) - .aggregation(new ForceTimeoutAggregationBuilder("noop")) - .size(0); - try (SearchContext context = createSearchContext(source, nullReturning, null, true)) { context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); @@ -730,39 +735,6 @@ public T search(Query query, CollectorManager col } } - /** - * Verifies that when both suggestions and aggregations are present in the SearchContext, - * a timeout still results in a well-formed partial response — timed_out=true, empty hits, - * and safe handling of the suggest container. - */ - public void testTimeoutWithSuggestsReturnsPartial() throws Exception { - SearchSourceBuilder source = new SearchSourceBuilder().query(new MatchAllQueryBuilder()) - .aggregation(new ForceTimeoutAggregationBuilder("force_timeout")) - .size(0); - - SuggestionSearchContext suggestCtx = new SuggestionSearchContext(); - suggestCtx.addSuggestion( - "suggestion", - new QueryPhaseTimeoutTests.TestSuggestionContext(new QueryPhaseTimeoutTests.TestSuggester(newContextSearcher(reader)), null) - ); - - try (SearchContext context = createSearchContext(source, newContextSearcher(reader), suggestCtx, true)) { - context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); - - QueryPhase.execute(context); - - assertTrue("search should be marked timed_out", context.queryResult().searchTimedOut()); - assertNotNull("topDocs must be present even on timeout", context.queryResult().topDocs()); - assertEquals("no hits returned on timeout", 0, context.queryResult().topDocs().topDocs.scoreDocs.length); - assertNotNull("aggs container must be non-null on timeout when aggs were requested", context.queryResult().aggregations()); - assertTrue("aggregations list should be empty on timeout", context.queryResult().aggregations().expand().asList().isEmpty()); - - if (context.queryResult().suggest() != null) { - assertTrue("suggest container readable", context.queryResult().suggest().size() >= 0); - } - } - } - /** * Verifies that when allowPartialSearchResults=false, a timeout is not converted * to a partial response but instead throws a SearchTimeoutException wrapped in @@ -771,7 +743,7 @@ public void testTimeoutWithSuggestsReturnsPartial() throws Exception { public void testTimeoutDisallowPartialsThrowsException() throws Exception { SearchSourceBuilder source = new SearchSourceBuilder().query(new MatchAllQueryBuilder()) .aggregation(new ForceTimeoutAggregationBuilder("force_timeout")) - .size(0); + .size(10); try (SearchContext context = createSearchContext(source, newContextSearcher(reader), null, false)) { context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); @@ -789,15 +761,12 @@ private TestSearchContext createSearchContext( SuggestionSearchContext suggestCtx, boolean allowPartials ) throws IOException { - AggregatorFactories.Builder aggsBuilder = AggregatorFactories.builder() - .addAggregator(new ForceTimeoutAggregationBuilder("force_timeout")); - AggregatorFactories factories; try (AggregationTestHelper aggHelper = new AggregationTestHelper()) { - aggHelper.init(); + aggHelper.initPlugins(); SearchExecutionContext sec = createSearchExecutionContext(); AggregationContext aggCtx = aggHelper.createAggregationContext(sec.getIndexReader(), new MatchAllDocsQuery()); - factories = aggsBuilder.build(aggCtx, null); + factories = source.aggregations().build(aggCtx, null); } catch (Exception e) { throw new IOException(e); } @@ -855,7 +824,7 @@ public ShardSearchRequest request() { * aggregation contexts in tests. Handles plugin setup and resource cleanup. */ private static final class AggregationTestHelper extends AggregatorTestCase implements AutoCloseable { - void init() { + void intiPlugins() { super.initPlugins(); } @@ -870,79 +839,90 @@ public void close() { * to verify QueryPhase timeout handling behavior. */ private static final class ForceTimeoutAggregationBuilder extends AggregationBuilder { + private Map metadata; + ForceTimeoutAggregationBuilder(String name) { super(name); } @Override - public String getType() { - return "force_timeout"; + protected AggregatorFactory build(AggregationContext ctx, AggregatorFactory parent) throws IOException { + return new AggregatorFactory(getName(), ctx, parent, AggregatorFactories.builder(), getMetadata()) { + @Override + protected Aggregator createInternal(Aggregator parent, CardinalityUpperBound cardinality, Map metadata) { + if (ctx.searcher() instanceof ContextIndexSearcher cis) { + cis.throwTimeExceededException(); + } + throw new AssertionError("unreachable"); + } + }; } @Override - public String getWriteableName() { - return "force_timeout"; + public AggregationBuilder subAggregation(AggregationBuilder aggregation) { + this.factoriesBuilder.addAggregator(aggregation); + return this; } @Override - public TransportVersion getMinimalSupportedVersion() { - return TransportVersion.current(); + public AggregationBuilder subAggregation(PipelineAggregationBuilder aggregation) { + this.factoriesBuilder.addPipelineAggregator(aggregation); + return this; } @Override - public BucketCardinality bucketCardinality() { - return BucketCardinality.ONE; + public AggregationBuilder subAggregations(AggregatorFactories.Builder subFactories) { + this.factoriesBuilder = subFactories; + return this; } @Override - protected AggregatorFactory build(AggregationContext ctx, AggregatorFactory parent) throws IOException { - return new AggregatorFactory(getName(), ctx, parent, AggregatorFactories.builder(), getMetadata()) { - @Override - protected Aggregator createInternal(Aggregator parent, CardinalityUpperBound cardinality, Map metadata) { - if (ctx.searcher() instanceof ContextIndexSearcher cis) { - cis.throwTimeExceededException(); - } - throw new AssertionError("unreachable"); - } - }; + protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBuilder, Map metadata) { + ForceTimeoutAggregationBuilder copy = new ForceTimeoutAggregationBuilder(getName()); + copy.factoriesBuilder = factoriesBuilder; + copy.setMetadata(metadata); + return copy; } @Override public AggregationBuilder setMetadata(Map metadata) { - return null; + this.metadata = metadata; + return this; } @Override public Map getMetadata() { - return Map.of(); + return metadata; } @Override - public AggregationBuilder subAggregation(AggregationBuilder aggregation) { - return null; + public BucketCardinality bucketCardinality() { + return BucketCardinality.ONE; } @Override - public AggregationBuilder subAggregation(PipelineAggregationBuilder aggregation) { - return null; + public String getWriteableName() { + return "force-timeout`"; } @Override - public AggregationBuilder subAggregations(AggregatorFactories.Builder subFactories) { - return null; + public TransportVersion getMinimalSupportedVersion() { + return TransportVersion.current(); } @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) { - return null; + public void writeTo(StreamOutput out) throws IOException { + out.writeString(getName()); } @Override - public void writeTo(StreamOutput out) {} + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder; + } @Override - protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBuilder, Map metadata) { - return null; + public String getType() { + return "force-timeout`"; } } } From cb0e1b2635a245852d9185348a82f789fe554295 Mon Sep 17 00:00:00 2001 From: Dimitris Rempapis Date: Fri, 14 Nov 2025 11:12:51 +0200 Subject: [PATCH 06/14] Update docs/changelog/138084.yaml --- docs/changelog/138084.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/138084.yaml diff --git a/docs/changelog/138084.yaml b/docs/changelog/138084.yaml new file mode 100644 index 0000000000000..f3209c0ee1f75 --- /dev/null +++ b/docs/changelog/138084.yaml @@ -0,0 +1,5 @@ +pr: 138084 +summary: Handle Query Timeouts During Collector Initialization in `QueryPhase` +area: Search +type: bug +issues: [] From 3d81fce478c95006dfe19a39476dc0bbddaa94d7 Mon Sep 17 00:00:00 2001 From: Dimitris Rempapis Date: Fri, 14 Nov 2025 11:20:39 +0200 Subject: [PATCH 07/14] update after review --- .../bucket/BucketsAggregatorTests.java | 2 +- .../search/query/QueryPhaseTimeoutTests.java | 17 ++++++++++++++++- .../search/aggregations/AggregatorTestCase.java | 2 +- 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregatorTests.java index 145b58aefb1f2..fb4c62ad66f19 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregatorTests.java @@ -37,7 +37,7 @@ public class BucketsAggregatorTests extends AggregatorTestCase { private List toRelease = new ArrayList<>(); @Override - public AggregationContext createAggregationContext(IndexReader indexSearcher, Query query, MappedFieldType... fieldTypes) + protected AggregationContext createAggregationContext(IndexReader indexSearcher, Query query, MappedFieldType... fieldTypes) throws IOException { AggregationContext context = super.createAggregationContext(indexSearcher, query, fieldTypes); // Generally, we should avoid doing this, but this test doesn't do anything with reduction, so it should be safe here diff --git a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTimeoutTests.java b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTimeoutTests.java index c97d2b92dccc2..a8fa3bdbde5c0 100644 --- a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTimeoutTests.java +++ b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTimeoutTests.java @@ -93,6 +93,8 @@ import java.util.Collections; import java.util.Map; +import static org.elasticsearch.test.InternalAggregationTestCase.DEFAULT_MAX_BUCKETS; + public class QueryPhaseTimeoutTests extends IndexShardTestCase { private static Directory dir; @@ -765,7 +767,7 @@ private TestSearchContext createSearchContext( try (AggregationTestHelper aggHelper = new AggregationTestHelper()) { aggHelper.initPlugins(); SearchExecutionContext sec = createSearchExecutionContext(); - AggregationContext aggCtx = aggHelper.createAggregationContext(sec.getIndexReader(), new MatchAllDocsQuery()); + AggregationContext aggCtx = aggHelper.createDefaultAggregationContext(sec.getIndexReader(), new MatchAllDocsQuery()); factories = source.aggregations().build(aggCtx, null); } catch (Exception e) { throw new IOException(e); @@ -832,6 +834,19 @@ void intiPlugins() { public void close() { super.cleanupReleasables(); } + + AggregationContext createDefaultAggregationContext(IndexReader reader, Query query) throws IOException { + return createAggregationContext( + reader, + createIndexSettings(), + query, + new NoneCircuitBreakerService(), + AggregationBuilder.DEFAULT_PREALLOCATION * 5, + DEFAULT_MAX_BUCKETS, + false, + false + ); + } } /** diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index f24147666bff3..f199fcaabd29b 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -275,7 +275,7 @@ private A createAggregator(AggregatorFactories.Builder bu * Deprecated - this will be made private in a future update */ @Deprecated - public AggregationContext createAggregationContext(IndexReader indexReader, Query query, MappedFieldType... fieldTypes) + protected AggregationContext createAggregationContext(IndexReader indexReader, Query query, MappedFieldType... fieldTypes) throws IOException { return createAggregationContext( indexReader, From 83379a5514d532d05ff96ad3d986e68686debad8 Mon Sep 17 00:00:00 2001 From: Dimitris Rempapis Date: Thu, 20 Nov 2025 17:36:04 +0200 Subject: [PATCH 08/14] update after review - remove redundant code --- .../src/main/java/org/elasticsearch/search/query/QueryPhase.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java index bab8c90c56f87..1688f0080814b 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -246,7 +246,6 @@ static void addCollectorsAndSearch(SearchContext searchContext, Long timeRangeFi } } catch (ContextIndexSearcher.TimeExceededException tee) { finalizeAsTimedOutResult(searchContext); - return; } } catch (Exception e) { throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Failed to execute main query", e); From a98b85522eccb33854643473523e0d1ef50d73d5 Mon Sep 17 00:00:00 2001 From: Dimitris Rempapis Date: Thu, 20 Nov 2025 19:40:05 +0200 Subject: [PATCH 09/14] update after review --- .../QueryPhaseForcedTimeoutIT.java | 45 +++----------- .../search/query/QueryPhaseTimeoutTests.java | 62 ++++++------------- 2 files changed, 28 insertions(+), 79 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java index 36ede24aaee29..a0b6b7e226d21 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java @@ -127,7 +127,7 @@ public List getAggregations() { * It has no parameters and its factory immediately triggers a timeout exception * when the search collectors are being prepared. */ - static class ForceTimeoutAggregationBuilder extends AggregationBuilder { + static class ForceTimeoutAggregationBuilder extends AbstractAggregationBuilder { public static final String TYPE = ForceTimeoutAggPlugin.NAME; @@ -138,7 +138,7 @@ static class ForceTimeoutAggregationBuilder extends AggregationBuilder { } ForceTimeoutAggregationBuilder(StreamInput in) throws IOException { - super(in.readString()); + super(in); } static ForceTimeoutAggregationBuilder parse(XContentParser parser, String name) { @@ -146,28 +146,12 @@ static ForceTimeoutAggregationBuilder parse(XContentParser parser, String name) } @Override - protected AggregatorFactory build(AggregationContext context, AggregatorFactory parent) throws IOException { + protected AggregatorFactory doBuild(AggregationContext context, + AggregatorFactory parent, + AggregatorFactories.Builder subfactoriesBuilder) throws IOException { return new ForceTimeoutAggregatorFactory(getName(), context, parent, factoriesBuilder, getMetadata()); } - @Override - public AggregationBuilder subAggregation(AggregationBuilder aggregation) { - this.factoriesBuilder.addAggregator(aggregation); - return this; - } - - @Override - public AggregationBuilder subAggregation(PipelineAggregationBuilder aggregation) { - this.factoriesBuilder.addPipelineAggregator(aggregation); - return this; - } - - @Override - public AggregationBuilder subAggregations(AggregatorFactories.Builder subFactories) { - this.factoriesBuilder = subFactories; - return this; - } - @Override protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBuilder, Map metadata) { ForceTimeoutAggregationBuilder copy = new ForceTimeoutAggregationBuilder(getName()); @@ -176,12 +160,6 @@ protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBu return copy; } - @Override - public AggregationBuilder setMetadata(Map metadata) { - this.metadata = metadata; - return this; - } - @Override public Map getMetadata() { return metadata; @@ -192,23 +170,18 @@ public BucketCardinality bucketCardinality() { return BucketCardinality.ONE; } - @Override - public String getWriteableName() { - return TYPE; - } - @Override public TransportVersion getMinimalSupportedVersion() { - return TransportVersion.current(); + return TransportVersion.zero(); } @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeString(getName()); + protected void doWriteTo(StreamOutput out) { + // Empty } @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + protected XContentBuilder internalXContent(XContentBuilder builder, Params params) { return builder; } diff --git a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTimeoutTests.java b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTimeoutTests.java index a8fa3bdbde5c0..471b75e379f55 100644 --- a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTimeoutTests.java +++ b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTimeoutTests.java @@ -63,14 +63,16 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.search.aggregations.AbstractAggregationBuilder; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.AggregatorFactory; import org.elasticsearch.search.aggregations.AggregatorTestCase; import org.elasticsearch.search.aggregations.CardinalityUpperBound; -import org.elasticsearch.search.aggregations.PipelineAggregationBuilder; import org.elasticsearch.search.aggregations.SearchContextAggregations; +import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.AliasFilter; @@ -853,15 +855,18 @@ AggregationContext createDefaultAggregationContext(IndexReader reader, Query que * Test aggregation builder that simulates a timeout during collector setup * to verify QueryPhase timeout handling behavior. */ - private static final class ForceTimeoutAggregationBuilder extends AggregationBuilder { - private Map metadata; + private static final class ForceTimeoutAggregationBuilder extends AbstractAggregationBuilder { ForceTimeoutAggregationBuilder(String name) { super(name); } @Override - protected AggregatorFactory build(AggregationContext ctx, AggregatorFactory parent) throws IOException { + protected AggregatorFactory doBuild( + AggregationContext ctx, + AggregatorFactory parent, + AggregatorFactories.Builder subfactoriesBuilder + ) throws IOException { return new AggregatorFactory(getName(), ctx, parent, AggregatorFactories.builder(), getMetadata()) { @Override protected Aggregator createInternal(Aggregator parent, CardinalityUpperBound cardinality, Map metadata) { @@ -873,24 +878,6 @@ protected Aggregator createInternal(Aggregator parent, CardinalityUpperBound car }; } - @Override - public AggregationBuilder subAggregation(AggregationBuilder aggregation) { - this.factoriesBuilder.addAggregator(aggregation); - return this; - } - - @Override - public AggregationBuilder subAggregation(PipelineAggregationBuilder aggregation) { - this.factoriesBuilder.addPipelineAggregator(aggregation); - return this; - } - - @Override - public AggregationBuilder subAggregations(AggregatorFactories.Builder subFactories) { - this.factoriesBuilder = subFactories; - return this; - } - @Override protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBuilder, Map metadata) { ForceTimeoutAggregationBuilder copy = new ForceTimeoutAggregationBuilder(getName()); @@ -899,45 +886,34 @@ protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBu return copy; } - @Override - public AggregationBuilder setMetadata(Map metadata) { - this.metadata = metadata; - return this; - } - - @Override - public Map getMetadata() { - return metadata; - } - @Override public BucketCardinality bucketCardinality() { return BucketCardinality.ONE; } @Override - public String getWriteableName() { - return "force-timeout`"; + protected void doWriteTo(StreamOutput out) throws IOException { + out.writeString(getName()); } @Override - public TransportVersion getMinimalSupportedVersion() { - return TransportVersion.current(); + protected XContentBuilder internalXContent(XContentBuilder builder, Params params) { + return builder; } @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeString(getName()); + public String getType() { + return GlobalAggregationBuilder.NAME; } @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - return builder; + public TransportVersion getMinimalSupportedVersion() { + return TransportVersion.zero(); } @Override - public String getType() { - return "force-timeout`"; + public boolean supportsVersion(TransportVersion version) { + return super.supportsVersion(version); } } } From 4bc1f415225bb07746573e261fdc796606b5c32b Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Thu, 20 Nov 2025 17:48:06 +0000 Subject: [PATCH 10/14] [CI] Auto commit changes from spotless --- .../search/aggregations/QueryPhaseForcedTimeoutIT.java | 8 +++++--- .../search/query/QueryPhaseTimeoutTests.java | 1 - 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java index a0b6b7e226d21..458b8a764ebca 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java @@ -146,9 +146,11 @@ static ForceTimeoutAggregationBuilder parse(XContentParser parser, String name) } @Override - protected AggregatorFactory doBuild(AggregationContext context, - AggregatorFactory parent, - AggregatorFactories.Builder subfactoriesBuilder) throws IOException { + protected AggregatorFactory doBuild( + AggregationContext context, + AggregatorFactory parent, + AggregatorFactories.Builder subfactoriesBuilder + ) throws IOException { return new ForceTimeoutAggregatorFactory(getName(), context, parent, factoriesBuilder, getMetadata()); } diff --git a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTimeoutTests.java b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTimeoutTests.java index 471b75e379f55..021609886e7c4 100644 --- a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTimeoutTests.java +++ b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTimeoutTests.java @@ -72,7 +72,6 @@ import org.elasticsearch.search.aggregations.CardinalityUpperBound; import org.elasticsearch.search.aggregations.SearchContextAggregations; import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder; -import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.AliasFilter; From cabadeb3294b0ad5411a14fe05004d0e4285c5b4 Mon Sep 17 00:00:00 2001 From: Dimitris Rempapis Date: Thu, 20 Nov 2025 19:58:09 +0200 Subject: [PATCH 11/14] update after review --- .../QueryPhaseForcedTimeoutIT.java | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java index a0b6b7e226d21..5cd80ca88cebb 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java @@ -11,6 +11,7 @@ import org.elasticsearch.TransportVersion; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -19,6 +20,8 @@ import org.elasticsearch.plugins.SearchPlugin; import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.internal.ContextIndexSearcher; +import org.elasticsearch.search.query.QueryPhaseExecutionException; +import org.elasticsearch.search.query.SearchTimeoutException; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentParser; @@ -33,6 +36,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -109,6 +113,29 @@ public void testTimeoutDuringCollectorPreparationReturnsTimedOutEmptyResult() { } } + /** + * In this test we explicitly set allow_partial_search_results=false. Under this + * setting, any shard-level failure in the query phase (including a timeout) is treated as + * a hard failure for the whole search. The coordinating node does not return a response + * with timed_out=true, instead it fails the phase and throws a + * {@link SearchPhaseExecutionException} whose cause is the underlying + * {@link SearchTimeoutException}. This test asserts that behavior. + */ + public void testTimeoutDuringCollectorPreparationDisallowPartialsThrowsException() { + SearchPhaseExecutionException ex = expectThrows( + SearchPhaseExecutionException.class, + () -> client().prepareSearch(INDEX) + .setQuery(QueryBuilders.matchAllQuery()) + .setSize(10) + .setAllowPartialSearchResults(false) + .addAggregation(new ForceTimeoutAggregationBuilder("force_timeout")) + .get() + ); + + assertNotNull("expected a cause on SearchPhaseExecutionException", ex.getCause()); + assertThat("expected inner cause to be SearchTimeoutException", ex.getCause(), instanceOf(SearchTimeoutException.class)); + } + /** * A minimal plugin registering a custom aggregation (ForceTimeoutAggregationBuilder) * whose factory simulates a timeout during collector setup to test QueryPhase handling. From 6cf501cd98bc944abb8a194ea7fa20b0a61a38ed Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Thu, 20 Nov 2025 18:12:52 +0000 Subject: [PATCH 12/14] [CI] Auto commit changes from spotless --- .../search/aggregations/QueryPhaseForcedTimeoutIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java index e63edc5c6d155..d08bdd265dcc6 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java @@ -20,7 +20,6 @@ import org.elasticsearch.plugins.SearchPlugin; import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.internal.ContextIndexSearcher; -import org.elasticsearch.search.query.QueryPhaseExecutionException; import org.elasticsearch.search.query.SearchTimeoutException; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.xcontent.XContentBuilder; From ba108801bb50f94379a861e145bf049080537702 Mon Sep 17 00:00:00 2001 From: Dimitris Rempapis Date: Thu, 20 Nov 2025 20:16:32 +0200 Subject: [PATCH 13/14] update after review --- .../QueryPhaseForcedTimeoutIT.java | 1 - .../search/query/QueryPhase.java | 7 +--- .../search/query/QueryPhaseTimeoutTests.java | 37 ------------------- 3 files changed, 1 insertion(+), 44 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java index e63edc5c6d155..d08bdd265dcc6 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java @@ -20,7 +20,6 @@ import org.elasticsearch.plugins.SearchPlugin; import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.internal.ContextIndexSearcher; -import org.elasticsearch.search.query.QueryPhaseExecutionException; import org.elasticsearch.search.query.SearchTimeoutException; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.xcontent.XContentBuilder; diff --git a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java index 1688f0080814b..b21630898837c 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -214,12 +214,7 @@ static void addCollectorsAndSearch(SearchContext searchContext, Long timeRangeFi ); QueryPhaseResult queryPhaseResult = searcher.search(query, collectorManager); - - if (queryPhaseResult == null) { - finalizeAsTimedOutResult(searchContext); - return; - } - + if (searchContext.getProfilers() != null) { searchContext.getProfilers().getCurrentQueryProfiler().setCollectorResult(queryPhaseResult.collectorResult()); } diff --git a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTimeoutTests.java b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTimeoutTests.java index 021609886e7c4..91cfbf8a6eaeb 100644 --- a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTimeoutTests.java +++ b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTimeoutTests.java @@ -701,43 +701,6 @@ public void testTimeoutWithAggsAndSuggestsReturnsPartial() throws Exception { } } - /** - * Simulates the search layer returning null from ContextIndexSearcher.search() - * and verifies that QueryPhase converts it into a valid partial response instead - * of failing — with timed_out=true and empty topDocs/aggregations. - */ - public void testNullSearchResultHandledAsEmptyPartial() throws Exception { - SearchSourceBuilder source = new SearchSourceBuilder().query(new MatchAllQueryBuilder()) - .aggregation(new ForceTimeoutAggregationBuilder("force_timeout")) - .size(10); - - ContextIndexSearcher base = newContextSearcher(reader); - ContextIndexSearcher nullReturning = new ContextIndexSearcher( - base.getIndexReader(), - base.getSimilarity(), - base.getQueryCache(), - base.getQueryCachingPolicy(), - true - ) { - @Override - public T search(Query query, CollectorManager collectorManager) { - return null; // simulate lower layer returning null - } - }; - - try (SearchContext context = createSearchContext(source, nullReturning, null, true)) { - context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); - - QueryPhase.execute(context); - - assertTrue("search should be marked timed_out", context.queryResult().searchTimedOut()); - assertNotNull("topDocs must be present even on timeout", context.queryResult().topDocs()); - assertEquals("no hits returned on timeout", 0, context.queryResult().topDocs().topDocs.scoreDocs.length); - assertNotNull("aggs container must be non-null on timeout when aggs were requested", context.queryResult().aggregations()); - assertTrue("aggregations list should be empty on timeout", context.queryResult().aggregations().expand().asList().isEmpty()); - } - } - /** * Verifies that when allowPartialSearchResults=false, a timeout is not converted * to a partial response but instead throws a SearchTimeoutException wrapped in From 71ca632a5318dd2160b95fb084a1ccd11be3510e Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Thu, 20 Nov 2025 18:25:20 +0000 Subject: [PATCH 14/14] [CI] Auto commit changes from spotless --- .../main/java/org/elasticsearch/search/query/QueryPhase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java index b21630898837c..47cb93ceb3f3f 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -214,7 +214,7 @@ static void addCollectorsAndSearch(SearchContext searchContext, Long timeRangeFi ); QueryPhaseResult queryPhaseResult = searcher.search(query, collectorManager); - + if (searchContext.getProfilers() != null) { searchContext.getProfilers().getCurrentQueryProfiler().setCollectorResult(queryPhaseResult.collectorResult()); }