Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new cluster setting to limit the total number of buckets returned by a request #27581

Merged
merged 13 commits into from
Dec 6, 2017

Conversation

jimczi
Copy link
Contributor

@jimczi jimczi commented Nov 29, 2017

This commit adds a new dynamic cluster setting named search.max_buckets that can be used to limit the number
of buckets created per shard or by the reduce phase. Each multi bucket aggregator can consume buckets during the final build
of the aggregation at the shard level or during the reduce phase (final or not) in the coordinating node. When an aggregator consumes a bucket,
a global count for the request is incremented and if this number is greater than the limit an exception is thrown (TooManyBuckets exception).
This change adds the ability for multi bucket aggregator to "consume" buckets in the global limit, the default is 10,000.
It's an opt-in consumer so each multi-bucket aggregator must explicitly call the consumer when a bucket is added in the response.

Closes #27452 #26012

Copy link
Contributor

@colings86 colings86 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some minor comments but LGTM (once the build passes ;) )

this(settings, (b) -> new ReduceContext(bigArrays, scriptService, b));
}

public SearchPhaseController(Settings settings, Function<Boolean, ReduceContext> reduceContextFunction) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some JavaDocs here explaining what the reduceContextFunction is?

import java.util.function.BiFunction;
import java.util.function.IntConsumer;

public class MultiBucketConsumerService {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add JavaDocs to this class to explain what it is used for?

this.maxBucket = maxBucket;
}

public static class TooManyBuckets extends ElasticsearchException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we name this TooManyBucketsException? Also should this extend AggregationExecutionException?


@Override
public void accept(int value) {
count += value;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a note explaining that its ok that the count is not an AtomicInteger since aggregations execute in a single thread?

@jimczi jimczi removed the WIP label Nov 30, 2017
@jimczi
Copy link
Contributor Author

jimczi commented Nov 30, 2017

Thanks @colings86
I pushed more commits to address your review and to handle all multi bucket aggregations. Can you take another look ?

Copy link
Contributor

@colings86 colings86 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jimczi i left some more comments

@@ -131,6 +131,9 @@ public SignificantStringTerms buildAggregation(long owningBucketOrdinal) throws
// global stats
spare.updateScore(significanceHeuristic);
spare = ordered.insertWithOverflow(spare);
if (spare == null) {
consumeBucketsAndMaybeBreak(1);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be an else here to remove the buckets inside spare if its not null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no bucket inside spare yet. We are just selecting the top terms here. When the selection is done we build the bucket aggregation for each top terms only so we'll count the inner buckets only for the final top terms.

@@ -101,6 +101,9 @@ public SignificantLongTerms buildAggregation(long owningBucketOrdinal) throws IO

spare.bucketOrd = i;
spare = ordered.insertWithOverflow(spare);
if (spare == null) {
consumeBucketsAndMaybeBreak(1);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above, should there be an else here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

@@ -107,6 +107,9 @@ public SignificantStringTerms buildAggregation(long owningBucketOrdinal) throws

spare.bucketOrd = i;
spare = ordered.insertWithOverflow(spare);
if (spare == null) {
consumeBucketsAndMaybeBreak(1);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above, should there be an else here?

@@ -204,6 +204,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE
if (bucketCountThresholds.getShardMinDocCount() <= spare.docCount) {
spare = ordered.insertWithOverflow(spare);
if (spare == null) {
consumeBucketsAndMaybeBreak(1);
spare = new OrdBucket(-1, 0, null, showTermDocCountError, 0);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above, should there be an else here?

spare = ordered.insertWithOverflow(spare);
if (spare == null) {
consumeBucketsAndMaybeBreak(1);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above, should there be an else here?

@@ -144,6 +144,9 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE
spare.bucketOrd = i;
if (bucketCountThresholds.getShardMinDocCount() <= spare.docCount) {
spare = ordered.insertWithOverflow(spare);
if (spare == null) {
consumeBucketsAndMaybeBreak(1);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above, should there be an else here?

for (Aggregation agg : bucket.getAggregations().asList()) {
if (agg instanceof MultiBucketsAggregation) {
count += countInnerBucket((MultiBucketsAggregation) agg);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to check single bucket aggregations here too? especially as they might contain more mutli-bucket aggregations below themZ

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I pushed 3fd2522

for (Aggregation bucketAgg : bucket.getAggregations().asList()) {
if (bucketAgg instanceof MultiBucketsAggregation) {
size += countInnerBucket((MultiBucketsAggregation) bucketAgg);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

Copy link
Contributor

@colings86 colings86 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

…ed by a request

This commit adds a new dynamic cluster setting named `search.max_buckets` that can be used to limit the number
of buckets created per shard or by the reduce phase. Each multi bucket aggregator can consume buckets during the final build
of the aggregation at the shard level or during the reduce phase (final or not) in the coordinating node. When an aggregator consumes a bucket,
a global count for the request is incremented and if this number is greater than the limit an exception is thrown (TooManyBuckets exception).
This change adds the ability for multi bucket aggregator to "consume" buckets in the global limit, the default is 10,000.
It's an opt-in consumer so each multi-bucket aggregator must explicitly call the consumer when a bucket is added in the response.
The `date_histogram` is the only aggregator that respects this limit in this change. Support for the other multi-bucket aggregator
will be added if the global method is approved.
@jimczi jimczi merged commit caea6b7 into elastic:master Dec 6, 2017
@jimczi jimczi deleted the enhancements/max_bucket branch December 6, 2017 08:15
jasontedor added a commit to jasontedor/elasticsearch that referenced this pull request Dec 6, 2017
* master:
  Add a new cluster setting to limit the total number of buckets returned by a request (elastic#27581)
  Allow index settings to be reset by wildcards (elastic#27671)
  Fix UpdateMappingIntegrationIT test failures
  Correct docs for binary fields and their default for doc values (elastic#27680)
  Clarify IntelliJ IDEA Jar Hell fix (elastic#27635)
  Add validation of keystore setting names (elastic#27626)
  Prevent constructing index template without patterns (elastic#27662)
  [DOCS] Fixed typos and broken attribute.
  Add support for filtering mappings fields (elastic#27603)
  [DOCS] Added link to upgrade guide and bumped the upgrade topic up to the top level (elastic#27621)
  [Geo] Add Well Known Text (WKT) Parsing Support to ShapeBuilders
  Fix up tests now that GeoDistance.*.calculate works (elastic#27541)
  [Docs] Fix parameter name (elastic#27656)
jimczi added a commit that referenced this pull request Dec 11, 2017
…ed by a request (#27581)

This commit adds a new dynamic cluster setting named `search.max_buckets` that can be used to limit the number of buckets created per shard or by the reduce phase. Each multi bucket aggregator can consume buckets during the final build of the aggregation at the shard level or during the reduce phase (final or not) in the coordinating node. When an aggregator consumes a bucket, a global count for the request is incremented and if this number is greater than the limit an exception is thrown (TooManyBuckets exception).
This change adds the ability for multi bucket aggregator to "consume" buckets in the global limit, the default is 10,000. It's an opt-in consumer so each multi-bucket aggregator must explicitly call the consumer when a bucket is added in the response.

Closes #27452 #26012
@IdanWo
Copy link

IdanWo commented Mar 17, 2018

This means that the following terms aggregation query will get error when hitting the limit of 10,000?

GET products / _search {
	"aggs" : {
		"terms":{
			"field" : "Company",
			"size" : 10,
			"shard_size" : 2147483647 //INT.MAX VALUE
		}
	}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants