Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parallel broker merges #8577

Closed
clintropolis opened this issue Sep 24, 2019 · 0 comments
Closed

parallel broker merges #8577

clintropolis opened this issue Sep 24, 2019 · 0 comments

Comments

@clintropolis
Copy link
Member

clintropolis commented Sep 24, 2019

Motivation

Brokers should be able to merge query result sets in parallel, adaptively/automatically, based on current overall utilization. The "merge/combine" of sequences constitutes the bulk of the real work that Brokers perform. This currently takes place within a single thread from the HTTP thread pool, which while fair-ish, means that we are also potentially under-utilizing additional cores on the server if the majority of queries are blocked waiting for results. Using a divide and conquer approach to perform this combining merge of results in parallel should allow us to often dramatically speed up the time this operation takes, and should also make broker resource utilization more predictable at the same time.

Proposed changes

To achieve this, we will introduce a new opt-in mode to enable parallel merging of results by Druid brokers using a fork-join pool in 'async' mode. This proposal is the result of running with the basic idea captured here #6629 (review), and building on the backs of the good work done in #5913 and #6629, creating a couple of prototype implementations, and performing a large number of experiments.

The primary change suggested by this proposal is to push some to all of the work currently done by QueryToolchest.mergeResults down into the Sequence merge currently done in CachingClusteredClient, for any QueryToolchest that implements createMergeFn. Note that in my current plans QueryToolchest.mergeResults will still be called and not modified, it just has a lot less work to do because some or all of the results will already be merged.

My current approach uses a 2 layer hierarchy, where the first layer merges sub-sets of input sequences and produces output to a blocking queue, and a single task for the second layer that merges input from the blocking queue outputs of the first layer into a single output blocking queue. The level of parallelism for layer 1 will be chosen automatically based on current 'merge' pool utilization, and the fork-join tasks will self-tune to perform a limited number of operations per task, before yielding their results and forking a new task to continue the work when the new task is scheduled.

ParallelMergeCombiningSequence

In a nod to query vectorization which happens at the segment level for historical processes, and more importantly, to minimize the number of blocking operations within fork-join pool tasks, the results from the input sequences will be yielded in small batches, processed in batches, and of course added to the output blocking queues batch at a time. While I haven't yet spent the time to find the ideal small batch size, batching seems to work dramatically better than processing a single result at a time, which in some of my initial experiments was even slower in parallel than the existing serial approach due to high lock contention.

A prototype implementation based on the experiments so far (but still missing a few features) is available here: #8578. The design will be described using the terms from this branch, but I consider everything fair game and willing to change based on discussion in this proposal.

result merging on the fork-join pool

A new class ResultBatch<T> will capture this notion of result batches, wrapping a Queue<T>, as well as the idea of a 'terminal' object in order to communicate to downstream fork-join tasks that a sequence is completed. To simplify the processing of results without directly dealing with these batches, a cursor pattern to allow easily processing individual results from the batches:

class BatchedResultsCursor<T>implements ForkJoinPool.ManagedBlocker, Comparable<BatchedResultsCursor<T>>

is also introduced, with implementations for Yielder<ResultBatch<T>> and BlockingQueue<ResultBatch<T>> to allow using the same types of worker tasks for both layer 1 and layer 2. The yielder cursors and blocking queue cursors operate slightly differently in that the yielder cursors are created 'pre-loaded' by virtue of converting the input sequences into accumulating yielders

At the outer level, parallel merging will be exposed to CachingClusteredClient through a new sequence type:

class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>

which will wrap all of the work done on the fork-join pool in the form of a yielding sequence, to allow easy integration through existing interfaces. The ParallelMergeCombinginSequence operates on a List<Sequence<T>> baseSequences to merge and combine results given an Ordering<T> orderingFn and BinaryOperator<T> combineFn to order and combine results from the sequences. The HTTP thread has a 'background' combining sequence that builds a sequence from an iterator over the batched results from the output blocking queue of the layer 2 task.

Converting the sequence to a yielder will create single RecursiveAction to run on the fork join pool:

class MergeCombinePartitioningAction <T> extends RecursiveAction

which is responsible for computing the level of parallelism, partitioning the set of input sequences between the chosen level of parallelism, creating sets of BatchedResultsCursor and spawning the layer 1 and layer 2 initialization tasks:

class PrepareMergeCombineInputsAction <T> extends RecursiveAction

