Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move Caching Cluster Client to java streams and allow parallel intermediate merges #5913

Closed
wants to merge 71 commits into from

Conversation

@drcrallen
Copy link
Contributor

commented Jun 27, 2018

We are seeing severe broker bottlenecks for certain types of queries. The existing Sequence paradigm does a folding reduction whereby the results from all the historicals and real-time nodes are folded into a single accumulator. This accumulation can be a major bottleneck in terms of performance for these large queries. TopN queries with results on order 10k returning from hundreds of nodes can take many seconds (in excess of 40s in some cases) to do the merge on the broker! A cpu-time flame graph of such a scenario can be seen below:

screen shot 2018-06-22 at 1 39 00 pm

In an attempt to "fix" this bottleneck, I was having a hard time following the logic in CachingClusteredClient. So I re-wrote a bunch of the logic in java8 streams, attempting to retain as much of the original functionality as possible. I also added many more comments as to why different sections were doing what they are doing. As a killer addition, a new query context intermediateMergeBatchThreshold is added which allows parallel intermediate merges

Intermediate merges

The Sequences produced by the druid client connections are collected in a Stream. If the query context intermediateMergeBatchThreshold is specified, then Spliterators are forked off of the Sequence Stream in batches according to the threshold if possible. Failure to do so reverts to prior merge behavior. These Sequences are fully materialized by working in a ForkJoinPool, and the results are Accumulated (reduce-fold-left kind of operation) on a first-available basis. All of the results fetched from cache are also another Sequence that gets merged in.

Why Fork Join Pool?

The "work" for the intermediate merges is done in a dedicated ForkJoinPool. A fork-join-pool was used instead of an Executor service in order to accommodate dynamic expansion of the worker pool. Basically to use a worker setup slightly better than running all the folding accumulation in the http server thread. The forking and joining semantics are used, but any effects of potentially blocking operations (like QTL) are not attempted to be accounted for. As such, having a specific pool of resources was chosen with the hope that eventually it can be more intelligent about being kind when doing potentially blocking operations. This is no worse than a fixed pool executor service and, when done properly, can handle new threads to circumvent blocked threads in a better manner. Future enhancements can include merge work task prioritization based on query priority, as well as enhancements to account for potentially blocking code.

Additionally, with how the fork and join items are intended to work, a ListenableFuture style workflow can be attained by doing a join of a task, and performing a task based on the result, so long as the join is executed in the FJP.

Other changes

Streaming methods are added to the Cache interface in order to accommodate changes in the caching cluster client.

CombiningSequence is moved out of the common module into java-util with the other Sequence classes

Additionally, small changes in the way the per-druid-node Sequences are initiated are present. The proposed implementation causes the direct druid client to fire off queries to all the nodes regardless of intermediate merges being used or not. The cached and uncached results now feed through the same "Stream" and are simply resolved as either a Merge Sequence (for cached results) or a Sequence from the direct druid client (for cache misses). This caused some unit tests to be changed to allow timeline lookups when previously there were none.

Results

Initial internal results show that query speeds were improved by 85% for large queries with intermediateMergeBatchThreshold set. Impact on smaller queries have not been fully investigated, and impact when intermediateMergeBatchThreshold is not set (just the new Stream workflow) is not fully exercised.

Work can clearly be seen being done in the ForkJoinPool as per:

screen shot 2018-07-24 at 7 53 05 pm

With the final merge in the QueryResource.doPost under the jetty servlet stack.

Not included here

There are other items of interest for larger clusters, such as broker-brokers (or cross-broker merges). Hopefully the migrating of CachingClusterClient onto java streams makes such a possibility easier in the future.

The impact on heap pressure from turning on this feature has not been fully explored. Since this feature is intended to help large clusters with large results, this could be a major problem if not taken care.

Handling back-pressure better would be a nice improvement. For example, if druid-client request returns could be controlled in a better manner to prevent an arbitrary number of them from returning an arbitrary size of result, then that would help larger results to be handled by the brokers.

TODO

  • I'm still missing some unit tests for the cache changes in the hybrid cache

Conclusion

