Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up partial reduce of terms aggregations #53216

Merged
merged 4 commits into from
Mar 10, 2020

Conversation

jimczi
Copy link
Contributor

@jimczi jimczi commented Mar 6, 2020

This change optimizes the merge of terms aggregations by removing
the priority queue that was used to collect all the buckets during
a non-final reduction. We don't need to keep the result sorted since
the merge of buckets in a subsequent reduce can modify the order.
I wrote a small micro-benchmark to test the change and the speed ups
are significative for small merge buffer sizes:

########## Master:
Benchmark                           (bufferSize)  (cardinality)  (numShards)  (topNSize)  Mode  Cnt     Score     Error  Units
TermsReduceBenchmark.reduceTopHits             5          10000         1000        1000  avgt   10  2459,690 ± 198,682  ms/op
TermsReduceBenchmark.reduceTopHits            16          10000         1000        1000  avgt   10  1030,620 ±  91,544  ms/op
TermsReduceBenchmark.reduceTopHits            32          10000         1000        1000  avgt   10   558,608 ±  44,915  ms/op
TermsReduceBenchmark.reduceTopHits           128          10000         1000        1000  avgt   10   287,333 ±   8,342  ms/op
TermsReduceBenchmark.reduceTopHits           512          10000         1000        1000  avgt   10   257,325 ±  54,515  ms/op

########## Patch:
Benchmark                           (bufferSize)  (cardinality)  (numShards)  (topNSize)  Mode  Cnt    Score    Error  Units
TermsReduceBenchmark.reduceTopHits             5          10000         1000        1000  avgt   10  805,611 ± 14,630  ms/op
TermsReduceBenchmark.reduceTopHits            16          10000         1000        1000  avgt   10  378,851 ± 17,929  ms/op
TermsReduceBenchmark.reduceTopHits            32          10000         1000        1000  avgt   10  261,094 ± 10,176  ms/op
TermsReduceBenchmark.reduceTopHits           128          10000         1000        1000  avgt   10  241,051 ± 19,558  ms/op
TermsReduceBenchmark.reduceTopHits           512          10000         1000        1000  avgt   10  231,643 ±  6,170  ms/op

The code for the benchmark can be found here.
It seems to be up to 3x faster for terms aggregations that return 10,000 unique terms (1000 terms per shard).
For a cardinality of 100,000 terms, this patch is up to 5x faster:

########## Patch:
Benchmark                           (bufferSize)  (cardinality)  (numShards)  (topNSize)  Mode  Cnt      Score     Error  Units
TermsReduceBenchmark.reduceTopHits             5         100000         1000        1000  avgt   10  12791,083 ± 397,128  ms/op
TermsReduceBenchmark.reduceTopHits            16         100000         1000        1000  avgt   10   3974,939 ± 324,617  ms/op
TermsReduceBenchmark.reduceTopHits            32         100000         1000        1000  avgt   10   2186,285 ± 267,124  ms/op
TermsReduceBenchmark.reduceTopHits           128         100000         1000        1000  avgt   10    914,657 ± 160,784  ms/op
TermsReduceBenchmark.reduceTopHits           512         100000         1000        1000  avgt   10    604,198 ± 145,457  ms/op

########## Master:
Benchmark                           (bufferSize)  (cardinality)  (numShards)  (topNSize)  Mode  Cnt      Score     Error  Units
TermsReduceBenchmark.reduceTopHits             5         100000         1000        1000  avgt   10  60696,107 ± 929,944  ms/op
TermsReduceBenchmark.reduceTopHits            16         100000         1000        1000  avgt   10  16292,894 ± 783,398  ms/op
TermsReduceBenchmark.reduceTopHits            32         100000         1000        1000  avgt   10   7705,444 ±  77,588  ms/op
TermsReduceBenchmark.reduceTopHits           128         100000         1000        1000  avgt   10   2156,685 ±  88,795  ms/op
TermsReduceBenchmark.reduceTopHits           512         100000         1000        1000  avgt   10    760,273 ±  53,738  ms/op

The merge of buckets can also be optimized. Currently we use an hash map to merge buckets coming from different shards so this can be costly if the number of unique terms is high. Instead, we could always sort the shard terms result by key and perform a merge sort to reduce the results. This would save memory and make the merge more linear in terms
of complexity in the coordinating node at the expense of an additional sort in the shards.
I plan to test this possible optimization in a follow up.

Relates #51857

This change optimizes the merge of terms aggregations by removing
the priority queue that was used to collect all the buckets during
a non-final reduction. We don't need to keep the result sorted since
the merge of buckets in a subsequent reduce can modify the order.
I wrote a small micro-benchmark to test the change and the speed ups
are significative for small merge buffer sizes:

````
########## Master:
Benchmark                           (bufferSize)  (cardinality)  (numShards)  (topNSize)  Mode  Cnt     Score     Error  Units
TermsReduceBenchmark.reduceTopHits             5          10000         1000        1000  avgt   10  2459,690 ± 198,682  ms/op
TermsReduceBenchmark.reduceTopHits            16          10000         1000        1000  avgt   10  1030,620 ±  91,544  ms/op
TermsReduceBenchmark.reduceTopHits            32          10000         1000        1000  avgt   10   558,608 ±  44,915  ms/op
TermsReduceBenchmark.reduceTopHits           128          10000         1000        1000  avgt   10   287,333 ±   8,342  ms/op
TermsReduceBenchmark.reduceTopHits           512          10000         1000        1000  avgt   10   257,325 ±  54,515  ms/op

########## Patch:
Benchmark                           (bufferSize)  (cardinality)  (numShards)  (topNSize)  Mode  Cnt    Score    Error  Units
TermsReduceBenchmark.reduceTopHits             5          10000         1000        1000  avgt   10  805,611 ± 14,630  ms/op
TermsReduceBenchmark.reduceTopHits            16          10000         1000        1000  avgt   10  378,851 ± 17,929  ms/op
TermsReduceBenchmark.reduceTopHits            32          10000         1000        1000  avgt   10  261,094 ± 10,176  ms/op
TermsReduceBenchmark.reduceTopHits           128          10000         1000        1000  avgt   10  241,051 ± 19,558  ms/op
TermsReduceBenchmark.reduceTopHits           512          10000         1000        1000  avgt   10  231,643 ±  6,170  ms/op
````

The code for the benchmark can be found [here](). It seems to be up to 3x faster for terms aggregations
that return 10,000 unique terms (1000 terms per shard). For a cardinality of 100,000 terms, this patch is up to 5x faster:

````
########## Patch:
Benchmark                           (bufferSize)  (cardinality)  (numShards)  (topNSize)  Mode  Cnt      Score     Error  Units
TermsReduceBenchmark.reduceTopHits             5         100000         1000        1000  avgt   10  12791,083 ± 397,128  ms/op
TermsReduceBenchmark.reduceTopHits            16         100000         1000        1000  avgt   10   3974,939 ± 324,617  ms/op
TermsReduceBenchmark.reduceTopHits            32         100000         1000        1000  avgt   10   2186,285 ± 267,124  ms/op
TermsReduceBenchmark.reduceTopHits           128         100000         1000        1000  avgt   10    914,657 ± 160,784  ms/op
TermsReduceBenchmark.reduceTopHits           512         100000         1000        1000  avgt   10    604,198 ± 145,457  ms/op

########## Master:
Benchmark                           (bufferSize)  (cardinality)  (numShards)  (topNSize)  Mode  Cnt      Score     Error  Units
TermsReduceBenchmark.reduceTopHits             5         100000         1000        1000  avgt   10  60696,107 ± 929,944  ms/op
TermsReduceBenchmark.reduceTopHits            16         100000         1000        1000  avgt   10  16292,894 ± 783,398  ms/op
TermsReduceBenchmark.reduceTopHits            32         100000         1000        1000  avgt   10   7705,444 ±  77,588  ms/op
TermsReduceBenchmark.reduceTopHits           128         100000         1000        1000  avgt   10   2156,685 ±  88,795  ms/op
TermsReduceBenchmark.reduceTopHits           512         100000         1000        1000  avgt   10    760,273 ±  53,738  ms/op
````

The merge of buckets can also be optimized. Currently we use an hash map to merge buckets coming from different shards
so this can be costly if the number of unique terms is high. Instead, we could always sort the shard terms result
by key and perform a merge sort to reduce the results. This would save memory and make the merge more linear in terms
of complexity in the coordinating node at the expense of an additional sort in the shards. I plan to test this possible
optimization in a follow up.

Relates elastic#51857
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-analytics-geo (:Analytics/Aggregations)

Copy link
Member

@nik9000 nik9000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Could you add a comment about why we need to keep all the buckets until final reduction? I could see wanting the priority queue/throwing away buckets in some cases. Like, say, you are sorting on key. But if you are sorting on doc count or something then you can't throw them away. And we don't have any way to expose that to this code. I think. Is that right?

@jimczi
Copy link
Contributor Author

jimczi commented Mar 6, 2020

Like, say, you are sorting on key. But if you are sorting on doc count or something then you can't throw them away. And we don't have any way to expose that to this code. I think. Is that right?

Yes, ideally the intermediate buckets (shard and partial reduce) should sort by key so that we can perform a merge sort on reduce and prune on partial reduce if the final sort is also by key. That's the follow up I mentioned in the description.

@nik9000
Copy link
Member

nik9000 commented Mar 6, 2020

Yes, ideally the intermediate buckets (shard and partial reduce) should sort by key so that we can perform a merge sort on reduce and prune on partial reduce if the final sort is also by key. That's the follow up I mentioned in the description.

👍

@jimczi jimczi merged commit f153f19 into elastic:master Mar 10, 2020
@jimczi jimczi deleted the terms_partial_reduce_optim branch March 10, 2020 12:24
jimczi added a commit that referenced this pull request Mar 10, 2020
This change optimizes the merge of terms aggregations by removing
the priority queue that was used to collect all the buckets during
a non-final reduction. We don't need to keep the result sorted since
the merge of buckets in a subsequent reduce can modify the order.
I wrote a small micro-benchmark to test the change and the speed ups
are significative for small merge buffer sizes:

````
########## Master:
Benchmark                           (bufferSize)  (cardinality)  (numShards)  (topNSize)  Mode  Cnt     Score     Error  Units
TermsReduceBenchmark.reduceTopHits             5          10000         1000        1000  avgt   10  2459,690 ± 198,682  ms/op
TermsReduceBenchmark.reduceTopHits            16          10000         1000        1000  avgt   10  1030,620 ±  91,544  ms/op
TermsReduceBenchmark.reduceTopHits            32          10000         1000        1000  avgt   10   558,608 ±  44,915  ms/op
TermsReduceBenchmark.reduceTopHits           128          10000         1000        1000  avgt   10   287,333 ±   8,342  ms/op
TermsReduceBenchmark.reduceTopHits           512          10000         1000        1000  avgt   10   257,325 ±  54,515  ms/op

########## Patch:
Benchmark                           (bufferSize)  (cardinality)  (numShards)  (topNSize)  Mode  Cnt    Score    Error  Units
TermsReduceBenchmark.reduceTopHits             5          10000         1000        1000  avgt   10  805,611 ± 14,630  ms/op
TermsReduceBenchmark.reduceTopHits            16          10000         1000        1000  avgt   10  378,851 ± 17,929  ms/op
TermsReduceBenchmark.reduceTopHits            32          10000         1000        1000  avgt   10  261,094 ± 10,176  ms/op
TermsReduceBenchmark.reduceTopHits           128          10000         1000        1000  avgt   10  241,051 ± 19,558  ms/op
TermsReduceBenchmark.reduceTopHits           512          10000         1000        1000  avgt   10  231,643 ±  6,170  ms/op
````

The code for the benchmark can be found [here](). It seems to be up to 3x faster for terms aggregations
that return 10,000 unique terms (1000 terms per shard). For a cardinality of 100,000 terms, this patch is up to 5x faster:

````
########## Patch:
Benchmark                           (bufferSize)  (cardinality)  (numShards)  (topNSize)  Mode  Cnt      Score     Error  Units
TermsReduceBenchmark.reduceTopHits             5         100000         1000        1000  avgt   10  12791,083 ± 397,128  ms/op
TermsReduceBenchmark.reduceTopHits            16         100000         1000        1000  avgt   10   3974,939 ± 324,617  ms/op
TermsReduceBenchmark.reduceTopHits            32         100000         1000        1000  avgt   10   2186,285 ± 267,124  ms/op
TermsReduceBenchmark.reduceTopHits           128         100000         1000        1000  avgt   10    914,657 ± 160,784  ms/op
TermsReduceBenchmark.reduceTopHits           512         100000         1000        1000  avgt   10    604,198 ± 145,457  ms/op

########## Master:
Benchmark                           (bufferSize)  (cardinality)  (numShards)  (topNSize)  Mode  Cnt      Score     Error  Units
TermsReduceBenchmark.reduceTopHits             5         100000         1000        1000  avgt   10  60696,107 ± 929,944  ms/op
TermsReduceBenchmark.reduceTopHits            16         100000         1000        1000  avgt   10  16292,894 ± 783,398  ms/op
TermsReduceBenchmark.reduceTopHits            32         100000         1000        1000  avgt   10   7705,444 ±  77,588  ms/op
TermsReduceBenchmark.reduceTopHits           128         100000         1000        1000  avgt   10   2156,685 ±  88,795  ms/op
TermsReduceBenchmark.reduceTopHits           512         100000         1000        1000  avgt   10    760,273 ±  53,738  ms/op
````

The merge of buckets can also be optimized. Currently we use an hash map to merge buckets coming from different shards so this can be costly if the number of unique terms is high. Instead, we could always sort the shard terms result by key and perform a merge sort to reduce the results. This would save memory and make the merge more linear in terms
of complexity in the coordinating node at the expense of an additional sort in the shards. I plan to test this possible optimization in a follow up.

Relates #51857
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants