Skip to content

Commit

Permalink
Speed up partial reduce of terms aggregations (#53216)
Browse files Browse the repository at this point in the history
This change optimizes the merge of terms aggregations by removing
the priority queue that was used to collect all the buckets during
a non-final reduction. We don't need to keep the result sorted since
the merge of buckets in a subsequent reduce can modify the order.
I wrote a small micro-benchmark to test the change and the speed ups
are significative for small merge buffer sizes:

````
########## Master:
Benchmark                           (bufferSize)  (cardinality)  (numShards)  (topNSize)  Mode  Cnt     Score     Error  Units
TermsReduceBenchmark.reduceTopHits             5          10000         1000        1000  avgt   10  2459,690 ± 198,682  ms/op
TermsReduceBenchmark.reduceTopHits            16          10000         1000        1000  avgt   10  1030,620 ±  91,544  ms/op
TermsReduceBenchmark.reduceTopHits            32          10000         1000        1000  avgt   10   558,608 ±  44,915  ms/op
TermsReduceBenchmark.reduceTopHits           128          10000         1000        1000  avgt   10   287,333 ±   8,342  ms/op
TermsReduceBenchmark.reduceTopHits           512          10000         1000        1000  avgt   10   257,325 ±  54,515  ms/op

########## Patch:
Benchmark                           (bufferSize)  (cardinality)  (numShards)  (topNSize)  Mode  Cnt    Score    Error  Units
TermsReduceBenchmark.reduceTopHits             5          10000         1000        1000  avgt   10  805,611 ± 14,630  ms/op
TermsReduceBenchmark.reduceTopHits            16          10000         1000        1000  avgt   10  378,851 ± 17,929  ms/op
TermsReduceBenchmark.reduceTopHits            32          10000         1000        1000  avgt   10  261,094 ± 10,176  ms/op
TermsReduceBenchmark.reduceTopHits           128          10000         1000        1000  avgt   10  241,051 ± 19,558  ms/op
TermsReduceBenchmark.reduceTopHits           512          10000         1000        1000  avgt   10  231,643 ±  6,170  ms/op
````

The code for the benchmark can be found [here](). It seems to be up to 3x faster for terms aggregations
that return 10,000 unique terms (1000 terms per shard). For a cardinality of 100,000 terms, this patch is up to 5x faster:

````
########## Patch:
Benchmark                           (bufferSize)  (cardinality)  (numShards)  (topNSize)  Mode  Cnt      Score     Error  Units
TermsReduceBenchmark.reduceTopHits             5         100000         1000        1000  avgt   10  12791,083 ± 397,128  ms/op
TermsReduceBenchmark.reduceTopHits            16         100000         1000        1000  avgt   10   3974,939 ± 324,617  ms/op
TermsReduceBenchmark.reduceTopHits            32         100000         1000        1000  avgt   10   2186,285 ± 267,124  ms/op
TermsReduceBenchmark.reduceTopHits           128         100000         1000        1000  avgt   10    914,657 ± 160,784  ms/op
TermsReduceBenchmark.reduceTopHits           512         100000         1000        1000  avgt   10    604,198 ± 145,457  ms/op

########## Master:
Benchmark                           (bufferSize)  (cardinality)  (numShards)  (topNSize)  Mode  Cnt      Score     Error  Units
TermsReduceBenchmark.reduceTopHits             5         100000         1000        1000  avgt   10  60696,107 ± 929,944  ms/op
TermsReduceBenchmark.reduceTopHits            16         100000         1000        1000  avgt   10  16292,894 ± 783,398  ms/op
TermsReduceBenchmark.reduceTopHits            32         100000         1000        1000  avgt   10   7705,444 ±  77,588  ms/op
TermsReduceBenchmark.reduceTopHits           128         100000         1000        1000  avgt   10   2156,685 ±  88,795  ms/op
TermsReduceBenchmark.reduceTopHits           512         100000         1000        1000  avgt   10    760,273 ±  53,738  ms/op
````

The merge of buckets can also be optimized. Currently we use an hash map to merge buckets coming from different shards so this can be costly if the number of unique terms is high. Instead, we could always sort the shard terms result by key and perform a merge sort to reduce the results. This would save memory and make the merge more linear in terms
of complexity in the coordinating node at the expense of an additional sort in the shards. I plan to test this possible optimization in a follow up.

Relates #51857
  • Loading branch information
jimczi committed Mar 10, 2020
1 parent d54d7f2 commit ae6c25b
Showing 1 changed file with 38 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,31 +255,49 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
}
}

final int size = reduceContext.isFinalReduce() == false ? buckets.size() : Math.min(requiredSize, buckets.size());
final BucketPriorityQueue<B> ordered = new BucketPriorityQueue<>(size, order.comparator());
for (List<B> sameTermBuckets : buckets.values()) {
final B b = reduceBucket(sameTermBuckets, reduceContext);
if (sumDocCountError == -1) {
b.docCountError = -1;
} else {
b.docCountError += sumDocCountError;
final B[] list;
if (reduceContext.isFinalReduce()) {
final int size = Math.min(requiredSize, buckets.size());
final BucketPriorityQueue<B> ordered = new BucketPriorityQueue<>(size, order.comparator());
for (List<B> sameTermBuckets : buckets.values()) {
final B b = reduceBucket(sameTermBuckets, reduceContext);
if (sumDocCountError == -1) {
b.docCountError = -1;
} else {
b.docCountError += sumDocCountError;
}
if (b.docCount >= minDocCount) {
B removed = ordered.insertWithOverflow(b);
if (removed != null) {
otherDocCount += removed.getDocCount();
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed));
} else {
reduceContext.consumeBucketsAndMaybeBreak(1);
}
} else {
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(b));
}
}
if (b.docCount >= minDocCount || reduceContext.isFinalReduce() == false) {
B removed = ordered.insertWithOverflow(b);
if (removed != null) {
otherDocCount += removed.getDocCount();
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed));
list = createBucketsArray(ordered.size());
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = ordered.pop();
}
} else {
// keep all buckets on partial reduce
// TODO: we could prune the buckets when sorting by key
list = createBucketsArray(buckets.size());
int pos = 0;
for (List<B> sameTermBuckets : buckets.values()) {
final B b = reduceBucket(sameTermBuckets, reduceContext);
reduceContext.consumeBucketsAndMaybeBreak(1);
if (sumDocCountError == -1) {
b.docCountError = -1;
} else {
reduceContext.consumeBucketsAndMaybeBreak(1);
b.docCountError += sumDocCountError;
}
} else {
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(b));
list[pos++] = b;
}
}
B[] list = createBucketsArray(ordered.size());
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = ordered.pop();
}
long docCountError;
if (sumDocCountError == -1) {
docCountError = -1;
Expand Down

0 comments on commit ae6c25b

Please sign in to comment.