This is a huge performance boon (85%) for one of our core use cases. It would be interesting to see if this helps with large group-by queries as well. Since this is an opt-in feature that is very close to prior behavior when disabled, hopefully the risk for the change is very small, while the clarity of how the data stream is handled is clearer (or at least documented more).

@drcrallen drcrallen changed the title Move Caching Cluster Client to java streams Move Caching Cluster Client to java streams and allow parallel intermediate merges Jun 27, 2018

@leventov leventov self-requested a review Jun 28, 2018

@himanshug

This comment has been minimized.

Copy link
Contributor

commented Jul 11, 2018

nice PR description :)

@@ -147,4 +151,29 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
}
);
}

private static final AtomicLong fjpWorkerThreadCount = new AtomicLong(0L);

This comment has been minimized.

Copy link
@asdf2014

asdf2014 Jul 11, 2018

Member

Using LongAdder will be better than AtomicLong. There is a source-level analysis in my personal blog, if you are interested, you can take a look.

Tips: https://yuzhouwan.com/posts/31915#LongAdder

// -Xmx512M -Xms512M -Xmn256M -XX:+AlwaysPreTouch -ea
@Test
public void pressureLongAdder() throws Exception {
    final LongAdder longAdder = new LongAdder();
    ExecutorService executorService = Executors.newCachedThreadPool();
    long startTime = System.currentTimeMillis();
    for (int i = 0; i < 100; i++) {
        executorService.submit(new Thread(() -> {
            for (int j = 0; j < 1000_0000; j++) {
                longAdder.increment();
            }
            System.out.print(String.format("%s %s \t", Thread.currentThread().getId(), longAdder.longValue()));
            /*
            14 19607585 	12 36445036 	20 38985288 	38 76821270 	70 117094732 	18 127252576
            22 137043349 	26 153411172 	30 164051380 	34 165971155  	102 192241678 	134 201104979
            158 232657818 	46 279030056 	174 288502545 	94 347965290 	198 348060553 	118 348087414
            36 353092712 	28 357762215 	44 365464475 	126 379518198 	54 379623515 	182 380077075
            142 385263911 	78 389013887 	62 389085727 	110 389122678 	86 389920423 	166 393535019
            150 396382512 	190 403100499 	32 403161217 	208 403197689 	206 406065520 	16 410725026
            24 415347205 	40 415379997 	48 415733397 	104 418507295 	192 423244160 	176 455793362
            168 458311865 	160 463028656 	136 496375440 	72 541243645 	186 561877000 	170 575352229
            162 584152392 	154 604552121 	138 614092854 	64 638151890 	114 668705836 	58 669235250
            188 699213410 	156 729222401 	124 754336889 	100 784326386 	76 813479501 	120 827569944
            66 830236567 	98 832153503 	112 841408676 	204 849520891 	210 852391130 	202 864804732
            172 875603834 	194 877222893 	200 881090909 	88 882809513 	80 882846368 	56 887174571
            178 889682247 	140 901357028 	146 902169049 	184 904540678 	152 915608988 	130 917896629
            116 924616135 	144 927674541 	122 930399321 	128 939791111 	106 942656234 	84 950848174
            96 951904067 	90 954910184 	74 964338213 	196 966487766 	82 968307139 	52 975854400
            180 977385398 	164 978882525 	50 980896807 	148 988292352 	132 989090669 	108 996891232
            92 996921398 	42 996938988 	68 996953941 	60 1000000000
             */
        }));
    }
    executorService.shutdown();
    while (!executorService.isTerminated()) {
        Thread.sleep(1);
    }
    long endTime = System.currentTimeMillis();
    System.out.println("\n" + (endTime - startTime));    // 3275 ms
}
// -Xmx512M -Xms512M -Xmn128M -XX:+AlwaysPreTouch -ea
@Test
public void pressureAtomicLong() throws Exception {
    final AtomicLong atomicLong = new AtomicLong();
    ExecutorService executorService = Executors.newCachedThreadPool();
    long startTime = System.currentTimeMillis();
    for (int i = 0; i < 100; i++) {
        executorService.submit(new Thread(() -> {
            for (int j = 0; j < 1000_0000; j++) {
                atomicLong.getAndIncrement();
            }
            System.out.print(String.format("%s %s \t", Thread.currentThread().getId(), atomicLong.longValue()));
            /*
            12 390000000 	28 390000000 	44 390000000 	20 390000000 	26 390000000 	18 390000000
            80 390000000 	56 390000000 	96 390000000 	24 390000000 	88 390000000 	72 390000000
            22 390000000 	118 390000000 	54 390000000 	142 390000000 	70 390000000 	86 390000000
            182 390000000 	110 390000000 	62 390000000 	78 390000000 	102 390000000 	158 390000000
            150 390000000 	46 390000000 	38 390000000 	126 390000000 	94 390000000 	134 390000000
            14 390000000 	48 390000000 	40 390000000 	32 390000000 	34 390000000 	64 390000000
            42 390000000 	36 390000000 	16 390000000 	180 416396554 	204 419908287 	196 425536497
            92 732203658 	30 733835560 	202 733835559 	210 733873571 	146 733878564 	186 733883527
            170 733888686 	76 733892691 	84 733888815 	148 733901560 	162 733907032 	172 733908079
            52 733913280 	116 733918421 	124 733906868 	164 733920945 	132 733891348 	68 733923672
            108 733924928 	156 733926091 	60 733921998 	140 733927257 	188 733928891 	154 733871822
            194 733830477 	178 733872527 	100 733830322 	106 748251688 	144 1000000000 	98 1000000000
            58 1000000000 	90 1000000000 	130 1000000000 	138 1000000000 	114 1000000000 	104 1000000000
            168 1000000000 	200 1000000000 	184 1000000000 	160 1000000000 	174 1000000000 	112 1000000000
            190 1000000000 	198 1000000000 	82 1000000000 	206 1000000000 	166 1000000000 	176 1000000000
            136 1000000000 	208 1000000000 	74 1000000000 	122 1000000000 	152 1000000000 	192 1000000000
            120 1000000000 	128 1000000000 	66 1000000000 	50 1000000000
             */
        }));
    }
    executorService.shutdown();
    while (!executorService.isTerminated()) {
        Thread.sleep(1);
    }
    long endTime = System.currentTimeMillis();
    System.out.println("\n" + (endTime - startTime));    // 19409 ms
}

