Skip to content

Commit

Permalink
Improve cardinality measure used to build aggs (#56533) (#59107)
Browse files Browse the repository at this point in the history
This makes a `parentCardinality` available to every `Aggregator`'s ctor
so it can make intelligent choices about how it collects bucket values.
This replaces `collectsFromSingleBucket` and is similar to it but:
1. It supports `NONE`, `ONE`, and `MANY` values and is generally
   extensible if we decide we can use more precise counts.
2. It is more accurate. `collectsFromSingleBucket` assumed that all
   sub-aggregations live under multi-bucket aggregations. This is
   normally true but `parentCardinality` is properly carried forward
   for single bucket aggregations like `filter` and for multi-bucket
   aggregations configured in single-bucket for like `range` with a
   single range.

While I was touching every aggregation I renamed `doCreateInternal` to
`createMapped` because that seemed like a much better name and it was
right there, next to the change I was already making.

Relates to #56487

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
  • Loading branch information
nik9000 and elasticmachine committed Jul 8, 2020
1 parent 90c8d3f commit a29d351
Show file tree
Hide file tree
Showing 110 changed files with 792 additions and 291 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
import org.elasticsearch.search.aggregations.support.ArrayValuesSourceAggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
Expand Down Expand Up @@ -60,7 +61,7 @@ protected Aggregator createUnmapped(SearchContext searchContext,
protected Aggregator doCreateInternal(Map<String, ValuesSource> valuesSources,
SearchContext searchContext,
Aggregator parent,
boolean collectsFromSingleBucket,
CardinalityUpperBound cardinality,
Map<String, Object> metadata) throws IOException {
Map<String, ValuesSource.Numeric> typedValuesSources = new HashMap<>(valuesSources.size());
for (Map.Entry<String, ValuesSource> entry : valuesSources.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
Expand All @@ -45,7 +46,7 @@ public ArrayValuesSourceAggregatorFactory(String name, Map<String, ValuesSourceC
@Override
public Aggregator createInternal(SearchContext searchContext,
Aggregator parent,
boolean collectsFromSingleBucket,
CardinalityUpperBound cardinality,
Map<String, Object> metadata) throws IOException {
HashMap<String, ValuesSource> valuesSources = new HashMap<>();

Expand All @@ -58,17 +59,29 @@ public Aggregator createInternal(SearchContext searchContext,
if (valuesSources.isEmpty()) {
return createUnmapped(searchContext, parent, metadata);
}
return doCreateInternal(valuesSources, searchContext, parent, collectsFromSingleBucket, metadata);
return doCreateInternal(valuesSources, searchContext, parent, cardinality, metadata);
}

/**
* Create the {@linkplain Aggregator} when none of the configured
* fields can be resolved to a {@link ValuesSource}.
*/
protected abstract Aggregator createUnmapped(SearchContext searchContext,
Aggregator parent,
Map<String, Object> metadata) throws IOException;

/**
* Create the {@linkplain Aggregator} when any of the configured
* fields can be resolved to a {@link ValuesSource}.
*
* @param cardinality Upper bound of the number of {@code owningBucketOrd}s
* that the {@link Aggregator} created by this method
* will be asked to collect.
*/
protected abstract Aggregator doCreateInternal(Map<String, ValuesSource> valuesSources,
SearchContext searchContext,
Aggregator parent,
boolean collectsFromSingleBucket,
CardinalityUpperBound cardinality,
Map<String, Object> metadata) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.NonCollectingAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
Expand Down Expand Up @@ -69,7 +70,7 @@ public InternalAggregation buildEmptyAggregation() {

@Override
protected Aggregator doCreateInternal(SearchContext searchContext, Aggregator parent,
boolean collectsFromSingleBucket,
CardinalityUpperBound cardinality,
Map<String, Object> metadata) throws IOException {

ValuesSource rawValuesSource = config.getValuesSource();
Expand All @@ -80,7 +81,7 @@ protected Aggregator doCreateInternal(SearchContext searchContext, Aggregator pa
WithOrdinals valuesSource = (WithOrdinals) rawValuesSource;
long maxOrd = valuesSource.globalMaxOrd(searchContext.searcher());
return new ParentToChildrenAggregator(name, factories, searchContext, parent, childFilter,
parentFilter, valuesSource, maxOrd, collectsFromSingleBucket, metadata);
parentFilter, valuesSource, maxOrd, cardinality, metadata);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.common.ParseField;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
Expand All @@ -40,8 +41,8 @@ public class ChildrenToParentAggregator extends ParentJoinAggregator {
public ChildrenToParentAggregator(String name, AggregatorFactories factories,
SearchContext context, Aggregator parent, Query childFilter,
Query parentFilter, ValuesSource.Bytes.WithOrdinals valuesSource,
long maxOrd, boolean collectsFromSingleBucket, Map<String, Object> metadata) throws IOException {
super(name, factories, context, parent, childFilter, parentFilter, valuesSource, maxOrd, collectsFromSingleBucket, metadata);
long maxOrd, CardinalityUpperBound cardinality, Map<String, Object> metadata) throws IOException {
super(name, factories, context, parent, childFilter, parentFilter, valuesSource, maxOrd, cardinality, metadata);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.NonCollectingAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
Expand Down Expand Up @@ -58,8 +59,7 @@ public ParentAggregatorFactory(String name,
}

@Override
protected Aggregator createUnmapped(SearchContext searchContext, Aggregator parent,
Map<String, Object> metadata) throws IOException {
protected Aggregator createUnmapped(SearchContext searchContext, Aggregator parent, Map<String, Object> metadata) throws IOException {
return new NonCollectingAggregator(name, searchContext, parent, metadata) {
@Override
public InternalAggregation buildEmptyAggregation() {
Expand All @@ -70,7 +70,7 @@ public InternalAggregation buildEmptyAggregation() {

@Override
protected Aggregator doCreateInternal(SearchContext searchContext, Aggregator children,
boolean collectsFromSingleBucket,
CardinalityUpperBound cardinality,
Map<String, Object> metadata) throws IOException {

ValuesSource rawValuesSource = config.getValuesSource();
Expand All @@ -81,7 +81,7 @@ protected Aggregator doCreateInternal(SearchContext searchContext, Aggregator ch
WithOrdinals valuesSource = (WithOrdinals) rawValuesSource;
long maxOrd = valuesSource.globalMaxOrd(searchContext.searcher());
return new ChildrenToParentAggregator(name, factories, searchContext, children, childFilter,
parentFilter, valuesSource, maxOrd, collectsFromSingleBucket, metadata);
parentFilter, valuesSource, maxOrd, cardinality, metadata);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.common.util.BitArray;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
Expand Down Expand Up @@ -68,9 +69,13 @@ public ParentJoinAggregator(String name,
Query outFilter,
ValuesSource.Bytes.WithOrdinals valuesSource,
long maxOrd,
boolean collectsFromSingleBucket,
CardinalityUpperBound cardinality,
Map<String, Object> metadata) throws IOException {
super(name, factories, context, parent, metadata);
/*
* We have to use MANY to work around
* https://github.com/elastic/elasticsearch/issues/59097
*/
super(name, factories, context, parent, CardinalityUpperBound.MANY, metadata);

if (maxOrd > Integer.MAX_VALUE) {
throw new IllegalStateException("the number of parent [" + maxOrd + "] + is greater than the allowed limit " +
Expand All @@ -82,9 +87,9 @@ public ParentJoinAggregator(String name,
this.outFilter = context.searcher().createWeight(context.searcher().rewrite(outFilter), ScoreMode.COMPLETE_NO_SCORES, 1f);
this.valuesSource = valuesSource;
boolean singleAggregator = parent == null;
collectionStrategy = singleAggregator && collectsFromSingleBucket
collectionStrategy = singleAggregator && cardinality == CardinalityUpperBound.ONE
? new DenseCollectionStrategy(maxOrd, context.bigArrays())
: new SparseCollectionStrategy(context.bigArrays(), collectsFromSingleBucket);
: new SparseCollectionStrategy(context.bigArrays(), cardinality);
}

@Override
Expand Down Expand Up @@ -215,8 +220,8 @@ public void close() {
protected class SparseCollectionStrategy implements CollectionStrategy {
private final LongKeyedBucketOrds ordsHash;

public SparseCollectionStrategy(BigArrays bigArrays, boolean collectsFromSingleBucket) {
ordsHash = LongKeyedBucketOrds.build(bigArrays, collectsFromSingleBucket);
public SparseCollectionStrategy(BigArrays bigArrays, CardinalityUpperBound cardinality) {
ordsHash = LongKeyedBucketOrds.build(bigArrays, cardinality);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.common.ParseField;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;
Expand All @@ -36,8 +37,8 @@ public class ParentToChildrenAggregator extends ParentJoinAggregator {
public ParentToChildrenAggregator(String name, AggregatorFactories factories,
SearchContext context, Aggregator parent, Query childFilter,
Query parentFilter, ValuesSource.Bytes.WithOrdinals valuesSource,
long maxOrd, boolean collectsFromSingleBucket, Map<String, Object> metadata) throws IOException {
super(name, factories, context, parent, parentFilter, childFilter, valuesSource, maxOrd, collectsFromSingleBucket, metadata);
long maxOrd, CardinalityUpperBound cardinality, Map<String, Object> metadata) throws IOException {
super(name, factories, context, parent, parentFilter, childFilter, valuesSource, maxOrd, cardinality, metadata);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,19 @@ public PipelineTree buildPipelineTree() {
}

/**
* Rough measure of how many buckets this aggregation can return. Just
* "zero", "one", and "many".
* A rough count of the number of buckets that {@link Aggregator}s built
* by this builder will contain per parent bucket used to validate sorts
* and pipeline aggregations. Just "zero", "one", and "many".
* <p>
* Unlike {@link CardinalityUpperBound} which is <strong>total</strong>
* instead of <strong>per parent bucket</strong>.
*/
public enum BucketCardinality {
NONE, ONE, MANY;
}
/**
* Do aggregations built by this builder contain buckets? If so, do they
* contain *always* contain a single bucket?
* A rough count of the number of buckets that {@link Aggregator}s built
* by this builder will contain per owning parent bucket.
*/
public abstract BucketCardinality bucketCardinality();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,18 @@ public abstract class AggregatorBase extends Aggregator {
* @param factories The factories for all the sub-aggregators under this aggregator
* @param context The aggregation context
* @param parent The parent aggregator (may be {@code null} for top level aggregators)
* @param subAggregatorCardinality Upper bound of the number of buckets that sub aggregations will collect
* @param metadata The metadata associated with this aggregator
*/
protected AggregatorBase(String name, AggregatorFactories factories, SearchContext context, Aggregator parent,
Map<String, Object> metadata) throws IOException {
CardinalityUpperBound subAggregatorCardinality, Map<String, Object> metadata) throws IOException {
this.name = name;
this.metadata = metadata;
this.parent = parent;
this.context = context;
this.breakerService = context.bigArrays().breakerService();
assert factories != null : "sub-factories provided to BucketAggregator must not be null, use AggragatorFactories.EMPTY instead";
this.subAggregators = factories.createSubAggregators(context, this);
this.subAggregators = factories.createSubAggregators(context, this, subAggregatorCardinality);
context.addReleasable(this, Lifetime.PHASE);
final SearchShardTarget shardTarget = context.shardTarget();
// Register a safeguard to highlight any invalid construction logic (call to this constructor without subsequent preCollection call)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,16 +184,15 @@ private AggregatorFactories(AggregatorFactory[] factories) {
/**
* Create all aggregators so that they can be consumed with multiple
* buckets.
* @param cardinality Upper bound of the number of {@code owningBucketOrd}s
* that {@link Aggregator}s created by this method will
* be asked to collect.
*/
public Aggregator[] createSubAggregators(SearchContext searchContext, Aggregator parent) throws IOException {
public Aggregator[] createSubAggregators(SearchContext searchContext, Aggregator parent, CardinalityUpperBound cardinality)
throws IOException {
Aggregator[] aggregators = new Aggregator[countAggregators()];
for (int i = 0; i < factories.length; ++i) {
// TODO: sometimes even sub aggregations always get called with bucket 0, eg. if
// you have a terms agg under a top-level filter agg. We should have a way to
// propagate the fact that only bucket 0 will be collected with single-bucket
// aggs
final boolean collectsFromSingleBucket = false;
Aggregator factory = factories[i].create(searchContext, parent, collectsFromSingleBucket);
Aggregator factory = factories[i].create(searchContext, parent, cardinality);
Profilers profilers = factory.context().getProfilers();
if (profilers != null) {
factory = new ProfilingAggregator(factory, profilers.getAggregationProfiler());
Expand All @@ -207,9 +206,11 @@ public Aggregator[] createTopLevelAggregators(SearchContext searchContext) throw
// These aggregators are going to be used with a single bucket ordinal, no need to wrap the PER_BUCKET ones
Aggregator[] aggregators = new Aggregator[factories.length];
for (int i = 0; i < factories.length; i++) {
// top-level aggs only get called with bucket 0
final boolean collectsFromSingleBucket = true;
Aggregator factory = factories[i].create(searchContext, null, collectsFromSingleBucket);
/*
* Top level aggs only collect from owningBucketOrd 0 which is
* *exactly* what CardinalityUpperBound.ONE *means*.
*/
Aggregator factory = factories[i].create(searchContext, null, CardinalityUpperBound.ONE);
Profilers profilers = factory.context().getProfilers();
if (profilers != null) {
factory = new ProfilingAggregator(factory, profilers.getAggregationProfiler());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,28 +61,20 @@ public void doValidate() {

protected abstract Aggregator createInternal(SearchContext searchContext,
Aggregator parent,
boolean collectsFromSingleBucket,
CardinalityUpperBound cardinality,
Map<String, Object> metadata) throws IOException;

/**
* Creates the aggregator
* Creates the aggregator.
*
*
* @param searchContext
* The search context
* @param parent
* The parent aggregator (if this is a top level factory, the
* parent will be {@code null})
* @param collectsFromSingleBucket
* If true then the created aggregator will only be collected
* with {@code 0} as a bucket ordinal. Some factories can take
* advantage of this in order to return more optimized
* implementations.
*
* @return The created aggregator
* @param parent The parent aggregator (if this is a top level factory, the
* parent will be {@code null})
* @param cardinality Upper bound of the number of {@code owningBucketOrd}s
* that the {@link Aggregator} created by this method
* will be asked to collect.
*/
public final Aggregator create(SearchContext searchContext, Aggregator parent, boolean collectsFromSingleBucket) throws IOException {
return createInternal(searchContext, parent, collectsFromSingleBucket, this.metadata);
public final Aggregator create(SearchContext searchContext, Aggregator parent, CardinalityUpperBound cardinality) throws IOException {
return createInternal(searchContext, parent, cardinality, this.metadata);
}

public AggregatorFactory getParent() {
Expand Down

0 comments on commit a29d351

Please sign in to comment.