Skip to content

Commit

Permalink
Plumb aggregation builder into reduce context (#81740)
Browse files Browse the repository at this point in the history
This expands the reduce context so the builder is available. It doesn't
expand any aggregation to use it. The first aggregation to use it will
need to take care with `randomResultsToReduce`.
  • Loading branch information
nik9000 committed Feb 1, 2022
1 parent 620fe44 commit 6efc28b
Show file tree
Hide file tree
Showing 38 changed files with 673 additions and 285 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.query.QuerySearchResult;
Expand Down Expand Up @@ -65,10 +66,12 @@
@Fork(value = 1)
public class TermsReduceBenchmark {

private final TermsAggregationBuilder builder = new TermsAggregationBuilder("terms");

private final SearchPhaseController controller = new SearchPhaseController((task, req) -> new AggregationReduceContext.Builder() {
@Override
public AggregationReduceContext forPartialReduction() {
return new AggregationReduceContext.ForPartial(null, null, task);
return new AggregationReduceContext.ForPartial(null, null, task, builder);
}

@Override
Expand All @@ -77,7 +80,7 @@ public AggregationReduceContext forFinalReduction() {
Integer.MAX_VALUE,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)
);
return new AggregationReduceContext.ForFinal(null, null, bucketConsumer, PipelineAggregator.PipelineTree.EMPTY, task);
return new AggregationReduceContext.ForFinal(null, null, task, builder, bucketConsumer, PipelineAggregator.PipelineTree.EMPTY);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.ParsedAggregation;
Expand All @@ -36,6 +37,8 @@
import java.util.Map;
import java.util.function.Predicate;

import static org.mockito.Mockito.mock;

public class InternalMatrixStatsTests extends InternalAggregationTestCase<InternalMatrixStats> {

private String[] fields;
Expand Down Expand Up @@ -158,9 +161,10 @@ public void testReduceRandom() {
AggregationReduceContext context = new AggregationReduceContext.ForFinal(
bigArrays,
mockScriptService,
() -> false,
mock(AggregationBuilder.class),
b -> {},
PipelineTree.EMPTY,
() -> false
PipelineTree.EMPTY
);
InternalMatrixStats reduced = (InternalMatrixStats) shardResults.get(0).reduce(shardResults, context);
multiPassStats.assertNearlyEqual(reduced.getResults());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public void setupSuiteScopeCluster() throws Exception {
createIndex("idx");
createIndex("idx_unmapped");
ensureSearchable();
final int numDocs = randomInt(5);
final int numDocs = between(1, 10);
IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
for (int i = 0; i < builders.length; i++) {
final boolean singleValue = randomBoolean();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,21 @@ public SearchHits getHits() {
return internalResponse.hits();
}

public Aggregations getAggregations() {
/**
* Aggregations in this response. "empty" aggregations could be
* either {@code null} or {@link InternalAggregations#EMPTY}.
*/
public @Nullable Aggregations getAggregations() {
return internalResponse.aggregations();
}

/**
* Will {@link #getAggregations()} return non-empty aggregation results?
*/
public boolean hasAggregations() {
return getAggregations() != null && getAggregations() != InternalAggregations.EMPTY;
}

public Suggest getSuggest() {
return internalResponse.suggest();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ SearchResponse getMergedResponse(Clusters clusters) {

profileResults.putAll(searchResponse.getProfileResults());

if (searchResponse.getAggregations() != null) {
if (searchResponse.hasAggregations()) {
InternalAggregations internalAggs = (InternalAggregations) searchResponse.getAggregations();
aggs.add(internalAggs);
}
Expand Down Expand Up @@ -195,7 +195,9 @@ SearchResponse getMergedResponse(Clusters clusters) {
SearchHits mergedSearchHits = topDocsToSearchHits(topDocs, topDocsStats);
setSuggestShardIndex(shards, groupedSuggestions);
Suggest suggest = groupedSuggestions.isEmpty() ? null : new Suggest(Suggest.reduce(groupedSuggestions));
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(aggs, aggReduceContextBuilder.forFinalReduction());
InternalAggregations reducedAggs = aggs.isEmpty()
? InternalAggregations.EMPTY
: InternalAggregations.topLevelReduce(aggs, aggReduceContextBuilder.forFinalReduction());
ShardSearchFailure[] shardFailures = failures.toArray(ShardSearchFailure.EMPTY_ARRAY);
SearchProfileResults profileShardResults = profileResults.isEmpty() ? null : new SearchProfileResults(profileResults);
// make failures ordering consistent between ordinary search and CCS by looking at the shard they come from
Expand Down
17 changes: 4 additions & 13 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.aggregations.SearchContextAggregations;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.AggregationContext.ProductionAggregationContext;
import org.elasticsearch.search.builder.SearchSourceBuilder;
Expand Down Expand Up @@ -1604,30 +1603,22 @@ public AggregationReduceContext.Builder aggReduceContextBuilder(Supplier<Boolean
return new AggregationReduceContext.Builder() {
@Override
public AggregationReduceContext forPartialReduction() {
return new AggregationReduceContext.ForPartial(bigArrays, scriptService, isCanceled);
return new AggregationReduceContext.ForPartial(bigArrays, scriptService, isCanceled, request.source().aggregations());
}

@Override
public AggregationReduceContext forFinalReduction() {
PipelineTree pipelineTree = requestToPipelineTree(request);
return new AggregationReduceContext.ForFinal(
bigArrays,
scriptService,
multiBucketConsumerService.create(),
pipelineTree,
isCanceled
isCanceled,
request.source().aggregations(),
multiBucketConsumerService.create()
);
}
};
}

private static PipelineTree requestToPipelineTree(SearchRequest request) {
if (request.source() == null || request.source().aggregations() == null) {
return PipelineTree.EMPTY;
}
return request.source().aggregations().buildPipelineTree();
}

/**
* This helper class ensures we only execute either the success or the failure path for {@link SearchOperationListener}.
* This is crucial for some implementations like {@link org.elasticsearch.index.search.stats.ShardSearchStats}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@
package org.elasticsearch.search.aggregations;

import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
import org.elasticsearch.tasks.TaskCancelledException;

import java.util.function.IntConsumer;
import java.util.function.Supplier;

/**
* Dependencies used to reduce aggs.
*/
public abstract sealed class AggregationReduceContext permits AggregationReduceContext.ForPartial,AggregationReduceContext.ForFinal {
/**
* Builds {@link AggregationReduceContext}s.
Expand All @@ -35,11 +39,38 @@ public interface Builder {
private final BigArrays bigArrays;
private final ScriptService scriptService;
private final Supplier<Boolean> isCanceled;
/**
* Builder for the agg being processed or {@code null} if this context
* was built for the top level or a pipeline aggregation.
*/
@Nullable
private final AggregationBuilder builder;
private final AggregatorFactories.Builder subBuilders;

private AggregationReduceContext(
BigArrays bigArrays,
ScriptService scriptService,
Supplier<Boolean> isCanceled,
AggregatorFactories.Builder subBuilders
) {
this.bigArrays = bigArrays;
this.scriptService = scriptService;
this.isCanceled = isCanceled;
this.builder = null;
this.subBuilders = subBuilders;
}

public AggregationReduceContext(BigArrays bigArrays, ScriptService scriptService, Supplier<Boolean> isCanceled) {
private AggregationReduceContext(
BigArrays bigArrays,
ScriptService scriptService,
Supplier<Boolean> isCanceled,
AggregationBuilder builder
) {
this.bigArrays = bigArrays;
this.scriptService = scriptService;
this.isCanceled = isCanceled;
this.builder = builder;
this.subBuilders = builder.factoriesBuilder;
}

/**
Expand All @@ -63,6 +94,14 @@ public final Supplier<Boolean> isCanceled() {
return isCanceled;
}

/**
* Builder for the agg being processed or {@code null} if this context
* was built for the top level or a pipeline aggregation.
*/
public AggregationBuilder builder() {
return builder;
}

/**
* The root of the tree of pipeline aggregations for this request.
*/
Expand All @@ -82,12 +121,35 @@ public final void consumeBucketsAndMaybeBreak(int size) {

protected abstract void consumeBucketCountAndMaybeBreak(int size);

/**
* Build a {@link AggregationReduceContext} for a sub-aggregation.
*/
public final AggregationReduceContext forAgg(String name) {
for (AggregationBuilder b : subBuilders.getAggregatorFactories()) {
if (b.getName().equals(name)) {
return forSubAgg(b);
}
}
throw new IllegalArgumentException("reducing an aggregation [" + name + "] that wasn't requested");
}

protected abstract AggregationReduceContext forSubAgg(AggregationBuilder sub);

/**
* A {@linkplain AggregationReduceContext} to perform a partial reduction.
*/
public static final class ForPartial extends AggregationReduceContext {
public ForPartial(BigArrays bigArrays, ScriptService scriptService, Supplier<Boolean> isCanceled) {
super(bigArrays, scriptService, isCanceled);
public ForPartial(
BigArrays bigArrays,
ScriptService scriptService,
Supplier<Boolean> isCanceled,
AggregatorFactories.Builder builders
) {
super(bigArrays, scriptService, isCanceled, builders);
}

public ForPartial(BigArrays bigArrays, ScriptService scriptService, Supplier<Boolean> isCanceled, AggregationBuilder builder) {
super(bigArrays, scriptService, isCanceled, builder);
}

@Override
Expand All @@ -102,6 +164,11 @@ protected void consumeBucketCountAndMaybeBreak(int size) {}
public PipelineTree pipelineTreeRoot() {
return null;
}

@Override
protected AggregationReduceContext forSubAgg(AggregationBuilder sub) {
return new ForPartial(bigArrays(), scriptService(), isCanceled(), sub);
}
}

/**
Expand All @@ -114,11 +181,24 @@ public static final class ForFinal extends AggregationReduceContext {
public ForFinal(
BigArrays bigArrays,
ScriptService scriptService,
Supplier<Boolean> isCanceled,
AggregatorFactories.Builder builders,
IntConsumer multiBucketConsumer
) {
super(bigArrays, scriptService, isCanceled, builders);
this.multiBucketConsumer = multiBucketConsumer;
this.pipelineTreeRoot = builders == null ? null : builders.buildPipelineTree();
}

public ForFinal(
BigArrays bigArrays,
ScriptService scriptService,
Supplier<Boolean> isCanceled,
AggregationBuilder builder,
IntConsumer multiBucketConsumer,
PipelineTree pipelineTreeRoot,
Supplier<Boolean> isCanceled
PipelineTree pipelineTreeRoot
) {
super(bigArrays, scriptService, isCanceled);
super(bigArrays, scriptService, isCanceled, builder);
this.multiBucketConsumer = multiBucketConsumer;
this.pipelineTreeRoot = pipelineTreeRoot;
}
Expand All @@ -137,5 +217,10 @@ protected void consumeBucketCountAndMaybeBreak(int size) {
public PipelineTree pipelineTreeRoot() {
return pipelineTreeRoot;
}

@Override
protected AggregationReduceContext forSubAgg(AggregationBuilder sub) {
return new ForFinal(bigArrays(), scriptService(), isCanceled(), sub, multiBucketConsumer, pipelineTreeRoot);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public static InternalAggregations reduce(List<InternalAggregations> aggregation
aggregations.sort(INTERNAL_AGG_COMPARATOR);
InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand
if (first.mustReduceOnSingleInternalAgg() || aggregations.size() > 1) {
reducedAggregations.add(first.reduce(aggregations, context));
reducedAggregations.add(first.reduce(aggregations, context.forAgg(entry.getKey())));
} else {
// no need for reduce phase
reducedAggregations.add(first);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
Expand All @@ -38,6 +39,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.mockito.Mockito.mock;

public class QueryPhaseResultConsumerTests extends ESTestCase {

private SearchPhaseController searchPhaseController;
Expand All @@ -49,16 +52,17 @@ public void setup() {
searchPhaseController = new SearchPhaseController((t, s) -> new AggregationReduceContext.Builder() {
@Override
public AggregationReduceContext forPartialReduction() {
return new AggregationReduceContext.ForPartial(BigArrays.NON_RECYCLING_INSTANCE, null, t);
return new AggregationReduceContext.ForPartial(BigArrays.NON_RECYCLING_INSTANCE, null, t, mock(AggregationBuilder.class));
}

public AggregationReduceContext forFinalReduction() {
return new AggregationReduceContext.ForFinal(
BigArrays.NON_RECYCLING_INSTANCE,
null,
t,
mock(AggregationBuilder.class),
b -> {},
PipelineAggregator.PipelineTree.EMPTY,
t
PipelineAggregator.PipelineTree.EMPTY
);
};
});
Expand Down

0 comments on commit 6efc28b

Please sign in to comment.