Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e2bd8a8
Add code and test class
drempapis Nov 11, 2025
81efb4c
Merge branch 'main' into handle_ContextIndexSearcher_TimeExceededExce…
drempapis Nov 12, 2025
1421c60
update
drempapis Nov 12, 2025
19e2121
update
drempapis Nov 13, 2025
6b15d74
update code
drempapis Nov 14, 2025
bfc3f02
update
drempapis Nov 14, 2025
8f6674c
Merge branch 'main' into handle_ContextIndexSearcher_TimeExceededExce…
drempapis Nov 14, 2025
cb0e1b2
Update docs/changelog/138084.yaml
drempapis Nov 14, 2025
3d81fce
update after review
drempapis Nov 14, 2025
09487e7
Merge branch 'handle_ContextIndexSearcher_TimeExceededException' of g…
drempapis Nov 14, 2025
0710406
Merge branch 'main' into handle_ContextIndexSearcher_TimeExceededExce…
drempapis Nov 19, 2025
a453b4e
Merge branch 'main' into handle_ContextIndexSearcher_TimeExceededExce…
drempapis Nov 19, 2025
83379a5
update after review - remove redundant code
drempapis Nov 20, 2025
a98b855
update after review
drempapis Nov 20, 2025
4bc1f41
[CI] Auto commit changes from spotless
Nov 20, 2025
cabadeb
update after review
drempapis Nov 20, 2025
ae30a7b
Merge branch 'handle_ContextIndexSearcher_TimeExceededException' of g…
drempapis Nov 20, 2025
7a6cbe8
Merge branch 'main' into handle_ContextIndexSearcher_TimeExceededExce…
drempapis Nov 20, 2025
6cf501c
[CI] Auto commit changes from spotless
Nov 20, 2025
ba10880
update after review
drempapis Nov 20, 2025
1ee843c
Merge branch 'handle_ContextIndexSearcher_TimeExceededException' of g…
drempapis Nov 20, 2025
d24498f
Merge branch 'main' into handle_ContextIndexSearcher_TimeExceededExce…
drempapis Nov 20, 2025
71ca632
[CI] Auto commit changes from spotless
Nov 20, 2025
c0aacb5
Merge branch 'main' into handle_ContextIndexSearcher_TimeExceededExce…
drempapis Nov 20, 2025
01a2d62
Merge branch 'main' into handle_ContextIndexSearcher_TimeExceededExce…
drempapis Nov 21, 2025
f4367c3
Merge branch 'main' into handle_ContextIndexSearcher_TimeExceededExce…
drempapis Nov 24, 2025
7ba2335
Merge branch 'main' into handle_ContextIndexSearcher_TimeExceededExce…
drempapis Nov 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/138084.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 138084
summary: Handle Query Timeouts During Collector Initialization in `QueryPhase`
area: Search
type: bug
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.search.aggregations;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.internal.ContextIndexSearcher;
import org.elasticsearch.search.query.SearchTimeoutException;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;

/**
* Integration test verifying that a TimeExceededException thrown during collector
* preparation (before the actual search executes) is caught in QueryPhase and
* correctly transformed into a partial response with `timed_out=true`, empty hits,
* and empty aggregations rather than an exception.
*/
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
public class QueryPhaseForcedTimeoutIT extends ESIntegTestCase {

private static final String INDEX = "index";

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(ForceTimeoutAggPlugin.class);
}

@Before
public void setupIndex() throws Exception {
assertAcked(prepareCreate(INDEX).setMapping("""
{
"properties": {
"kwd": { "type": "keyword" },
"txt": { "type": "text" }
}
}
"""));

for (int i = 0; i < 10; i++) {
IndexRequest ir = new IndexRequest(INDEX).source(
jsonBuilder().startObject().field("kwd", "value" + i).field("txt", "text " + i).endObject()
);
client().index(ir).actionGet();
}
indicesAdmin().prepareRefresh(INDEX).get();
ensureGreen(INDEX);
}

@After
public void cleanup() {
indicesAdmin().prepareDelete(INDEX).get();
}

