[SPARK-56570][SQL] PlanMerger correctness fix and code cleanup#55482
[SPARK-56570][SQL] PlanMerger correctness fix and code cleanup#55482peter-toth wants to merge 3 commits into
PlanMerger correctness fix and code cleanup#55482Conversation
### What changes were proposed in this pull request? Remove `cachedPlanMapping` / `cpMapping` from `PlanMerger` by preserving ExprIds when wrapping cached-plan expressions in `mergeNamedExpressions`. Previously, wrapping a cached expression in `Alias(If(filter, expr, null))` generated a fresh `ExprId`, making parent nodes that referenced the original attribute by ExprId stale. A `cachedPlanMapping` field was threaded through `TryMergeResult` and `mergeNamedExpressions` to remap those references at every level of `tryMergePlans`. This PR eliminates the need for that mapping by passing `exprId = ce.toAttribute.exprId` to the wrapping `Alias`, preserving the original `ExprId`. Since the mapping is now always identity, `TryMergeResult.cachedPlanMapping` and the `cachedPlanMapping` parameter of `mergeNamedExpressions` are removed entirely, along with all `cpMapping` threading throughout `tryMergePlans`. Additional cleanups: - `mappedCPCondition` locals replaced with direct `cp.condition` references (no remapping needed after cpMapping removal). - `mappedCPGroupingExpression` assigned directly from `cp.groupingExpressions`. - Cached-expression wrapping loop limited to `cachedPlanExpressions.size` entries to avoid accidentally wrapping newly-appended new-plan expressions. - Cp-expression wrapping simplified to a single `case Alias(child, _) if !child.isInstanceOf[Attribute]` match. ### Why are the changes needed? `cachedPlanMapping` was purely mechanical bookkeeping to compensate for `Alias(...)()` generating fresh ExprIds. Preserving the original ExprId makes the mapping a no-op everywhere, so it can be removed. This simplifies `TryMergeResult` from 5 fields to 4, changes `mergeNamedExpressions` to return a pair instead of a triple, and removes cpMapping threading from all branches of `tryMergePlans`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing `MergeSubplansSuite` unit tests. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Sonnet 4.6
e0d7814 to
11d7f3a
Compare
|
cc @dongjoon-hyun , this is a minor code simplification after #55298. |
…inal cached range Fix `PlanMerger.mergeNamedExpressions` to wrap only the original cached expressions with the cached plan's filter. The loop previously iterated over all of `mergedExpressions`, including new-plan entries that were appended earlier in the same call and already wrapped with the new plan's filter; re-wrapping them with the cached plan's filter produced double-wrapped `If(cpFilter, If(npFilter, expr, null), null)` expressions, stale `newNPMapping` targets, and analysis failures (missing attribute). Also tighten the `(np: Filter, cp)` and `(np, cp: Filter)` cases in `tryMergePlans` to match only the structurally reachable child results (`cpFilter`/`npFilter` always `None` because the recursion keeps the non-Filter side unchanged), and drop the associated dead-code appends. Co-authored-by: Isaac
|
@cloud-fan, I've cherry picked the test from #55500 in 31621fe. |
There was a problem hiding this comment.
It would be great to fix this before Apache Spark 4.2.0. BTW, the PR title looks a little misleading because this is a bug fix as shown by the test case.
+1, LGTM, @peter-toth .
cloud-fan
left a comment
There was a problem hiding this comment.
LGTM overall — the ExprId-preservation idea is clean, and dropping cpMapping falls out of it consistently. Two small readability nits inline.
(Not re-raising the "title says cleanup but this is also a bug fix" point — @dongjoon-hyun already covered that in his approval.)
| val mappedCPGroupingExpression = cp.groupingExpressions | ||
| // Order of grouping expression does matter as merging different grouping orders can | ||
| // introduce "extra" shuffles/sorts that might not present in all of the original | ||
| // subqueries. | ||
| if (mappedNPGroupingExpression.map(_.canonicalized) == | ||
| mappedCPGroupingExpression.map(_.canonicalized)) { | ||
| val (mergedAggregateExpressions, newNPMapping, newCPMapping) = | ||
| mergeNamedExpressions(np.aggregateExpressions, cp.aggregateExpressions, npMapping, | ||
| cpMapping) | ||
| val (mergedAggregateExpressions, newNPMapping) = | ||
| mergeNamedExpressions(np.aggregateExpressions, cp.aggregateExpressions, npMapping) | ||
| val mergedPlan = | ||
| Aggregate(mappedCPGroupingExpression, mergedAggregateExpressions, mergedChild) |
There was a problem hiding this comment.
After dropping cpMapping, the mapped prefix no longer reflects what this local holds — it's just cp.groupingExpressions. Same for the reference used to build the merged Aggregate.
| val mappedCPGroupingExpression = cp.groupingExpressions | |
| // Order of grouping expression does matter as merging different grouping orders can | |
| // introduce "extra" shuffles/sorts that might not present in all of the original | |
| // subqueries. | |
| if (mappedNPGroupingExpression.map(_.canonicalized) == | |
| mappedCPGroupingExpression.map(_.canonicalized)) { | |
| val (mergedAggregateExpressions, newNPMapping, newCPMapping) = | |
| mergeNamedExpressions(np.aggregateExpressions, cp.aggregateExpressions, npMapping, | |
| cpMapping) | |
| val (mergedAggregateExpressions, newNPMapping) = | |
| mergeNamedExpressions(np.aggregateExpressions, cp.aggregateExpressions, npMapping) | |
| val mergedPlan = | |
| Aggregate(mappedCPGroupingExpression, mergedAggregateExpressions, mergedChild) | |
| val cpGroupingExpressions = cp.groupingExpressions | |
| // Order of grouping expression does matter as merging different grouping orders can | |
| // introduce "extra" shuffles/sorts that might not present in all of the original | |
| // subqueries. | |
| if (mappedNPGroupingExpression.map(_.canonicalized) == | |
| cpGroupingExpressions.map(_.canonicalized)) { | |
| val (mergedAggregateExpressions, newNPMapping) = | |
| mergeNamedExpressions(np.aggregateExpressions, cp.aggregateExpressions, npMapping) | |
| val mergedPlan = | |
| Aggregate(cpGroupingExpressions, mergedAggregateExpressions, mergedChild) |
| mergedExpressions(i) = | ||
| Alias(If(f, child, Literal(null, child.dataType)), ce.name)( | ||
| exprId = ce.toAttribute.exprId) |
There was a problem hiding this comment.
The exprId = ce.toAttribute.exprId argument is the linchpin of this whole refactor — it's what makes cachedPlanMapping redundant — but there's nothing in the surrounding code or comments that flags it. A one-liner like // Preserve the original ExprId so parent references to this cached attribute stay valid without a cp-side remapping. (The new-plan wrapping above uses a fresh ExprId because those aliases are appended rather than replacing an existing entry.) would make the invariant — and the cp/np asymmetry — discoverable for future readers.
PlanMerger code cleanupPlanMerger fix and code cleanup
PlanMerger fix and code cleanupPlanMerger correctness fix and code cleanup
|
Thank you @dongjoon-hyun and @cloud-fan. Initially I thought this is just a cosmetic issue (double |
|
Thanks for the review again! Merged to |
…eanup followup ### What changes were proposed in this pull request? This is a follow-up to #55482 and contains four bug fixes and two small cleanups in `PlanMerger`: Bug fixes in `PlanMerger`: 1. Tagged `(Filter, Filter)` reuse preserves `mergedChild`'s appended columns: When the reuse check finds an existing `propagatedFilter` alias, the branch now rebuilds the Filter over `mergedChild` (via `cp.withNewChildren(Seq(mergedChild))`) instead of returning `cp` unchanged. If the recursion extended `cp.child`'s output with new columns (e.g. a computed `d = a + b` from a user Project below the Filter), returning `cp` would drop those columns while `npMapping` still pointed into them, leaving the enclosing `Aggregate` with unresolved references. 2. `(np: Filter, cp)` does not duplicate a `cpFilter` already present in `mergedChild`: `cpFilter`, when set, was produced by a deeper `(np, cp: Filter)` (or `(Join, Join)` pass-through) and is already part of `mergedChild`'s output. Appending it a second time via `++ cpFilter.toSeq` duplicated the attribute in the outer Project's projectList. 3. `(np, cp: Filter)` does not duplicate an `npFilter` already present in `mergedChild`: Symmetric to 2. on the `np` side. 4. `(np, cp: Filter)` with a `MERGED_FILTER_TAG`-tagged `cp` drops the tagged Filter: `cp`'s condition is `OR(pf_0, pf_1, ...)` and `cp`'s aggregate expressions already carry individual `FILTER (WHERE pf_i)` clauses. Synthesising a new `propagatedFilter_X = OR(pf_0, pf_1, ...)` would just add `FILTER AND(OR(...), pf_i)` wrapping upstream (simplifying to `FILTER pf_i`) plus a redundant alias in the Project. The branch now drops `cp`'s Filter and returns `cpFilter = None` so `cp`'s aggregates are left untouched. Cleanups in `PlanMerger.merge()`: - Unify the local variable name to `newMergedPlan` across all three branches (was `newMergedPlan` in one and `newMergePlan` in the other two) -- matches the `MergedPlan` case class name. - Replace `cache(i).merged` with `mp.merged`; `mp` and `cache(i)` are the same object inside the `collectFirst` pattern. ### Why are the changes needed? Fix 1. is a correctness bug. Fixes 2-4. are plan-shape bugs that produce duplicated attributes or redundant `OR`-of-propagated-filter aliases in the merged plan. The cleanups are minor readability improvements. ### Does this PR introduce _any_ user-facing change? No. All changes are internal to the optimizer; they produce cleaner merged plans for queries that `MergeSubplans` already handled. ### How was this patch tested? Four new tests in `MergeSubplansSuite`, one per fix: - `(np: Filter, cp)` does not duplicate a `cpFilter` already present in mergedChild -- exercises 2. via a `Join` with a `Filter` on the right child, routing a `cpFilter` up through `(Join, Join)` so that `mergedChild.output` already contains the attribute the branch used to re-append. - `(np, cp: Filter)` does not duplicate an `npFilter` already present in mergedChild -- exercises 3., mirror shape on the `np` side. - tagged `(Filter, Filter)` reuse must keep mergedChild's appended columns -- exercises 1. with three subqueries (sq1/sq2 create the tagged structure; sq3's Filter sits above a user Project introducing `d = a + b`, so the `(Filter, Filter)` tagged recursion extends `mergedChild` with `d`). - `(np, cp: Filter)` drops a tagged `cp` Filter without synthesising a redundant alias -- exercises 4. with three subqueries (sq1/sq2 create the tagged structure; sq3 has no filter). ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Opus 4.7 Closes #55659 from peter-toth/SPARK-56570-planmerger-code-cleanup-followup. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Peter Toth <peter.toth@gmail.com>
…eanup followup ### What changes were proposed in this pull request? This is a follow-up to #55482 and contains four bug fixes and two small cleanups in `PlanMerger`: Bug fixes in `PlanMerger`: 1. Tagged `(Filter, Filter)` reuse preserves `mergedChild`'s appended columns: When the reuse check finds an existing `propagatedFilter` alias, the branch now rebuilds the Filter over `mergedChild` (via `cp.withNewChildren(Seq(mergedChild))`) instead of returning `cp` unchanged. If the recursion extended `cp.child`'s output with new columns (e.g. a computed `d = a + b` from a user Project below the Filter), returning `cp` would drop those columns while `npMapping` still pointed into them, leaving the enclosing `Aggregate` with unresolved references. 2. `(np: Filter, cp)` does not duplicate a `cpFilter` already present in `mergedChild`: `cpFilter`, when set, was produced by a deeper `(np, cp: Filter)` (or `(Join, Join)` pass-through) and is already part of `mergedChild`'s output. Appending it a second time via `++ cpFilter.toSeq` duplicated the attribute in the outer Project's projectList. 3. `(np, cp: Filter)` does not duplicate an `npFilter` already present in `mergedChild`: Symmetric to 2. on the `np` side. 4. `(np, cp: Filter)` with a `MERGED_FILTER_TAG`-tagged `cp` drops the tagged Filter: `cp`'s condition is `OR(pf_0, pf_1, ...)` and `cp`'s aggregate expressions already carry individual `FILTER (WHERE pf_i)` clauses. Synthesising a new `propagatedFilter_X = OR(pf_0, pf_1, ...)` would just add `FILTER AND(OR(...), pf_i)` wrapping upstream (simplifying to `FILTER pf_i`) plus a redundant alias in the Project. The branch now drops `cp`'s Filter and returns `cpFilter = None` so `cp`'s aggregates are left untouched. Cleanups in `PlanMerger.merge()`: - Unify the local variable name to `newMergedPlan` across all three branches (was `newMergedPlan` in one and `newMergePlan` in the other two) -- matches the `MergedPlan` case class name. - Replace `cache(i).merged` with `mp.merged`; `mp` and `cache(i)` are the same object inside the `collectFirst` pattern. ### Why are the changes needed? Fix 1. is a correctness bug. Fixes 2-4. are plan-shape bugs that produce duplicated attributes or redundant `OR`-of-propagated-filter aliases in the merged plan. The cleanups are minor readability improvements. ### Does this PR introduce _any_ user-facing change? No. All changes are internal to the optimizer; they produce cleaner merged plans for queries that `MergeSubplans` already handled. ### How was this patch tested? Four new tests in `MergeSubplansSuite`, one per fix: - `(np: Filter, cp)` does not duplicate a `cpFilter` already present in mergedChild -- exercises 2. via a `Join` with a `Filter` on the right child, routing a `cpFilter` up through `(Join, Join)` so that `mergedChild.output` already contains the attribute the branch used to re-append. - `(np, cp: Filter)` does not duplicate an `npFilter` already present in mergedChild -- exercises 3., mirror shape on the `np` side. - tagged `(Filter, Filter)` reuse must keep mergedChild's appended columns -- exercises 1. with three subqueries (sq1/sq2 create the tagged structure; sq3's Filter sits above a user Project introducing `d = a + b`, so the `(Filter, Filter)` tagged recursion extends `mergedChild` with `d`). - `(np, cp: Filter)` drops a tagged `cp` Filter without synthesising a redundant alias -- exercises 4. with three subqueries (sq1/sq2 create the tagged structure; sq3 has no filter). ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Opus 4.7 Closes #55659 from peter-toth/SPARK-56570-planmerger-code-cleanup-followup. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Peter Toth <peter.toth@gmail.com> (cherry picked from commit 3ae7da7) Signed-off-by: Peter Toth <peter.toth@gmail.com>
…eanup followup ### What changes were proposed in this pull request? This is a follow-up to #55482 and contains four bug fixes and two small cleanups in `PlanMerger`: Bug fixes in `PlanMerger`: 1. Tagged `(Filter, Filter)` reuse preserves `mergedChild`'s appended columns: When the reuse check finds an existing `propagatedFilter` alias, the branch now rebuilds the Filter over `mergedChild` (via `cp.withNewChildren(Seq(mergedChild))`) instead of returning `cp` unchanged. If the recursion extended `cp.child`'s output with new columns (e.g. a computed `d = a + b` from a user Project below the Filter), returning `cp` would drop those columns while `npMapping` still pointed into them, leaving the enclosing `Aggregate` with unresolved references. 2. `(np: Filter, cp)` does not duplicate a `cpFilter` already present in `mergedChild`: `cpFilter`, when set, was produced by a deeper `(np, cp: Filter)` (or `(Join, Join)` pass-through) and is already part of `mergedChild`'s output. Appending it a second time via `++ cpFilter.toSeq` duplicated the attribute in the outer Project's projectList. 3. `(np, cp: Filter)` does not duplicate an `npFilter` already present in `mergedChild`: Symmetric to 2. on the `np` side. 4. `(np, cp: Filter)` with a `MERGED_FILTER_TAG`-tagged `cp` drops the tagged Filter: `cp`'s condition is `OR(pf_0, pf_1, ...)` and `cp`'s aggregate expressions already carry individual `FILTER (WHERE pf_i)` clauses. Synthesising a new `propagatedFilter_X = OR(pf_0, pf_1, ...)` would just add `FILTER AND(OR(...), pf_i)` wrapping upstream (simplifying to `FILTER pf_i`) plus a redundant alias in the Project. The branch now drops `cp`'s Filter and returns `cpFilter = None` so `cp`'s aggregates are left untouched. Cleanups in `PlanMerger.merge()`: - Unify the local variable name to `newMergedPlan` across all three branches (was `newMergedPlan` in one and `newMergePlan` in the other two) -- matches the `MergedPlan` case class name. - Replace `cache(i).merged` with `mp.merged`; `mp` and `cache(i)` are the same object inside the `collectFirst` pattern. ### Why are the changes needed? Fix 1. is a correctness bug. Fixes 2-4. are plan-shape bugs that produce duplicated attributes or redundant `OR`-of-propagated-filter aliases in the merged plan. The cleanups are minor readability improvements. ### Does this PR introduce _any_ user-facing change? No. All changes are internal to the optimizer; they produce cleaner merged plans for queries that `MergeSubplans` already handled. ### How was this patch tested? Four new tests in `MergeSubplansSuite`, one per fix: - `(np: Filter, cp)` does not duplicate a `cpFilter` already present in mergedChild -- exercises 2. via a `Join` with a `Filter` on the right child, routing a `cpFilter` up through `(Join, Join)` so that `mergedChild.output` already contains the attribute the branch used to re-append. - `(np, cp: Filter)` does not duplicate an `npFilter` already present in mergedChild -- exercises 3., mirror shape on the `np` side. - tagged `(Filter, Filter)` reuse must keep mergedChild's appended columns -- exercises 1. with three subqueries (sq1/sq2 create the tagged structure; sq3's Filter sits above a user Project introducing `d = a + b`, so the `(Filter, Filter)` tagged recursion extends `mergedChild` with `d`). - `(np, cp: Filter)` drops a tagged `cp` Filter without synthesising a redundant alias -- exercises 4. with three subqueries (sq1/sq2 create the tagged structure; sq3 has no filter). ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Opus 4.7 Closes #55659 from peter-toth/SPARK-56570-planmerger-code-cleanup-followup. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Peter Toth <peter.toth@gmail.com> (cherry picked from commit 3ae7da7) Signed-off-by: Peter Toth <peter.toth@gmail.com>
What changes were proposed in this pull request?
Remove
cachedPlanMapping/cpMappingfromPlanMergerby preservingExprIds when wrapping cached-plan expressions inmergeNamedExpressions.Previously, wrapping a cached expression in
Alias(If(filter, expr, null))generated a freshExprId, making parent nodes that referenced the original attribute byExprIdstale. AcachedPlanMappingfield was threaded throughTryMergeResultandmergeNamedExpressionsto remap those references at every level oftryMergePlans.This PR eliminates the need for that mapping by preserving the original
ExprId. Since the mapping is now always identity,TryMergeResult.cachedPlanMappingand thecachedPlanMappingparameter ofmergeNamedExpressionsare removed entirely, along with allcpMappingthreading throughouttryMergePlans.Additionally, this PR fixes a correctness issue: cached-expression wrapping loop is limited to
cachedPlanExpressions.sizeentries to avoid accidentally wrapping newly-appended new-plan expressions.Why are the changes needed?
cachedPlanMappingwas purely mechanical bookkeeping to compensate forAlias(...)()generating freshExprIds. Preserving the originalExprIdmakes the mapping a no-op everywhere, so it can be removed. This simplifiesTryMergeResultfrom 5 fields to 4, changesmergeNamedExpressionsto return a pair instead of a triple, and removescpMappingthreading from all branches oftryMergePlans.Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing
MergeSubplansSuiteunit tests.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Sonnet 4.6