Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternative groupBy strategy. #2998

Merged
merged 1 commit into from
Jun 25, 2016
Merged

Conversation

gianm
Copy link
Contributor

@gianm gianm commented May 20, 2016

This patch introduces a GroupByStrategy concept and two strategies: "v1"
is the current groupBy strategy and "v2" is a new one. It also
introduces a merge buffers concept in DruidProcessingModule, to try to better
manage memory used for merging.

Both of these are described in more detail in #2987.

There are two goals of this patch:

  1. Make it possible for historical/realtime nodes to return larger groupBy
    result sets, faster, with better memory management.
  2. Make it possible for brokers to merge streams when there are no order-by
    columns, avoiding materialization.

This patch does not do anything to help with memory management on the broker
when there are order-by columns or when there are nested queries. That could
potentially be done in a future patch.

Benchmarks:

master

Benchmark                                       (numSegments)  (rowsPerSegment)  (schemaAndQuery)  Mode  Cnt       Score      Error  Units
GroupByBenchmark.processSingleIncrementalIndex              4            100000           basic.A  avgt   25  109206.640 ± 2110.087  us/op
GroupByBenchmark.processSingleQueryableIndex                4            100000           basic.A  avgt   25   79239.089 ± 1903.127  us/op
GroupByBenchmark.queryMultiQueryableIndex                   4            100000           basic.A  avgt   25  376020.150 ± 4878.307  us/op
GroupByBenchmark.querySingleIncrementalIndex                4            100000           basic.A  avgt   25  253640.355 ± 9018.422  us/op
GroupByBenchmark.querySingleQueryableIndex                  4            100000           basic.A  avgt   25  198558.886 ± 7151.646  us/op

epinephelinae

Benchmark                                       (numSegments)  (rowsPerSegment)  (schemaAndQuery)  Mode  Cnt       Score      Error  Units
GroupByBenchmark.processSingleIncrementalIndex              4            100000           basic.A  avgt   25   59674.994 ± 1996.179  us/op
GroupByBenchmark.processSingleQueryableIndex                4            100000           basic.A  avgt   25   38382.406 ±  848.041  us/op
GroupByBenchmark.queryMultiQueryableIndex                   4            100000           basic.A  avgt   25  189633.397 ± 3665.856  us/op
GroupByBenchmark.querySingleIncrementalIndex                4            100000           basic.A  avgt   25   69098.673 ± 3138.578  us/op
GroupByBenchmark.querySingleQueryableIndex                  4            100000           basic.A  avgt   25   45949.471 ± 1807.845  us/op

@fjy fjy added this to the 0.9.2 milestone May 20, 2016
@@ -56,6 +56,7 @@ Druid uses Jetty to serve HTTP requests.
|`druid.processing.buffer.sizeBytes`|This specifies a buffer size for the storage of intermediate results. The computation engine in both the Historical and Realtime nodes will use a scratch buffer of this size to do all of their intermediate computations off-heap. Larger values allow for more aggregations in a single pass over the data while smaller values can require more passes depending on the query that is being executed.|1073741824 (1GB)|
|`druid.processing.buffer.poolCacheMaxCount`|processing buffer pool caches the buffers for later use, this is the maximum count cache will grow to. note that pool can create more buffers than it can cache if necessary.|Integer.MAX_VALUE|
|`druid.processing.formatString`|Realtime and historical nodes use this format string to name their processing threads.|processing-%s|
|`druid.processing.numMergeBuffers`|The number of direct memory buffers available for merging query results. The buffers are sized by `druid.processing.buffer.sizeBytes`. Not all queries need these buffers. By default, no queries use these buffers, so the default pool size is zero.|0|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are some reasonable values to set this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However many concurrent groupBy queries you want to be able to run with the strategy from this PR, I guess it depends on your expected query load and how much memory you have.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add some documentation about how to use the new groupBys then? I feel people won't figure out what parameters to set without being told to set them

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe in the groupBy query doc page?

@fjy
Copy link
Contributor

fjy commented May 20, 2016

@gianm this is failing UT

@gianm
Copy link
Contributor Author

gianm commented May 20, 2016

fixed

@gianm
Copy link
Contributor Author

gianm commented May 20, 2016

Some more benchmarks, this time on a c4.8xlarge with a 31 thread processing pool, varying the number of segments to see how well the concurrent merging scales. It looks like neither the old or new approach scales perfectly, but the new one does scale better.

oldFaithful

Benchmark                                  (numSegments)  (rowsPerSegment)  (schemaAndQuery)  Mode  Cnt        Score       Error  Units
GroupByBenchmark.queryMultiQueryableIndex              1            100000           basic.A  avgt   25   313628.140 ±  4590.621  us/op
GroupByBenchmark.queryMultiQueryableIndex              2            100000           basic.A  avgt   25   349807.711 ±  6322.088  us/op
GroupByBenchmark.queryMultiQueryableIndex              4            100000           basic.A  avgt   25   404873.643 ± 17808.031  us/op
GroupByBenchmark.queryMultiQueryableIndex              8            100000           basic.A  avgt   25   582821.472 ± 40670.849  us/op
GroupByBenchmark.queryMultiQueryableIndex             16            100000           basic.A  avgt   25  1225828.630 ± 49795.881  us/op
GroupByBenchmark.queryMultiQueryableIndex             24            100000           basic.A  avgt   25  1884208.661 ± 56287.292  us/op
GroupByBenchmark.queryMultiQueryableIndex             32            100000           basic.A  avgt   25  2574992.390 ± 57767.378  us/op
GroupByBenchmark.queryMultiQueryableIndex             48            100000           basic.A  avgt   25  3805300.946 ± 89395.445  us/op
GroupByBenchmark.queryMultiQueryableIndex             64            100000           basic.A  avgt   25  4730137.402 ± 66557.773  us/op

