[SPARK-56677][SQL] Propagate filter conditions through Join nodes in PlanMerger#55628
[SPARK-56677][SQL] Propagate filter conditions through Join nodes in PlanMerger#55628peter-toth wants to merge 2 commits intoapache:masterfrom
Join nodes in PlanMerger#55628Conversation
|
I measured the following improvements with the affected queries: |
| .createWithDefault(false) | ||
|
|
||
| val MERGE_SUBPLANS_FILTER_PROPAGATION_THROUGH_JOIN_ENABLED = | ||
| buildConf("spark.sql.optimizer.mergeSubplans.filterPropagationThroughJoin.enabled") |
There was a problem hiding this comment.
It seems that we had better follow the config namespace rule.
There was a problem hiding this comment.
I will rebase this PR after #55633, but I already renamed the new config to be in the .filterPropagation namespace.
2087a88 to
bd5f812
Compare
| "merging, allowing subplans that differ only in their filter conditions and share a " + | ||
| "common join to be merged into a single scan. A filter attribute is only propagated " + | ||
| "through a join when it originates from the non-nullable (preserved) side: the left side " + | ||
| "of LeftOuter/LeftSemi/LeftAnti, the right side of RightOuter, or either side of " + |
There was a problem hiding this comment.
nit. either side -> either side
| * produces a boolean attribute that flows through the join output to the enclosing | ||
| * [[Aggregate]]. Propagation is skipped when both the left and right children simultaneously | ||
| * produce filter attributes, as combining them would require an additional AND alias above | ||
| * the join (not yet supported). |
There was a problem hiding this comment.
According to the filterSafeForJoin, shall we mention NULL-pad cases a little more?
There was a problem hiding this comment.
I rewrote this part entirelly to ellaborate on what is the problem with null padded sides: 4d87515.
|
|
||
| val MERGE_SUBPLANS_FILTER_PROPAGATION_THROUGH_JOIN_ENABLED = | ||
| buildConf( | ||
| "spark.sql.optimizer.mergeSubplans.filterPropagation.filterPropagationThroughJoin.enabled") |
There was a problem hiding this comment.
Shall we rename filterPropagationThroughJoin -> throughJoin because (2) is shorter and better in general.
- spark.sql.optimizer.mergeSubplans.filterPropagation.filterPropagationThroughJoin.enabled
- spark.sql.optimizer.mergeSubplans.filterPropagation.throughJoin.enabled
| // rows are NULL-padded so f=NULL, causing FILTER (WHERE f) to incorrectly | ||
| // exclude rows that should contribute to the aggregate. Right-side | ||
| // attributes are also absent from semi/anti join output. | ||
| (leftNPFilter.isEmpty && leftCPFilter.isEmpty || |
There was a problem hiding this comment.
isEmpty && -> isEmpty && because the Apache Spark community doesn't use vertical alignment.
|
|
||
| comparePlans(Optimize.execute(originalQuery.analyze), originalQuery.analyze) | ||
| } | ||
| } |
There was a problem hiding this comment.
Shall we add Cross (positive) and FullOuter (negative) test coverage?
…PlanMerger
### What changes were proposed in this pull request?
`PlanMerger` now supports filter propagation through `Join` nodes when merging
similar subplans. Previously, when two subplans contained identical `Join` nodes
but differed only in a filter applied to one of the join's children, they could
not be merged.
This PR adds the ability to propagate such filter conditions through a `Join`
and into the parent `Aggregate`'s `FILTER` clause. A new `filterSafeForJoin`
helper checks that the filter originates from the non-nullable (preserved) side
of the join: the left side of `LeftOuter`/`LeftSemi`/`LeftAnti`, the right side
of `RightOuter`, or either side of `Inner`/`Cross`. `FullOuter` joins are not
eligible.
The feature is gated by a new SQL config:
`spark.sql.optimizer.mergeSubplans.filterPropagationThroughJoin.enabled`
(default: `true`).
### Why are the changes needed?
Without this change, scalar subqueries that differ only in a filter on one side
of an identical join cannot be merged, resulting in redundant scans and compute.
For example:
SELECT
(SELECT sum(key) FROM t1 JOIN t2 ON t1.id = t2.id),
(SELECT sum(key) FROM t1 JOIN t2 ON t1.id = t2.id WHERE t2.b > 1)
Both subqueries scan `t1` and `t2` in full even though they share the same base
join. After this change a single merged scan is used and the second subquery's
result is derived from it via an aggregate `FILTER` clause.
### Does this PR introduce _any_ user-facing change?
Yes. The optimizer may now merge scalar subqueries that were previously kept
separate, reducing the number of scan and join operations. The new config
`spark.sql.optimizer.mergeSubplans.filterPropagationThroughJoin.enabled`
(default `true`) can be used to opt out.
### How was this patch tested?
Added unit tests in `MergeSubplansSuite`:
- Merge with filter on left inner join child
- Merge with filter on right inner join child
- No merge when both join children have independent filters
- Merge with filter on the preserved side of a `LeftSemi` join
- No merge when filter is on the non-output side of a `LeftSemi` join
- No merge when filter is on the nullable side of an outer join
- No merge when the feature is disabled via config
Added integration test in `PlanMergeSuite` verifying correctness (`checkAnswer`)
and plan shape (`SubqueryExec`/`ReusedSubqueryExec` counts) for both the enabled
and disabled config cases, with and without AQE.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Sonnet 4.6
bd5f812 to
4d87515
Compare
dongjoon-hyun
left a comment
There was a problem hiding this comment.
+1, LGTM. Thank you, @peter-toth .
|
Merged to master for Apache Spark 4.2.0. |
What changes were proposed in this pull request?
PlanMergernow supports filter propagation throughJoinnodes when merging similar subplans. Previously, when two subplans contained identicalJoinnodes but differed only in a filter applied to one of the join's children, they could not be merged.This PR adds the ability to propagate such filter conditions through a
Joinand into the parentAggregate'sFILTERclause. A newfilterSafeForJoinhelper checks that the filter originates from the non-nullable (preserved) side of the join: the left side ofLeftOuter/LeftSemi/LeftAnti, the right side ofRightOuter, or either side ofInner/Cross.FullOuterjoins are not eligible.The feature is gated by a new SQL config
spark.sql.optimizer.mergeSubplans.filterPropagation.throughJoin.enabled(default:false).Why are the changes needed?
Without this change, scalar subqueries that differ only in a filter on one side of an identical join cannot be merged, resulting in redundant scans and compute. For example:
SELECT
(SELECT sum(key) FROM t1 JOIN t2 ON t1.id = t2.id),
(SELECT sum(key) FROM t1 JOIN t2 ON t1.id = t2.id WHERE t2.b > 1)
Both subqueries scan
t1andt2in full even though they share the same base join. After this change a single merged scan is used and the second subquery's result is derived from it via an aggregateFILTERclause.Does this PR introduce any user-facing change?
Yes. When
spark.sql.optimizer.mergeSubplans.filterPropagation.filterPropagationThroughJoin.enabledis set totrue, the optimizer may merge scalar subqueries that were previously kept separate, reducing the number of scan and join operations.How was this patch tested?
Added unit tests in
MergeSubplansSuite:LeftSemijoinLeftSemijoinAdded integration test in
PlanMergeSuiteverifying correctness (checkAnswer) and plan shape (SubqueryExec/ReusedSubqueryExeccounts) for both the enabled and disabled config cases, with and without AQE.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Sonnet 4.6