diff --git a/docs/changelog/138084.yaml b/docs/changelog/138084.yaml new file mode 100644 index 0000000000000..f3209c0ee1f75 --- /dev/null +++ b/docs/changelog/138084.yaml @@ -0,0 +1,5 @@ +pr: 138084 +summary: Handle Query Timeouts During Collector Initialization in `QueryPhase` +area: Search +type: bug +issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java new file mode 100644 index 0000000000000..d08bdd265dcc6 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/QueryPhaseForcedTimeoutIT.java @@ -0,0 +1,247 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.search.aggregations; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.SearchPlugin; +import org.elasticsearch.search.aggregations.support.AggregationContext; +import org.elasticsearch.search.internal.ContextIndexSearcher; +import org.elasticsearch.search.query.SearchTimeoutException; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; + +/** + * Integration test verifying that a TimeExceededException thrown during collector + * preparation (before the actual search executes) is caught in QueryPhase and + * correctly transformed into a partial response with `timed_out=true`, empty hits, + * and empty aggregations rather than an exception. + */ +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST) +public class QueryPhaseForcedTimeoutIT extends ESIntegTestCase { + + private static final String INDEX = "index"; + + @Override + protected Collection> nodePlugins() { + return List.of(ForceTimeoutAggPlugin.class); + } + + @Before + public void setupIndex() throws Exception { + assertAcked(prepareCreate(INDEX).setMapping(""" + { + "properties": { + "kwd": { "type": "keyword" }, + "txt": { "type": "text" } + } + } + """)); + + for (int i = 0; i < 10; i++) { + IndexRequest ir = new IndexRequest(INDEX).source( + jsonBuilder().startObject().field("kwd", "value" + i).field("txt", "text " + i).endObject() + ); + client().index(ir).actionGet(); + } + indicesAdmin().prepareRefresh(INDEX).get(); + ensureGreen(INDEX); + } + + @After + public void cleanup() { + indicesAdmin().prepareDelete(INDEX).get(); + } + + /** + * Executes a search using the ForceTimeoutAggPlugin aggregation which throws + * TimeExceededException during collector preparation, and asserts that: + * - the response is returned without failure, + * - the `timed_out` flag is true, + * - hits are empty, and + * - aggregations are non-null but empty. + */ + public void testTimeoutDuringCollectorPreparationReturnsTimedOutEmptyResult() { + SearchResponse resp = null; + try { + resp = client().prepareSearch(INDEX) + .setQuery(QueryBuilders.matchAllQuery()) + .setSize(10) + .setAllowPartialSearchResults(true) + .addAggregation(new ForceTimeoutAggregationBuilder("force_timeout")) + .get(); + + assertThat(resp, notNullValue()); + assertThat("search should be marked timed_out", resp.isTimedOut(), is(true)); + assertThat("no hits returned", resp.getHits().getHits().length, equalTo(0)); + assertThat(resp.getAggregations(), notNullValue()); + assertThat("no aggr returned", resp.getAggregations().asList().isEmpty(), is(true)); + assertThat("no shard failures expected", resp.getShardFailures() == null || resp.getShardFailures().length == 0, is(true)); + } finally { + if (resp != null) { + resp.decRef(); + } + } + } + + /** + * In this test we explicitly set allow_partial_search_results=false. Under this + * setting, any shard-level failure in the query phase (including a timeout) is treated as + * a hard failure for the whole search. The coordinating node does not return a response + * with timed_out=true, instead it fails the phase and throws a + * {@link SearchPhaseExecutionException} whose cause is the underlying + * {@link SearchTimeoutException}. This test asserts that behavior. + */ + public void testTimeoutDuringCollectorPreparationDisallowPartialsThrowsException() { + SearchPhaseExecutionException ex = expectThrows( + SearchPhaseExecutionException.class, + () -> client().prepareSearch(INDEX) + .setQuery(QueryBuilders.matchAllQuery()) + .setSize(10) + .setAllowPartialSearchResults(false) + .addAggregation(new ForceTimeoutAggregationBuilder("force_timeout")) + .get() + ); + + assertNotNull("expected a cause on SearchPhaseExecutionException", ex.getCause()); + assertThat("expected inner cause to be SearchTimeoutException", ex.getCause(), instanceOf(SearchTimeoutException.class)); + } + + /** + * A minimal plugin registering a custom aggregation (ForceTimeoutAggregationBuilder) + * whose factory simulates a timeout during collector setup to test QueryPhase handling. + */ + public static class ForceTimeoutAggPlugin extends Plugin implements SearchPlugin { + public static final String NAME = "force_timeout_plugin"; + + @Override + public List getAggregations() { + return List.of(new AggregationSpec(NAME, ForceTimeoutAggregationBuilder::new, ForceTimeoutAggregationBuilder::parse)); + } + } + + /** + * Aggregation builder for the ForceTimeoutAggPlugin aggregation. + * It has no parameters and its factory immediately triggers a timeout exception + * when the search collectors are being prepared. + */ + static class ForceTimeoutAggregationBuilder extends AbstractAggregationBuilder { + + public static final String TYPE = ForceTimeoutAggPlugin.NAME; + + private Map metadata; + + ForceTimeoutAggregationBuilder(String name) { + super(name); + } + + ForceTimeoutAggregationBuilder(StreamInput in) throws IOException { + super(in); + } + + static ForceTimeoutAggregationBuilder parse(XContentParser parser, String name) { + return new ForceTimeoutAggregationBuilder(name); + } + + @Override + protected AggregatorFactory doBuild( + AggregationContext context, + AggregatorFactory parent, + AggregatorFactories.Builder subfactoriesBuilder + ) throws IOException { + return new ForceTimeoutAggregatorFactory(getName(), context, parent, factoriesBuilder, getMetadata()); + } + + @Override + protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBuilder, Map metadata) { + ForceTimeoutAggregationBuilder copy = new ForceTimeoutAggregationBuilder(getName()); + copy.factoriesBuilder = factoriesBuilder; + copy.setMetadata(metadata); + return copy; + } + + @Override + public Map getMetadata() { + return metadata; + } + + @Override + public BucketCardinality bucketCardinality() { + return BucketCardinality.ONE; + } + + @Override + public TransportVersion getMinimalSupportedVersion() { + return TransportVersion.zero(); + } + + @Override + protected void doWriteTo(StreamOutput out) { + // Empty + } + + @Override + protected XContentBuilder internalXContent(XContentBuilder builder, Params params) { + return builder; + } + + @Override + public String getType() { + return TYPE; + } + + /** + * Factory implementation for ForceTimeoutAggregationBuilder. + * Its createInternal() method throws a TimeExceededException + * before any actual collection occurs, simulating a timeout during setup. + */ + static class ForceTimeoutAggregatorFactory extends AggregatorFactory { + + ForceTimeoutAggregatorFactory( + String name, + AggregationContext context, + AggregatorFactory parent, + AggregatorFactories.Builder subFactoriesBuilder, + Map metadata + ) throws IOException { + super(name, context, parent, subFactoriesBuilder, metadata); + } + + @Override + protected Aggregator createInternal(Aggregator parent, CardinalityUpperBound cardinality, Map metadata) { + if (context.searcher() instanceof ContextIndexSearcher cis) { + cis.throwTimeExceededException(); + } + throw new AssertionError("unreachable"); + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java index 5fcfb2b9766cd..b0ccd4c6effe1 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -33,6 +33,7 @@ import org.elasticsearch.search.SearchContextSourcePrinter; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.aggregations.AggregationPhase; +import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.internal.ContextIndexSearcher; import org.elasticsearch.search.internal.ScrollContext; import org.elasticsearch.search.internal.SearchContext; @@ -197,47 +198,71 @@ static void addCollectorsAndSearch(SearchContext searchContext) throws QueryPhas ); } - CollectorManager collectorManager = QueryPhaseCollectorManager.createQueryPhaseCollectorManager( - postFilterWeight, - searchContext.aggregations() == null ? null : searchContext.aggregations().getAggsCollectorManager(), - searchContext, - hasFilterCollector - ); - final Runnable timeoutRunnable = getTimeoutCheck(searchContext); if (timeoutRunnable != null) { searcher.addQueryCancellation(timeoutRunnable); } - QueryPhaseResult queryPhaseResult = searcher.search(query, collectorManager); - if (searchContext.getProfilers() != null) { - searchContext.getProfilers().getCurrentQueryProfiler().setCollectorResult(queryPhaseResult.collectorResult()); - } - queryResult.topDocs(queryPhaseResult.topDocsAndMaxScore(), queryPhaseResult.sortValueFormats()); - if (searcher.timeExceeded()) { - assert timeoutRunnable != null : "TimeExceededException thrown even though timeout wasn't set"; - SearchTimeoutException.handleTimeout( - searchContext.request().allowPartialSearchResults(), - searchContext.shardTarget(), - searchContext.queryResult() - ); - } - if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER) { - queryResult.terminatedEarly(queryPhaseResult.terminatedAfter()); - } - ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH); - assert executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor - || (executor instanceof EsThreadPoolExecutor == false /* in case thread pool is mocked out in tests */) - : "SEARCH threadpool should have an executor that exposes EWMA metrics, but is of type " + executor.getClass(); - if (executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor rExecutor) { - queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize()); - queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA()); + try { + CollectorManager collectorManager = QueryPhaseCollectorManager + .createQueryPhaseCollectorManager( + postFilterWeight, + searchContext.aggregations() == null ? null : searchContext.aggregations().getAggsCollectorManager(), + searchContext, + hasFilterCollector + ); + + QueryPhaseResult queryPhaseResult = searcher.search(query, collectorManager); + + if (searchContext.getProfilers() != null) { + searchContext.getProfilers().getCurrentQueryProfiler().setCollectorResult(queryPhaseResult.collectorResult()); + } + queryResult.topDocs(queryPhaseResult.topDocsAndMaxScore(), queryPhaseResult.sortValueFormats()); + + if (searcher.timeExceeded()) { + assert timeoutRunnable != null : "TimeExceededException thrown even though timeout wasn't set"; + SearchTimeoutException.handleTimeout( + searchContext.request().allowPartialSearchResults(), + searchContext.shardTarget(), + searchContext.queryResult() + ); + } + if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER) { + queryResult.terminatedEarly(queryPhaseResult.terminatedAfter()); + } + ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH); + assert executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor + || (executor instanceof EsThreadPoolExecutor == false /* in case thread pool is mocked out in tests */) + : "SEARCH threadpool should have an executor that exposes EWMA metrics, but is of type " + executor.getClass(); + if (executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor rExecutor) { + queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize()); + queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA()); + } + } catch (ContextIndexSearcher.TimeExceededException tee) { + finalizeAsTimedOutResult(searchContext); } } catch (Exception e) { throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Failed to execute main query", e); } } + /** + * Marks the current search as timed out and finalizes the {@link QuerySearchResult} + * with a well-formed empty response. This ensures that even when a timeout occurs + * (e.g., during collector setup or search execution), the shard still returns a + * valid result object with empty top docs and aggregations instead of throwing. + */ + private static void finalizeAsTimedOutResult(SearchContext searchContext) { + QuerySearchResult queryResult = searchContext.queryResult(); + SearchTimeoutException.handleTimeout(searchContext.request().allowPartialSearchResults(), searchContext.shardTarget(), queryResult); + + queryResult.topDocs(new TopDocsAndMaxScore(Lucene.EMPTY_TOP_DOCS, Float.NaN), new DocValueFormat[0]); + + if (searchContext.aggregations() != null) { + queryResult.aggregations(InternalAggregations.EMPTY); + } + } + /** * Returns whether collection within the provided reader can be early-terminated if it sorts * with sortAndFormats. diff --git a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTimeoutTests.java b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTimeoutTests.java index e19a85f4f2a0b..3f115b9747593 100644 --- a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTimeoutTests.java +++ b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTimeoutTests.java @@ -22,6 +22,8 @@ import org.apache.lucene.index.PointValues; import org.apache.lucene.index.TermsEnum; import org.apache.lucene.search.BulkScorer; +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.CollectorManager; import org.apache.lucene.search.ConstantScoreScorer; import org.apache.lucene.search.ConstantScoreWeight; import org.apache.lucene.search.DocIdSetIterator; @@ -40,16 +42,37 @@ import org.apache.lucene.tests.util.LuceneTestCase; import org.apache.lucene.util.Bits; import org.apache.lucene.util.CharsRefBuilder; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchShardTask; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.CheckedConsumer; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.cache.bitset.BitsetFilterCache; +import org.elasticsearch.index.fielddata.IndexFieldDataCache; +import org.elasticsearch.index.mapper.MapperMetrics; +import org.elasticsearch.index.mapper.MappingLookup; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.ParsedQuery; import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.search.aggregations.AbstractAggregationBuilder; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.CardinalityUpperBound; +import org.elasticsearch.search.aggregations.SearchContextAggregations; +import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder; +import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.ContextIndexSearcher; @@ -61,12 +84,17 @@ import org.elasticsearch.search.suggest.SuggestionSearchContext; import org.elasticsearch.test.TestSearchContext; import org.elasticsearch.xcontent.Text; +import org.elasticsearch.xcontent.XContentBuilder; import org.hamcrest.Matchers; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import java.io.IOException; import java.util.Collections; +import java.util.Map; + +import static org.elasticsearch.test.InternalAggregationTestCase.DEFAULT_MAX_BUCKETS; public class QueryPhaseTimeoutTests extends IndexShardTestCase { @@ -398,6 +426,40 @@ private TestSearchContext createSearchContext(Query query, int size) throws IOEx return context; } + private SearchExecutionContext createSearchExecutionContext() { + IndexMetadata indexMetadata = IndexMetadata.builder("index") + .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())) + .numberOfShards(1) + .numberOfReplicas(0) + .creationDate(System.currentTimeMillis()) + .build(); + IndexSettings indexSettings = new IndexSettings(indexMetadata, Settings.EMPTY); + // final SimilarityService similarityService = new SimilarityService(indexSettings, null, Map.of()); + final long nowInMillis = randomNonNegativeLong(); + return new SearchExecutionContext( + 0, + 0, + indexSettings, + new BitsetFilterCache(indexSettings, BitsetFilterCache.Listener.NOOP), + (ft, fdc) -> ft.fielddataBuilder(fdc).build(new IndexFieldDataCache.None(), new NoneCircuitBreakerService()), + null, + MappingLookup.EMPTY, + null, + null, + parserConfig(), + writableRegistry(), + null, + new IndexSearcher(reader), + () -> nowInMillis, + null, + null, + () -> true, + null, + Collections.emptyMap(), + MapperMetrics.NOOP + ); + } + public void testSuggestOnlyWithTimeout() throws Exception { SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().suggest(new SuggestBuilder()); try (SearchContext context = createSearchContextWithSuggestTimeout(searchSourceBuilder)) { @@ -572,4 +634,248 @@ public final boolean isCacheable(LeafReaderContext ctx) { return false; } } + + /** + * Verifies that when a timeout occurs before search execution and no aggregations + * are requested, QueryPhase returns an empty partial result with timed_out=true + * and no aggregation container. + */ + public void testTimeoutNoAggsReturnsEmptyResult() throws Exception { + SearchSourceBuilder source = new SearchSourceBuilder().query(new MatchAllQueryBuilder()).size(10); + + ContextIndexSearcher base = newContextSearcher(reader); + ContextIndexSearcher throwing = new ContextIndexSearcher( + base.getIndexReader(), + base.getSimilarity(), + base.getQueryCache(), + base.getQueryCachingPolicy(), + true + ) { + @Override + public T search(Query query, CollectorManager collectorManager) { + this.throwTimeExceededException(); // simulate timeout right as search would begin + throw new AssertionError("unreachable"); + } + }; + + try (SearchContext context = createSearchContext(source, throwing, null, null, true)) { + context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); + + QueryPhase.execute(context); + + assertTrue("search should be marked timed_out", context.queryResult().searchTimedOut()); + assertNotNull("topDocs must be present even on timeout", context.queryResult().topDocs()); + assertEquals("no hits returned on timeout", 0, context.queryResult().topDocs().topDocs.scoreDocs.length); + assertNull("no aggs were requested so container should remain null", context.queryResult().aggregations()); + } + } + + /** + * Verifies that when both suggestions and aggregations are present in the SearchContext, + * and a timeout occurs during aggregation setup, QueryPhase returns a well-formed partial + * result: marked timed_out=true, with empty hits, an empty but non-null aggregation container, + * and a safely accessible suggest container. + */ + public void testTimeoutWithAggsAndSuggestsReturnsPartial() throws Exception { + SearchSourceBuilder source = new SearchSourceBuilder().query(new MatchAllQueryBuilder()) + .aggregation(new ForceTimeoutAggregationBuilder("force_timeout")) + .size(10); + + SuggestionSearchContext suggestCtx = new SuggestionSearchContext(); + suggestCtx.addSuggestion( + "suggestion", + new QueryPhaseTimeoutTests.TestSuggestionContext(new QueryPhaseTimeoutTests.TestSuggester(newContextSearcher(reader)), null) + ); + + try (SearchContext context = createSearchContext(source, newContextSearcher(reader), suggestCtx, true)) { + context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); + + QueryPhase.execute(context); + + assertTrue("search should be marked timed_out", context.queryResult().searchTimedOut()); + assertNotNull("topDocs must be present even on timeout", context.queryResult().topDocs()); + assertEquals("no hits returned on timeout", 0, context.queryResult().topDocs().topDocs.scoreDocs.length); + assertNotNull("aggs container must be non-null on timeout when aggs were requested", context.queryResult().aggregations()); + assertTrue("aggregations list should be empty on timeout", context.queryResult().aggregations().expand().asList().isEmpty()); + context.queryResult().suggest().iterator().forEachRemaining(Assert::assertNotNull); + } + } + + /** + * Verifies that when allowPartialSearchResults=false, a timeout is not converted + * to a partial response but instead throws a SearchTimeoutException wrapped in + * a QueryPhaseExecutionException. + */ + public void testTimeoutDisallowPartialsThrowsException() throws Exception { + SearchSourceBuilder source = new SearchSourceBuilder().query(new MatchAllQueryBuilder()) + .aggregation(new ForceTimeoutAggregationBuilder("force_timeout")) + .size(10); + + try (SearchContext context = createSearchContext(source, newContextSearcher(reader), null, false)) { + context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); + + // expect QueryPhase to propagate a failure instead of marking timed_out=true + QueryPhaseExecutionException ex = expectThrows(QueryPhaseExecutionException.class, () -> QueryPhase.execute(context)); + assertNotNull("expected a root cause", ex.getCause()); + assertTrue("expected the cause to be a SearchTimeoutException", ex.getCause() instanceof SearchTimeoutException); + } + } + + private TestSearchContext createSearchContext( + SearchSourceBuilder source, + ContextIndexSearcher cis, + SuggestionSearchContext suggestCtx, + boolean allowPartials + ) throws IOException { + AggregatorFactories factories; + try (AggregationTestHelper aggHelper = new AggregationTestHelper()) { + aggHelper.initPlugins(); + SearchExecutionContext sec = createSearchExecutionContext(); + AggregationContext aggCtx = aggHelper.createDefaultAggregationContext(sec.getIndexReader(), new MatchAllDocsQuery()); + factories = source.aggregations().build(aggCtx, null); + } catch (Exception e) { + throw new IOException(e); + } + + SearchContextAggregations scAggs = new SearchContextAggregations(factories, () -> { + throw new AssertionError("reduce should not be called in this early-timeout test"); + }); + + return createSearchContext(source, cis, scAggs, suggestCtx, allowPartials); + } + + private TestSearchContext createSearchContext( + SearchSourceBuilder source, + ContextIndexSearcher cis, + SearchContextAggregations aggsCtx, + SuggestionSearchContext suggestCtx, + boolean allowPartials + ) throws IOException { + TestSearchContext ctx = new TestSearchContext(createSearchExecutionContext(), indexShard, cis) { + @Override + public SearchContextAggregations aggregations() { + return aggsCtx; + } + + @Override + public SuggestionSearchContext suggest() { + return suggestCtx; + } + + @Override + public ShardSearchRequest request() { + SearchRequest sr = new SearchRequest(); + sr.allowPartialSearchResults(allowPartials); + sr.source(source); + return new ShardSearchRequest( + OriginalIndices.NONE, + sr, + indexShard.shardId(), + 0, // slice id + 1, // total slices + AliasFilter.EMPTY, + 1F, + 0, + null + ); + } + }; + + ctx.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); + return ctx; + } + + /** + * Helper extending {@link AggregatorTestCase} for creating and initializing + * aggregation contexts in tests. Handles plugin setup and resource cleanup. + */ + private static final class AggregationTestHelper extends AggregatorTestCase implements AutoCloseable { + void intiPlugins() { + super.initPlugins(); + } + + @Override + public void close() { + super.cleanupReleasables(); + } + + AggregationContext createDefaultAggregationContext(IndexReader reader, Query query) throws IOException { + return createAggregationContext( + reader, + createIndexSettings(), + query, + new NoneCircuitBreakerService(), + AggregationBuilder.DEFAULT_PREALLOCATION * 5, + DEFAULT_MAX_BUCKETS, + false, + false + ); + } + } + + /** + * Test aggregation builder that simulates a timeout during collector setup + * to verify QueryPhase timeout handling behavior. + */ + private static final class ForceTimeoutAggregationBuilder extends AbstractAggregationBuilder { + + ForceTimeoutAggregationBuilder(String name) { + super(name); + } + + @Override + protected AggregatorFactory doBuild( + AggregationContext ctx, + AggregatorFactory parent, + AggregatorFactories.Builder subfactoriesBuilder + ) throws IOException { + return new AggregatorFactory(getName(), ctx, parent, AggregatorFactories.builder(), getMetadata()) { + @Override + protected Aggregator createInternal(Aggregator parent, CardinalityUpperBound cardinality, Map metadata) { + if (ctx.searcher() instanceof ContextIndexSearcher cis) { + cis.throwTimeExceededException(); + } + throw new AssertionError("unreachable"); + } + }; + } + + @Override + protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBuilder, Map metadata) { + ForceTimeoutAggregationBuilder copy = new ForceTimeoutAggregationBuilder(getName()); + copy.factoriesBuilder = factoriesBuilder; + copy.setMetadata(metadata); + return copy; + } + + @Override + public BucketCardinality bucketCardinality() { + return BucketCardinality.ONE; + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + out.writeString(getName()); + } + + @Override + protected XContentBuilder internalXContent(XContentBuilder builder, Params params) { + return builder; + } + + @Override + public String getType() { + return GlobalAggregationBuilder.NAME; + } + + @Override + public TransportVersion getMinimalSupportedVersion() { + return TransportVersion.zero(); + } + + @Override + public boolean supportsVersion(TransportVersion version) { + return super.supportsVersion(version); + } + } }