epinephelinae

Benchmark                                  (numSegments)  (rowsPerSegment)  (schemaAndQuery)  Mode  Cnt       Score       Error  Units
GroupByBenchmark.queryMultiQueryableIndex              1            100000           basic.A  avgt   25  111067.798 ±  2244.667  us/op
GroupByBenchmark.queryMultiQueryableIndex              2            100000           basic.A  avgt   25  122309.884 ±  2568.025  us/op
GroupByBenchmark.queryMultiQueryableIndex              4            100000           basic.A  avgt   25  133108.612 ±  3692.383  us/op
GroupByBenchmark.queryMultiQueryableIndex              8            100000           basic.A  avgt   25  153410.938 ±  3546.060  us/op
GroupByBenchmark.queryMultiQueryableIndex             16            100000           basic.A  avgt   25  211194.960 ±  3433.743  us/op
GroupByBenchmark.queryMultiQueryableIndex             24            100000           basic.A  avgt   25  297872.072 ±  4930.089  us/op
GroupByBenchmark.queryMultiQueryableIndex             32            100000           basic.A  avgt   25  428084.539 ±  7660.333  us/op
GroupByBenchmark.queryMultiQueryableIndex             48            100000           basic.A  avgt   25  537781.584 ± 12506.191  us/op
GroupByBenchmark.queryMultiQueryableIndex             64            100000           basic.A  avgt   25  714390.172 ± 16653.711  us/op

@fjy
Copy link
Contributor

fjy commented May 20, 2016

@gianm benchmarks look great!

this is really promising

return new ObjectResourceHolder(objects.take());
}

private class ObjectResourceHolder implements ResourceHolder<T>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this pretty much a copy of the StupidPool code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is similar, yeah. One difference is that this one adds back to the pool on finalize whereas stupid doesn't.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you put that in a comment here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

@gianm gianm force-pushed the epinephelinae branch 2 times, most recently from 1cf5cdc to 3058de2 Compare May 26, 2016 19:50

Preconditions.checkArgument(
keyBuffer.remaining() == keySize,
"key size[%s] != keySize[%s]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this error msg is really confusing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will rewrite to "keyBuffer.remaining[%s] != keySize[%s], buffer was the wrong size?!"

Copy link
Contributor

@himanshug himanshug Jun 13, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could say, keySerde.keySize()[%s] is not same as keySerde.toByteBuffer(key).remaining()[%s]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

@gianm gianm force-pushed the epinephelinae branch 6 times, most recently from 0a96bdb to 064aac2 Compare June 22, 2016 20:01
@gianm
Copy link
Contributor Author

gianm commented Jun 22, 2016

@fjy added docs

@fjy
Copy link
Contributor

fjy commented Jun 22, 2016

👍

final ResourceHolder<ByteBuffer> mergeBufferHolder;

try {
mergeBufferHolder = mergeBufferPool.take(timeoutAt > startTime ? timeoutAt - startTime : -1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not simply mergeBufferHolder = mergeBufferPool.take(timeout != null && timeout.longValue() > 0 ? timeout.longValue() : -1); without getting current time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, switched this, but kept timeoutAt as it is needed for waitForFutureCompletion

@himanshug
Copy link
Contributor

minor comment #2998 (comment) but 👍 overall

for (int i = 0; i < entry.getKey().getDimensions().length; i++) {
theMap.put(
query.getDimensions().get(i).getOutputName(),
Strings.emptyToNull(entry.getKey().getDimensions()[i])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IncrementalIndex would have returned this as empty string for nulls,
are you sure we need to convert empty strings back to null here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests fail without emptyToNull here.

furthermore, I believe treating empties as nulls is consistent with other areas.

This patch introduces a GroupByStrategy concept and two strategies: "v1"
is the current groupBy strategy and "v2" is a new one. It also introduces
a merge buffers concept in DruidProcessingModule, to try to better
manage memory used for merging.

Both of these are described in more detail in apache#2987.

There are two goals of this patch:

1. Make it possible for historical/realtime nodes to return larger groupBy
   result sets, faster, with better memory management.
2. Make it possible for brokers to merge streams when there are no order-by
   columns, avoiding materialization.

This patch does not do anything to help with memory management on the broker
when there are order-by columns or when there are nested queries. That could
potentially be done in a future patch.
@gianm
Copy link
Contributor Author

gianm commented Jun 24, 2016

@nishantmonu51 updated PR with changes, thanks for reviewing

@nishantmonu51
Copy link
Member

👍

);

return ByteBuffer.allocateDirect(computationBufferSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just wondering why using allocateDirect here?

@gianm gianm deleted the epinephelinae branch September 23, 2022 19:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants