Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"java.lang.NullPointerException" occurs when using APPROX_QUANTILE_DS #11544

Open
lsee9 opened this issue Aug 4, 2021 · 20 comments
Open

"java.lang.NullPointerException" occurs when using APPROX_QUANTILE_DS #11544

lsee9 opened this issue Aug 4, 2021 · 20 comments

Comments

@lsee9
Copy link

lsee9 commented Aug 4, 2021

Affected Version

  • 0.21.1
  • 2 overlords, 2 coordinators
  • 2 routers, 2 brokers
  • 3 historicals, 11 middle managers

Description

Hello, I am trying to calculate quantiles using "APPROX_QUANTILE_DS()".
but java.lang.NullPointerException occurs in my query.
Exception occurs in "org.apache.datasketches.quantiles.DirectUpdateDoublesSketch.growCombinedMemBuffer".

So, I think this is due to out of memory. (There is not enough memory available for the operation)
However, increasing the memory does not solve the problem.

Also, the problem only occurs when using some service codes (e.g. 'top', 'cafe')

What I'm curious about is:

  1. Is it a memory problem? Or do you think there is another cause?
  2. For k=128, how much memory is needed for the quantile operation?
  3. Should I use a different aggregator to compute the quantile?

I don't have any good ideas to solve the problem :(

my query:

SELECT COALESCE("mytable".country, '_') AS country,
  (APPROX_QUANTILE_DS("mytable".quantile_duration, 0.9)) AS quantile
FROM "mytable"
WHERE ("mytable".service_code = 'top')
AND __time >= '2021-06-01' AND __time <= '2021-06-01'
GROUP BY COALESCE("mytable".country, '_')

datasource configuration:

  • __time : 2021-06-01/2021-06-13
  • dimension: country (cardinality 239), service_code(cardinality 129) etc.
  • metric: quantile_duration (Apply quantilesDoublesSketch, k=128(default)) etc.
  • 7 segments (total 1.42 GB)

full log:

at  org.apache.datasketches.quantiles.DirectUpdateDoublesSketch.growCombinedMemBuffer(DirectUpdateDoublesSketch.java:254)
at  org.apache.datasketches.quantiles.DirectUpdateDoublesSketch.growCombinedBuffer(DirectUpdateDoublesSketch.java:238)
at  org.apache.datasketches.quantiles.DoublesMergeImpl.mergeInto(DoublesMergeImpl.java:84)
at  org.apache.datasketches.quantiles.DoublesUnionImpl.updateLogic(DoublesUnionImpl.java:200)
at  org.apache.datasketches.quantiles.DoublesUnionImpl.update(DoublesUnionImpl.java:118)
at  org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchMergeAggregator.updateUnion(DoublesSketchMergeAggregator.java:80)
at  org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchMergeBufferAggregator.aggregate(DoublesSketchMergeBufferAggregator.java:66)
at  org.apache.druid.query.aggregation.AggregatorAdapters.aggregateBuffered(AggregatorAdapters.java:164)
at  org.apache.druid.query.groupby.epinephelinae.AbstractBufferHashGrouper.aggregate(AbstractBufferHashGrouper.java:161)
at  org.apache.druid.query.groupby.epinephelinae.SpillingGrouper.aggregate(SpillingGrouper.java:172)
at  org.apache.druid.query.groupby.epinephelinae.ConcurrentGrouper.aggregate(ConcurrentGrouper.java:269)
at  org.apache.druid.query.groupby.epinephelinae.Grouper.aggregate(Grouper.java:85)
at  org.apache.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.lambda$createGrouperAccumulatorPair$2(RowBasedGrouperHelper.java:332)
at  org.apache.druid.java.util.common.guava.MappingAccumulator.accumulate(MappingAccumulator.java:40)
at  org.apache.druid.java.util.common.guava.BaseSequence.accumulate(BaseSequence.java:44)
at  org.apache.druid.java.util.common.guava.ConcatSequence.lambda$accumulate$0(ConcatSequence.java:41)
at  org.apache.druid.java.util.common.guava.MappingAccumulator.accumulate(MappingAccumulator.java:40)
at  org.apache.druid.java.util.common.guava.FilteringAccumulator.accumulate(FilteringAccumulator.java:41)
at  org.apache.druid.java.util.common.guava.MappingAccumulator.accumulate(MappingAccumulator.java:40)
at  org.apache.druid.java.util.common.guava.BaseSequence.accumulate(BaseSequence.java:44)
at  org.apache.druid.java.util.common.guava.MappedSequence.accumulate(MappedSequence.java:43)
at  org.apache.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50)
at  org.apache.druid.java.util.common.guava.SequenceWrapper.wrap(SequenceWrapper.java:55)
at  org.apache.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45)
at  org.apache.druid.java.util.common.guava.FilteredSequence.accumulate(FilteredSequence.java:45)
at  org.apache.druid.java.util.common.guava.MappedSequence.accumulate(MappedSequence.java:43)
at  org.apache.druid.java.util.common.guava.ConcatSequence.accumulate(ConcatSequence.java:41)
at  org.apache.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50)
at  org.apache.druid.java.util.common.guava.SequenceWrapper.wrap(SequenceWrapper.java:55)
at  org.apache.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45)
at  org.apache.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50)
at  org.apache.druid.java.util.common.guava.SequenceWrapper.wrap(SequenceWrapper.java:55)
at  org.apache.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45)
at  org.apache.druid.java.util.common.guava.LazySequence.accumulate(LazySequence.java:40)
at  org.apache.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50)
at  org.apache.druid.java.util.common.guava.SequenceWrapper.wrap(SequenceWrapper.java:55)
at  org.apache.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45)
at  org.apache.druid.java.util.common.guava.MappedSequence.accumulate(MappedSequence.java:43)
at  org.apache.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50)
at  org.apache.druid.java.util.common.guava.SequenceWrapper.wrap(SequenceWrapper.java:55)
at  org.apache.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45)
at  org.apache.druid.java.util.common.guava.LazySequence.accumulate(LazySequence.java:40)
at  org.apache.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50)
at  org.apache.druid.java.util.common.guava.SequenceWrapper.wrap(SequenceWrapper.java:55)
at  org.apache.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45)
at  org.apache.druid.query.spec.SpecificSegmentQueryRunner$1.accumulate(SpecificSegmentQueryRunner.java:87)
at  org.apache.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50)
at  org.apache.druid.query.spec.SpecificSegmentQueryRunner.doNamed(SpecificSegmentQueryRunner.java:171)
at  org.apache.druid.query.spec.SpecificSegmentQueryRunner.access$100(SpecificSegmentQueryRunner.java:44)
at  org.apache.druid.query.spec.SpecificSegmentQueryRunner$2.wrap(SpecificSegmentQueryRunner.java:153)
at  org.apache.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45)
at  org.apache.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50)
at  org.apache.druid.query.CPUTimeMetricQueryRunner$1.wrap(CPUTimeMetricQueryRunner.java:78)
at  org.apache.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45)
at  org.apache.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2$1$1$1.call(GroupByMergingQueryRunnerV2.java:247)
at  org.apache.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2$1$1$1.call(GroupByMergingQueryRunnerV2.java:234)
at  java.util.concurrent.FutureTask.run(FutureTask.java:266)
at  org.apache.druid.query.PrioritizedListenableFutureTask.run(PrioritizedExecutorService.java:247)
at  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at  java.lang.Thread.run(Thread.java:748)

Any help would be greatly appreciated.

@jihoonson
Copy link
Contributor

Hi @lsee9, thank you for the report. I would call this a bug of Druid because Druid should have returned a better error than NPE. To answer your questions, I think your assessment is correct about the lack of memory. Please see https://datasketches.apache.org/docs/Quantiles/OrigQuantilesSketch.html for the memory space required per k. I'm not sure why the query exploded in your case though because data seems pretty small. What was druid.processing.buffer.sizeBytes set to in your test? You could try either with a smaller k or a bigger buffer size.

@lsee9
Copy link
Author

lsee9 commented Aug 5, 2021

Hi @jihoonson, thank you for your reply:)
I'll tell you what I tried.

My druid spec:

druid.processing.buffer.sizeBytes=1GiB
druid.processing.numMergeBuffers=10
druid.processing.numThreads=19 (20 core machine)
MaxDirectMemorySize=30g
heap size=32g

BUT, For k = 128, the problem still occurs.
Should I increase more??

