fix: make shuffle fallback decisions sticky across planning passes#3982
Merged
andygrove merged 11 commits intoapache:mainfrom Apr 17, 2026
Merged
fix: make shuffle fallback decisions sticky across planning passes#3982andygrove merged 11 commits intoapache:mainfrom
andygrove merged 11 commits intoapache:mainfrom
Conversation
…ain tags columnarShuffleSupported and nativeShuffleSupported now short-circuit to false if the shuffle already carries a Comet fallback tag from a prior rule pass. This mirrors the established pattern in CometNativeScan.isSupported and preserves the earlier decision instead of re-deriving it from the current plan shape. Background: Comet's shuffle-support checks run at both initial planning and AQE stage-prep. Between those passes, AQE wraps completed child stages in ShuffleQueryStageExec (a LeafExecNode whose children is Seq.empty). A naive re-evaluation can therefore flip the decision — e.g., stageContainsDPPScan uses s.child.exists(...) to find a FileSourceScanExec with a PlanExpression partition filter, but that walk stops at the stage wrapper and the DPP scan becomes invisible. The same shuffle then falls back to Spark at initial planning and gets converted to Comet at stage prep, producing plan-shape inconsistencies across the two passes. Adds CometShuffleFallbackStickinessSuite: - direct: tag a synthetic shuffle and assert both support predicates return false - end-to-end: build a DPP query, observe pass 1 falls back and tags the shuffle, then swap the shuffle's child for an opaque leaf (mimicking a materialized stage) and assert pass 2 still falls back
Initial version used hasExplainInfo as the short-circuit condition, but that also matches informational reasons from earlier checks (e.g. 'Comet native shuffle not enabled' left behind by nativeShuffleSupported). That caused legitimate columnar shuffle conversions to be blocked, regressing the columnar shuffle suite (e.g. 'columnar shuffle on struct including nulls'). Introduce a dedicated CometFallback tag distinct from the explain-info tag: - markForFallback(node, reason) records the decision and also writes the reason to the explain channel for visibility. - isMarkedForFallback(node) is what the shuffle-support predicates check. nativeShuffleSupported and columnarShuffleSupported now short-circuit on isMarkedForFallback, and the DPP branch uses markForFallback instead of withInfo so the decision sticks across AQE stage-prep passes. Updated CometShuffleFallbackStickinessSuite to cover both the positive case (marked node must fall back) and the regression case (informational explain info must NOT force fallback).
Adds CometDppFallbackRepro3949Suite with two tests: 1. mechanism: builds a DPP plan, observes the initial columnarShuffleSupported decision is "fall back" (DPP visible), then wraps the shuffle's child in an opaque LeafExecNode mirroring how ShuffleQueryStageExec presents to .exists walks, and asserts the decision stays "fall back" because the sticky CometFallback marker carries over via withNewChildren. 2. end-to-end: runs five DPP-flavored queries across three AQE variants and looks for either a collect() failure with the apache#3949 stack-trace signature (AssertionError through ColumnarToRowExec.<init> during BroadcastExchangeExec.doCanonicalize) or a Comet shuffle in the final plan whose subtree still contains a DPP scan. Verified on main (without the sticky-marker fix): q4 (UNION ALL of three DPP-using subqueries with outer rollup aggregate) crashes under smj+aqe and smj+aqe+coalesce with the exact apache#3949 stack trace, and the end-to-end test fails. With this branch's fix applied, both tests pass.
This reverts commit b5f6a80.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
parthchandra
approved these changes
Apr 17, 2026
Contributor
parthchandra
left a comment
There was a problem hiding this comment.
lgtm. Some minor comments
| * re-deriving the decision from the (possibly reshaped) subtree. Also records the reason in the | ||
| * usual explain channel so it surfaces in extended explain output. | ||
| */ | ||
| def markForFallback[T <: TreeNode[_]](node: T, reason: String): T = { |
Contributor
There was a problem hiding this comment.
This is much better than using explain info
|
|
||
| if (CometConf.COMET_DPP_FALLBACK_ENABLED.get() && stageContainsDPPScan(s)) { | ||
| withInfo(s, "Stage contains a scan with Dynamic Partition Pruning") | ||
| markForFallback(s, "Stage contains a scan with Dynamic Partition Pruning") |
Contributor
There was a problem hiding this comment.
Should we add the reason to explainInfo?
Member
Author
There was a problem hiding this comment.
markForFallback does call withInfo so we still get the explain info recorded
Member
Author
There was a problem hiding this comment.
having two approaches seems a bit hacky though. I filed follow on issue to clean this up. #3984
| } | ||
|
|
||
| // scalastyle:off println | ||
| println("=== mechanism check ===") |
Contributor
There was a problem hiding this comment.
Use log insted of println, or perhaps you meant to remove this?
Member
Author
There was a problem hiding this comment.
I removed the debug println
Member
Author
|
Merged. Thanks @parthchandra |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #3949
Closes #3870
There is a follow on issue #3984 to clean this up. This was implemented quite hastily.
Rationale for this change
Comet's shuffle-support predicates (
nativeShuffleSupported,columnarShuffleSupportedinCometShuffleExchangeExec) are called at both initial planning and AQE stage-prep. Some fallback decisions depend on the surrounding plan shape — for example, the presence of a DPP scan below a shuffle. Between the two passes, AQE wraps already-materialized child stages inShuffleQueryStageExec, aLeafExecNodewhosechildrenisSeq.empty, so a naive re-evaluation can flip the decision.Concrete mechanism (the trigger for #3949):
stageContainsDPPScanwalkss.child.exists(...)looking for aFileSourceScanExecwith aPlanExpressionpartition filter. When the DPP subtree sits under a materializedShuffleQueryStageExec,.existsstops at the wrapper and the DPP scan becomes invisible. The same shuffle that correctly fell back to Spark at initial planning is converted to Comet at stage prep. The resulting plan has aCometColumnarExchangeabove a materialized row-mode stage whose subtree still contains a Spark-fallback DPP scan — a boundary that breaks duringBroadcastExchangeExec.doCanonicalizewithAssertionErroratColumnarToRowExec.<init>(Columnar.scala:70)(becausechild.supportsColumnaris false after canonicalization).What changes are included in this PR?
CometFallbackobject withmarkForFallback/isMarkedForFallback. Distinct fromCometExplainInfo.EXTENSION_INFOon purpose: the explain tag accumulates informational reasons (including rolled-up child reasons) and treating any presence as a fallback signal is too coarse — it breaks legitimate conversions (e.g. a shuffle tagged "Comet native shuffle not enabled" should still be eligible for columnar shuffle). The fallback tag exists only for decisions that must remain sticky.nativeShuffleSupportedandcolumnarShuffleSupportedshort-circuit onisMarkedForFallback(s)at the top.columnarShuffleSupportednow usesmarkForFallbackinstead ofwithInfo, so the decision persists across AQE replanning.Design rule (worth noting for future contributors):
markForFallbackmust only be used for decisions that mean the whole stage falls back regardless of shuffle mode (DPP qualifies). Per-mode reasons — e.g. "unsupported data type for native only" — must keep usingwithInfo, because the native check runs before the columnar check and a sticky marker set in native would prevent columnar from getting a shot.How are these changes tested?
Two new test suites:
CometShuffleFallbackStickinessSuite— unit-level invariant:nativeShuffleSupportedandcolumnarShuffleSupportedreturnfalse.LeafExecNodethat hides the DPP subtree (mimicking a materialized stage), and assert the second call still returnsfalse.CometDppFallbackRepro3949Suite— end-to-end reproduction of the crash:mechanism: builds a real DPP plan and asserts the sticky marker survives an AQE-style child wrap (withNewChildrenpreserves tree-node tags).end-to-end: runs five DPP-flavored queries across three AQE variants; captures anycollect()failure and any Comet shuffle in the final plan whose subtree still contains a DPP scan.q4— a UNION ALL of three DPP-using subqueries with an outer rollup aggregate — crashes undersmj+aqeandsmj+aqe+coalescewith the exact [INTERNAL_ERROR] The "collect" action failed. #3949 stack trace (AssertionErrorthroughColumnarToRowExec.<init>(Columnar.scala:70)duringBroadcastExchangeExec.doCanonicalize).Existing
DPP fallbackandDPP fallback avoids inefficient Comet shuffle (#3874)tests inCometExecSuitecontinue to pass.