-
Notifications
You must be signed in to change notification settings - Fork 986
DRILL-7062: Initial implementation of run-time row-group pruning #1738
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
|
||
| public static final String BOOTSTRAP_STORAGE_PLUGINS_FILE = "bootstrap-storage-plugins.json"; | ||
|
|
||
| public static final String SKIP_RUNTIME_ROWGROUP_PRUNING_KEY = "exec.storage.skip_runtime_rowgroup_pruning"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since row groups are Parquet specific, should we make the option under a 'exec.storage.parquet' namespace ? or name it something like exec.storage.parquet_skip_runtime_rowgroup_pruning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The namespace 'exec.storage.parquet' is not yet used for any option, and adding "parquet_" to the option name makes it longer and more typo prone (unless you are French :-) . Any user who understands when/why to use this option probably knows that "rowgroups" means Parquet.
Are we going to introduce several other new options under this namespace ?
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
Outdated
Show resolved
Hide resolved
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
Outdated
Show resolved
Hide resolved
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
Outdated
Show resolved
Hide resolved
...-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
Outdated
Show resolved
Hide resolved
...-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
Outdated
Show resolved
Hide resolved
...-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
Outdated
Show resolved
Hide resolved
...-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
Show resolved
Hide resolved
...-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
Outdated
Show resolved
Hide resolved
|
|
||
| List<SchemaPath> columns = columnsStatistics.keySet().stream().collect(Collectors.toList()); | ||
|
|
||
| ParquetGroupScan parquetGroupScan = new ParquetGroupScan( context.getQueryUserName(), metadataProvider, fileSelection, columns, readerConfig, filterExpr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creating a new instance of the ParquetGroupScan seems too much overhead since this is happening for each row group. I understand how you may have arrived at this since the run-time does not have access to the planner objects such as the original ParquetGroupScan. However, the only reason you need this is to create a FilterPredicate, which is accessible through the ParquetGroupScan. Could we just create a single (dummy) instance of ParquetGroupScan using 1 file (or row group) outside the main For loop and just create the FilterPredicate right in the beginning before even iterating over all the row groups ? The construction of the FilterPredicate in getFilterPredicate() itself does not seem to have any direct dependence on any internal state of the ParquetGroupScan as far as I can tell.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- The total "per rowgroup" overhead (including the ParquetGroupScan construction) is usually less than a millisecond (on my Mac). (That does not include the "per file/footer" overhead).
- In case the input list of rowgroups spans multiple files, we'd need to recreate a ParquetGroupScan for each file (could there be changes in the columns or schema between files ? Seem there is some dependence on those.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am actually less concerned about the elapsed time and more about the number of objects created on the Java heap. The ParquetGroupScan is at least 100 bytes each and since we are targeting use cases with about million row groups per minor fragment, it means 100 MB per minor fragment (assuming the garbage collector is not doing the cleanup while the Foo loop is executing). If there are 24 minor frags, this would add up to 2.4GB heap per node which would cause quite a bit of GC activity subsequently.
I have made some changes on top of your branch which avoids this by directly calling a static method to create a FilterPredicate. The pruning unit tests are passing with these changes. I will send my changes to you offline and we can discuss.
fb46a0f to
197494d
Compare
|
The code has been modified, simplified and passes all the tests. The changes from the first PR: (a) Cancelled item (4) above (new ctor for ParquetGroupScan). |
amansinha100
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few minor comments. Overall lgtm.
...-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
Outdated
Show resolved
Hide resolved
...-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
Outdated
Show resolved
Hide resolved
...va-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
Outdated
Show resolved
Hide resolved
d83ca7c to
e1b83fe
Compare
amansinha100
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated version LGTM. +1. Before the final merge pls squash commits.
This PR is the initial implementation of the Run-Time row-group pruning (RTP); i.e., as oppose to Plan-Time pruning which usually works serially over the metadata - the run-time work applies the query predicate in parallel, at each minor fragment that scans several Parquet row-groups, using their files' footers. Note that the new RTP implementation currently is independent from the plan-time pruning; e.g., the same predicate applied for plan-time may be applied again at run-time (future work would remove or reduce this predicate).
Overview of the key changes:
(1) The RTP is enabled by passing the relevant filter expression to the runtime via the method setFilterForRuntime (in AbstractGroupScanWithMetadata). A value of null or a TRUE boolean (the default initial value) for the filter in that class means "disable RTP".
The RTP can also be avoided/skipped using a new "skip RTP" option (default false). Due to some issue in the Hive Native scanner, the RTP is also currently disabled for that scanner (will create later a Jira to fix that).
The setFilterForRuntime method is called in doOnMatch (in ParquetPushDownFilter), depending on the results of the plan-time pruning (e.g., a new filter expression may be composed).
(2) Changes for the Plan-Time: Changed the "Threshold" option from Positive Long to any Long, so that plan-time can be disabled with a value like zero. Also rewrote the documentation string for that option.
Also added a special check-and-return for this option (in AbstractParquetGroupScan) , else the prior code "over-pruned" in that case.
(3) The selection-root Path field was added to the AbstractParquetRowGroupScan.
(4) A new constructor added for ParquetGroupScan (so it can be used without a format plugin).
(5) Two new query profile Metrics were added to the ParquetReaderStats: NUM_ROWGROUPS (i.e., the number assigned to this fragment (after plan time pruning)), and ROWGROUPS_PRUNED (i.e. how many of those rowgroups were eliminated/pruned). When all the rowgroups are pruned, the two numbers are identical.
(6) A test was added which disables plan-time pruning, selects from a multi-rowgroup file, and then verifies the result metrics.
(7) The "filter=(...)" now shows in the Scan operator in the physical plan. So needed to fix some unit tests that did not expect this part (basically added '.*' in the expected expression).
(8) The actual Run Time Pruning work is taking place in the method getBatch (in AbstractParquetScanBatchCreator). There is a "for" loop iterating over all the assigned rowgroups, creating a reader for each (to be given to the scan batch later). The pruning code was added after the original code that gets the rowgroup footer.
The pruning is done by applying the filter predicate (similar to how the plan-time works) to each rowgroup's footer, and skipping that footer when the predicate fails (i.e., no reader would be created for that rowgroup).
In case all the rowgroups are pruned, the first rowgroup is anyway selected so that the schema would be returned. Note: In this case, our initial implementation assigned zero rows to read for that reader, but something broke at the scanner (also broke when 1 row was set). So currently all the rows of that rowgroup are read in this case. (Future: fix that to avoid waste).
In order to use the pruning logic at run-time, some changes were made:
(a) Several methods and definitions in Metadata.java were changed to become public and static.
Note that they refer explicitly to metadata V3 (later to become V4), even though the run-time is independent of the metadata versioning (there was no cleaner abstraction to use).
(b) The prior code (in the for loop) was already reading the footer; to avoid reading the footer again, some of the APIs in Metadata.java were changed to pass along the footer, and avoid reading it again (if it is not null).
Some testing we did show that the savings (by not reading the footer again) are minor (probably due to file system caching). So if people think that the APIs change are not good, this item (c) can be undone.
(c) The pruning code initialized few "per file" objects once per file (that's where (3) above is needed), and then builds the per-rowgroup filter-predicate and columns-statistics (where (4) is needed), and performs the match.
(d) The rest of the code in the original for loop (which created the reader, etc) was moved into a new private method - createReaderAndImplicitColumns (that code was moved by Intellij, so no need to review every line :-). The reason is that this method needs to be called once outside the for loop (in the case where all rowgroups were pruned).