Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parallel reduce support on coordinator and parameterized #58377

Conversation

hackerwin7
Copy link

@hackerwin7 hackerwin7 commented Jun 19, 2020

relates to #51857

If a search request's shard requests count is extremely huge, and batched reduce size is relatively low. the subsequent shard query result maybe block on synchronized consumeInternal()

open this PR to discuss a way of synchronized removal to support a partial reduce in parallel.

@hackerwin7
Copy link
Author

hackerwin7 commented Jun 19, 2020

QueryPhaseParallelResultConsumer

Compare to QueryPhaseResultConsumer which is single thread incrementally batched reduce, QueryPhaseParallelResultConsumer use a blocking queue to collect shard query results, and shard thread can quickly call consumeParallel() and immediately move next to call executeNext() in AbstractSearchAsyncAction.performPhaseOnShard().

PartialReduceTask

A task in thread pool with Names REDUCE_PARTIAL_PARALLEL, execute reduce in parallel.

checkParitalReduceAggsDone()

Wait all needed parallel reduces are done, then we can call reducedQueryPhase() to execute final reduce.
Currently the condition is only one reduced result left in blocking queue, maybe it's a little bit strict ?

SearchRequest

add parallel_reduce parameter to use QueryPhaseParallelResultConsumer to reduce in parallel instead of QueryPhaseResultConsumer's batched reduce process.

…et(i,null)

Change-Id: I056246581124f52bf33cb2f88e4c51929e8d9e21
Change-Id: I308a4925d7b94ca268860a185f79f655f9557945
Change-Id: If7830e4f01876b35c277f4d7eb5bfa53c3ed03cb
@dliappis dliappis added the :Analytics/Aggregations Aggregations label Jun 23, 2020
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-analytics-geo (:Analytics/Aggregations)

@elasticmachine elasticmachine added the Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) label Jun 23, 2020
@dliappis dliappis added the :Search/Search Search-related issues that do not fall into other categories label Jun 23, 2020
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-search (:Search/Search)

@elasticmachine elasticmachine added the Team:Search Meta label for search team label Jun 23, 2020
Copy link
Contributor

@jimczi jimczi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for contributing @hackerwin7 . Although I am not sure that we want to allow parallel reduction of shard responses. The actual implementation ensures that the reduction is incremental in order to avoid cascading merges.
I am already working on a change to move the execution of the partial reduce in the search thread pool:
#58461
This change is important since it allows partial reduces to throw an exception that will cancel the search request but it doesn't allow any parallelism. We want to be able to control the memory used by partial reductions so I think it would be preferable to look at other alternatives to speed up partial merges in general. Do you have an example aggregation that is much slower when the batched reduced size is small ? In my experiments, the overhead is minimal since most of the time is spent waiting for shard responses.

@hackerwin7
Copy link
Author

hackerwin7 commented Jun 24, 2020

@jimczi
Thanks for your reply.
the example query is like the style of this:
search a daily index for a month of range(30 indices), a single index have 20 million docs approximately.
a query pattern like this:

aggs.data_histogram > aggs.terms > aggs.terms > aggs.terms > [aggs.metrics-1, aggs.metrics-2, ..... aggs-metrics-60]

there are five layers of aggs, and the count of buckets is extremely huge.
I profile the batched=5 query and find that most threads blocked in synchronized consumeInternal(), so this is the reason I want to remove the single thread synchronized reduce.

Thanks for contributing @hackerwin7 . Although I am not sure that we want to allow parallel reduction of shard responses. The actual implementation ensures that the reduction is incremental in order to avoid cascading merges.

I agree with this. I tune the parallel patch, here is my second comparison test for this:

  1. parallel reduce query took time is close to default (ArraySearchPhaseResults) reduce query took but is much faster than batched=small search query
  2. parallel reduce will cause more YGC count than default
  3. batched = small (QueryPhaseResultConsumer) execute extremely slow because of blocked threads

Change-Id: Iafaac6235d7c4fdb3116bfa3914ede6f3e428efe
@hackerwin7 hackerwin7 force-pushed the hackerwin7/parallel_partial_reduce_parameter branch from f48fc4c to 168d366 Compare June 24, 2020 04:32
@hackerwin7
Copy link
Author

hackerwin7 commented Jun 24, 2020

update parallel reduction into 2-phase execution.

  • First execution: we reduce the shard query results (2 or 3) into intermediate reduced result
  • Second execution: drain all the intermediate reduced results from queue and execute final reduce

@hackerwin7
Copy link
Author

hackerwin7 commented Jun 24, 2020

Although I am not sure that we want to allow parallel reduction of shard responses

I think if a search request parse to a lots of shard requests which the count is extremely huge (thousands of or tens thousands of), Maybe we need a parallel reduce for this case?
I will make a huge shard requests search case test to see the performance

Change-Id: I192ac8f45a5d9695ae47eacc6b244052cb9f4993
Change-Id: Ib619715ae27c07a64bd14f40753e3af244aeeba3
Change-Id: I7793634c17086817bcfca98d17e73f789d9218ea
@hackerwin7
Copy link
Author

hackerwin7 commented Jul 6, 2020

I test a 1500 shards of search request with huge data set, here is comparison results:

Search mode YGC count Old GC count Took
default (batch=512) 1 0 125.913 s
parallel (batch=2) 15 0 38.318 s

@hackerwin7 hackerwin7 requested a review from jimczi July 6, 2020 12:00
Change-Id: I7639038369919afd0819a29ea167b2142e9d28c4
Change-Id: Ie21cf0cc71c1edb2157c6fea84c4f8ec780d7555
@hackerwin7
Copy link
Author

hackerwin7 commented Jul 7, 2020

I test a benchmark for default (batch=512) mode and parallel (batch=1024,512...2) mode

default

Benchmark                            (cardinality)  (numShards)  (topNSize)  Mode  Cnt      Score     Error  Units
SearchReduceBenchmarkBatched.reduce         100000          100        1000  avgt   10      1.042 ±   0.047  ms/op
SearchReduceBenchmarkBatched.reduce         100000          500        1000  avgt   10      1.106 ±   0.042  ms/op
SearchReduceBenchmarkBatched.reduce         100000          800        1000  avgt   10   1160.578 ± 106.793  ms/op
SearchReduceBenchmarkBatched.reduce         100000         1000        1000  avgt   10   1380.126 ± 108.007  ms/op
SearchReduceBenchmarkBatched.reduce         100000         2000        1000  avgt   10   2811.232 ± 108.483  ms/op
SearchReduceBenchmarkBatched.reduce         100000         5000        1000  avgt   10   6663.553 ± 157.251  ms/op
SearchReduceBenchmarkBatched.reduce         100000        10000        1000  avgt   10  13345.544 ± 322.421  ms/op

parallel

buffsize

Benchmark                             (buffSize)  (cardinality)  (numShards)  (topNSize)  Mode  Cnt      Score      Error  Units
SearchReduceBenchmarkParallel.reduce        1024         100000          100        1000  avgt   10    159.404 ±   23.284  ms/op
SearchReduceBenchmarkParallel.reduce        1024         100000          500        1000  avgt   10    663.442 ±   50.141  ms/op
SearchReduceBenchmarkParallel.reduce        1024         100000          800        1000  avgt   10   1059.180 ±   70.510  ms/op
SearchReduceBenchmarkParallel.reduce        1024         100000         1000        1000  avgt   10   1446.776 ±  154.090  ms/op
SearchReduceBenchmarkParallel.reduce        1024         100000         2000        1000  avgt   10   2330.898 ±  130.852  ms/op
SearchReduceBenchmarkParallel.reduce        1024         100000         5000        1000  avgt   10   4003.519 ±  259.088  ms/op
SearchReduceBenchmarkParallel.reduce        1024         100000        10000        1000  avgt   10   6198.209 ±  333.585  ms/op
SearchReduceBenchmarkParallel.reduce         512         100000          100        1000  avgt   10    158.167 ±   11.597  ms/op
SearchReduceBenchmarkParallel.reduce         512         100000          500        1000  avgt   10    658.384 ±   30.304  ms/op
SearchReduceBenchmarkParallel.reduce         512         100000          800        1000  avgt   10   1093.640 ±   89.986  ms/op
SearchReduceBenchmarkParallel.reduce         512         100000         1000        1000  avgt   10   1204.028 ±   84.124  ms/op
SearchReduceBenchmarkParallel.reduce         512         100000         2000        1000  avgt   10   1961.237 ±   98.439  ms/op
SearchReduceBenchmarkParallel.reduce         512         100000         5000        1000  avgt   10   3474.373 ±  242.222  ms/op
SearchReduceBenchmarkParallel.reduce         512         100000        10000        1000  avgt   10   5852.566 ±  396.123  ms/op
SearchReduceBenchmarkParallel.reduce         256         100000          100        1000  avgt   10    156.004 ±   14.392  ms/op
SearchReduceBenchmarkParallel.reduce         256         100000          500        1000  avgt   10    706.420 ±   58.107  ms/op
SearchReduceBenchmarkParallel.reduce         256         100000          800        1000  avgt   10    849.890 ±   58.857  ms/op
SearchReduceBenchmarkParallel.reduce         256         100000         1000        1000  avgt   10   1071.556 ±  114.170  ms/op
SearchReduceBenchmarkParallel.reduce         256         100000         2000        1000  avgt   10   1710.307 ±  102.870  ms/op
SearchReduceBenchmarkParallel.reduce         256         100000         5000        1000  avgt   10   3495.000 ±  213.657  ms/op
SearchReduceBenchmarkParallel.reduce         256         100000        10000        1000  avgt   10   6020.631 ±  199.768  ms/op
SearchReduceBenchmarkParallel.reduce         128         100000          100        1000  avgt   10    152.607 ±   16.764  ms/op
SearchReduceBenchmarkParallel.reduce         128         100000          500        1000  avgt   10    675.716 ±   65.031  ms/op
SearchReduceBenchmarkParallel.reduce         128         100000          800        1000  avgt   10    835.427 ±   62.215  ms/op
SearchReduceBenchmarkParallel.reduce         128         100000         1000        1000  avgt   10   1081.593 ±   79.905  ms/op
SearchReduceBenchmarkParallel.reduce         128         100000         2000        1000  avgt   10   1717.664 ±  135.461  ms/op
SearchReduceBenchmarkParallel.reduce         128         100000         5000        1000  avgt   10   3734.803 ±  270.677  ms/op
SearchReduceBenchmarkParallel.reduce         128         100000        10000        1000  avgt   10   7028.719 ±  420.056  ms/op
SearchReduceBenchmarkParallel.reduce          64         100000          100        1000  avgt   10    192.205 ±   16.250  ms/op
SearchReduceBenchmarkParallel.reduce          64         100000          500        1000  avgt   10    628.311 ±   66.455  ms/op
SearchReduceBenchmarkParallel.reduce          64         100000          800        1000  avgt   10    934.562 ±   48.704  ms/op
SearchReduceBenchmarkParallel.reduce          64         100000         1000        1000  avgt   10   1138.550 ±   97.303  ms/op
SearchReduceBenchmarkParallel.reduce          64         100000         2000        1000  avgt   10   1861.344 ±  235.446  ms/op
SearchReduceBenchmarkParallel.reduce          64         100000         5000        1000  avgt   10   4140.063 ±  237.067  ms/op
SearchReduceBenchmarkParallel.reduce          64         100000        10000        1000  avgt   10   8744.300 ±  347.165  ms/op
SearchReduceBenchmarkParallel.reduce          32         100000          100        1000  avgt   10    165.103 ±   18.215  ms/op
SearchReduceBenchmarkParallel.reduce          32         100000          500        1000  avgt   10    609.916 ±   80.499  ms/op
SearchReduceBenchmarkParallel.reduce          32         100000          800        1000  avgt   10    965.669 ±   82.598  ms/op
SearchReduceBenchmarkParallel.reduce          32         100000         1000        1000  avgt   10   1138.667 ±   89.693  ms/op
SearchReduceBenchmarkParallel.reduce          32         100000         2000        1000  avgt   10   2126.192 ±  246.528  ms/op
SearchReduceBenchmarkParallel.reduce          32         100000         5000        1000  avgt   10   4911.856 ±  301.119  ms/op
SearchReduceBenchmarkParallel.reduce          32         100000        10000        1000  avgt   10  11474.884 ±  576.317  ms/op
SearchReduceBenchmarkParallel.reduce          16         100000          100        1000  avgt   10    168.171 ±   13.428  ms/op
SearchReduceBenchmarkParallel.reduce          16         100000          500        1000  avgt   10    671.869 ±   74.748  ms/op
SearchReduceBenchmarkParallel.reduce          16         100000          800        1000  avgt   10   1070.933 ±  164.470  ms/op
SearchReduceBenchmarkParallel.reduce          16         100000         1000        1000  avgt   10   1305.430 ±  189.270  ms/op
SearchReduceBenchmarkParallel.reduce          16         100000         2000        1000  avgt   10   2620.392 ±  312.015  ms/op
SearchReduceBenchmarkParallel.reduce          16         100000         5000        1000  avgt   10   6105.427 ±  397.724  ms/op
SearchReduceBenchmarkParallel.reduce          16         100000        10000        1000  avgt   10  15135.228 ±  517.475  ms/op
SearchReduceBenchmarkParallel.reduce           8         100000          100        1000  avgt   10    162.596 ±   10.073  ms/op
SearchReduceBenchmarkParallel.reduce           8         100000          500        1000  avgt   10    721.860 ±   91.378  ms/op
SearchReduceBenchmarkParallel.reduce           8         100000          800        1000  avgt   10   1221.285 ±  142.668  ms/op
SearchReduceBenchmarkParallel.reduce           8         100000         1000        1000  avgt   10   1378.757 ±  177.750  ms/op
SearchReduceBenchmarkParallel.reduce           8         100000         2000        1000  avgt   10   2977.201 ±  254.309  ms/op
SearchReduceBenchmarkParallel.reduce           8         100000         5000        1000  avgt   10   8016.127 ± 1109.922  ms/op
SearchReduceBenchmarkParallel.reduce           8         100000        10000        1000  avgt   10  19572.966 ±  797.254  ms/op
SearchReduceBenchmarkParallel.reduce           4         100000          100        1000  avgt   10    172.249 ±   25.957  ms/op
SearchReduceBenchmarkParallel.reduce           4         100000          500        1000  avgt   10    750.778 ±   86.484  ms/op
SearchReduceBenchmarkParallel.reduce           4         100000          800        1000  avgt   10   1298.646 ±  136.009  ms/op
SearchReduceBenchmarkParallel.reduce           4         100000         1000        1000  avgt   10   1796.165 ±  273.239  ms/op
SearchReduceBenchmarkParallel.reduce           4         100000         2000        1000  avgt   10   4147.806 ±  555.031  ms/op
SearchReduceBenchmarkParallel.reduce           4         100000         5000        1000  avgt   10  12445.461 ± 1448.686  ms/op
SearchReduceBenchmarkParallel.reduce           4         100000        10000        1000  avgt   10  27187.155 ±  738.094  ms/op
SearchReduceBenchmarkParallel.reduce           2         100000          100        1000  avgt   10    161.388 ±   10.592  ms/op
SearchReduceBenchmarkParallel.reduce           2         100000          500        1000  avgt   10    783.003 ±   82.187  ms/op
SearchReduceBenchmarkParallel.reduce           2         100000          800        1000  avgt   10   1397.900 ±  305.982  ms/op
SearchReduceBenchmarkParallel.reduce           2         100000         1000        1000  avgt   10   1903.888 ±  388.740  ms/op
SearchReduceBenchmarkParallel.reduce           2         100000         2000        1000  avgt   10   4538.188 ±  804.478  ms/op
SearchReduceBenchmarkParallel.reduce           2         100000         5000        1000  avgt   10  15182.142 ± 1005.977  ms/op
SearchReduceBenchmarkParallel.reduce           2         100000        10000        1000  avgt   10  33388.568 ± 2066.813  ms/op

benchmarks could find here

Change-Id: Ice2bc1ad7a6f70e3a71ac294b981cebdad2b9e8c
@hackerwin7
Copy link
Author

hackerwin7 commented Jul 7, 2020

There will be two mode merge styles for search request:

  • incremental mode: reduce with batched size in single thread incrementally
  • parallel mode: reduce with batched size in parallel with multiple threads (better used in huge shards and buckets merge cases)

@jimczi
Copy link
Contributor

jimczi commented Apr 23, 2021

(Partial) reduction of shard responses should be a linear operation so the parallelization feels odd. I am going to close this PR because we don't want to optimize for very large aggregations. You should reduce the number of terms or levels that you return. We also made the terms reduce faster in the latest releases so you might see an improvement even when using a single thread to do the reduction.

@jimczi jimczi closed this Apr 23, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Analytics/Aggregations Aggregations >feature :Search/Search Search-related issues that do not fall into other categories Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) Team:Search Meta label for search team
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants