From 440ad8aac851fd9c68c48e4da3b57b91442c1ac6 Mon Sep 17 00:00:00 2001 From: jimczi Date: Fri, 6 Mar 2020 10:28:49 +0100 Subject: [PATCH 1/3] Speed up partial reduce of terms aggregations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This change optimizes the merge of terms aggregations by removing the priority queue that was used to collect all the buckets during a non-final reduction. We don't need to keep the result sorted since the merge of buckets in a subsequent reduce can modify the order. I wrote a small micro-benchmark to test the change and the speed ups are significative for small merge buffer sizes: ```` ########## Master: Benchmark (bufferSize) (cardinality) (numShards) (topNSize) Mode Cnt Score Error Units TermsReduceBenchmark.reduceTopHits 5 10000 1000 1000 avgt 10 2459,690 ± 198,682 ms/op TermsReduceBenchmark.reduceTopHits 16 10000 1000 1000 avgt 10 1030,620 ± 91,544 ms/op TermsReduceBenchmark.reduceTopHits 32 10000 1000 1000 avgt 10 558,608 ± 44,915 ms/op TermsReduceBenchmark.reduceTopHits 128 10000 1000 1000 avgt 10 287,333 ± 8,342 ms/op TermsReduceBenchmark.reduceTopHits 512 10000 1000 1000 avgt 10 257,325 ± 54,515 ms/op ########## Patch: Benchmark (bufferSize) (cardinality) (numShards) (topNSize) Mode Cnt Score Error Units TermsReduceBenchmark.reduceTopHits 5 10000 1000 1000 avgt 10 805,611 ± 14,630 ms/op TermsReduceBenchmark.reduceTopHits 16 10000 1000 1000 avgt 10 378,851 ± 17,929 ms/op TermsReduceBenchmark.reduceTopHits 32 10000 1000 1000 avgt 10 261,094 ± 10,176 ms/op TermsReduceBenchmark.reduceTopHits 128 10000 1000 1000 avgt 10 241,051 ± 19,558 ms/op TermsReduceBenchmark.reduceTopHits 512 10000 1000 1000 avgt 10 231,643 ± 6,170 ms/op ```` The code for the benchmark can be found [here](). It seems to be up to 3x faster for terms aggregations that return 10,000 unique terms (1000 terms per shard). For a cardinality of 100,000 terms, this patch is up to 5x faster: ```` ########## Patch: Benchmark (bufferSize) (cardinality) (numShards) (topNSize) Mode Cnt Score Error Units TermsReduceBenchmark.reduceTopHits 5 100000 1000 1000 avgt 10 12791,083 ± 397,128 ms/op TermsReduceBenchmark.reduceTopHits 16 100000 1000 1000 avgt 10 3974,939 ± 324,617 ms/op TermsReduceBenchmark.reduceTopHits 32 100000 1000 1000 avgt 10 2186,285 ± 267,124 ms/op TermsReduceBenchmark.reduceTopHits 128 100000 1000 1000 avgt 10 914,657 ± 160,784 ms/op TermsReduceBenchmark.reduceTopHits 512 100000 1000 1000 avgt 10 604,198 ± 145,457 ms/op ########## Master: Benchmark (bufferSize) (cardinality) (numShards) (topNSize) Mode Cnt Score Error Units TermsReduceBenchmark.reduceTopHits 5 100000 1000 1000 avgt 10 60696,107 ± 929,944 ms/op TermsReduceBenchmark.reduceTopHits 16 100000 1000 1000 avgt 10 16292,894 ± 783,398 ms/op TermsReduceBenchmark.reduceTopHits 32 100000 1000 1000 avgt 10 7705,444 ± 77,588 ms/op TermsReduceBenchmark.reduceTopHits 128 100000 1000 1000 avgt 10 2156,685 ± 88,795 ms/op TermsReduceBenchmark.reduceTopHits 512 100000 1000 1000 avgt 10 760,273 ± 53,738 ms/op ```` The merge of buckets can also be optimized. Currently we use an hash map to merge buckets coming from different shards so this can be costly if the number of unique terms is high. Instead, we could always sort the shard terms result by key and perform a merge sort to reduce the results. This would save memory and make the merge more linear in terms of complexity in the coordinating node at the expense of an additional sort in the shards. I plan to test this possible optimization in a follow up. Relates #51857 --- .../bucket/terms/InternalTerms.java | 56 ++++++++++++------- 1 file changed, 36 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java index d8f74b1b8e42f..e38e7349fb57f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java @@ -255,31 +255,47 @@ public InternalAggregation reduce(List aggregations, Reduce } } - final int size = reduceContext.isFinalReduce() == false ? buckets.size() : Math.min(requiredSize, buckets.size()); - final BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, order.comparator()); - for (List sameTermBuckets : buckets.values()) { - final B b = reduceBucket(sameTermBuckets, reduceContext); - if (sumDocCountError == -1) { - b.docCountError = -1; - } else { - b.docCountError += sumDocCountError; + final B[] list; + if (reduceContext.isFinalReduce()) { + final int size = Math.min(requiredSize, buckets.size()); + final BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, order.comparator()); + for (List sameTermBuckets : buckets.values()) { + final B b = reduceBucket(sameTermBuckets, reduceContext); + if (sumDocCountError == -1) { + b.docCountError = -1; + } else { + b.docCountError += sumDocCountError; + } + if (b.docCount >= minDocCount) { + B removed = ordered.insertWithOverflow(b); + if (removed != null) { + otherDocCount += removed.getDocCount(); + reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed)); + } else { + reduceContext.consumeBucketsAndMaybeBreak(1); + } + } else { + reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(b)); + } } - if (b.docCount >= minDocCount || reduceContext.isFinalReduce() == false) { - B removed = ordered.insertWithOverflow(b); - if (removed != null) { - otherDocCount += removed.getDocCount(); - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed)); + list = createBucketsArray(ordered.size()); + for (int i = ordered.size() - 1; i >= 0; i--) { + list[i] = ordered.pop(); + } + } else { + list = createBucketsArray(buckets.size()); + int pos = 0; + for (List sameTermBuckets : buckets.values()) { + final B b = reduceBucket(sameTermBuckets, reduceContext); + reduceContext.consumeBucketsAndMaybeBreak(1); + if (sumDocCountError == -1) { + b.docCountError = -1; } else { - reduceContext.consumeBucketsAndMaybeBreak(1); + b.docCountError += sumDocCountError; } - } else { - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(b)); + list[pos++] = b; } } - B[] list = createBucketsArray(ordered.size()); - for (int i = ordered.size() - 1; i >= 0; i--) { - list[i] = ordered.pop(); - } long docCountError; if (sumDocCountError == -1) { docCountError = -1; From b4944b4a5fe018c2e8a7e4340b6b1fa7e459386f Mon Sep 17 00:00:00 2001 From: jimczi Date: Fri, 6 Mar 2020 10:30:48 +0100 Subject: [PATCH 2/3] add comment --- .../search/aggregations/bucket/terms/InternalTerms.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java index e38e7349fb57f..d469c61f79338 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java @@ -283,6 +283,7 @@ public InternalAggregation reduce(List aggregations, Reduce list[i] = ordered.pop(); } } else { + // we don't need to prune buckets on non-final reduce since we keep them all. list = createBucketsArray(buckets.size()); int pos = 0; for (List sameTermBuckets : buckets.values()) { From d67d34a38e02e9b0c4a1baa7b9ca56f8aa5a14df Mon Sep 17 00:00:00 2001 From: jimczi Date: Tue, 10 Mar 2020 09:11:37 +0100 Subject: [PATCH 3/3] address review --- .../search/aggregations/bucket/terms/InternalTerms.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java index d469c61f79338..ba786f9414778 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java @@ -283,7 +283,8 @@ public InternalAggregation reduce(List aggregations, Reduce list[i] = ordered.pop(); } } else { - // we don't need to prune buckets on non-final reduce since we keep them all. + // keep all buckets on partial reduce + // TODO: we could prune the buckets when sorting by key list = createBucketsArray(buckets.size()); int pos = 0; for (List sameTermBuckets : buckets.values()) {