Skip to content

Commit

Permalink
Increase search.max_buckets to 65,535 (#57042)
Browse files Browse the repository at this point in the history
Increases the default search.max_buckets limit to 65,535, and only counts
buckets during reduce phase.

Closes #51731
  • Loading branch information
imotov committed Jun 3, 2020
1 parent e0a15e8 commit 8d7f389
Show file tree
Hide file tree
Showing 31 changed files with 92 additions and 179 deletions.
2 changes: 1 addition & 1 deletion docs/reference/aggregations/bucket.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ define fixed number of multiple buckets, and others dynamically create the bucke

NOTE: The maximum number of buckets allowed in a single response is limited by a
dynamic cluster setting named
<<search-settings-max-buckets,`search.max_buckets`>>. It defaults to 10,000,
<<search-settings-max-buckets,`search.max_buckets`>>. It defaults to 65,535,
requests that try to return more than the limit will fail with an exception.

include::bucket/adjacency-matrix-aggregation.asciidoc[]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@
* An aggregation service that creates instances of {@link MultiBucketConsumer}.
* The consumer is used by {@link BucketsAggregator} and {@link InternalMultiBucketAggregation} to limit the number of buckets created
* in {@link Aggregator#buildAggregations} and {@link InternalAggregation#reduce}.
* The limit can be set by changing the `search.max_buckets` cluster setting and defaults to 10000.
* The limit can be set by changing the `search.max_buckets` cluster setting and defaults to 65535.
*/
public class MultiBucketConsumerService {
public static final int DEFAULT_MAX_BUCKETS = 10000;
public static final int DEFAULT_MAX_BUCKETS = 65535;
public static final Setting<Integer> MAX_BUCKET_SETTING =
Setting.intSetting("search.max_buckets", DEFAULT_MAX_BUCKETS, 0, Setting.Property.NodeScope, Setting.Property.Dynamic);

Expand Down Expand Up @@ -102,6 +102,7 @@ public static class MultiBucketConsumer implements IntConsumer {

// aggregations execute in a single thread so no atomic here
private int count;
private int callCount = 0;

public MultiBucketConsumer(int limit, CircuitBreaker breaker) {
this.limit = limit;
Expand All @@ -110,15 +111,17 @@ public MultiBucketConsumer(int limit, CircuitBreaker breaker) {

@Override
public void accept(int value) {
count += value;
if (count > limit) {
throw new TooManyBucketsException("Trying to create too many buckets. Must be less than or equal to: [" + limit
+ "] but was [" + count + "]. This limit can be set by changing the [" +
MAX_BUCKET_SETTING.getKey() + "] cluster level setting.", limit);
if (value != 0) {
count += value;
if (count > limit) {
throw new TooManyBucketsException("Trying to create too many buckets. Must be less than or equal to: [" + limit
+ "] but was [" + count + "]. This limit can be set by changing the [" +
MAX_BUCKET_SETTING.getKey() + "] cluster level setting.", limit);
}
}

// check parent circuit breaker every 1024 buckets
if (value > 0 && (count & 0x3FF) == 0) {
// check parent circuit breaker every 1024 calls
callCount++;
if ((callCount & 0x3FF) == 0) {
breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ public BucketsAggregator(String name, AggregatorFactories factories, SearchConte
Map<String, Object> metadata) throws IOException {
super(name, factories, context, parent, metadata);
bigArrays = context.bigArrays();
docCounts = bigArrays.newIntArray(1, true);
if (context.aggregations() != null) {
multiBucketConsumer = context.aggregations().multiBucketConsumer();
} else {
multiBucketConsumer = (count) -> {};
}
docCounts = bigArrays.newIntArray(1, true);
}

/**
Expand Down Expand Up @@ -91,7 +91,12 @@ public final void collectBucket(LeafBucketCollector subCollector, int doc, long
* Same as {@link #collectBucket(LeafBucketCollector, int, long)}, but doesn't check if the docCounts needs to be re-sized.
*/
public final void collectExistingBucket(LeafBucketCollector subCollector, int doc, long bucketOrd) throws IOException {
docCounts.increment(bucketOrd, 1);
if (docCounts.increment(bucketOrd, 1) == 1) {
// We calculate the final number of buckets only during the reduce phase. But we still need to
// trigger bucket consumer from time to time in order to give it a chance to check available memory and break
// the execution if we are running out. To achieve that we are passing 0 as a bucket count.
multiBucketConsumer.accept(0);
}
subCollector.collect(doc, bucketOrd);
}

Expand Down Expand Up @@ -137,14 +142,6 @@ public final int bucketDocCount(long bucketOrd) {
}
}

/**
* Adds {@code count} buckets to the global count for the request and fails if this number is greater than
* the maximum number of buckets allowed in a response
*/
protected final void consumeBucketsAndMaybeBreak(int count) {
multiBucketConsumer.accept(count);
}

/**
* Hook to allow taking an action before building buckets.
*/
Expand Down Expand Up @@ -186,7 +183,7 @@ public InternalAggregation get(int index) {
public int size() {
return aggregations.length;
}
});
});
}
return result;
}
Expand Down Expand Up @@ -267,7 +264,6 @@ protected final <B> void buildSubAggsForBuckets(List<B> buckets,
protected final <B> InternalAggregation[] buildAggregationsForFixedBucketCount(long[] owningBucketOrds, int bucketsPerOwningBucketOrd,
BucketBuilderForFixedCount<B> bucketBuilder, Function<List<B>, InternalAggregation> resultBuilder) throws IOException {
int totalBuckets = owningBucketOrds.length * bucketsPerOwningBucketOrd;
consumeBucketsAndMaybeBreak(totalBuckets);
long[] bucketOrdsToCollect = new long[totalBuckets];
int bucketOrdIdx = 0;
for (long owningBucketOrd : owningBucketOrds) {
Expand Down Expand Up @@ -299,7 +295,7 @@ protected interface BucketBuilderForFixedCount<B> {
* @param owningBucketOrds owning bucket ordinals for which to build the results
* @param resultBuilder how to build a result from the sub aggregation results
*/
protected final InternalAggregation[] buildAggregationsForSingleBucket(long[] owningBucketOrds,
protected final InternalAggregation[] buildAggregationsForSingleBucket(long[] owningBucketOrds,
SingleBucketResultBuilder resultBuilder) throws IOException {
/*
* It'd be entirely reasonable to call
Expand Down Expand Up @@ -328,7 +324,6 @@ protected interface SingleBucketResultBuilder {
protected final <B> InternalAggregation[] buildAggregationsForVariableBuckets(long[] owningBucketOrds, LongHash bucketOrds,
BucketBuilderForVariable<B> bucketBuilder, Function<List<B>, InternalAggregation> resultBuilder) throws IOException {
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
consumeBucketsAndMaybeBreak((int) bucketOrds.size());
long[] bucketOrdsToCollect = new long[(int) bucketOrds.size()];
for (int bucketOrd = 0; bucketOrd < bucketOrds.size(); bucketOrd++) {
bucketOrdsToCollect[bucketOrd] = bucketOrd;
Expand Down Expand Up @@ -360,7 +355,6 @@ protected final <B> InternalAggregation[] buildAggregationsForVariableBuckets(lo
throw new AggregationExecutionException("Can't collect more than [" + Integer.MAX_VALUE
+ "] buckets but attempted [" + totalOrdsToCollect + "]");
}
consumeBucketsAndMaybeBreak((int) totalOrdsToCollect);
long[] bucketOrdsToCollect = new long[(int) totalOrdsToCollect];
int b = 0;
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
totalBucketsToBuild++;
}
}
consumeBucketsAndMaybeBreak(totalBucketsToBuild);
long[] bucketOrdsToBuild = new long[totalBucketsToBuild];
int builtBucketIndex = 0;
for (int ord = 0; ord < maxOrd; ord++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,10 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
for (List<InternalBucket> sameRangeList : bucketsMap.values()) {
InternalBucket reducedBucket = reduceBucket(sameRangeList, reduceContext);
if(reducedBucket.docCount >= 1){
reduceContext.consumeBucketsAndMaybeBreak(1);
reducedBuckets.add(reducedBucket);
} else {
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reducedBucket));
}
}
reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size());
Collections.sort(reducedBuckets, Comparator.comparing(InternalBucket::getKey));

InternalAdjacencyMatrix reduced = new InternalAdjacencyMatrix(name, reducedBuckets, getMetadata());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ protected void doPostCollection() throws IOException {
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
// Composite aggregator must be at the top of the aggregation tree
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0L;
consumeBucketsAndMaybeBreak(queue.size());
if (deferredCollectors != NO_OP_COLLECTOR) {
// Replay all documents that contain at least one top bucket (collected during the first pass).
runDeferredCollections();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
if (lastBucket != null && bucketIt.current.compareKey(lastBucket) != 0) {
InternalBucket reduceBucket = reduceBucket(buckets, reduceContext);
buckets.clear();
reduceContext.consumeBucketsAndMaybeBreak(1);
result.add(reduceBucket);
if (result.size() >= size) {
break;
Expand All @@ -207,7 +206,6 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
}
if (buckets.size() > 0) {
InternalBucket reduceBucket = reduceBucket(buckets, reduceContext);
reduceContext.consumeBucketsAndMaybeBreak(1);
result.add(reduceBucket);
}

Expand All @@ -220,6 +218,7 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
reducedFormats = lastBucket.formats;
lastKey = lastBucket.getRawKey();
}
reduceContext.consumeBucketsAndMaybeBreak(result.size());
return new InternalComposite(name, size, sourceNames, reducedFormats, result, lastKey, reverseMuls,
earlyTerminated, metadata);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,24 +109,23 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
InternalGeoGridBucket[][] topBucketsPerOrd = new InternalGeoGridBucket[owningBucketOrds.length][];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]), shardSize);
consumeBucketsAndMaybeBreak(size);


BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size);
InternalGeoGridBucket spare = null;
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
while (ordsEnum.next()) {
if (spare == null) {
spare = newEmptyBucket();
}

// need a special function to keep the source bucket
// up-to-date so it can get the appropriate key
spare.hashAsLong = ordsEnum.value();
spare.docCount = bucketDocCount(ordsEnum.ord());
spare.bucketOrd = ordsEnum.ord();
spare = ordered.insertWithOverflow(spare);
}

topBucketsPerOrd[ordIdx] = new InternalGeoGridBucket[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; --i) {
topBucketsPerOrd[ordIdx][i] = ordered.pop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,14 @@ public InternalGeoGrid reduce(List<InternalAggregation> aggregations, ReduceCont
BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size);
for (LongObjectPagedHashMap.Cursor<List<InternalGeoGridBucket>> cursor : buckets) {
List<InternalGeoGridBucket> sameCellBuckets = cursor.value;
InternalGeoGridBucket removed = ordered.insertWithOverflow(reduceBucket(sameCellBuckets, reduceContext));
if (removed != null) {
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed));
} else {
reduceContext.consumeBucketsAndMaybeBreak(1);
}
ordered.insertWithOverflow(reduceBucket(sameCellBuckets, reduceContext));
}
buckets.close();
InternalGeoGridBucket[] list = new InternalGeoGridBucket[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = ordered.pop();
}
reduceContext.consumeBucketsAndMaybeBreak(list.length);
return create(getName(), requiredSize, Arrays.asList(list), getMetadata());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,6 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
if (reduceRounding.round(top.current.key) != key) {
// the key changes, reduce what we already buffered and reset the buffer for current buckets
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
reduceContext.consumeBucketsAndMaybeBreak(1);
reducedBuckets.add(reduced);
currentBuckets.clear();
key = reduceRounding.round(top.current.key);
Expand All @@ -348,7 +347,6 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {

if (currentBuckets.isEmpty() == false) {
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
reduceContext.consumeBucketsAndMaybeBreak(1);
reducedBuckets.add(reduced);
}
}
Expand Down Expand Up @@ -376,22 +374,17 @@ private List<Bucket> mergeBuckets(List<Bucket> reducedBuckets, Rounding reduceRo
long roundedBucketKey = reduceRounding.round(bucket.key);
if (Double.isNaN(key)) {
key = roundedBucketKey;
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations));
} else if (roundedBucketKey == key) {
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations));
} else {
reduceContext.consumeBucketsAndMaybeBreak(1);
mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext));
sameKeyedBuckets.clear();
key = roundedBucketKey;
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations));
}
}
if (sameKeyedBuckets.isEmpty() == false) {
reduceContext.consumeBucketsAndMaybeBreak(1);
mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext));
}
reducedBuckets = mergedBuckets;
Expand Down Expand Up @@ -449,7 +442,6 @@ private BucketReduceResult addEmptyBuckets(BucketReduceResult currentResult, Red
if (lastBucket != null) {
long key = rounding.nextRoundingValue(lastBucket.key);
while (key < nextBucket.key) {
reduceContext.consumeBucketsAndMaybeBreak(1);
iter.add(new InternalAutoDateHistogram.Bucket(key, 0, format, reducedEmptySubAggs));
key = rounding.nextRoundingValue(key);
}
Expand Down Expand Up @@ -515,7 +507,7 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
// Now finally see if we need to merge consecutive buckets together to make a coarser interval at the same rounding
reducedBucketsResult = maybeMergeConsecutiveBuckets(reducedBucketsResult, reduceContext);
}

reduceContext.consumeBucketsAndMaybeBreak(reducedBucketsResult.buckets.size());
BucketInfo bucketInfo = new BucketInfo(this.bucketInfo.roundingInfos, reducedBucketsResult.roundingIdx,
this.bucketInfo.emptySubAggregations);

Expand Down Expand Up @@ -551,16 +543,13 @@ private BucketReduceResult mergeConsecutiveBuckets(List<Bucket> reducedBuckets,
for (int i = 0; i < reducedBuckets.size(); i++) {
Bucket bucket = reducedBuckets.get(i);
if (i % mergeInterval == 0 && sameKeyedBuckets.isEmpty() == false) {
reduceContext.consumeBucketsAndMaybeBreak(1);
mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext));
sameKeyedBuckets.clear();
key = roundingInfo.rounding.round(bucket.key);
}
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
sameKeyedBuckets.add(new Bucket(Math.round(key), bucket.docCount, format, bucket.aggregations));
}
if (sameKeyedBuckets.isEmpty() == false) {
reduceContext.consumeBucketsAndMaybeBreak(1);
mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext));
}
return new BucketReduceResult(mergedBuckets, roundingInfo, roundingIdx, mergeInterval);
Expand Down
Loading

0 comments on commit 8d7f389

Please sign in to comment.