which serve the purpose to block until the initial batch of results is produced and ready to process for each cursor, allowing them to be placed in a PriorityQueue and sorted using the Ordering function. Once the results are available for all cursors in a PrepareMergeCombineInputsAction, the PriorityQueue will be fed into the main worker task of the ParallelMergeCombinginSequence:

class MergeCombineAction<T> extends RecursiveAction

which does the actual merging of results. MergeCombineAction. Results with the same ordering are then combined with the combining function while applicable before being added to an output ResultBatch to be pushed to an output blocking queue. MergeCombineAction will "yield" after processing n inputs, where n is initially 1024, and subsequently set by measuring the time it takes to process n inputs and computing the ideal n to run for 10ms. The new n is used for the next MergeCombineAction that is executed, continuing the work of processing the BatchedResultsCursor from the PriorityQueue until everything is completely drained, where a 'terminal' result batch is added to indicate to downstream processors that the stream is complete.

The level of parallelization in the prototype is currently very greedy. It is naively chosen by picking the maximum of available processors or remaining fork-join tasks, with a test oriented query context parameter to limit lower than available processors. I say naively because this should probably consider not just the remaining fork-join task slots, but how many queries are currently being processed, to attempt to save additional slots when a small number of queries are dominating the pool, but further experimentation and discussion I think might be required to pick an optimal strategy, as well as investigating the content mentioned in #8357.

Prioritization

The current prototype is lacking any sort of prioritization or gated access to the fork-join pool. Testing so far shows that unrestricted access to the fork-join pool is perhaps 'good enough' for the initial PR or perhaps in general, and prioritization should be perhaps handled elsewhere (or solely pushed down to the historicals as it is currently done). Unrestricted scheduling of tasks I think should achieve the interleaving suggested in #8356, by nature of the algorithm in use where work is done in small chunks and continuously scheduling additional tasks to run on the pool to complete the work.

However, if it unrestricted access to the pool proves to be not sufficient after further testing, I have considered 2 approaches we could take to handle this, and account for query priority. The first is sort of prioritized, first-come first-serve blocking mechanism to a fixed number of slots, an effective maximum concurrent queries to be merging limit, to block before spawning fork-join tasks and release the slots when the merge is complete.

While I haven't spent a significant amount of time thinking about this yet, a more elaborate mechanism I can imagine is some sort of customized fork-join pool implementation, where the 'execute' method goes through a prioritized queue, so that lower priority queries can be stalled in favor of higher priorities.

Semi-related, with the work broken up into small chunks like this, it seems like there could be even more elaborate strategies of constricting and expanding the number of parallel merge tasks based on pool utilization by just regrouping BatchedResultsCursor, but this would require much further investigation and testing than I think should be part of the initial effort on this.

Rationale

This seems like a much more flexible approach to dividing up the work for result merging at the broker level than the previous attempt in #6629, and a bit less invasive than the changes of #5913. The concept itself of the broker performing a magic parallel result merging I don't think is objectionable by anyone, so experimentation was necessary in order to provide the approach viable. The results so far appear very promising, testing on my 4 physcial/8 hyperthreaded core laptop has yielded the following results using the benchmarks added in #8089:

Benchmark                                        (numServers)  (parallelism)  (queryGranularity)  (rowsPerSegment)  Mode  Cnt        Score         Error  Units
CachingClusteredClientBenchmark.groupByQuery                8              0                 all             75000  avgt    5   294667.810 ±   15159.044  us/op
CachingClusteredClientBenchmark.groupByQuery                8              1                 all             75000  avgt    5   289990.615 ±   12078.017  us/op
CachingClusteredClientBenchmark.groupByQuery                8              4                 all             75000  avgt    5   165992.136 ±    5120.743  us/op
CachingClusteredClientBenchmark.groupByQuery                8              0              minute             75000  avgt    5   665222.595 ±   34855.138  us/op
CachingClusteredClientBenchmark.groupByQuery                8              1              minute             75000  avgt    5   601011.456 ±   31143.896  us/op
CachingClusteredClientBenchmark.groupByQuery                8              4              minute             75000  avgt    5   505897.930 ±  252049.184  us/op

Benchmark                                        (numServers)  (parallelism)  (queryGranularity)  (rowsPerSegment)  Mode  Cnt        Score         Error  Units
CachingClusteredClientBenchmark.timeseriesQuery             8              0                 all             75000  avgt    5    10430.144 ±     808.633  us/op
CachingClusteredClientBenchmark.timeseriesQuery             8              1                 all             75000  avgt    5    10451.555 ±     673.152  us/op
CachingClusteredClientBenchmark.timeseriesQuery             8              4                 all             75000  avgt    5     3092.491 ±     102.575  us/op
CachingClusteredClientBenchmark.timeseriesQuery             8              1              minute             75000  avgt    5    35844.012 ±    1422.124  us/op
CachingClusteredClientBenchmark.timeseriesQuery             8              0              minute             75000  avgt    5    35463.967 ±    2678.709  us/op
CachingClusteredClientBenchmark.timeseriesQuery             8              4              minute             75000  avgt    5    10490.847 ±     532.253  us/op

Benchmark                                        (numServers)  (parallelism)  (queryGranularity)  (rowsPerSegment)  Mode  Cnt        Score         Error  Units
CachingClusteredClientBenchmark.topNQuery                   8              0                 all             75000  avgt    5    17020.785 ±    1591.048  us/op
CachingClusteredClientBenchmark.topNQuery                   8              1                 all             75000  avgt    5    17236.979 ±    1742.515  us/op
CachingClusteredClientBenchmark.topNQuery                   8              4                 all             75000  avgt    5     5182.325 ±     275.617  us/op
CachingClusteredClientBenchmark.topNQuery                   8              0              minute             75000  avgt    5   440288.390 ±   36510.682  us/op
CachingClusteredClientBenchmark.topNQuery                   8              1              minute             75000  avgt    5   462569.201 ±   40711.373  us/op
CachingClusteredClientBenchmark.topNQuery                   8              4              minute             75000  avgt    5   213399.804 ±   24344.845  us/op


Benchmark                                        (numServers)  (parallelism)  (queryGranularity)  (rowsPerSegment)  Mode  Cnt        Score         Error  Units
CachingClusteredClientBenchmark.groupByQuery               32              0                 all             75000  avgt    5  1392022.470 ±   37345.957  us/op
CachingClusteredClientBenchmark.groupByQuery               32              1                 all             75000  avgt    5  1355029.496 ±   74562.324  us/op
CachingClusteredClientBenchmark.groupByQuery               32              4                 all             75000  avgt    5   789772.717 ±   15048.352  us/op
CachingClusteredClientBenchmark.groupByQuery               32              1              minute             75000  avgt    5  3039476.943 ±   96495.702  us/op
CachingClusteredClientBenchmark.groupByQuery               32              0              minute             75000  avgt    5  3066853.419 ±   83639.422  us/op
CachingClusteredClientBenchmark.groupByQuery               32              4              minute             75000  avgt    5  1677438.195 ±  263601.667  us/op

Benchmark                                        (numServers)  (parallelism)  (queryGranularity)  (rowsPerSegment)  Mode  Cnt        Score         Error  Units
CachingClusteredClientBenchmark.timeseriesQuery            32              0                 all             75000  avgt    5    42082.949 ±    3386.983  us/op
CachingClusteredClientBenchmark.timeseriesQuery            32              1                 all             75000  avgt    5    41423.214 ±    3115.364  us/op
CachingClusteredClientBenchmark.timeseriesQuery            32              4                 all             75000  avgt    5    14438.213 ±     404.668  us/op
CachingClusteredClientBenchmark.timeseriesQuery            32              0              minute             75000  avgt    5   159062.877 ±   13616.178  us/op
CachingClusteredClientBenchmark.timeseriesQuery            32              1              minute             75000  avgt    5   148730.323 ±   13239.695  us/op
CachingClusteredClientBenchmark.timeseriesQuery            32              4              minute             75000  avgt    5    50602.164 ±    1653.594  us/op

Benchmark                                        (numServers)  (parallelism)  (queryGranularity)  (rowsPerSegment)  Mode  Cnt        Score         Error  Units
CachingClusteredClientBenchmark.topNQuery                  32              0                 all             75000  avgt    5    68737.166 ±    5021.642  us/op
CachingClusteredClientBenchmark.topNQuery                  32              1                 all             75000  avgt    5    68601.619 ±    1731.964  us/op
CachingClusteredClientBenchmark.topNQuery                  32              4                 all             75000  avgt    5    23354.736 ±     136.128  us/op
CachingClusteredClientBenchmark.topNQuery                  32              0              minute             75000  avgt    5  2209146.790 ±   26481.535  us/op
CachingClusteredClientBenchmark.topNQuery                  32              1              minute             75000  avgt    5  2072088.508 ±   23812.286  us/op
CachingClusteredClientBenchmark.topNQuery                  32              4              minute             75000  avgt    5   702360.662 ±   13024.822  us/op


Benchmark                                        (numServers)  (parallelism)  (queryGranularity)  (rowsPerSegment)  Mode  Cnt        Score         Error  Units
CachingClusteredClientBenchmark.groupByQuery               64              0                 all             75000  avgt    5  2982378.361 ±  111827.750  us/op
CachingClusteredClientBenchmark.groupByQuery               64              1                 all             75000  avgt    5  3093978.766 ±  173486.476  us/op
CachingClusteredClientBenchmark.groupByQuery               64              4                 all             75000  avgt    5  1866266.877 ±   72350.259  us/op
CachingClusteredClientBenchmark.groupByQuery               64              0              minute             75000  avgt    5  6771130.653 ±  389650.338  us/op
CachingClusteredClientBenchmark.groupByQuery               64              1              minute             75000  avgt    5  6531315.426 ±  350824.451  us/op
CachingClusteredClientBenchmark.groupByQuery               64              4              minute             75000  avgt    5  3915926.485 ± 1945271.888  us/op

Benchmark                                        (numServers)  (parallelism)  (queryGranularity)  (rowsPerSegment)  Mode  Cnt        Score         Error  Units
CachingClusteredClientBenchmark.timeseriesQuery            64              0                 all             75000  avgt    5    83402.353 ±    8059.262  us/op
CachingClusteredClientBenchmark.timeseriesQuery            64              1                 all             75000  avgt    5    82654.846 ±    6609.039  us/op
CachingClusteredClientBenchmark.timeseriesQuery            64              4                 all             75000  avgt    5    29818.524 ±     531.522  us/op
CachingClusteredClientBenchmark.timeseriesQuery            64              0              minute             75000  avgt    5   319373.950 ±   34796.043  us/op
CachingClusteredClientBenchmark.timeseriesQuery            64              1              minute             75000  avgt    5   313583.213 ±   23267.252  us/op
CachingClusteredClientBenchmark.timeseriesQuery            64              4              minute             75000  avgt    5   107551.460 ±    2252.579  us/op

Benchmark                                        (numServers)  (parallelism)  (queryGranularity)  (rowsPerSegment)  Mode  Cnt        Score         Error  Units
CachingClusteredClientBenchmark.topNQuery                  64              0                 all             75000  avgt    5   134204.767 ±    1053.392  us/op
CachingClusteredClientBenchmark.topNQuery                  64              1                 all             75000  avgt    5   140975.988 ±    9803.699  us/op
CachingClusteredClientBenchmark.topNQuery                  64              4                 all             75000  avgt    5    49865.418 ±    1392.572  us/op
CachingClusteredClientBenchmark.topNQuery                  64              0              minute             75000  avgt    5  4927134.934 ±  526932.265  us/op
CachingClusteredClientBenchmark.topNQuery                  64              1              minute             75000  avgt    5  4371840.961 ±  422021.155  us/op
CachingClusteredClientBenchmark.topNQuery                  64              4              minute             75000  avgt    5  1571932.840 ±   11563.777  us/op

Parallelism 0 is the existing caching clustered client merge strategy, parallelism 1 is doing a serial merge on the fork-join pool, and 4 is using 3 layer 1 tasks to merge sequences in parallel, which is the limit to the number of physical cores my laptop has. In many cases queries are processing 2-3x faster when done in parallel, as would be expected for the given level of parallelism. Even doing serial processing with a single fork-join task is competitive with the existing serial approach, so all merges can be done with the same approach even when there is not capacity available to run the merge in parallel. I will continue to update this proposal as I collect more experiment results.

Operational impact

No forced operational impact since the feature will be opt-in initially and must be defined in the service configuration to be enabled. I think that this could result in more predicitable broker resource utilization, but operators experimenting with this new feature will need to closely monitor broker query performance to ensure that the new feature is producing beneficial results.

Test plan

Test plan includes live cluster testing on some small-ish clusters I have available, as well as running the benchmarks on a large core count machine to simulate larger clusters and round out the benchmarks, to ensure that the approach scales correctly. Additionally I plan to test 'overloaded' testing to ensure that a busy broker performs no worse within reason than the existing merge strategy.

Future work (optional)

Beyond the initial PR, I think the most benefit would be focusing on tuning the level of parallelism, re #8357.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant