-
Notifications
You must be signed in to change notification settings - Fork 1.9k
feat: add ExpressionPlacement enum for optimizer expression placement decisions #20065
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
… decisions This extracts the ExpressionPlacement enum from PR apache#20036 to provide a mechanism for expressions to indicate where they should be placed in the query plan for optimal execution. Changes: - Add ExpressionPlacement enum with variants: Literal, Column, PlaceAtLeaves, PlaceAtRoot - Add placement() method to Expr, ScalarUDF, ScalarUDFImpl traits - Add placement() method to PhysicalExpr trait and implementations - Implement placement() for GetFieldFunc to return PlaceAtLeaves when accessing struct fields with literal keys - Replace is_expr_trivial() checks with placement() in optimizer and physical-plan projection code Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add tests for GetFieldFunc::placement() covering: - Literal key access (leaf-pushable) - Column key access (not leaf-pushable) - PlaceAtRoot base expressions - Edge cases (empty args, literal base) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
| 02)--ProjectionExec: expr=[get_field(s@0, value) as __common_expr_1] | ||
| 03)----FilterExec: id@0 > 2, projection=[s@1] | ||
| 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]}, projection=[id, s], file_type=parquet, predicate=id@0 > 2, pruning_predicate=id_null_count@1 != row_count@2 AND id_max@0 > 2, required_guarantees=[] | ||
| 01)ProjectionExec: expr=[get_field(s@0, value) + get_field(s@0, value) as doubled] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually correct / an improvement. We are saying that get_field is very cheap, so no need to deduplicate it. I added an extra test above that shows that a more complex expression (id + s['value']) will get dudplicated (as a whole) by the CSE optimizer.
| 03)----ProjectionExec: expr=[get_field(__unnest_placeholder(recursive_unnest_table.column3,depth=1)@0, c1) as __unnest_placeholder(UNNEST(recursive_unnest_table.column3)[c1]), column3@1 as column3] | ||
| 04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | ||
| 03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | ||
| 04)------ProjectionExec: expr=[get_field(__unnest_placeholder(recursive_unnest_table.column3,depth=1)@0, c1) as __unnest_placeholder(UNNEST(recursive_unnest_table.column3)[c1]), column3@1 as column3] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the projection is a get_field it can be pushed under the RepartitionExec. Because this is a MemorySourceConfig data source (which doesn't accept projections, it's pointless to do so) it doesn't get pushed into the scan. But this is still correct / a win: we reduce the size of the data very cheaply by pulling out the field we care above and discarding the rest before we slice up the data in the RepartitionExec.
| // Check whether `expr` is trivial; i.e. it doesn't imply any computation. | ||
| fn is_expr_trivial(expr: &Expr) -> bool { | ||
| matches!(expr, Expr::Column(_) | Expr::Literal(_, _)) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The point of this PR is to formalize up these hidden assumptions about expressions and let expressions like get_field participate in the decision of how to treat the expression.
| 01)ProjectionExec: expr=[count(Int64(1))@0 as count(), count(Int64(1))@0 as count(*)] | ||
| 02)--ProjectionExec: expr=[2 as count(Int64(1))] | ||
| 03)----PlaceholderRowExec |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also an improvement.
Previously we would brodcast the literal 2 to the number of rows twice (2 as count(), 2 as count(*) -> lit(2), lit(2). Now we first expand it once in an inner projection then reference that column with aliases twice (i.e. clone the pointer to the array).
In this case it's 1 row so it's not really meaningful, but is nonetheless better. I'll see if I can craft an example that shows this behavior with N rows.
|
@AdamGS I think this should be in a reviewable state 😄 |
Summary
This PR is part of work towards #19387
Extracts the
ExpressionPlacementenum from #20036 to provide a mechanism for expressions to indicate where they should be placed in the query plan for optimal execution.I've opted to go the route of having expressions declare their behavior via a new API on
enum Exprandtrait PhysicalExpr:And:
Where
ExpressionPlacement:We arrived at
ExprPlacementafter iterating through a version that had:This terminology came from existing concepts in the codebase that were sprinkled around various places in the logical and physical layers. Some examples:
datafusion/datafusion/physical-plan/src/projection.rs
Lines 282 to 290 in f819061
datafusion/datafusion/physical-plan/src/projection.rs
Lines 1120 to 1125 in f819061
datafusion/datafusion/optimizer/src/optimize_projections/mod.rs
Lines 589 to 592 in f819061
The new API adds the nuance / distinction of the case of
get_field(col, 'a')where it is neither a column nor a literal but it is trivial.It also gives scalar functions the ability to classify themselves.
This part was a bit tricky because
ScalarUDFImpl(the scalar function trait that users implement) lives indatafuions-exprwhich cannot have references todatafusion-physical-expr-common(wherePhysicalExpris defined).But once we are in the physical layer scalar functions are represented as
func: ScalarUDFImpl, args: Vec<Arc<dyn PhysicalExpr>>.And since we can't have a trait method referencing
PhysicalExprthere would be no way to ask a function to classify itself in the physical layer.Additionally even if we could refer to
PhysicalExprfrom theScalarUDFImpltrait we would then need 2 methods with similar but divergent logic (match on theExprenum in one, downcast to various known types in the physical version) that adds boilerplate for implementers.The
ExprPlacementenum solves this problem: we can have a single methodScalarUDFImpl::placement(args: &[ExpressionPlacement]).The parent of
ScalarUDFImplwill call eitherExpr::placementorPhysicalExpr::placementdepending on which one it has.Changes
Add
ExpressionPlacementenum indatafusion-expr-commonwith four variants:Literal- constant valuesColumn- simple column referencesPlaceAtLeaves- cheap expressions (likeget_field) that can be pushed to leaf nodesPlaceAtRoot- expensive expressions that should stay at rootAdd
placement()method to:ExprenumScalarUDF/ScalarUDFImpltraits (with default returningPlaceAtRoot)PhysicalExprtrait (with default returningPlaceAtRoot)Column,Literal, andScalarFunctionExprImplement
placement()forGetFieldFuncthat returnsPlaceAtLeaveswhen accessing struct fields with literal keysReplace
is_expr_trivial()function checks withplacement()checks in:datafusion/optimizer/src/optimize_projections/mod.rsdatafusion/physical-plan/src/projection.rsTest Plan
cargo checkpasses on all affected packagescargo test -p datafusion-optimizerpassescargo test -p datafusion-physical-planpasses (except unrelated zstd feature test)cargo test -p datafusion-functions --lib getfieldpasses🤖 Generated with Claude Code