-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Push down projection expressions into ParquetOpener #19111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| let table_schema = table_schema.into(); | ||
| Self { | ||
| projection: SplitProjection::unprojected(&table_schema), | ||
| projection: ProjectionExprs::from_schema(table_schema.table_schema()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic here of creating this upfront even if it will often be replaced is that the most important thing is that each time we create a ParquetOpener (once per file) we do the minimum amount of compute. Pre-computing here it and storing it in ParquetSource is the easiest way to do that.
This allows it to be Arc'ed, have multiple references, etc. It's a backwards compatible change (aside from producing compiler warnings about unnecessarily mutable variables). This will help with #19111 where we'll want to re-use a simplifier instance for predicate and projection.
This PR does some refactoring of `PhysicalExprAdapter` and `PhysicalExprSimplifier` that I found necessary and/or beneficial while working on #19111. ## Changes made ### Replace `PhysicalExprAdapter::with_partition_values` with `replace_columns_with_literals` This is a nice improvement because it: 1. Makes the `PhysicalExprAdapter` trait that users might need to implement simpler (less boilerplate for users). 2. Decouples these two transformations so that we can replace partition values and then apply a projection without having to also do the schema mapping (it would be from the logical schema to the logical schema, confusing and a waste of compute). I ran into this need in #19111. I think there may be other ways of doing it (e.g. piping in the expected output schema from ParquetSource) but it felt nicer this way and I expect other places may also need the decoupled transformation. 3. I think we can use it in the future to implement #19089 (edit: evidently I was right, see identical function in #19136). 4. It's less lines of code 😄 This will require any users calling `PhysicalExprAdapter` directly to change their code, I can add an entry to the upgrade guide. ### Remove partition pruning logic from `FilePruner` and deprecate now unused `PrunableStatistics` and `CompositePruningStatistics`. Since we replace partition values with literals we no longer need these structures, they get handled like any other literals. This is a good chunk of code / complexity that we can bin off. ### Use `TableSchema` instead of `SchemaRef` + `Vec<FieldRef>` in `ParquetOpener` `TableSchema` is basically `SchemaRef` + `Vec<FieldRef>` already and since `ParquetSource` has a `TableSchema` its less code and less clones to propagate that into `ParquetOpener` --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
d002d19 to
6aba2d1
Compare
| if let Some(expr_adapter_factory) = expr_adapter_factory.as_ref() { | ||
| // After rewriting to the file schema, further simplifications may be possible. | ||
| // For example, if `'a' = col_that_is_missing` becomes `'a' = NULL` that can then be simplified to `FALSE` | ||
| // and we can avoid doing any more work on the file (bloom filters, loading the page index, etc.). | ||
| // Additionally, if any casts were inserted we can move casts from the column to the literal side: | ||
| // `CAST(col AS INT) = 5` can become `col = CAST(5 AS <col type>)`, which can be evaluated statically. | ||
| let simplifier = PhysicalExprSimplifier::new(&physical_file_schema); | ||
| let rewriter = expr_adapter_factory.create( | ||
| Arc::clone(&logical_file_schema), | ||
| Arc::clone(&physical_file_schema), | ||
| ); | ||
| predicate = predicate | ||
| .map(|p| { | ||
| let expr = expr_adapter_factory | ||
| .create( | ||
| Arc::clone(&logical_file_schema), | ||
| Arc::clone(&physical_file_schema), | ||
| ) | ||
| .rewrite(p)?; | ||
| // After rewriting to the file schema, further simplifications may be possible. | ||
| // For example, if `'a' = col_that_is_missing` becomes `'a' = NULL` that can then be simplified to `FALSE` | ||
| // and we can avoid doing any more work on the file (bloom filters, loading the page index, etc.). | ||
| PhysicalExprSimplifier::new(&physical_file_schema).simplify(expr) | ||
| }) | ||
| .map(|p| simplifier.simplify(rewriter.rewrite(p)?)) | ||
| .transpose()?; | ||
| predicate_file_schema = Arc::clone(&physical_file_schema); | ||
| // Adapt projections to the physical file schema as well | ||
| projection = projection | ||
| .try_map_exprs(|p| simplifier.simplify(rewriter.rewrite(p)?))?; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is important (though a tech debt item to be sure) |
|
Starting to check this one out |
|
run benchmarks |
|
run benchmark clickbench_pushdown |
|
🤖 |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @adriangb
I went through this code carefully and it looks really nice. Super well done! It has been quite a road but the result is 🧑🍳 👌
I do think we should work on deprecating SchemaAdapter for this release, to ensure that we have ported over all tests, for example.
| let projection = Arc::clone(&self.projection); | ||
| let logical_file_schema = Arc::clone(self.table_schema.table_schema()); | ||
| // Apply partition column replacement to projection expressions | ||
| let mut projection = self.projection.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found this code now very nice and easy to follow
| projector.project_batch(&b) | ||
| b = projector.project_batch(&b)?; | ||
| if replace_schema { | ||
| // Ensure the output batch has the expected schema. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the comments -- I see now that this is similar, but not the same, as what SchemaAdapter did.
|
I will follow up with deprecating SchemaAdapter. We already have a tracking issue so 👍🏻 edit: #16800 |
|
🤖: Benchmark completed Details
|
|
🤖 |
Well that's unexpected but welcome 😆 |
|
Benchmarks generally look great! |
|
🤖: Benchmark completed Details
|
|
I know we are trying to go fast, but I think for some of these slightly less trivial PRs we should consider leaving them open for a while after approval and before merge to allow additional time to review from people who may not be awake, as described in https://datafusion.apache.org/contributor-guide/index.html#major-and-minor-prs The major/minor distinction is definitely subjective, but I do think it would be good to make sure as many people who are interested get the chance to comment if they want |
Or maybe noise -- I'll see if it reprodces |
|
run benchmark clickbench_partitioned |
Noted, thank you for the reminder. Just very excited to close the loop on this change. I’m happy to address any feedback that comes in the next couple days or revert if there are issues |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
@alamb hmm I'm going to re-run again. That last run seems quite... interesting. |
|
run benchmark clickbench_partitioned |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
I think my conclusion here is that the benchmarks are just too noisy to draw any conclusions but this change (as expected) seems neutral on performance. |

Closes #14993
Once this is merged I think we can say we support projection expression pushdown into scans and it is implemented for Parquet.
Remaining TODOs which I think should be tracked in other issues (I'll find them or create them later):
SchemaAdapterwithPhysicalExprAdapterand decide if we want to actually deprecateSchemaAdapter(Plan to replaceSchemaAdapterwithPhysicalExprAdapter#16800)