/**
* Executes a search using the ForceTimeoutAggPlugin aggregation which throws
* TimeExceededException during collector preparation, and asserts that:
* - the response is returned without failure,
* - the `timed_out` flag is true,
* - hits are empty, and
* - aggregations are non-null but empty.
*/
public void testTimeoutDuringCollectorPreparationReturnsTimedOutEmptyResult() {
SearchResponse resp = null;
try {
resp = client().prepareSearch(INDEX)
.setQuery(QueryBuilders.matchAllQuery())
.setSize(10)
.setAllowPartialSearchResults(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we randomize this and look for SearchTimeoutException in the case that allowPartialSearchResults=false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I added a second test because the code behaves differently depending on setAllowPartialSearchResults(xx):

  • true: shard timeouts are treated as partial failures, so the search returns a normal SearchResponse with timed_out = true.
  • false: shard timeouts are not allowed, so the coordinating node fails the entire search with a SearchPhaseExecutionException.

Since these are two distinct code paths with different expected outcomes, each needs its own test.

.addAggregation(new ForceTimeoutAggregationBuilder("force_timeout"))
.get();

assertThat(resp, notNullValue());
assertThat("search should be marked timed_out", resp.isTimedOut(), is(true));
assertThat("no hits returned", resp.getHits().getHits().length, equalTo(0));
assertThat(resp.getAggregations(), notNullValue());
assertThat("no aggr returned", resp.getAggregations().asList().isEmpty(), is(true));
assertThat("no shard failures expected", resp.getShardFailures() == null || resp.getShardFailures().length == 0, is(true));
} finally {
if (resp != null) {
resp.decRef();
}
}
}

/**
* In this test we explicitly set allow_partial_search_results=false. Under this
* setting, any shard-level failure in the query phase (including a timeout) is treated as
* a hard failure for the whole search. The coordinating node does not return a response
* with timed_out=true, instead it fails the phase and throws a
* {@link SearchPhaseExecutionException} whose cause is the underlying
* {@link SearchTimeoutException}. This test asserts that behavior.
*/
public void testTimeoutDuringCollectorPreparationDisallowPartialsThrowsException() {
SearchPhaseExecutionException ex = expectThrows(
SearchPhaseExecutionException.class,
() -> client().prepareSearch(INDEX)
.setQuery(QueryBuilders.matchAllQuery())
.setSize(10)
.setAllowPartialSearchResults(false)
.addAggregation(new ForceTimeoutAggregationBuilder("force_timeout"))
.get()
);

assertNotNull("expected a cause on SearchPhaseExecutionException", ex.getCause());
assertThat("expected inner cause to be SearchTimeoutException", ex.getCause(), instanceOf(SearchTimeoutException.class));
}

/**
* A minimal plugin registering a custom aggregation (ForceTimeoutAggregationBuilder)
* whose factory simulates a timeout during collector setup to test QueryPhase handling.
*/
public static class ForceTimeoutAggPlugin extends Plugin implements SearchPlugin {
public static final String NAME = "force_timeout_plugin";

@Override
public List<AggregationSpec> getAggregations() {
return List.of(new AggregationSpec(NAME, ForceTimeoutAggregationBuilder::new, ForceTimeoutAggregationBuilder::parse));
}
}

/**
* Aggregation builder for the ForceTimeoutAggPlugin aggregation.
* It has no parameters and its factory immediately triggers a timeout exception
* when the search collectors are being prepared.
*/
static class ForceTimeoutAggregationBuilder extends AbstractAggregationBuilder<ForceTimeoutAggregationBuilder> {

public static final String TYPE = ForceTimeoutAggPlugin.NAME;

private Map<String, Object> metadata;

ForceTimeoutAggregationBuilder(String name) {
super(name);
}

ForceTimeoutAggregationBuilder(StreamInput in) throws IOException {
super(in);
}

static ForceTimeoutAggregationBuilder parse(XContentParser parser, String name) {
return new ForceTimeoutAggregationBuilder(name);
}

@Override
protected AggregatorFactory doBuild(
AggregationContext context,
AggregatorFactory parent,
AggregatorFactories.Builder subfactoriesBuilder
) throws IOException {
return new ForceTimeoutAggregatorFactory(getName(), context, parent, factoriesBuilder, getMetadata());
}

@Override
protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBuilder, Map<String, Object> metadata) {
ForceTimeoutAggregationBuilder copy = new ForceTimeoutAggregationBuilder(getName());
copy.factoriesBuilder = factoriesBuilder;
copy.setMetadata(metadata);
return copy;
}

@Override
public Map<String, Object> getMetadata() {
return metadata;
}

@Override
public BucketCardinality bucketCardinality() {
return BucketCardinality.ONE;
}

@Override
public TransportVersion getMinimalSupportedVersion() {
return TransportVersion.zero();
}

@Override
protected void doWriteTo(StreamOutput out) {
// Empty
}

@Override
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) {
return builder;
}

@Override
public String getType() {
return TYPE;
}

/**
* Factory implementation for ForceTimeoutAggregationBuilder.
* Its createInternal() method throws a TimeExceededException
* before any actual collection occurs, simulating a timeout during setup.
*/
static class ForceTimeoutAggregatorFactory extends AggregatorFactory {

ForceTimeoutAggregatorFactory(
String name,
AggregationContext context,
AggregatorFactory parent,
AggregatorFactories.Builder subFactoriesBuilder,
Map<String, Object> metadata
) throws IOException {
super(name, context, parent, subFactoriesBuilder, metadata);
}

@Override
protected Aggregator createInternal(Aggregator parent, CardinalityUpperBound cardinality, Map<String, Object> metadata) {
if (context.searcher() instanceof ContextIndexSearcher cis) {
cis.throwTimeExceededException();
}
throw new AssertionError("unreachable");
}
}
}
}
85 changes: 55 additions & 30 deletions server/src/main/java/org/elasticsearch/search/query/QueryPhase.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.search.SearchContextSourcePrinter;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.aggregations.AggregationPhase;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.internal.ContextIndexSearcher;
import org.elasticsearch.search.internal.ScrollContext;
import org.elasticsearch.search.internal.SearchContext;
Expand Down Expand Up @@ -198,47 +199,71 @@ static void addCollectorsAndSearch(SearchContext searchContext, Long timeRangeFi
);
}

CollectorManager<Collector, QueryPhaseResult> collectorManager = QueryPhaseCollectorManager.createQueryPhaseCollectorManager(
postFilterWeight,
searchContext.aggregations() == null ? null : searchContext.aggregations().getAggsCollectorManager(),
searchContext,
hasFilterCollector
);

final Runnable timeoutRunnable = getTimeoutCheck(searchContext);
if (timeoutRunnable != null) {
searcher.addQueryCancellation(timeoutRunnable);
}

QueryPhaseResult queryPhaseResult = searcher.search(query, collectorManager);
if (searchContext.getProfilers() != null) {
searchContext.getProfilers().getCurrentQueryProfiler().setCollectorResult(queryPhaseResult.collectorResult());
}
queryResult.topDocs(queryPhaseResult.topDocsAndMaxScore(), queryPhaseResult.sortValueFormats());
if (searcher.timeExceeded()) {
assert timeoutRunnable != null : "TimeExceededException thrown even though timeout wasn't set";
SearchTimeoutException.handleTimeout(
searchContext.request().allowPartialSearchResults(),
searchContext.shardTarget(),
searchContext.queryResult()
);
}
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER) {
queryResult.terminatedEarly(queryPhaseResult.terminatedAfter());
}
ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH);
assert executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor
|| (executor instanceof EsThreadPoolExecutor == false /* in case thread pool is mocked out in tests */)
: "SEARCH threadpool should have an executor that exposes EWMA metrics, but is of type " + executor.getClass();
if (executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor rExecutor) {
queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize());
queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA());
try {
CollectorManager<Collector, QueryPhaseResult> collectorManager = QueryPhaseCollectorManager
.createQueryPhaseCollectorManager(
postFilterWeight,
searchContext.aggregations() == null ? null : searchContext.aggregations().getAggsCollectorManager(),
searchContext,
hasFilterCollector
);

QueryPhaseResult queryPhaseResult = searcher.search(query, collectorManager);

if (searchContext.getProfilers() != null) {
searchContext.getProfilers().getCurrentQueryProfiler().setCollectorResult(queryPhaseResult.collectorResult());
}
queryResult.topDocs(queryPhaseResult.topDocsAndMaxScore(), queryPhaseResult.sortValueFormats());

if (searcher.timeExceeded()) {
assert timeoutRunnable != null : "TimeExceededException thrown even though timeout wasn't set";
SearchTimeoutException.handleTimeout(
searchContext.request().allowPartialSearchResults(),
searchContext.shardTarget(),
searchContext.queryResult()
);
}
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER) {
queryResult.terminatedEarly(queryPhaseResult.terminatedAfter());
}
ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH);
assert executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor
|| (executor instanceof EsThreadPoolExecutor == false /* in case thread pool is mocked out in tests */)
: "SEARCH threadpool should have an executor that exposes EWMA metrics, but is of type " + executor.getClass();
if (executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor rExecutor) {
queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize());
queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA());
}
} catch (ContextIndexSearcher.TimeExceededException tee) {
finalizeAsTimedOutResult(searchContext);
}
} catch (Exception e) {
throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Failed to execute main query", e);
}
}

/**
* Marks the current search as timed out and finalizes the {@link QuerySearchResult}
* with a well-formed empty response. This ensures that even when a timeout occurs
* (e.g., during collector setup or search execution), the shard still returns a
* valid result object with empty top docs and aggregations instead of throwing.
*/
private static void finalizeAsTimedOutResult(SearchContext searchContext) {
QuerySearchResult queryResult = searchContext.queryResult();
SearchTimeoutException.handleTimeout(searchContext.request().allowPartialSearchResults(), searchContext.shardTarget(), queryResult);

queryResult.topDocs(new TopDocsAndMaxScore(Lucene.EMPTY_TOP_DOCS, Float.NaN), new DocValueFormat[0]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to set topDocs and aggs here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to set them. In the early-timeout path the QuerySearchResult may not have been initialized at all, and leaving topDocs/aggs unset leads to stale values during merge. Setting them explicitly ensures the shard returns a well-formed (but empty) result.

e.g.

Caused by: java.lang.IllegalStateException: topDocs already consumed
	at org.elasticsearch.search.query.QuerySearchResult.topDocs(QuerySearchResult.java:188)
	at org.elasticsearch.action.search.QueryPhaseResultConsumer.reduce(QueryPhaseResultConsumer.java:246)


if (searchContext.aggregations() != null) {
queryResult.aggregations(InternalAggregations.EMPTY);
}
}

/**
* Returns whether collection within the provided <code>reader</code> can be early-terminated if it sorts
* with <code>sortAndFormats</code>.
Expand Down
Loading