Skip to content

Commit

Permalink
Use the breadth first collection mode for significant terms aggs. (#2…
Browse files Browse the repository at this point in the history
…9042)

This helps avoid memory issues when computing deep sub-aggregations. Because it
should be rare to use sub-aggregations with significant terms, we opted to always
choose breadth first as opposed to exposing a `collect_mode` option.

Closes #28652.
  • Loading branch information
matarrese authored and jtibshirani committed Apr 11, 2019
1 parent 0f49684 commit 79c7a57
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,12 @@ It is possible (although rarely required) to filter the values for which buckets
`exclude` parameters which are based on a regular expression string or arrays of exact terms. This functionality mirrors the features
described in the <<search-aggregations-bucket-terms-aggregation,terms aggregation>> documentation.

==== Collect mode

To avoid memory issues, the `significant_terms` aggregation always computes child aggregations in `breadth_first` mode.
A description of the different collection modes can be found in the
<<search-aggregations-bucket-terms-aggregation-collect, terms aggregation>> documentation.

==== Execution hint

There are different mechanisms by which terms aggregations can be executed:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,7 @@ fields, then use `copy_to` in your mapping to create a new dedicated field at
index time which contains the values from both fields. You can aggregate on
this single field, which will benefit from the global ordinals optimization.

[[search-aggregations-bucket-terms-aggregation-collect]]
==== Collect mode

Deferring calculation of child aggregations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public GlobalOrdinalsSignificantTermsAggregator(String name,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
super(name, factories, valuesSource, null, format, bucketCountThresholds, includeExclude, context, parent,
forceRemapGlobalOrds, SubAggCollectionMode.DEPTH_FIRST, false, pipelineAggregators, metaData);
forceRemapGlobalOrds, SubAggCollectionMode.BREADTH_FIRST, false, pipelineAggregators, metaData);
this.significanceHeuristic = significanceHeuristic;
this.termsAggFactory = termsAggFactory;
this.numCollectedDocs = 0;
Expand Down Expand Up @@ -146,12 +146,19 @@ public SignificantStringTerms buildAggregation(long owningBucketOrdinal) throws
}

final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()];
final long[] survivingBucketOrds = new long[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
final SignificantStringTerms.Bucket bucket = ordered.pop();
survivingBucketOrds[i] = bucket.bucketOrd;
list[i] = bucket;
}

runDeferredCollections(survivingBucketOrds);

for (SignificantStringTerms.Bucket bucket : list) {
// the terms are owned by the BytesRefHash, we need to pull a copy since the BytesRef hash data may be recycled at some point
bucket.termBytes = BytesRef.deepCopyOf(bucket.termBytes);
bucket.aggregations = bucketAggregations(bucket.bucketOrd);
list[i] = bucket;
}

return new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public SignificantLongTermsAggregator(String name, AggregatorFactories factories
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {

super(name, factories, valuesSource, format, null, bucketCountThresholds, context, parent,
SubAggCollectionMode.DEPTH_FIRST, false, includeExclude, pipelineAggregators, metaData);
SubAggCollectionMode.BREADTH_FIRST, false, includeExclude, pipelineAggregators, metaData);
this.significanceHeuristic = significanceHeuristic;
this.termsAggFactory = termsAggFactory;
}
Expand Down Expand Up @@ -106,12 +106,20 @@ public SignificantLongTerms buildAggregation(long owningBucketOrdinal) throws IO
}
}

final SignificantLongTerms.Bucket[] list = new SignificantLongTerms.Bucket[ordered.size()];
SignificantLongTerms.Bucket[] list = new SignificantLongTerms.Bucket[ordered.size()];
final long[] survivingBucketOrds = new long[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
final SignificantLongTerms.Bucket bucket = ordered.pop();
bucket.aggregations = bucketAggregations(bucket.bucketOrd);
survivingBucketOrds[i] = bucket.bucketOrd;
list[i] = bucket;
}

runDeferredCollections(survivingBucketOrds);

for (SignificantLongTerms.Bucket bucket : list) {
bucket.aggregations = bucketAggregations(bucket.bucketOrd);
}

return new SignificantLongTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
pipelineAggregators(), metaData(), format, subsetSize, supersetSize, significanceHeuristic, Arrays.asList(list));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public SignificantStringTermsAggregator(String name, AggregatorFactories factori
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {

super(name, factories, valuesSource, null, format, bucketCountThresholds, includeExclude, aggregationContext, parent,
SubAggCollectionMode.DEPTH_FIRST, false, pipelineAggregators, metaData);
SubAggCollectionMode.BREADTH_FIRST, false, pipelineAggregators, metaData);
this.significanceHeuristic = significanceHeuristic;
this.termsAggFactory = termsAggFactory;
}
Expand Down Expand Up @@ -113,12 +113,20 @@ public SignificantStringTerms buildAggregation(long owningBucketOrdinal) throws
}

final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()];
final long[] survivingBucketOrds = new long[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
final SignificantStringTerms.Bucket bucket = ordered.pop();
// the terms are owned by the BytesRefHash, we need to pull a copy since the BytesRef hash data may be recycled at some point
survivingBucketOrds[i] = bucket.bucketOrd;
list[i] = bucket;
}

runDeferredCollections(survivingBucketOrds);

for (SignificantStringTerms.Bucket bucket : list) {
// the terms are owned by the BytesRefHash, we need to pull a copy since the BytesRef hash data may be
// recycled at some point
bucket.termBytes = BytesRef.deepCopyOf(bucket.termBytes);
bucket.aggregations = bucketAggregations(bucket.bucketOrd);
list[i] = bucket;
}

return new SignificantStringTerms( name, bucketCountThresholds.getRequiredSize(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.QueryShardException;
import org.elasticsearch.plugins.Plugin;
Expand All @@ -38,6 +39,7 @@
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTerms;
Expand Down Expand Up @@ -543,6 +545,37 @@ public void testScoresEqualForPositiveAndNegative(SignificanceHeuristic heuristi
}
}

/**
* A simple test that adds a sub-aggregation to a significant terms aggregation,
* to help check that sub-aggregation collection is handled correctly.
*/
public void testSubAggregations() throws Exception {
indexEqualTestData();

QueryBuilder query = QueryBuilders.termsQuery(TEXT_FIELD, "a", "b");
AggregationBuilder subAgg = terms("class").field(CLASS_FIELD);
AggregationBuilder agg = significantTerms("significant_terms")
.field(TEXT_FIELD)
.executionHint(randomExecutionHint())
.significanceHeuristic(new ChiSquare(true, true))
.minDocCount(1).shardSize(1000).size(1000)
.subAggregation(subAgg);

SearchResponse response = client().prepareSearch("test")
.setQuery(query)
.addAggregation(agg)
.get();
assertSearchResponse(response);

SignificantTerms sigTerms = response.getAggregations().get("significant_terms");
assertThat(sigTerms.getBuckets().size(), equalTo(2));

for (SignificantTerms.Bucket bucket : sigTerms) {
StringTerms terms = bucket.getAggregations().get("class");
assertThat(terms.getBuckets().size(), equalTo(2));
}
}

private void indexEqualTestData() throws ExecutionException, InterruptedException {
assertAcked(prepareCreate("test")
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0))
Expand Down

0 comments on commit 79c7a57

Please sign in to comment.