Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Time Ordering On Scans #7133

Merged

Conversation

justinborromeo
Copy link
Contributor

@justinborromeo justinborromeo commented Feb 23, 2019

This PR addresses points 2 and 3 of issue #6088, is based on proposal #7036, and stems from a refactor of the spaghetti in PR #7024.

This PR introduces two strategies for time-ordering results from scan queries:

  1. Priority Queue: Each segment on a Historical is opened sequentially. Every row is added to a bounded priority queue which is ordered by timestamp. For every row above the result set limit, the row with the earliest (if descending) or latest (if ascending) timestamp will be dequeued. After every row has been processed, the sorted contents of the priority queue are streamed back to the Broker(s) in batches.

  2. K-Way/N-Way Merge: Each segment on a Historical is opened in parallel. Since each segment's rows are already time-ordered, a k-way merge can be performed on the results from each segment. This approach doesn't persist the entire result set in memory (like the Priority Queue) as it streams back batches as they are returned from the merge function.

Each strategy has a corresponding configurable limit to protect historicals from OOMEs:

  1. The druid.query.scan.maxRowsQueuedForOrdering property protects
    from OOMEs by limiting the number of rows in the query result set when time ordering is used.

  2. The druid.query.scan.maxSegmentPartitionsOrderedInMemory limit protects from creating too many decompression/decoding buffers by capping the number of segments opened per historical when time ordering is used.

Future Work

The configurable limits should be overrideable through new query parameters. My thinking is that since memory usage is proportional to # of dimensions being queried, it might make sense to allow users to tune that based on the specific query they issue (as opposed to setting it in the config at startup).

Also, this feature should probably be exposed through Druid SQL.

Add a better un-set value for limit. Instead of Long.MAX_VALUE, it should probably be -1 or something.

@justinborromeo
Copy link
Contributor Author

For the unordered case, there is no performance regression as demonstrated by the results of the ScanBenchmark before and after the PR below. The benchmark was run on an EC2 instance.

java -Xmx8g -server -jar target/benchmarks.jar ScanBenchmark -f 1 -i 50 -wi 25 -p schemaAndQuery=basic.A -p ordering=NONE -p limit=0,1000,99999

After PR:
# Run complete. Total time: 00:33:03

Benchmark                                  (limit)  (numProcessingThreads)  (numSegments)  (rowsPerSegment)  (schemaAndQuery)  Mode  Cnt        Score        Error  Units
ScanBenchmark.queryMultiQueryableIndex           0                       2              2            200000           basic.A  avgt   50  1047299.585 ± 100832.876  us/op
ScanBenchmark.queryMultiQueryableIndex           0                       2              4            200000           basic.A  avgt   50  3327877.446 ± 340896.932  us/op
ScanBenchmark.queryMultiQueryableIndex        1000                       2              2            200000           basic.A  avgt   50     2871.893 ±      1.312  us/op
ScanBenchmark.queryMultiQueryableIndex        1000                       2              4            200000           basic.A  avgt   50     2932.073 ±      3.538  us/op
ScanBenchmark.queryMultiQueryableIndex       99999                       2              2            200000           basic.A  avgt   50   230683.292 ±   1364.525  us/op
ScanBenchmark.queryMultiQueryableIndex       99999                       2              4            200000           basic.A  avgt   50   231715.616 ±   1572.446  us/op

ScanBenchmark.querySingleIncrementalIndex        0                       2              2            200000           basic.A  avgt   50   349069.471 ±   4376.445  us/op
ScanBenchmark.querySingleIncrementalIndex        0                       2              4            200000           basic.A  avgt   50   340061.203 ±   3059.043  us/op
ScanBenchmark.querySingleIncrementalIndex     1000                       2              2            200000           basic.A  avgt   50     1258.837 ±      0.716  us/op
ScanBenchmark.querySingleIncrementalIndex     1000                       2              4            200000           basic.A  avgt   50     1320.102 ±      1.458  us/op
ScanBenchmark.querySingleIncrementalIndex    99999                       2              2            200000           basic.A  avgt   50   155196.629 ±   1357.245  us/op
ScanBenchmark.querySingleIncrementalIndex    99999                       2              4            200000           basic.A  avgt   50   162660.413 ±   1560.482  us/op

ScanBenchmark.querySingleQueryableIndex          0                       2              2            200000           basic.A  avgt   50   492261.267 ±   5470.700  us/op
ScanBenchmark.querySingleQueryableIndex          0                       2              4            200000           basic.A  avgt   50   480649.890 ±   5540.968  us/op
ScanBenchmark.querySingleQueryableIndex       1000                       2              2            200000           basic.A  avgt   50     2795.713 ±      1.473  us/op
ScanBenchmark.querySingleQueryableIndex       1000                       2              4            200000           basic.A  avgt   50     2864.783 ±      2.959  us/op
ScanBenchmark.querySingleQueryableIndex      99999                       2              2            200000           basic.A  avgt   50   240433.943 ±   1845.204  us/op
ScanBenchmark.querySingleQueryableIndex      99999                       2              4            200000           basic.A  avgt   50   234122.746 ±   1507.052  us/op


Before PR:
# Run complete. Total time: 00:33:02

Benchmark                                  (limit)  (numProcessingThreads)  (numSegments)  (rowsPerSegment)  (schemaAndQuery)  Mode  Cnt        Score        Error  Units
ScanBenchmark.queryMultiQueryableIndex           0                       2              2            200000           basic.A  avgt   50  1172830.904 ± 201573.754  us/op
ScanBenchmark.queryMultiQueryableIndex           0                       2              4            200000           basic.A  avgt   50  3267726.979 ± 319137.068  us/op
ScanBenchmark.queryMultiQueryableIndex        1000                       2              2            200000           basic.A  avgt   50     2824.916 ±      3.079  us/op
ScanBenchmark.queryMultiQueryableIndex        1000                       2              4            200000           basic.A  avgt   50     2777.685 ±      3.450  us/op
ScanBenchmark.queryMultiQueryableIndex       99999                       2              2            200000           basic.A  avgt   50   231104.252 ±   1376.867  us/op
ScanBenchmark.queryMultiQueryableIndex       99999                       2              4            200000           basic.A  avgt   50   231335.680 ±   1355.400  us/op

ScanBenchmark.querySingleIncrementalIndex        0                       2              2            200000           basic.A  avgt   50   342828.419 ±   4551.446  us/op
ScanBenchmark.querySingleIncrementalIndex        0                       2              4            200000           basic.A  avgt   50   333952.670 ±   3096.050  us/op
ScanBenchmark.querySingleIncrementalIndex     1000                       2              2            200000           basic.A  avgt   50     1307.223 ±      0.686  us/op
ScanBenchmark.querySingleIncrementalIndex     1000                       2              4            200000           basic.A  avgt   50     1288.650 ±      1.255  us/op
ScanBenchmark.querySingleIncrementalIndex    99999                       2              2            200000           basic.A  avgt   50   158636.920 ±    781.898  us/op
ScanBenchmark.querySingleIncrementalIndex    99999                       2              4            200000           basic.A  avgt   50   162173.353 ±    961.087  us/op

ScanBenchmark.querySingleQueryableIndex          0                       2              2            200000           basic.A  avgt   50   488685.621 ±   5538.093  us/op
ScanBenchmark.querySingleQueryableIndex          0                       2              4            200000           basic.A  avgt   50   486952.922 ±   5868.651  us/op
ScanBenchmark.querySingleQueryableIndex       1000                       2              2            200000           basic.A  avgt   50     2879.226 ±      1.755  us/op
ScanBenchmark.querySingleQueryableIndex       1000                       2              4            200000           basic.A  avgt   50     2823.509 ±     21.211  us/op
ScanBenchmark.querySingleQueryableIndex      99999                       2              2            200000           basic.A  avgt   50   238340.781 ±   1514.616  us/op
ScanBenchmark.querySingleQueryableIndex      99999                       2              4            200000           basic.A  avgt   50   235502.141 ±   1402.612  us/op

)
)
).limit(
Math.toIntExact(((ScanQuery) (queryPlus.getQuery())).getLimit())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I do not specify a limit parameter but do specify an order, I end up in here with limit as Long.MAX_VALUE and hit a

2019-03-27T23:39:05,477 ERROR [qtp904351240-69[scan_[wikiticker]_e44c2050-ecd4-45b7-ba3e-28913a46581e]] org.apache.druid.server.QueryResource - Exception handling request: {class=org.apache.druid.server.QueryResource, exceptionType=class java.lang.ArithmeticException, exceptionMessage=integer overflow, exception=java.lang.ArithmeticException: integer overflow, query=ScanQuery{dataSource='wikiticker', querySegmentSpec=MultipleSpecificSegmentSpec{descriptors=[SegmentDescriptor{interval=2015-09-12T00:00:00.000Z/2015-09-13T00:00:00.000Z, version='2019-03-08T11:26:16.569Z', partitionNumber=0}]}, virtualColumns=[], resultFormat='list', batchSize=20480, limit=9223372036854775807, dimFilter=null, columns=[], legacy=false}, peer=127.0.0.1}
java.lang.ArithmeticException: integer overflow
	at java.lang.Math.toIntExact(Math.java:1011) ~[?:1.8.0_192]
	at org.apache.druid.query.scan.ScanQueryRunnerFactory.nWayMergeAndLimit(ScanQueryRunnerFactory.java:306) ~[classes/:?]
...

I haven't quite figured out what the correct behavior here in that condition is yet, but this error isn't so friendly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change the Sequence#limit() function to accept longs. There shouldn't be any adverse consequences.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that should be ok too. Should it skip the sequence limit entirely if limit is the default Long.MAX_VALUE?

Copy link
Contributor Author

@justinborromeo justinborromeo Mar 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so because there's not really a way of distinguishing whether someone wants a limit of Long.MAX_VALUE or no limit at all. Also, in practice, I don't think anyone will complain that they can't query more than a quintillion rows out of Druid.

However, it might make more sense to do so if the default query limit value with no limit set was some invalid value like -1.

}
return new ScanResultValue(null, columns, eventsToAdd);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be interesting to encode the interval of events contained in this batch as an iso interval as the "segmentId", but it also feels a bit magical and maybe unexpected. It does seem more useful than null though, and would allow seeking to chunks for specific time ranges out of the full result set without reading every individual event. Thoughts anyone?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel as if that might be nonintuitive (especially for new users) since the interval string isn't actually the segmentId and the only way to know that it wasn't would be to read the docs or the code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only way to know what segmentId is at all is to read the docs, null isn't particularly intuitive here either. But you're right, I don't think using segmentId for this is the correct thing, rather it probably makes sense to add a new interval property to the json and ScanResultValue, that has interval of min and max event timestamps, which seems useful for both ordered an unordered scan queries (if it's not too painful for unordered). I don't think this needs to be done in this PR, but maybe a nice follow up to consider.

return returnedRows;
}
} else {
if (!(query.getQuerySegmentSpec() instanceof MultipleSpecificSegmentSpec)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add an in code comment on why this is the case?

+ "of type MultipleSpecificSegmentSpec");
}
List<SegmentDescriptor> descriptorsOrdered =
((MultipleSpecificSegmentSpec) query.getQuerySegmentSpec()).getDescriptors(); // Ascending time order
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this and the following line have nearly the same end of line comments, suggest consolidating and putting on it's own line before specifying that ascending order is the default

The Select query _will_ retain the rows in memory, causing memory pressure if too many rows are returned.
The Scan query can return all the rows without issuing another pagination query.

In addition to straightforward usage where a Scan query is issued to the Broker, the Scan query can also be issued
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I noticed that the code that hits the historical would maybe explode if not issued a the interval with a MultipleSpecificSegmentSpec, but there is an example query immediately after mention that the historicals can be queried that doesn't have this segment spec, and is probably confusing. Querying historicals directly seems like an advanced use case, and should maybe be moved to it's own section near the end of this document to make it clear and not necessarily encouraging it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the time-ordering section, there's this line: "Also,
time ordering is not support for queries issued directly to historicals unless a list of segments is specified." (don't mind the typo, I'll fix that right now). Is that good enough?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's probably ok, i think it just flows strange for me when I read the section. It's because the things aren't really related but don't really feel separated.

For future work it might be nice to overhaul the scan query documentation, maybe drop mention of the legacy mode and the valueVector result format and maybe re-arrange some things. I still feel querying a historical directly is less important than describing the scan query itself, and should be pushed to the end of the docs into an 'advanced' section that describes using the scan query as a means of parallel export into another query system or whatever, but all of this can be done in a future PR

Copy link
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm 🤘

We should consider opening issues for suggested follow-up after this gets merged.

}
return new ScanResultValue(null, columns, eventsToAdd);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only way to know what segmentId is at all is to read the docs, null isn't particularly intuitive here either. But you're right, I don't think using segmentId for this is the correct thing, rather it probably makes sense to add a new interval property to the json and ScanResultValue, that has interval of min and max event timestamps, which seems useful for both ordered an unordered scan queries (if it's not too painful for unordered). I don't think this needs to be done in this PR, but maybe a nice follow up to consider.

The Select query _will_ retain the rows in memory, causing memory pressure if too many rows are returned.
The Scan query can return all the rows without issuing another pagination query.

In addition to straightforward usage where a Scan query is issued to the Broker, the Scan query can also be issued
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's probably ok, i think it just flows strange for me when I read the section. It's because the things aren't really related but don't really feel separated.

For future work it might be nice to overhaul the scan query documentation, maybe drop mention of the legacy mode and the valueVector result format and maybe re-arrange some things. I still feel querying a historical directly is less important than describing the scan query itself, and should be pushed to the end of the docs into an 'advanced' section that describes using the scan query as a means of parallel export into another query system or whatever, but all of this can be done in a future PR

@jon-wei jon-wei merged commit ad7862c into apache:master Mar 28, 2019
@justinborromeo justinborromeo deleted the 6088-Time-Ordering-On-Scans-N-Way-Merge branch March 28, 2019 21:55
justinborromeo added a commit to justinborromeo/incubator-druid that referenced this pull request Apr 1, 2019
* Moved Scan Builder to Druids class and started on Scan Benchmark setup

* Need to form queries

* It runs.

* Stuff for time-ordered scan query

* Move ScanResultValue timestamp comparator to a separate class for testing

* Licensing stuff

* Change benchmark

* Remove todos

* Added TimestampComparator tests

* Change number of benchmark iterations

* Added time ordering to the scan benchmark

* Changed benchmark params

* More param changes

* Benchmark param change

* Made Jon's changes and removed TODOs

* Broke some long lines into two lines

* nit

* Decrease segment size for less memory usage

* Wrote tests for heapsort scan result values and fixed bug where iterator
wasn't returning elements in correct order

* Wrote more tests for scan result value sort

* Committing a param change to kick teamcity

* Fixed codestyle and forbidden API errors

* .

* Improved conciseness

* nit

* Created an error message for when someone tries to time order a result
set > threshold limit

* Set to spaces over tabs

* Fixing tests WIP

* Fixed failing calcite tests

* Kicking travis with change to benchmark param

* added all query types to scan benchmark

* Fixed benchmark queries

* Renamed sort function

* Added javadoc on ScanResultValueTimestampComparator

* Unused import

* Added more javadoc

* improved doc

* Removed unused import to satisfy PMD check

* Small changes

* Changes based on Gian's comments

* Fixed failing test due to null resultFormat

* Added config and get # of segments

* Set up time ordering strategy decision tree

* Refactor and pQueue works

* Cleanup

* Ordering is correct on n-way merge -> still need to batch events into
ScanResultValues

* WIP

* Sequence stuff is so dirty :(

* Fixed bug introduced by replacing deque with list

* Wrote docs

* Multi-historical setup works

* WIP

* Change so batching only occurs on broker for time-ordered scans

Restricted batching to broker for time-ordered queries and adjusted
tests

Formatting

Cleanup

* Fixed mistakes in merge

* Fixed failing tests

* Reset config

* Wrote tests and added Javadoc

* Nit-change on javadoc

* Checkstyle fix

* Improved test and appeased TeamCity

* Sorry, checkstyle

* Applied Jon's recommended changes

* Checkstyle fix

* Optimization

* Fixed tests

* Updated error message

* Added error message for UOE

* Renaming

* Finish rename

* Smarter limiting for pQueue method

* Optimized n-way merge strategy

* Rename segment limit -> segment partitions limit

* Added a bit of docs

* More comments

* Fix checkstyle and test

* Nit comment

* Fixed failing tests -> allow usage of all types of segment spec

* Fixed failing tests -> allow usage of all types of segment spec

* Revert "Fixed failing tests -> allow usage of all types of segment spec"

This reverts commit ec47028.

* Revert "Merge branch '6088-Time-Ordering-On-Scans-N-Way-Merge' of github.com:justinborromeo/incubator-druid into 6088-Time-Ordering-On-Scans-N-Way-Merge"

This reverts commit 57033f3, reversing
changes made to 8f01d8d.

* Check type of segment spec before using for time ordering

* Fix bug in numRowsScanned

* Fix bug messing up count of rows

* Fix docs and flipped boolean in ScanQueryLimitRowIterator

* Refactor n-way merge

* Added test for n-way merge

* Refixed regression

* Checkstyle and doc update

* Modified sequence limit to accept longs and added test for long limits

* doc fix

* Implemented Clint's recommendations
@clintropolis clintropolis mentioned this pull request Oct 25, 2019
2 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants