Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust Histogram's bucket accounting to be iteratively #102172

Merged
merged 2 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/102172.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 102172
summary: Adjust Histogram's bucket accounting to be iteratively
area: Aggregations
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ public abstract class InternalMultiBucketAggregation<
A extends InternalMultiBucketAggregation,
B extends InternalMultiBucketAggregation.InternalBucket> extends InternalAggregation implements MultiBucketsAggregation {

/**
* When we pre-count the empty buckets we report them periodically
* because you can configure the date_histogram to create an astounding
* number of buckets. It'd take a while to count that high only to abort.
* So we report every couple thousand buckets. It's be simpler to report
* every single bucket we plan to allocate one at a time but that'd cause
* needless overhead on the circuit breakers. Counting a couple thousand
* buckets is plenty fast to fail this quickly in pathological cases and
* plenty large to keep the overhead minimal.
*/
protected static final int REPORT_EMPTY_EVERY = 10_000;

public InternalMultiBucketAggregation(String name, Map<String, Object> metadata) {
super(name, metadata);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,18 +373,6 @@ protected Bucket reduceBucket(List<Bucket> buckets, AggregationReduceContext con
return createBucket(buckets.get(0).key, docCount, aggs);
}

/**
* When we pre-count the empty buckets we report them periodically
* because you can configure the date_histogram to create an astounding
* number of buckets. It'd take a while to count that high only to abort.
* So we report every couple thousand buckets. It's be simpler to report
* every single bucket we plan to allocate one at a time but that'd cause
* needless overhead on the circuit breakers. Counting a couple thousand
* buckets is plenty fast to fail this quickly in pathological cases and
* plenty large to keep the overhead minimal.
*/
private static final int REPORT_EMPTY_EVERY = 10_000;

private void addEmptyBuckets(List<Bucket> list, AggregationReduceContext reduceContext) {
/*
* Make sure we have space for the empty buckets we're going to add by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,11 @@ protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Buck
for (InternalAggregation aggregation : aggregations) {
InternalHistogram histogram = (InternalHistogram) aggregation;
if (histogram.buckets.isEmpty() == false) {
pq.add(new IteratorAndCurrent<Bucket>(histogram.buckets.iterator()));
pq.add(new IteratorAndCurrent<>(histogram.buckets.iterator()));
}
}

int consumeBucketCount = 0;
List<Bucket> reducedBuckets = new ArrayList<>();
if (pq.size() > 0) {
// list of buckets coming from different shards that have the same key
Expand All @@ -310,6 +311,10 @@ protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Buck
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
reducedBuckets.add(reduced);
if (consumeBucketCount++ >= REPORT_EMPTY_EVERY) {
reduceContext.consumeBucketsAndMaybeBreak(consumeBucketCount);
consumeBucketCount = 0;
}
}
currentBuckets.clear();
key = top.current().key;
Expand All @@ -330,10 +335,15 @@ protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Buck
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
reducedBuckets.add(reduced);
if (consumeBucketCount++ >= REPORT_EMPTY_EVERY) {
reduceContext.consumeBucketsAndMaybeBreak(consumeBucketCount);
consumeBucketCount = 0;
}
}
}
}

reduceContext.consumeBucketsAndMaybeBreak(consumeBucketCount);
return reducedBuckets;
}

Expand All @@ -358,26 +368,14 @@ private double round(double key) {
return Math.floor((key - emptyBucketInfo.offset) / emptyBucketInfo.interval) * emptyBucketInfo.interval + emptyBucketInfo.offset;
}

/**
* When we pre-count the empty buckets we report them periodically
* because you can configure the histogram to create more buckets than
* there are atoms in the universe. It'd take a while to count that high
* only to abort. So we report every couple thousand buckets. It's be
* simpler to report every single bucket we plan to allocate one at a time
* but that'd cause needless overhead on the circuit breakers. Counting a
* couple thousand buckets is plenty fast to fail this quickly in
* pathological cases and plenty large to keep the overhead minimal.
*/
private static final int REPORT_EMPTY_EVERY = 10_000;

private void addEmptyBuckets(List<Bucket> list, AggregationReduceContext reduceContext) {
/*
* Make sure we have space for the empty buckets we're going to add by
* counting all of the empties we plan to add and firing them into
* consumeBucketsAndMaybeBreak.
*/
class Counter implements DoubleConsumer {
private int size = list.size();
private int size = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this changed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method only adds empty buckets. Before accounting of non empty buckets was either here or in the method calling this method of no empty buckets were requested. Now accounting of non empty buckets happens before this method is invoked. So size can be initialised with 0 instead of list.size() (this will contain non empty buckets if exists).


@Override
public void accept(double key) {
Expand Down Expand Up @@ -456,11 +454,9 @@ private void iterateEmptyBuckets(List<Bucket> list, ListIterator<Bucket> iter, D
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
List<Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext);
boolean alreadyAccountedForBuckets = false;
if (reduceContext.isFinalReduce()) {
if (minDocCount == 0) {
addEmptyBuckets(reducedBuckets, reduceContext);
alreadyAccountedForBuckets = true;
}
if (InternalOrder.isKeyDesc(order)) {
// we just need to reverse here...
Expand All @@ -474,9 +470,6 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Aggreg
CollectionUtil.introSort(reducedBuckets, order.comparator());
}
}
if (false == alreadyAccountedForBuckets) {
reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size());
}
return new InternalHistogram(getName(), reducedBuckets, order, minDocCount, emptyBucketInfo, format, keyed, getMetadata());
}

Expand Down