This comment has been minimized.

Copy link
@drcrallen

drcrallen Jul 13, 2018

Author Contributor

Changed

This comment has been minimized.

Copy link
@drcrallen

drcrallen Jul 31, 2018

Author Contributor

Had to change back to AtomicLong because of a race condition and the long adder not quite being the right solution

This comment has been minimized.

Copy link
@asdf2014

asdf2014 Jul 31, 2018

Member

Yes, it is.

@@ -3087,7 +3102,10 @@ public void testIfNoneMatch()
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(interval)))
.context(ImmutableMap.<String, Object>of("If-None-Match", "aVJV29CJY93rszVW/QBy0arWZo0="))
.context(ImmutableMap.<String, Object>of(

This comment has been minimized.

Copy link
@asdf2014

asdf2014 Jul 11, 2018

Member

ImmutableMap.<String, Object>of can be ImmutableMap.of

This comment has been minimized.

Copy link
@drcrallen

drcrallen Jul 13, 2018

Author Contributor

that one was caught in a source code refactor I think :) I'll fix

This comment has been minimized.

Copy link
@drcrallen

drcrallen Jul 13, 2018

Author Contributor

Fixed

drcrallen added 3 commits Jul 13, 2018
@drcrallen

This comment has been minimized.

Copy link
Contributor Author

commented Aug 29, 2018

@leventov I'm worried some of your comments might be on commits instead of the PR, and might be lost in the UI for me to discover. I think I responded to all the comments. Can you please call out in a fresh comment in this PR any that are missing responses so I can figure out how they were missed?

return runAndMergeWithTimelineChange(
query,
// No change, but Function.identity() doesn't work here for some reason
identity -> identity

This comment has been minimized.

Copy link
@leventov

leventov Aug 30, 2018

Member

UnaryOperator.identity() works

return CachingClusteredClient.this.run(queryPlus, responseContext, timeline -> timeline);
}
};
return runAndMergeWithTimelineChange(

This comment has been minimized.

Copy link
@leventov

leventov Aug 30, 2018

Member

Unnecessary breakdown

This comment has been minimized.

Copy link
@drcrallen

drcrallen Oct 4, 2018

Author Contributor

why unnecessary?

This comment has been minimized.

Copy link
@drcrallen

drcrallen Oct 4, 2018

Author Contributor

ah I see now, nevermind, fixing

}

/**
* Run a query. The timelineConverter will be given the "master" timeline and can be used to return a different
* timeline, if desired. This is used by getQueryRunnerForSegments.
*/
private <T> Sequence<T> run(
@VisibleForTesting
<T> Stream<Sequence<T>> run(

This comment has been minimized.

Copy link
@leventov

leventov Aug 30, 2018

Member

According to the call order, run() should be placed after runAndMergeWithTimelineChange()

);
return MergeWorkTask.parallelMerge(
sequences.parallel(),
sequenceStream ->

This comment has been minimized.

Copy link
@leventov

leventov Aug 30, 2018

Member

Please add type to this variable for readability

queryRunnerFactory.mergeRunners(
mergeFjp,
sequenceStream.map(
s -> (QueryRunner<T>) (ignored0, ignored1) -> (Sequence<T>) s

This comment has been minimized.

Copy link
@leventov

leventov Aug 30, 2018

Member

Extracting a static factory method QueryRunner.returnConstant() would be more readable

This comment has been minimized.

Copy link
@drcrallen

drcrallen Oct 4, 2018

Author Contributor

adding a simple function to get this part of code cleaner

.filter(Objects::nonNull)
.collect(Collectors.toList());

// We should only ever have cache or queries to run, not both. So if we have no segments, try caches

This comment has been minimized.

Copy link
@leventov

leventov Aug 30, 2018

Member

If so, could you replace ServerMaybeSegmentMaybeCache with two nullable fields with two different classes, both having only two fields?

// See io.druid.java.util.common.guava.MergeSequenceTest.testScrewsUpOnOutOfOrder for an example
// With zero results actually being found (no segments no caches) this should essentially return a no-op
// merge sequence
return new MergeSequence<>(query.getResultOrdering(), Sequences.fromStream(

This comment has been minimized.

Copy link
@leventov

leventov Aug 30, 2018

Member

Should be

        return new MergeSequence<>(
            query.getResultOrdering(),
            Sequences.fromStream(
                segmentOrResult
                    .stream()
                    .map(ServerMaybeSegmentMaybeCache::getCachedValue)
                    .filter(Objects::nonNull)
                    .map(Collections::singletonList)
                    .map(Sequences::simple)
            )
        );
segmentOrResult
.stream()
.map(ServerMaybeSegmentMaybeCache::getCachedValue)
.filter(Objects::nonNull)

This comment has been minimized.

Copy link
@leventov

leventov Aug 30, 2018

Member

Don't nulls here break the assumptions?

.stream()
.map(ServerMaybeSegmentMaybeCache::getCachedValue)
.filter(Objects::nonNull)
.map(Collections::singletonList)

This comment has been minimized.

Copy link
@leventov

leventov Aug 30, 2018

Member

.map(result -> Sequences.simple(Collections.singletonList(result))) would be clearer (or add Sequence.singleton())

*
* @return A stream of potentially cached results per server
*/

This comment has been minimized.

Copy link
@leventov

leventov Aug 30, 2018

Member

Extra line

@drcrallen

This comment has been minimized.

Copy link
Contributor Author

commented Sep 5, 2018

phew package rename is pushed, fixing unit tests and then I'll get to roman's comments

@drcrallen

This comment has been minimized.

Copy link
Contributor Author

commented Sep 21, 2018

got clobbered by #6313 , fixing

@jihoonson

This comment has been minimized.

Copy link
Contributor

commented Sep 23, 2018

This Travis fail looks legit. Please check.

testTopNOnPostAggMetricCaching[0](org.apache.druid.client.CachingClusteredClientTest)  Time elapsed: 0.095 sec  <<< ERROR!
java.lang.ArithmeticException: / by zero
	at org.apache.druid.client.CachingClusteredClientTest.lambda$getDefaultQueryRunner$4(CachingClusteredClientTest.java:3114)
	at org.apache.druid.client.CachingClusteredClientTest.lambda$testQueryCaching$3(CachingClusteredClientTest.java:2134)
	at org.apache.druid.client.CachingClusteredClientTest.runWithMocks(CachingClusteredClientTest.java:2625)
	at org.apache.druid.client.CachingClusteredClientTest.testQueryCaching(CachingClusteredClientTest.java:2110)
	at org.apache.druid.client.CachingClusteredClientTest.testQueryCaching(CachingClusteredClientTest.java:1971)
	at org.apache.druid.client.CachingClusteredClientTest.testTopNOnPostAggMetricCaching(CachingClusteredClientTest.java:1059)
@jihoonson

This comment has been minimized.

Copy link
Contributor

commented Sep 23, 2018

Also, are there unit tests for this feature?

@drcrallen

This comment has been minimized.

Copy link
Contributor Author

commented Oct 4, 2018

I'm digging more. Trying to get this pr back up to the top of my priority list and address Roman's comments

@jihoonson

This comment has been minimized.

Copy link
Contributor

commented Oct 4, 2018

@drcrallen thanks. Also please add some JMH benchmark.

@drcrallen

This comment has been minimized.

Copy link
Contributor Author

commented Oct 4, 2018

@jihoonson what kind of info are you wanting for JMH stuff?

@jihoonson

This comment has been minimized.

Copy link
Contributor

commented Oct 5, 2018

@drcrallen basically I want to check how large performance benefit the new parallel merge algorithm can give us. Even though you mentioned that it shows about 85% of performance improvement in some internal tests, it shows the overall performance benefit not the algorithm itself. Of course, the overall performance is more important, but the later one is also important because it gives us an insight about what we can expect exactly with this feature, I think.

Also the query performance is affected by many factors like dataSource size, query filter selectivity, # of aggregators and their types, cluster size, and so on. JMH would be useful because we can easily replicate the performance benchmark with the same query and the same data which makes us easy to maintain or improve it in the future.

I think the JMH should include the below:

  • Simplifying historical part to check the performance of only the broker merge algorithm. For example, there's no HTTP communication between CachingClusteredClient and actual query runners of historicals. Also, the query runners of historicals can just do aggregation for a single segment (or even return just some pre-aggregated values).
  • Benchmarking with varying intermediateMergeBatchThreshold and # of streams to be merged.
  • Maybe need testing against different query types (timeseries, topN, groupBy).
@drcrallen

This comment has been minimized.

Copy link
Contributor Author

commented Oct 5, 2018

intermediateMergeBatchThreshold is a purely experimental feature and as such is completely opt in. Doing full benchmarking on it at this point is overkill unless it is being considered to replace the default.

@jihoonson

This comment has been minimized.

Copy link
Contributor

commented Oct 5, 2018

@drcrallen hmm, maybe there's some misunderstanding. As you said, this is still an experimental feature, and exact numbers are not much important at this point. However, we need to add a benchmark implementation which we can do some performance tests, so that we can have better understanding of the implementation of parallel merge algorithm in this PR and how to further improve it. Does it make sense?

@drcrallen

This comment has been minimized.

Copy link
Contributor Author

commented Feb 20, 2019

For those finding this later, there are problems with GroupBy queries that are not appropriately addressed here. I think the split up between handling CachingClusterClient changes (which may or may not be needed) and enabling the GroupBy results merging to work better in parallel are really two different PRs and need addressed separately. I'm leaving the PR open for now as a reference, but it is unlikely to have any more development in favor of pursuing a more sustainable (and groupby compatible) approach

@stale

This comment has been minimized.

Copy link

commented Apr 21, 2019

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@druid.apache.org list. Thank you for your contributions.

@stale stale bot added the stale label Apr 21, 2019

@clintropolis clintropolis removed the stale label Apr 22, 2019

@stale

This comment has been minimized.

Copy link

commented Jun 20, 2019

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@druid.apache.org list. Thank you for your contributions.

@stale stale bot added the stale label Jun 20, 2019

@stale

This comment has been minimized.

Copy link

commented Jun 27, 2019

This pull request/issue has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@stale stale bot closed this Jun 27, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
7 participants
You can’t perform that action at this time.