And, I tried with a smaller k (k = 32, 64).
Then no error occurs.(I'm going to check a wider time range(more data)).
But the deviation of the values ​​is too large. (e.g. quantile 0 ~ 2000 in some cases)
So I don't know if I can use this.

If you have any good ideas, please reply!

@jihoonson
Copy link
Contributor

Hmm, were there lots of values per group-by key by any chance? What does this query return? (BTW, I copied the time filter from your comment, but is that correct? It is identical to __time = '2021-06-01')

SELECT COALESCE("mytable".country, '_') AS country, count(*)
FROM "mytable"
WHERE ("mytable".service_code = 'top')
AND __time >= '2021-06-01' AND __time <= '2021-06-01'
GROUP BY COALESCE("mytable".country, '_')

@lsee9
Copy link
Author

lsee9 commented Aug 5, 2021

Yes!! the time filter is correct.
And NPE occurs when I do my query and turn off auto limit.

The result of running the query you said(ORDER BY count DESC):

{"country":"kr","EXPR$1":490}
{"country":"us","EXPR$1":221}
{"country":"jp","EXPR$1":173}
{"country":"ca","EXPR$1":165}
{"country":"au","EXPR$1":155}
{"country":"de","EXPR$1":147}
{"country":"vn","EXPR$1":138}
{"country":"sg","EXPR$1":130}
{"country":"th","EXPR$1":127}
{"country":"hk","EXPR$1":123}
{"country":"nz","EXPR$1":122}
{"country":"gb","EXPR$1":115}
{"country":"ph","EXPR$1":112}
{"country":"tw","EXPR$1":111}
{"country":"id","EXPR$1":108}
...
{"country":"re","EXPR$1":6}
{"country":"ye","EXPR$1":6}
{"country":"bm","EXPR$1":4}
{"country":"gy","EXPR$1":4}
{"country":"li","EXPR$1":4}
{"country":"mc","EXPR$1":4}
{"country":"tc","EXPR$1":4}
{"country":"kp","EXPR$1":3}
{"country":"ad","EXPR$1":2}
{"country":"so","EXPR$1":2}
{"country":"gw","EXPR$1":1}
{"country":"mq","EXPR$1":1}
{"country":"sy","EXPR$1":1}
total num country: 200

each is not so much...

@lsee9
Copy link
Author

lsee9 commented Aug 5, 2021

☝️ The above comment is the druid table result.
This is the value after already rolling-up with quantilesDoublesSketch and becoming ingestion.

The number of rows in the original table is as follows.
query:

SELECT
  country,
  SUM("count") AS total_num_rows_original
FROM "mytable"
WHERE __time >= '2021-06-01' AND __time <= '2021-06-01' AND service_code = 'top'
GROUP BY 1
ORDER BY 2 DESC

query result:

{"country":"kr","total_num_rows_original":1082227280}
{"country":"us","total_num_rows_original":10978845}
{"country":"jp","total_num_rows_original":2896190}
{"country":"ca","total_num_rows_original":2767109}
{"country":"au","total_num_rows_original":1862148}
{"country":"vn","total_num_rows_original":1718031}
{"country":"nz","total_num_rows_original":575751}
{"country":"de","total_num_rows_original":556492}
{"country":"sg","total_num_rows_original":536305}
{"country":"id","total_num_rows_original":425479}
{"country":"hk","total_num_rows_original":373920}
{"country":"ph","total_num_rows_original":364786}
{"country":"","total_num_rows_original":361175}
{"country":"th","total_num_rows_original":360037}
{"country":"my","total_num_rows_original":333746}
{"country":"gb","total_num_rows_original":324027}
{"country":"mx","total_num_rows_original":240169}
{"country":"ae","total_num_rows_original":237182}
...
{"country":"ad","total_num_rows_original":3}
{"country":"gw","total_num_rows_original":3}
{"country":"so","total_num_rows_original":3}
{"country":"mq","total_num_rows_original":1}
{"country":"sy","total_num_rows_original":1}

If total aggregation is performed, the number of original rows is about 81 billion,
up to 20 times the value of N in table(https://datasketches.apache.org/docs/Quantiles/OrigQuantilesSketch.html)

But the number of bytes required is 2^36 ~ 2^37 about 81 billion rows, increasing by 1 KB on a log scale.
Based on this calculation, 30KB to 32KB seems to be sufficient.

@jihoonson
Copy link
Contributor

I think I see what's going on 🙂. Does your original query work if you add an extra filter of country <> 'kr'?

@lsee9
Copy link
Author

lsee9 commented Aug 6, 2021

Yes, it does work if I add extra filter country <> 'kr' !
What do you think is the problem here?
Is it the big size of the original rows of table??

@jihoonson
Copy link
Contributor

Yes, I think the problem is too many items per country. Druid uses a fixed-size buffer per row to keep the sketch (DoublesSketch). Since the buffer size is fixed but Druid doesn't know the number of items in advance, it estimates the buffer size to be large enough to hold one billion items in the sketch. So, when you have less items than one billion, the sketch can fit in the buffer and everything works well. The interesting part is when you have more items than one billion. In that case, Druid lets the sketch allocate extra heap memory to hold those items that don't fit in the buffer. However, DoublesSketch is not working as we expected and throws NPE when it tries to allocate more memory. This issue is filed in apache/datasketches-java#358.

As a workaround, you could use other functions to compute approximate quantiles, such as DS_QUANTILES_SKETCH or APPROX_QUANTILE. Note that APPROX_QUANTILE uses the deprecated approximate histogram aggregator and its accuracy might be not great.

@lsee9
Copy link
Author

lsee9 commented Aug 6, 2021

Yes, I understand!
Thanks for your help.
I'll try the other function you suggested 😄 .

@AlexanderSaydakov
Copy link
Contributor

Druid version 0.21.1 uses datasketches-java-1.3.0-incubating and datasketches-memory-1.2.0-incubating
Would it be possible to try reproducing this with the current code in master, which uses datasketches-java-2.0.0 and datasketches-memory-1.3.0?

@AlexanderSaydakov
Copy link
Contributor

Could someone point to the code that allocates this memory for BufferAggregator please?

@AlexanderSaydakov
Copy link
Contributor

if rebuilding Druid is an option, I would suggest increasing this constant:


It will increase the size of pre-allocated buffers in BufferAggregator, but not drastically. Sketches grow very slowly at that point. I suggest this as a temporary measure until we figure out how to fix this and go through release cycles.

@jihoonson
Copy link
Contributor

Hi @AlexanderSaydakov, thank you for taking a look. It does fail in the Druid master branch. You can easily reproduce it by running DoublesSketchAggregatorTest.buildingSketchesAtQueryTime() after setting DoublesSketchAggregatorFactory.MAX_STREAM_LENGTH to something very low, like 10.

Could someone point to the code that allocates this memory for BufferAggregator please?

Those buffers are allocated in DruidProcessingModule.

@leerho
Copy link
Contributor

leerho commented Aug 7, 2021

See comments in datasketches-java/issues#358.

@lsee9
Copy link
Author

lsee9 commented Aug 9, 2021

Hi, @leerho, thank you for your reply.

As suggested at datasketches-java/issues#358,
If you can support a temporary hacked jar, I'd love to get it!

Please let me know if it is feasible:)

@AlexanderSaydakov
Copy link
Contributor

As Lee Rhodes said, it might take quite a while to fix the root cause and go through release cycles for datasketches-memory and datasketches-java. Therefore I would suggest using the workaround that I mentioned above, namely increasing the MAX_STREAM_LENGTH constant. It affects the size pre-allocated for each sketch in the BufferAggregator. The assumption was that due to data fragmentation across multiple dimensions with power-law distribution only a small number of sketches will reach that size and move to on-heap memory. Since this mechanism is broken now, let's set a much higher limit until it is fixed. And let's do it quickly before 0.22 branch is created. I can do a pull request if we agree on the value.

Here is the size of one slot in the BufferAggregator in bytes for the default sketch parameter K=128 for different values of MAX_STREAM_LENGTH:
1B (current): 24608
10B: 28704
100B: 31776
1T: 34848

I suggest setting to 1T.

@jihoonson
Copy link
Contributor

@leerho @AlexanderSaydakov, do you have a rough schedule for the new release of datasketches-memory and datasketches-java? If it's going to take long, perhaps we could add a config that can temporarily live for a couple of Druid releases to control the size of MAX_STREAM_LENGTH. We could use the current size as default, but users could override it if needed to avoid this error.

@AlexanderSaydakov
Copy link
Contributor

This can take weeks if not months. datasketches-memory is being prepared for a major release, which is not quite ready yet, and datasketches-java depends on it, which means a sequential process with voting stage for each and so on.
I like your suggestion to make this parameter configurable. It might be useful even after we fix the root cause. So if you know how to do it quickly, please go ahead.

@jihoonson
Copy link
Contributor

@AlexanderSaydakov thanks, sounds good. I will make a PR soon.

@jihoonson
Copy link
Contributor

I created #11574.

xvrl pushed a commit that referenced this issue Mar 2, 2022
These changes are to use the latest datasketches-java-3.1.0 and also to restore support for quantile and HLL4 sketches to be able to grow larger than a given buffer in a buffer aggregator and move to heap in rare cases. This was discussed in #11544.

Co-authored-by: AlexanderSaydakov <AlexanderSaydakov@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants