From 2925777ddddc5972f81c6482876b1f1d7a9507cc Mon Sep 17 00:00:00 2001 From: Ben Chaplin Date: Fri, 12 Sep 2025 10:10:24 -0400 Subject: [PATCH 1/2] Don't set doc count error to 0 when batched reduction occurred --- .../aggregations/bucket/TermsDocCountErrorIT.java | 15 --------------- .../action/search/QueryPhaseResultConsumer.java | 10 ++++++---- .../aggregations/AggregationReduceContext.java | 13 ++++++++++++- .../bucket/terms/AbstractInternalTerms.java | 5 ++++- 4 files changed, 22 insertions(+), 21 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/TermsDocCountErrorIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/TermsDocCountErrorIT.java index a180674ba2378..a6c01852e2f16 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/TermsDocCountErrorIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/TermsDocCountErrorIT.java @@ -13,15 +13,12 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.search.SearchService; import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode; import org.elasticsearch.search.aggregations.BucketOrder; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregatorFactory.ExecutionMode; import org.elasticsearch.test.ESIntegTestCase; -import org.junit.After; -import org.junit.Before; import java.io.IOException; import java.util.ArrayList; @@ -53,18 +50,6 @@ public static String randomExecutionHint() { private static int numRoutingValues; - @Before - public void disableBatchedExecution() { - // TODO: it's practically impossible to get a 100% deterministic test with batched execution unfortunately, adjust this test to - // still do something useful with batched execution (i.e. use somewhat relaxed assertions) - updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false)); - } - - @After - public void resetSettings() { - updateClusterSettings(Settings.builder().putNull(SearchService.BATCHED_QUERY_PHASE.getKey())); - } - @Override public void setupSuiteScopeCluster() throws Exception { assertAcked(indicesAdmin().prepareCreate("idx").setMapping(STRING_FIELD_NAME, "type=keyword").get()); diff --git a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java index ec63d38616153..fb12fc0b560bc 100644 --- a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java +++ b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java @@ -220,6 +220,7 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception { batchedResults = this.batchedResults; } final int resultSize = buffer.size() + (mergeResult == null ? 0 : 1) + batchedResults.size(); + final boolean hasBatchedResults = batchedResults.isEmpty() == false; final List topDocsList = hasTopDocs ? new ArrayList<>(resultSize) : null; final Deque> aggsList = hasAggs ? new ArrayDeque<>(resultSize) : null; // consume partial merge result from the un-batched execution path that is used for BwC, shard-level retries, and shard level @@ -247,6 +248,10 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception { if (aggsList != null) { // Add an estimate of the final reduce size breakerSize = addEstimateAndMaybeBreak(estimateRamBytesUsedForReduce(breakerSize)); + AggregationReduceContext aggReduceContext = performFinalReduce + ? aggReduceContextBuilder.forFinalReduction() + : aggReduceContextBuilder.forPartialReduction(); + aggReduceContext.setFinalReduceHasBatchedResult(hasBatchedResults); aggs = aggregate(buffer.iterator(), new Iterator<>() { @Override public boolean hasNext() { @@ -257,10 +262,7 @@ public boolean hasNext() { public DelayableWriteable next() { return aggsList.pollFirst(); } - }, - resultSize, - performFinalReduce ? aggReduceContextBuilder.forFinalReduction() : aggReduceContextBuilder.forPartialReduction() - ); + }, resultSize, aggReduceContext); } else { aggs = null; } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationReduceContext.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationReduceContext.java index fbfffd21fef93..39ad713fea7eb 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationReduceContext.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationReduceContext.java @@ -47,6 +47,7 @@ public interface Builder { @Nullable private final AggregationBuilder builder; private final AggregatorFactories.Builder subBuilders; + private boolean finalReduceHasBatchedResult; private AggregationReduceContext( BigArrays bigArrays, @@ -136,6 +137,14 @@ public final AggregationReduceContext forAgg(String name) { protected abstract AggregationReduceContext forSubAgg(AggregationBuilder sub); + public boolean doesFinalReduceHaveBatchedResult() { + return finalReduceHasBatchedResult; + } + + public void setFinalReduceHasBatchedResult(boolean finalReduceHasBatchedResult) { + this.finalReduceHasBatchedResult = finalReduceHasBatchedResult; + } + /** * A {@linkplain AggregationReduceContext} to perform a partial reduction. */ @@ -234,7 +243,9 @@ public PipelineTree pipelineTreeRoot() { @Override protected AggregationReduceContext forSubAgg(AggregationBuilder sub) { - return new ForFinal(bigArrays(), scriptService(), isCanceled(), sub, multiBucketConsumer, pipelineTreeRoot); + ForFinal subContext = new ForFinal(bigArrays(), scriptService(), isCanceled(), sub, multiBucketConsumer, pipelineTreeRoot); + subContext.setFinalReduceHasBatchedResult(doesFinalReduceHaveBatchedResult()); + return subContext; } } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java index 1892cc8859639..09ecbb4098dc3 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java @@ -332,7 +332,10 @@ public InternalAggregation get() { } long docCountError = -1; if (sumDocCountError != -1) { - docCountError = size == 1 ? 0 : sumDocCountError; + // If we are reducing only one aggregation (size == 1), the doc count error should be 0. + // However, the presence of a batched query result implies this is a final reduction and a partial reduction with size > 1 + // has already occurred on a data node. The doc count error should not be 0 in this case. + docCountError = size == 1 && reduceContext.doesFinalReduceHaveBatchedResult() == false ? 0 : sumDocCountError; } return create(name, result, reduceContext.isFinalReduce() ? getOrder() : thisReduceOrder, docCountError, otherDocCount); } From 6e7e209df2c05cbb359223af746fa7b2c805d9c8 Mon Sep 17 00:00:00 2001 From: Ben Chaplin Date: Wed, 24 Sep 2025 18:07:32 -0400 Subject: [PATCH 2/2] Rename flag --- .../action/search/QueryPhaseResultConsumer.java | 2 +- .../aggregations/AggregationReduceContext.java | 12 ++++++------ .../bucket/terms/AbstractInternalTerms.java | 4 ++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java index fb12fc0b560bc..9f5d82d465fb8 100644 --- a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java +++ b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java @@ -251,7 +251,7 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception { AggregationReduceContext aggReduceContext = performFinalReduce ? aggReduceContextBuilder.forFinalReduction() : aggReduceContextBuilder.forPartialReduction(); - aggReduceContext.setFinalReduceHasBatchedResult(hasBatchedResults); + aggReduceContext.setHasBatchedResult(hasBatchedResults); aggs = aggregate(buffer.iterator(), new Iterator<>() { @Override public boolean hasNext() { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationReduceContext.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationReduceContext.java index 39ad713fea7eb..50e7bf079e7be 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationReduceContext.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationReduceContext.java @@ -47,7 +47,7 @@ public interface Builder { @Nullable private final AggregationBuilder builder; private final AggregatorFactories.Builder subBuilders; - private boolean finalReduceHasBatchedResult; + private boolean hasBatchedResult; private AggregationReduceContext( BigArrays bigArrays, @@ -137,12 +137,12 @@ public final AggregationReduceContext forAgg(String name) { protected abstract AggregationReduceContext forSubAgg(AggregationBuilder sub); - public boolean doesFinalReduceHaveBatchedResult() { - return finalReduceHasBatchedResult; + public boolean hasBatchedResult() { + return hasBatchedResult; } - public void setFinalReduceHasBatchedResult(boolean finalReduceHasBatchedResult) { - this.finalReduceHasBatchedResult = finalReduceHasBatchedResult; + public void setHasBatchedResult(boolean hasBatchedResult) { + this.hasBatchedResult = hasBatchedResult; } /** @@ -244,7 +244,7 @@ public PipelineTree pipelineTreeRoot() { @Override protected AggregationReduceContext forSubAgg(AggregationBuilder sub) { ForFinal subContext = new ForFinal(bigArrays(), scriptService(), isCanceled(), sub, multiBucketConsumer, pipelineTreeRoot); - subContext.setFinalReduceHasBatchedResult(doesFinalReduceHaveBatchedResult()); + subContext.setHasBatchedResult(hasBatchedResult()); return subContext; } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java index 09ecbb4098dc3..f3b02c5142e42 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java @@ -333,9 +333,9 @@ public InternalAggregation get() { long docCountError = -1; if (sumDocCountError != -1) { // If we are reducing only one aggregation (size == 1), the doc count error should be 0. - // However, the presence of a batched query result implies this is a final reduction and a partial reduction with size > 1 + // However, the presence of a batched query result implies a partial reduction with size > 1 // has already occurred on a data node. The doc count error should not be 0 in this case. - docCountError = size == 1 && reduceContext.doesFinalReduceHaveBatchedResult() == false ? 0 : sumDocCountError; + docCountError = size == 1 && reduceContext.hasBatchedResult() == false ? 0 : sumDocCountError; } return create(name, result, reduceContext.isFinalReduce() ? getOrder() : thisReduceOrder, docCountError, otherDocCount); }