Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parallel broker merges on fork join pool #8578

Conversation

clintropolis
Copy link
Member

@clintropolis clintropolis commented Sep 24, 2019

Description

Implementation of proposal #8577 (see for more details).

In deviation from the proposal, in this PR this is now on by default. All query types except for scan are merged on a dedicated 'async' ForkJoinPool.

ParallelMergeCombiningSequence


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths.
  • added integration tests.
  • been tested in a test Druid cluster.

Key changed/added classes in this PR
  • ParallelMergeCombiningSequence
  • CachingClusteredClient

@clintropolis clintropolis changed the title Broker parallel merge combine artisanal small batch parallel broker merges Sep 24, 2019
@clintropolis clintropolis changed the title parallel broker merges parallel broker merges on fork join pool Sep 24, 2019
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;

public class LifecycleForkJoinPool extends ForkJoinPool
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be the cause of mysterious java 9+ compilation issues causing CI failures (my favorite part is how the compiler error doesn't at all specify what is causing the error). ForkJoinPool has the @Contended annotation, which from javadocs looks like it leaks to subclasses I guess?

 * <p>The class level {@code @Contended} annotation is not inherited and has
 * no effect on the fields declared in any sub-classes. The effects of all
 * {@code @Contended} annotations, however, remain in force for all
 * subclass instances, providing isolation of all the defined contention
 * groups. Contention group tags are not inherited, and the same tag used
 * in a superclass and subclass, represent distinct contention groups.

This might be fixed by something like:

<compilerArgs>
  <arg>--add-exports</arg>
  <arg>java.base/jdk.internal.misc=ALL-UNNAMED</arg>
</compilerArgs>

added to jdk9+ profile, but would also require removing all source/target directives into a jdk8 profile I think, since:

 error: option --add-exports not allowed with target 8

happens otherwise.

Not sure what is the best thing to do here. it could probably be worked around by I guess having a lifecycle'd module that wraps a ForkJoinPool for the provider instead of extending ForkJoinPool, since just using ForkJoinPool doesn't seem to cause this error in jdk9+, but I'm not sure if that is the better solution, or if we should try to add the compiler args.

Copy link
Member Author

@clintropolis clintropolis Sep 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually a bit less certain that this class is the issue... if I pull it into another project I don't seem to have an issue compiling with a newer java...

However, commenting out this class and using a plain ForkJoinPool does allow druid-processing to compile correctly with jdk11.

@leventov leventov removed their request for review September 30, 2019 16:08
@leventov
Copy link
Member

I withdraw from reviewing this, just one note: I'm strongly in favor of making the new behavior opt-out rather than opt-in, and removing the old behavior after a few versions of Druid released. We don't need more configurations here, parallel merges should be good for everybody.

The transition period of a few Druid versions is just a safety check against regressions in this feature which might be surfaced only in the production environment.

@clintropolis clintropolis removed the WIP label Oct 31, 2019
@jihoonson
Copy link
Contributor

Thank you for the detailed benchmark results! It looks great but I wonder how the default configuration works under more realistic load. For example, it would be more realistic if there are like 80% of light queries and 20% of heavy queries that have a shorter delay and a larger delay, respectively.

I think the performance degradation under load spikes falls within what I consider acceptable range. Particularly important, I find the degradation to be relatively predictable so that we can update the cluster tuning guide (in a future PR) to best advise how to tune your brokers for a given cluster size and load pattern.

Sounds nice.

@clintropolis
Copy link
Member Author

clintropolis commented Oct 31, 2019

Thank you for the detailed benchmark results! It looks great but I wonder how the default configuration works under more realistic load. For example, it would be more realistic if there are like 80% of light queries and 20% of heavy queries that have a shorter delay and a larger delay, respectively.

This sounds good. I think i went a bit hard on this PR in the benchmarks I have presented so far in terms of targeting the worst cases which aren't super realistic, which I think maybe looks a lot scarier than a typical heavy load will appear practice. The existing worst case benchmarks are basically depicting what happens if a bunch of moderate to large result set sized queries all happen simultaneously and even more all simultaneously have work to do instead of some of them blocking waiting for input, which should very rarely (if ever) happen in the real world.

I will throw together another benchmark to try and plot out a more realistic heavy load case to see how that looks.

@clintropolis
Copy link
Member Author

clintropolis commented Nov 4, 2019

more realistic worst case

I reworked the JMH thread based benchmark to use thread groups to examine what happens in a more realistic scenario, with the newly renamed ParallelMergeCombiningSequenceThreadedBenchmark. I find this benchmark to be a fair bit less scary than the previous 'worst case' benchmarks, which focused on an impossible scenario because I really wanted to dig in and see where and how the wheels fell off.

This benchmark models a more 'typical' heavy load, where the majority of the queries are smaller result-sets with shorter blocking times and a smaller subset are larger result sets with longer initial blocking times. By using thread groups we can look at performance for these 'classes' of queries as load increases.

This set was collected with a ratio of 1 'moderately large' query for every 8 'small' queries, where 'moderately large' is defined as input sequence row counts of 50k-75k rows and blocking for 1-2.5 seconds before yielding results, and 'small' is defined as input sequence row counts of 500-10k and blocking for 50-200ms. Keep in mind while reviewing the result that I collected data on a significantly higher level of parallelism than I would expect a 16 core machine to be realistically configured to handle. I would probably configure an m5.8xl with no more than 64 http threads, but collected data points up to 128 concurrent sequences being processed just to see where things went.

The first plot shows the merge time (y axis) growth as concurrency (x axis) increases, animated to show the differences for a given number of input sequences (analagous to cluster size).

thread-groups-typical-distribution-1-8-small

Note that the x axis is the total concurrency count, not the number of threads of this particular group. Also worth pointing out is that the degradation of performance happens at a significantly higher level of concurrency than the previous (unrealistic) worse case performance, but in terms of characteristics, it does share some aspects with the previous plots, such as 8 input sequences being a lot more performant than say 64, and after a certain threshold, the performance of the parallel approach crosses the limit of the same threaded serial merge approach.

The larger 'queries' tell a similar tale:

thread-groups-typical-distribution-1-8-moderately-large

The differences here when the parallel merge sequence crosses the threshold look to me a fair bit less dramatic than the 'small' sequences, but keep in mind the 'big jump' in the small sequences only amount to a few hundred milliseconds, so it's not quite as dramatic as it appears.

The final plot shows the overall average between both groups:

thread-groups-typical-distribution-1-8-average

which I find a bit less useful than the other 2 plots, but included anyway for completeness.

@clintropolis clintropolis removed the WIP label Nov 4, 2019
@clintropolis
Copy link
Member Author

clintropolis commented Nov 5, 2019

simulated heavy load

I collected another round of data using the same benchmarks as my 'more realistic worst case' comment, but this time plotting what happens when a large number of queries all start within a 500ms spread, which might be a more typical heavy load, rather than simulating a large concurrent spike of simultaneous queries like the last set of results.

In this scenario, parallel merges outperform the same threaded merges until much higher concurrency than the concurrent spike model. This is at least partially driven by the fact that each individual thread can make a better estimate about utilization than is possible in the spike model.

'small' sequences

thread-groups-typical-distribution-small-500ms

'moderately large' sequences

thread-groups-typical-distribution-moderately-large-500ms

overall average

thread-groups-typical-distribution-average-500ms

I think future work could focus on making the concurrent spike behavior a bit more chillax through a variety of means, but I find these results to be 'good enough' for now.

Anyone want to see any other scenarios?

@himanshug
Copy link
Contributor

himanshug commented Nov 5, 2019

@clintropolis thanks for all the benchmarks, I haven't had the opportunity to look at the new developments yet but get back to reviewing this week.

one thing, I am not sure if taken care or not, many people run the druid processes inside docker containers where Runtime.getRuntime().availableProcessors() returns the available processors from host and not from the "container". ( https://bugs.openjdk.java.net/browse/JDK-8140793 , I think it has been changed in jdk10) . Given the sensitivity of performance to availableProcessors() returned value, it might be good to make that area a bit configurable if not already. I will hopefully offer more specific suggestion when reviewing again.

@clintropolis
Copy link
Member Author

clintropolis commented Nov 5, 2019

@clintropolis thanks for all the benchmarks, I haven't had the opportunity to look at the new developments yet but get back to reviewing this week.

Yeah, no problem, thanks for asking the hard questions to make me collect them, the result is the PR is in a better state than before them 🤘. The production part of the code hasn't really changed much in the last couple of weeks other than a few lines to change behavior of the parallelism computing method, and mostly changes to the default values.

Given the sensitivity of performance to availableProcessors() returned value, it might be good to make that area a bit configurable if not already. I will hopefully offer more specific suggestion when reviewing again.

This stuff should all be controllable via configs, druid.processing.merge.pool.parallelism to control the FJP pool size, and druid.merge.pool.defaultMaxQueryParallelism to control individual query max parallelism (this one can also be set on the query context through parallelMergeParallelism).

In a follow-up I think I will also try to add additional information to the cluster tuning guide docs, since I have a pretty good idea how this implementation performs now which I think we can use to advise operators.

Copy link
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, the last benchmark results look reasonable to me. Thanks for all your efforts!

Copy link
Contributor

@himanshug himanshug left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for doing the tests for and tuning the defaults.

As discussed in #8578 (comment) , There needs to be follow up work to expose some metrics to see how things behave on real clusters for particular use cases.

@jihoonson jihoonson merged commit 7aafcf8 into apache:master Nov 7, 2019
@clintropolis
Copy link
Member Author

Thanks for the review everyone!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants