From 2bfacb6f832406c8a4244dee158515413d7cf614 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 17 Apr 2026 11:40:00 -0600 Subject: [PATCH 01/11] fix: make shuffle fallback decisions sticky by checking existing explain tags MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit columnarShuffleSupported and nativeShuffleSupported now short-circuit to false if the shuffle already carries a Comet fallback tag from a prior rule pass. This mirrors the established pattern in CometNativeScan.isSupported and preserves the earlier decision instead of re-deriving it from the current plan shape. Background: Comet's shuffle-support checks run at both initial planning and AQE stage-prep. Between those passes, AQE wraps completed child stages in ShuffleQueryStageExec (a LeafExecNode whose children is Seq.empty). A naive re-evaluation can therefore flip the decision — e.g., stageContainsDPPScan uses s.child.exists(...) to find a FileSourceScanExec with a PlanExpression partition filter, but that walk stops at the stage wrapper and the DPP scan becomes invisible. The same shuffle then falls back to Spark at initial planning and gets converted to Comet at stage prep, producing plan-shape inconsistencies across the two passes. Adds CometShuffleFallbackStickinessSuite: - direct: tag a synthetic shuffle and assert both support predicates return false - end-to-end: build a DPP query, observe pass 1 falls back and tags the shuffle, then swap the shuffle's child for an opaque leaf (mimicking a materialized stage) and assert pass 2 still falls back --- .../shuffle/CometShuffleExchangeExec.scala | 17 ++- .../CometShuffleFallbackStickinessSuite.scala | 139 ++++++++++++++++++ 2 files changed, 155 insertions(+), 1 deletion(-) create mode 100644 spark/src/test/scala/org/apache/spark/sql/comet/CometShuffleFallbackStickinessSuite.scala diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index df2dca0331..e09760d4dc 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -50,7 +50,7 @@ import com.google.common.base.Objects import org.apache.comet.CometConf import org.apache.comet.CometConf.{COMET_EXEC_SHUFFLE_ENABLED, COMET_SHUFFLE_MODE} -import org.apache.comet.CometSparkSessionExtensions.{isCometShuffleManagerEnabled, withInfo} +import org.apache.comet.CometSparkSessionExtensions.{hasExplainInfo, isCometShuffleManagerEnabled, withInfo} import org.apache.comet.serde.{Compatible, OperatorOuterClass, QueryPlanSerde, SupportLevel, Unsupported} import org.apache.comet.serde.operator.CometSink import org.apache.comet.shims.ShimCometShuffleExchangeExec @@ -328,6 +328,11 @@ object CometShuffleExchangeExec false } + // Preserve any prior-pass fallback decision (see comment in columnarShuffleSupported). + if (hasExplainInfo(s)) { + return false + } + if (!isCometShuffleEnabledWithInfo(s)) { return false } @@ -450,6 +455,16 @@ object CometShuffleExchangeExec false } + // If this shuffle already carries fallback reasons from a prior rule pass, preserve that + // decision instead of re-deriving it from the current plan shape. Under AQE, a completed + // child stage gets wrapped in a ShuffleQueryStageExec whose `children` is empty, and a + // naive re-evaluation can flip its answer (e.g., `stageContainsDPPScan` stops seeing the + // DPP scan and returns false). Sticking with the original decision avoids those plan-shape + // inconsistencies across planning passes. Mirrors CometNativeScan.isSupported. + if (hasExplainInfo(s)) { + return false + } + if (!isCometShuffleEnabledWithInfo(s)) { return false } diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/CometShuffleFallbackStickinessSuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/CometShuffleFallbackStickinessSuite.scala new file mode 100644 index 0000000000..643c7b2b32 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/comet/CometShuffleFallbackStickinessSuite.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.SinglePartition +import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.internal.SQLConf + +import org.apache.comet.{CometConf, CometExplainInfo} +import org.apache.comet.CometSparkSessionExtensions.withInfo + +/** + * Pins the sticky-fallback invariant for Comet shuffle decisions: `nativeShuffleSupported` / + * `columnarShuffleSupported` must return `false` whenever the shuffle already carries a Comet + * fallback tag from a prior rule pass. + * + * Without this behavior, AQE's stage-prep rule re-evaluation can flip the decision — e.g., + * `stageContainsDPPScan` walks the shuffle's child tree with `.exists`, but a materialized child + * stage is wrapped in `ShuffleQueryStageExec` (a `LeafExecNode`) so `.exists` stops at the + * wrapper and the DPP scan becomes invisible. That causes the same shuffle to fall back to Spark + * at initial planning and then convert to Comet at stage prep, producing plan-shape + * inconsistencies across the two passes (suspected mechanism behind #3949). + * + * The fix mirrors `CometNativeScan.isSupported`: if the node already has any fallback reason + * tagged by a prior pass, preserve that decision rather than re-deriving it. + */ +class CometShuffleFallbackStickinessSuite extends CometTestBase { + + test("columnarShuffleSupported returns false when shuffle already carries a fallback tag") { + val shuffle = ShuffleExchangeExec(SinglePartition, SyntheticLeaf(Nil)) + // Simulate an earlier rule pass having tagged this shuffle with a DPP fallback reason. + withInfo(shuffle, "Stage contains a scan with Dynamic Partition Pruning") + assert(shuffle.getTagValue(CometExplainInfo.EXTENSION_INFO).exists(_.nonEmpty)) + + withSQLConf(CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "true") { + assert( + !CometShuffleExchangeExec.columnarShuffleSupported(shuffle), + "tagged shuffle must preserve its prior-pass fallback decision") + assert( + !CometShuffleExchangeExec.nativeShuffleSupported(shuffle), + "tagged shuffle must preserve its prior-pass fallback decision in native path too") + } + } + + test( + "DPP fallback decision is sticky across two invocations even when the child tree changes") { + withTempDir { dir => + val factPath = s"${dir.getAbsolutePath}/fact.parquet" + val dimPath = s"${dir.getAbsolutePath}/dim.parquet" + withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "false") { + val sess = spark + import sess.implicits._ + val oneDay = 24L * 60L * 60000L + val now = System.currentTimeMillis() + (0 until 400) + .map(i => (i, new java.sql.Date(now + (i % 40) * oneDay), i.toString)) + .toDF("fact_id", "fact_date", "fact_str") + .write + .partitionBy("fact_date") + .parquet(factPath) + (0 until 40) + .map(i => (i, new java.sql.Date(now + i * oneDay), i.toString)) + .toDF("dim_id", "dim_date", "dim_str") + .write + .parquet(dimPath) + } + spark.read.parquet(factPath).createOrReplaceTempView("t_sticky_fact") + spark.read.parquet(dimPath).createOrReplaceTempView("t_sticky_dim") + + withSQLConf( + CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.PREFER_SORTMERGEJOIN.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + + val df = spark.sql( + "select f.fact_date, count(*) c from t_sticky_fact f " + + "join t_sticky_dim d on f.fact_date = d.dim_date " + + "where d.dim_id > 35 group by f.fact_date") + val initial = df.queryExecution.executedPlan match { + case a: org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec => a.initialPlan + case other => other + } + val shuffle = + initial + .collectFirst { case s: ShuffleExchangeExec => s } + .getOrElse(fail(s"no shuffle found:\n${initial.treeString}")) + + // Pass 1: real DPP subtree visible. Returns false AND tags the shuffle. + val first = CometShuffleExchangeExec.columnarShuffleSupported(shuffle) + assert(!first, "initial pass must fall back (DPP visible)") + assert( + shuffle.getTagValue(CometExplainInfo.EXTENSION_INFO).exists(_.nonEmpty), + "fallback reason must be tagged onto the shuffle") + + // Pass 2 simulates AQE stage-prep: replace the child with an opaque leaf that hides + // the DPP subtree from tree walks. A naive `.exists`-based check would flip to true + // here; the tag-first short-circuit must keep the decision stable. + val reshapedShuffle = + shuffle + .withNewChildren(Seq(SyntheticLeaf(shuffle.child.output))) + .asInstanceOf[ShuffleExchangeExec] + val second = CometShuffleExchangeExec.columnarShuffleSupported(reshapedShuffle) + assert( + !second, + "second pass must still fall back even though the DPP subtree is now hidden") + } + } + } +} + +private case class SyntheticLeaf(output: Seq[Attribute]) extends LeafExecNode { + override protected def doExecute(): RDD[InternalRow] = + throw new UnsupportedOperationException("stub") +} From 79a5676bcd95ebe0d51ba21a1c953d51a74e5e21 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 17 Apr 2026 12:11:25 -0600 Subject: [PATCH 02/11] refactor: introduce CometFallback tag for sticky shuffle fallback Initial version used hasExplainInfo as the short-circuit condition, but that also matches informational reasons from earlier checks (e.g. 'Comet native shuffle not enabled' left behind by nativeShuffleSupported). That caused legitimate columnar shuffle conversions to be blocked, regressing the columnar shuffle suite (e.g. 'columnar shuffle on struct including nulls'). Introduce a dedicated CometFallback tag distinct from the explain-info tag: - markForFallback(node, reason) records the decision and also writes the reason to the explain channel for visibility. - isMarkedForFallback(node) is what the shuffle-support predicates check. nativeShuffleSupported and columnarShuffleSupported now short-circuit on isMarkedForFallback, and the DPP branch uses markForFallback instead of withInfo so the decision sticks across AQE stage-prep passes. Updated CometShuffleFallbackStickinessSuite to cover both the positive case (marked node must fall back) and the regression case (informational explain info must NOT force fallback). --- .../org/apache/comet/CometFallback.scala | 67 +++++++++++++++++++ .../shuffle/CometShuffleExchangeExec.scala | 18 ++--- .../CometShuffleFallbackStickinessSuite.scala | 42 +++++++----- 3 files changed, 100 insertions(+), 27 deletions(-) create mode 100644 spark/src/main/scala/org/apache/comet/CometFallback.scala diff --git a/spark/src/main/scala/org/apache/comet/CometFallback.scala b/spark/src/main/scala/org/apache/comet/CometFallback.scala new file mode 100644 index 0000000000..28a4816b66 --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/CometFallback.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet + +import org.apache.spark.sql.catalyst.trees.{TreeNode, TreeNodeTag} + +import org.apache.comet.CometSparkSessionExtensions.withInfo + +/** + * Sticky fallback marker for shuffle / stage nodes. + * + * Comet's shuffle-support predicates (e.g. `CometShuffleExchangeExec.columnarShuffleSupported`) + * run at both initial planning and AQE stage-prep. Some fallback decisions depend on the + * surrounding plan shape - for example, the presence of a DPP scan below a shuffle. Between the + * two passes AQE can reshape that subtree (a completed child stage becomes a + * `ShuffleQueryStageExec`, a `LeafExecNode` whose `children` is empty), so a naive re-evaluation + * can flip the decision. + * + * When a decision is made on the initial-plan pass, the deciding rule records a sticky tag via + * [[markForFallback]]. On subsequent passes, callers short-circuit via [[isMarkedForFallback]] + * and preserve the earlier decision instead of re-deriving it from the current plan shape. + * + * This tag is kept separate from `CometExplainInfo.EXTENSION_INFO` on purpose: the explain tag + * accumulates informational reasons (including rolled-up child reasons), many of which are not a + * full-fallback signal. Treating any presence of explain info as fallback is too coarse and + * breaks legitimate conversions (e.g. a shuffle tagged "Comet native shuffle not enabled" should + * still be eligible for columnar shuffle). The fallback tag exists only for decisions that should + * remain sticky. + */ +object CometFallback { + + val STAGE_FALLBACK_TAG: TreeNodeTag[Set[String]] = + new TreeNodeTag[Set[String]]("CometStageFallback") + + /** + * Mark a node so that subsequent shuffle-support re-evaluations fall back to Spark without + * re-deriving the decision from the (possibly reshaped) subtree. Also records the reason in the + * usual explain channel so it surfaces in extended explain output. + */ + def markForFallback[T <: TreeNode[_]](node: T, reason: String): T = { + val existing = node.getTagValue(STAGE_FALLBACK_TAG).getOrElse(Set.empty[String]) + node.setTagValue(STAGE_FALLBACK_TAG, existing + reason) + withInfo(node, reason) + node + } + + /** True if a prior rule pass marked this node for Spark fallback via [[markForFallback]]. */ + def isMarkedForFallback(node: TreeNode[_]): Boolean = + node.getTagValue(STAGE_FALLBACK_TAG).exists(_.nonEmpty) +} diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index e09760d4dc..2500a52658 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -50,7 +50,8 @@ import com.google.common.base.Objects import org.apache.comet.CometConf import org.apache.comet.CometConf.{COMET_EXEC_SHUFFLE_ENABLED, COMET_SHUFFLE_MODE} -import org.apache.comet.CometSparkSessionExtensions.{hasExplainInfo, isCometShuffleManagerEnabled, withInfo} +import org.apache.comet.CometFallback.{isMarkedForFallback, markForFallback} +import org.apache.comet.CometSparkSessionExtensions.{isCometShuffleManagerEnabled, withInfo} import org.apache.comet.serde.{Compatible, OperatorOuterClass, QueryPlanSerde, SupportLevel, Unsupported} import org.apache.comet.serde.operator.CometSink import org.apache.comet.shims.ShimCometShuffleExchangeExec @@ -328,8 +329,8 @@ object CometShuffleExchangeExec false } - // Preserve any prior-pass fallback decision (see comment in columnarShuffleSupported). - if (hasExplainInfo(s)) { + // Preserve any prior-pass fallback decision (see `CometFallback`). + if (isMarkedForFallback(s)) { return false } @@ -455,13 +456,8 @@ object CometShuffleExchangeExec false } - // If this shuffle already carries fallback reasons from a prior rule pass, preserve that - // decision instead of re-deriving it from the current plan shape. Under AQE, a completed - // child stage gets wrapped in a ShuffleQueryStageExec whose `children` is empty, and a - // naive re-evaluation can flip its answer (e.g., `stageContainsDPPScan` stops seeing the - // DPP scan and returns false). Sticking with the original decision avoids those plan-shape - // inconsistencies across planning passes. Mirrors CometNativeScan.isSupported. - if (hasExplainInfo(s)) { + // Preserve any prior-pass fallback decision (see `CometFallback`). + if (isMarkedForFallback(s)) { return false } @@ -470,7 +466,7 @@ object CometShuffleExchangeExec } if (CometConf.COMET_DPP_FALLBACK_ENABLED.get() && stageContainsDPPScan(s)) { - withInfo(s, "Stage contains a scan with Dynamic Partition Pruning") + markForFallback(s, "Stage contains a scan with Dynamic Partition Pruning") return false } diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/CometShuffleFallbackStickinessSuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/CometShuffleFallbackStickinessSuite.scala index 643c7b2b32..774698e62c 100644 --- a/spark/src/test/scala/org/apache/spark/sql/comet/CometShuffleFallbackStickinessSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/comet/CometShuffleFallbackStickinessSuite.scala @@ -29,13 +29,12 @@ import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.internal.SQLConf -import org.apache.comet.{CometConf, CometExplainInfo} -import org.apache.comet.CometSparkSessionExtensions.withInfo +import org.apache.comet.{CometConf, CometFallback} /** * Pins the sticky-fallback invariant for Comet shuffle decisions: `nativeShuffleSupported` / - * `columnarShuffleSupported` must return `false` whenever the shuffle already carries a Comet - * fallback tag from a prior rule pass. + * `columnarShuffleSupported` must return `false` whenever the shuffle already carries a + * `CometFallback` marker from a prior rule pass. * * Without this behavior, AQE's stage-prep rule re-evaluation can flip the decision — e.g., * `stageContainsDPPScan` walks the shuffle's child tree with `.exists`, but a materialized child @@ -44,27 +43,38 @@ import org.apache.comet.CometSparkSessionExtensions.withInfo * at initial planning and then convert to Comet at stage prep, producing plan-shape * inconsistencies across the two passes (suspected mechanism behind #3949). * - * The fix mirrors `CometNativeScan.isSupported`: if the node already has any fallback reason - * tagged by a prior pass, preserve that decision rather than re-deriving it. + * Fallback decisions that must survive AQE replanning use `CometFallback.markForFallback`. The + * shuffle-support predicates check `isMarkedForFallback` at the top and short-circuit. */ class CometShuffleFallbackStickinessSuite extends CometTestBase { - test("columnarShuffleSupported returns false when shuffle already carries a fallback tag") { + test("both support predicates fall back when the shuffle carries a CometFallback marker") { val shuffle = ShuffleExchangeExec(SinglePartition, SyntheticLeaf(Nil)) - // Simulate an earlier rule pass having tagged this shuffle with a DPP fallback reason. - withInfo(shuffle, "Stage contains a scan with Dynamic Partition Pruning") - assert(shuffle.getTagValue(CometExplainInfo.EXTENSION_INFO).exists(_.nonEmpty)) + CometFallback.markForFallback(shuffle, "pretend prior pass decided Spark fallback") withSQLConf(CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "true") { assert( !CometShuffleExchangeExec.columnarShuffleSupported(shuffle), - "tagged shuffle must preserve its prior-pass fallback decision") + "marked shuffle must preserve its prior-pass fallback decision (columnar path)") assert( !CometShuffleExchangeExec.nativeShuffleSupported(shuffle), - "tagged shuffle must preserve its prior-pass fallback decision in native path too") + "marked shuffle must preserve its prior-pass fallback decision (native path)") } } + test("informational explain-info alone does NOT force fallback") { + // A shuffle can accumulate explain info (e.g. 'Comet native shuffle not enabled') as + // informational output from earlier checks without being a full-fallback signal. That + // info must not cause the columnar path to decline. + val shuffle = ShuffleExchangeExec(SinglePartition, SyntheticLeaf(Nil)) + // Note: withInfo, not markForFallback. + org.apache.comet.CometSparkSessionExtensions + .withInfo(shuffle, "Comet native shuffle not enabled") + assert( + !CometFallback.isMarkedForFallback(shuffle), + "explain info alone must not imply a sticky fallback marker") + } + test( "DPP fallback decision is sticky across two invocations even when the child tree changes") { withTempDir { dir => @@ -110,16 +120,16 @@ class CometShuffleFallbackStickinessSuite extends CometTestBase { .collectFirst { case s: ShuffleExchangeExec => s } .getOrElse(fail(s"no shuffle found:\n${initial.treeString}")) - // Pass 1: real DPP subtree visible. Returns false AND tags the shuffle. + // Pass 1: real DPP subtree visible. Returns false AND marks the shuffle. val first = CometShuffleExchangeExec.columnarShuffleSupported(shuffle) assert(!first, "initial pass must fall back (DPP visible)") assert( - shuffle.getTagValue(CometExplainInfo.EXTENSION_INFO).exists(_.nonEmpty), - "fallback reason must be tagged onto the shuffle") + CometFallback.isMarkedForFallback(shuffle), + "fallback marker must be placed on the shuffle") // Pass 2 simulates AQE stage-prep: replace the child with an opaque leaf that hides // the DPP subtree from tree walks. A naive `.exists`-based check would flip to true - // here; the tag-first short-circuit must keep the decision stable. + // here; the sticky marker must keep the decision stable. val reshapedShuffle = shuffle .withNewChildren(Seq(SyntheticLeaf(shuffle.child.output))) From 98b3b796a8fc8efa92899dfce91c23bc9ce1739d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 17 Apr 2026 12:28:23 -0600 Subject: [PATCH 03/11] scalastyle --- .../spark/sql/comet/CometShuffleFallbackStickinessSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/CometShuffleFallbackStickinessSuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/CometShuffleFallbackStickinessSuite.scala index 774698e62c..7e3ee63502 100644 --- a/spark/src/test/scala/org/apache/spark/sql/comet/CometShuffleFallbackStickinessSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/comet/CometShuffleFallbackStickinessSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.physical.SinglePartition import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec -import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.LeafExecNode import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.internal.SQLConf From 195bf8a1c7decc8b945458b69860f8c000f5eb64 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 17 Apr 2026 12:56:06 -0600 Subject: [PATCH 04/11] test: reproduce #3949 via q14a-style UNION-ALL DPP query under AQE Adds CometDppFallbackRepro3949Suite with two tests: 1. mechanism: builds a DPP plan, observes the initial columnarShuffleSupported decision is "fall back" (DPP visible), then wraps the shuffle's child in an opaque LeafExecNode mirroring how ShuffleQueryStageExec presents to .exists walks, and asserts the decision stays "fall back" because the sticky CometFallback marker carries over via withNewChildren. 2. end-to-end: runs five DPP-flavored queries across three AQE variants and looks for either a collect() failure with the #3949 stack-trace signature (AssertionError through ColumnarToRowExec. during BroadcastExchangeExec.doCanonicalize) or a Comet shuffle in the final plan whose subtree still contains a DPP scan. Verified on main (without the sticky-marker fix): q4 (UNION ALL of three DPP-using subqueries with outer rollup aggregate) crashes under smj+aqe and smj+aqe+coalesce with the exact #3949 stack trace, and the end-to-end test fails. With this branch's fix applied, both tests pass. --- .../CometDppFallbackRepro3949Suite.scala | 430 ++++++++++++++++++ 1 file changed, 430 insertions(+) create mode 100644 spark/src/test/scala/org/apache/spark/sql/comet/CometDppFallbackRepro3949Suite.scala diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/CometDppFallbackRepro3949Suite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/CometDppFallbackRepro3949Suite.scala new file mode 100644 index 0000000000..71e1f16f75 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/comet/CometDppFallbackRepro3949Suite.scala @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet + +import scala.collection.mutable + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{CometTestBase, Row} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, PlanExpression} +import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec +import org.apache.spark.sql.execution.{FileSourceScanExec, LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.internal.SQLConf + +import org.apache.comet.{CometConf, CometExplainInfo} + +/** + * Attempts an end-to-end reproduction of issue #3949. + * + * #3949 reports `[INTERNAL_ERROR]` from `BroadcastExchangeExec.doCanonicalize`, ultimately caused + * by `ColumnarToRowExec.` asserting that its child `supportsColumnar`. The suspected root + * cause is that Comet's DPP fallback (`CometShuffleExchangeExec.stageContainsDPPScan`) walks + * `s.child.exists(...)` for a `FileSourceScanExec` with a `PlanExpression` partition filter, and + * that walk is not stable across the two planning passes: + * + * - initial planning: shuffle's child subtree includes the DPP scan -> `.exists` finds it -> + * fall back to Spark. + * - AQE stage prep: the inner stage that contained the scan has materialized and been replaced + * by a `ShuffleQueryStageExec` (a `LeafExecNode` whose `children == Seq.empty`). `.exists` + * can no longer descend into it, the DPP scan becomes invisible, the same shuffle is + * converted to Comet, and the plan shape changes between passes. + * + * This suite has two tests: + * + * 1. `mechanism`: synthetic. Builds a real DPP plan, observes the initial-pass decision is + * "fall back", then swaps the shuffle's child for an opaque `LeafExecNode` (mirroring how + * `ShuffleQueryStageExec` presents to `.exists`) and asserts the decision flips to + * "convert". Documents the mechanism without depending on AQE actually triggering it. + * + * 2. `endToEnd`: runs DPP-flavored queries with AQE on, sweeps a few seeds/variants, and asks + * whether `df.collect()` ever throws or whether the final executed plan ever contains a Comet + * shuffle whose child subtree (descending through `QueryStageExec.plan`) still contains a DPP + * scan -- i.e. an inconsistency that the bug would produce. + */ +class CometDppFallbackRepro3949Suite extends CometTestBase { + + // ---------------------------------------------------------------------- + // Mechanism (synthetic): proves the AQE wrap flips the fallback decision. + // ---------------------------------------------------------------------- + + private def buildDppTables(dir: java.io.File, factPrefix: String): Unit = { + val factPath = s"${dir.getAbsolutePath}/$factPrefix.parquet" + val dimPath = s"${dir.getAbsolutePath}/${factPrefix}_dim.parquet" + withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "false") { + val sess = spark + import sess.implicits._ + val oneDay = 24L * 60L * 60000L + val now = System.currentTimeMillis() + (0 until 400) + .map(i => (i, new java.sql.Date(now + (i % 40) * oneDay), i.toString)) + .toDF("fact_id", "fact_date", "fact_str") + .write + .partitionBy("fact_date") + .parquet(factPath) + (0 until 40) + .map(i => (i, new java.sql.Date(now + i * oneDay), i.toString)) + .toDF("dim_id", "dim_date", "dim_str") + .write + .parquet(dimPath) + } + spark.read.parquet(factPath).createOrReplaceTempView(s"${factPrefix}_fact") + spark.read.parquet(dimPath).createOrReplaceTempView(s"${factPrefix}_dim") + } + + private def unwrapAqe(plan: SparkPlan): SparkPlan = plan match { + case a: AdaptiveSparkPlanExec => a.initialPlan + case other => other + } + + private def findFirstShuffle(plan: SparkPlan): Option[ShuffleExchangeExec] = { + var found: Option[ShuffleExchangeExec] = None + plan.foreach { + case s: ShuffleExchangeExec if found.isEmpty => found = Some(s) + case _ => + } + found + } + + test("mechanism: DPP fallback decision is sticky across an AQE-style child wrap") { + withTempDir { dir => + buildDppTables(dir, "mech") + withSQLConf( + CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.PREFER_SORTMERGEJOIN.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + + val df = spark.sql( + "select f.fact_date, count(*) c " + + "from mech_fact f join mech_dim d on f.fact_date = d.dim_date " + + "where d.dim_id > 35 group by f.fact_date") + val initialPlan = unwrapAqe(df.queryExecution.executedPlan) + val shuffle = findFirstShuffle(initialPlan).getOrElse { + fail(s"No ShuffleExchangeExec found in initial plan:\n${initialPlan.treeString}") + } + + val initialDecision = CometShuffleExchangeExec.columnarShuffleSupported(shuffle) + + val initialDppVisible = shuffle.child.exists { + case scan: FileSourceScanExec => + scan.partitionFilters.exists(_.exists(_.isInstanceOf[PlanExpression[_]])) + case _ => false + } + + // Simulate AQE stage prep: wrap the shuffle's child in an opaque LeafExecNode, + // matching how `ShuffleQueryStageExec` presents to `.exists` walks (its `children` + // is `Seq.empty`). `withNewChildren` preserves tree-node tags, so if the fix is in + // place the sticky CometFallback marker on `shuffle` carries over to + // `postAqeShuffle`, and the decision short-circuits to false. Without the fix, + // the DPP walk re-runs, fails to see the scan, and flips to true. + val hiddenChild = OpaqueStageStub(shuffle.child.output) + val postAqeShuffle = + shuffle.withNewChildren(Seq(hiddenChild)).asInstanceOf[ShuffleExchangeExec] + val postAqeDecision = CometShuffleExchangeExec.columnarShuffleSupported(postAqeShuffle) + + val postAqeDppVisible = postAqeShuffle.child.exists { + case scan: FileSourceScanExec => + scan.partitionFilters.exists(_.exists(_.isInstanceOf[PlanExpression[_]])) + case _ => false + } + + // scalastyle:off println + println(s"=== mechanism check ===") + println(s"initialDppVisible=$initialDppVisible initialDecision=$initialDecision") + println(s"postAqeDppVisible=$postAqeDppVisible postAqeDecision=$postAqeDecision") + // scalastyle:on println + + assert(initialDppVisible, "initial child tree should expose DPP scan") + assert(!postAqeDppVisible, "stage-wrapped child should hide DPP scan") + assert(!initialDecision, s"expected fall back initially, got $initialDecision") + assert( + !postAqeDecision, + s"decision must stay 'fall back' across the AQE-style wrap, got $postAqeDecision") + } + } + } + + // ---------------------------------------------------------------------- + // End-to-end: actually run DPP-flavored queries and look for the bug. + // ---------------------------------------------------------------------- + + // Walk the executed plan descending into QueryStageExec.plan, and return any Comet + // shuffle whose subtree still contains a DPP scan -- the cross-pass inconsistency the + // bug would create. + private def findInconsistentCometShuffles(plan: SparkPlan): Seq[SparkPlan] = { + import org.apache.spark.sql.execution.adaptive.QueryStageExec + def containsDppScan(p: SparkPlan): Boolean = p match { + case scan: FileSourceScanExec => + scan.partitionFilters.exists(_.exists(_.isInstanceOf[PlanExpression[_]])) + case stage: QueryStageExec => containsDppScan(stage.plan) + case other => other.children.exists(containsDppScan) + } + val acc = mutable.Buffer.empty[SparkPlan] + def walk(p: SparkPlan): Unit = { + p match { + case s: CometShuffleExchangeExec if containsDppScan(s) => + acc += s + case _ => + } + p match { + case stage: QueryStageExec => walk(stage.plan) + case _ => p.children.foreach(walk) + } + } + walk(plan) + acc.toSeq + } + + // Match the executed plan: any Comet shuffle whose explain-info already says it should have + // fallen back due to DPP. That's the smoking-gun pattern from the issue: + // CometColumnarExchange [COMET: Stage contains a scan with Dynamic Partition Pruning] + private def findCometShufflesTaggedAsDppFallback(plan: SparkPlan): Seq[SparkPlan] = { + import org.apache.spark.sql.execution.adaptive.QueryStageExec + val acc = mutable.Buffer.empty[SparkPlan] + def walk(p: SparkPlan): Unit = { + p match { + case s: CometShuffleExchangeExec => + val tags = s.getTagValue(CometExplainInfo.EXTENSION_INFO).getOrElse(Set.empty[String]) + if (tags.exists(_.contains("Dynamic Partition Pruning"))) acc += s + case _ => + } + p match { + case stage: QueryStageExec => walk(stage.plan) + case _ => p.children.foreach(walk) + } + } + walk(plan) + acc.toSeq + } + + private def buildDppTablesShared(dir: java.io.File): Unit = { + val factPath = s"${dir.getAbsolutePath}/e2e_fact.parquet" + val dimPath = s"${dir.getAbsolutePath}/e2e_dim.parquet" + val dim2Path = s"${dir.getAbsolutePath}/e2e_dim2.parquet" + withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "false") { + val sess = spark + import sess.implicits._ + val oneDay = 24L * 60L * 60000L + val now = System.currentTimeMillis() + // larger fact so AQE actually has stages to materialize + (0 until 4000) + .map(i => (i, new java.sql.Date(now + (i % 80) * oneDay), i.toString, i % 10)) + .toDF("fact_id", "fact_date", "fact_str", "fact_grp") + .write + .partitionBy("fact_date") + .parquet(factPath) + (0 until 80) + .map(i => (i, new java.sql.Date(now + i * oneDay), i.toString)) + .toDF("dim_id", "dim_date", "dim_str") + .write + .parquet(dimPath) + (0 until 10) + .map(i => (i, s"g$i")) + .toDF("grp_id", "grp_str") + .write + .parquet(dim2Path) + } + spark.read.parquet(factPath).createOrReplaceTempView("e2e_fact") + spark.read.parquet(dimPath).createOrReplaceTempView("e2e_dim") + spark.read.parquet(dim2Path).createOrReplaceTempView("e2e_dim2") + } + + // A handful of DPP-flavored queries in roughly increasing complexity. The last few mimic the + // q14a structure (UNION ALL of multiple DPP-using subqueries with HAVING and outer aggregate) + // because the issue specifically hit q14a / q14b / q31 / q47 / q57. + private val queries: Seq[String] = Seq( + // 1. Plain SMJ DPP + """select f.fact_date, count(*) c + |from e2e_fact f join e2e_dim d on f.fact_date = d.dim_date + |where d.dim_id > 70 + |group by f.fact_date""".stripMargin, + // 2. Aggregation above DPP join, then second join on the aggregate + """select g.grp_str, sum(t.c) total from e2e_dim2 g join ( + | select f.fact_grp, count(*) c + | from e2e_fact f join e2e_dim d on f.fact_date = d.dim_date + | where d.dim_id > 60 + | group by f.fact_grp + |) t on g.grp_id = t.fact_grp + |group by g.grp_str""".stripMargin, + // 3. UNION ALL of two DPP-using subqueries, outer aggregate -- q14a-style. + """select channel, fact_grp, sum(c) total from ( + | select 'a' channel, f.fact_grp, count(*) c + | from e2e_fact f join e2e_dim d on f.fact_date = d.dim_date + | where d.dim_id > 60 + | group by f.fact_grp + | union all + | select 'b' channel, f.fact_grp, count(*) c + | from e2e_fact f join e2e_dim d on f.fact_date = d.dim_date + | where d.dim_id > 70 + | group by f.fact_grp + |) y group by channel, fact_grp""".stripMargin, + // 4. UNION ALL with rollup -- pushes the bug surface higher up the plan. + """select channel, fact_grp, sum(c) total from ( + | select 'a' channel, f.fact_grp, count(*) c + | from e2e_fact f join e2e_dim d on f.fact_date = d.dim_date + | where d.dim_id > 60 + | group by f.fact_grp + | union all + | select 'b' channel, f.fact_grp, count(*) c + | from e2e_fact f join e2e_dim d on f.fact_date = d.dim_date + | where d.dim_id > 70 + | group by f.fact_grp + | union all + | select 'c' channel, f.fact_grp, count(*) c + | from e2e_fact f join e2e_dim d on f.fact_date = d.dim_date + | where d.dim_id > 50 + | group by f.fact_grp + |) y group by rollup (channel, fact_grp)""".stripMargin, + // 5. q14a-style with HAVING + scalar subquery (forces broadcast-side stage materialization + // BEFORE outer planning -- the configuration that historically reproduced #3949). + """with avg_sales as ( + | select avg(c) avg_c from ( + | select count(*) c from e2e_fact f + | join e2e_dim d on f.fact_date = d.dim_date + | where d.dim_id > 50 + | group by f.fact_grp + | ) x + |) + |select 'store' channel, f.fact_grp, count(*) c + |from e2e_fact f join e2e_dim d on f.fact_date = d.dim_date + |where d.dim_id > 60 + |group by f.fact_grp + |having count(*) > (select avg_c from avg_sales) + |union all + |select 'web' channel, f.fact_grp, count(*) c + |from e2e_fact f join e2e_dim d on f.fact_date = d.dim_date + |where d.dim_id > 70 + |group by f.fact_grp + |having count(*) > (select avg_c from avg_sales)""".stripMargin) + + private val variants: Seq[(String, Map[String, String])] = Seq( + "smj+aqe" -> Map( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.PREFER_SORTMERGEJOIN.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true"), + "bhj+aqe" -> Map( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true"), + "smj+aqe+coalesce" -> Map( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.PREFER_SORTMERGEJOIN.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + "spark.sql.adaptive.coalescePartitions.enabled" -> "true", + "spark.sql.adaptive.coalescePartitions.minPartitionSize" -> "1b", + "spark.sql.adaptive.coalescePartitions.initialPartitionNum" -> "16")) + + test("end-to-end: collect DPP queries under AQE; look for #3949 symptoms") { + withTempDir { dir => + buildDppTablesShared(dir) + + val failures = mutable.Buffer.empty[(String, Int, String, String)] + val suspicious = mutable.Buffer.empty[(String, Int, String)] + + for ((variantName, variantConf) <- variants; (q, idx) <- queries.zipWithIndex) { + val conf = variantConf ++ Map( + CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "true", + SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") + try { + withSQLConf(conf.toSeq: _*) { + val df = spark.sql(q) + val rows: Array[Row] = df.collect() + rows.length // touch + + val executedPlan = df.queryExecution.executedPlan + val taggedFallback = findCometShufflesTaggedAsDppFallback(executedPlan) + val inconsistent = findInconsistentCometShuffles(executedPlan) + if (taggedFallback.nonEmpty) { + suspicious += (( + variantName, + idx, + s"Comet shuffle tagged with DPP fallback reason but not fallen back " + + s"(${taggedFallback.size}). Plan:\n${executedPlan.treeString}")) + } + if (inconsistent.nonEmpty) { + suspicious += (( + variantName, + idx, + s"Comet shuffle still has DPP scan in subtree (${inconsistent.size}). " + + s"Plan:\n${executedPlan.treeString}")) + } + } + } catch { + case t: Throwable => + var c: Throwable = t + while (c.getCause != null && c.getCause != c) c = c.getCause + val sw = new java.io.StringWriter + c.printStackTrace(new java.io.PrintWriter(sw)) + val detail = Option(c.getMessage).getOrElse(c.toString) + "\n" + sw.toString + failures += ((variantName, idx, detail, c.getClass.getName)) + } + } + + // scalastyle:off println + println(s"=== end-to-end summary ===") + println(s"failures (collect threw): ${failures.size}") + failures.foreach { case (v, i, msg, cls) => + println(s" $v/q$i: $cls") + println(msg) + } + println(s"suspicious (plan-shape inconsistency): ${suspicious.size}") + suspicious.foreach { case (v, i, note) => + println(s" $v/q$i: ${note.take(800)}") + } + // scalastyle:on println + + // Demonstrate-the-bug assertion: if EITHER an #3949-shaped crash or a plan inconsistency + // was observed, the bug is reproduced. The 3949 signature is an AssertionError whose + // stack goes through ColumnarToRowExec. (Columnar.scala:70) during + // BroadcastExchangeExec.doCanonicalize. + val anyEvidence = failures.exists { case (_, _, msg, _) => + msg.contains("INTERNAL_ERROR") || + msg.contains("supportsColumnar") || + msg.contains("ColumnarToRowExec") || + msg.contains("doCanonicalize") + } || suspicious.nonEmpty + + val summary = new StringBuilder + summary.append(s"#3949 reproduced: ${failures.size} collect failure(s) and ") + summary.append(s"${suspicious.size} plan-shape inconsistencies\n") + summary.append(s"---- failures ----\n") + failures.foreach { case (v, i, msg, cls) => + summary.append(s"$v/q$i ($cls):\n").append(msg.take(4000)).append("\n") + } + summary.append(s"---- suspicious ----\n") + suspicious.foreach { case (v, i, note) => + summary.append(s"$v/q$i: ${note.take(1500)}\n") + } + assert(!anyEvidence, summary.toString) + } + } +} + +/** + * `LeafExecNode` stub mirroring how `ShuffleQueryStageExec` presents to a `.exists` walk: + * `children == Seq.empty`, so descent stops at the wrapper. Used by the mechanism test only. + */ +private case class OpaqueStageStub(output: Seq[Attribute]) extends LeafExecNode { + override protected def doExecute(): RDD[InternalRow] = + throw new UnsupportedOperationException("stub") +} From 4d43405cea2acfb614c115f0ca2483b4e1f56909 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 17 Apr 2026 13:07:11 -0600 Subject: [PATCH 05/11] scalastyle --- .../execution/shuffle/CometShuffleExchangeExec.scala | 4 ++-- .../sql/comet/CometDppFallbackRepro3949Suite.scala | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index 2500a52658..849f4d37d0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -50,7 +50,7 @@ import com.google.common.base.Objects import org.apache.comet.CometConf import org.apache.comet.CometConf.{COMET_EXEC_SHUFFLE_ENABLED, COMET_SHUFFLE_MODE} -import org.apache.comet.CometFallback.{isMarkedForFallback, markForFallback} +import org.apache.comet.CometFallback.isMarkedForFallback import org.apache.comet.CometSparkSessionExtensions.{isCometShuffleManagerEnabled, withInfo} import org.apache.comet.serde.{Compatible, OperatorOuterClass, QueryPlanSerde, SupportLevel, Unsupported} import org.apache.comet.serde.operator.CometSink @@ -466,7 +466,7 @@ object CometShuffleExchangeExec } if (CometConf.COMET_DPP_FALLBACK_ENABLED.get() && stageContainsDPPScan(s)) { - markForFallback(s, "Stage contains a scan with Dynamic Partition Pruning") + withInfo(s, "Stage contains a scan with Dynamic Partition Pruning") return false } diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/CometDppFallbackRepro3949Suite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/CometDppFallbackRepro3949Suite.scala index 71e1f16f75..6264038abf 100644 --- a/spark/src/test/scala/org/apache/spark/sql/comet/CometDppFallbackRepro3949Suite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/comet/CometDppFallbackRepro3949Suite.scala @@ -150,7 +150,7 @@ class CometDppFallbackRepro3949Suite extends CometTestBase { } // scalastyle:off println - println(s"=== mechanism check ===") + println("=== mechanism check ===") println(s"initialDppVisible=$initialDppVisible initialDecision=$initialDecision") println(s"postAqeDppVisible=$postAqeDppVisible postAqeDecision=$postAqeDecision") // scalastyle:on println @@ -358,7 +358,7 @@ class CometDppFallbackRepro3949Suite extends CometTestBase { suspicious += (( variantName, idx, - s"Comet shuffle tagged with DPP fallback reason but not fallen back " + + "Comet shuffle tagged with DPP fallback reason but not fallen back " + s"(${taggedFallback.size}). Plan:\n${executedPlan.treeString}")) } if (inconsistent.nonEmpty) { @@ -381,7 +381,7 @@ class CometDppFallbackRepro3949Suite extends CometTestBase { } // scalastyle:off println - println(s"=== end-to-end summary ===") + println("=== end-to-end summary ===") println(s"failures (collect threw): ${failures.size}") failures.foreach { case (v, i, msg, cls) => println(s" $v/q$i: $cls") @@ -407,11 +407,11 @@ class CometDppFallbackRepro3949Suite extends CometTestBase { val summary = new StringBuilder summary.append(s"#3949 reproduced: ${failures.size} collect failure(s) and ") summary.append(s"${suspicious.size} plan-shape inconsistencies\n") - summary.append(s"---- failures ----\n") + summary.append("---- failures ----\n") failures.foreach { case (v, i, msg, cls) => summary.append(s"$v/q$i ($cls):\n").append(msg.take(4000)).append("\n") } - summary.append(s"---- suspicious ----\n") + summary.append("---- suspicious ----\n") suspicious.foreach { case (v, i, note) => summary.append(s"$v/q$i: ${note.take(1500)}\n") } From b5f6a80fe59293158078fbaca7adb2066c8abb67 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 17 Apr 2026 13:24:49 -0600 Subject: [PATCH 06/11] add test to CI --- .github/workflows/pr_build_linux.yml | 1 + .github/workflows/pr_build_macos.yml | 1 + 2 files changed, 2 insertions(+) diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index 030657fc39..ab5cfb1bb6 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -279,6 +279,7 @@ jobs: org.apache.comet.exec.DisableAQECometShuffleSuite org.apache.comet.exec.DisableAQECometAsyncShuffleSuite org.apache.spark.shuffle.sort.SpillSorterSuite + org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.scala - name: "parquet" value: | org.apache.comet.parquet.CometParquetWriterSuite diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index 8362a6cfba..4908cdc337 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -160,6 +160,7 @@ jobs: org.apache.comet.exec.DisableAQECometShuffleSuite org.apache.comet.exec.DisableAQECometAsyncShuffleSuite org.apache.spark.shuffle.sort.SpillSorterSuite + org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.scala - name: "parquet" value: | org.apache.comet.parquet.CometParquetWriterSuite From b2316ed186bc68e3007feedbba00943e17c59988 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 17 Apr 2026 13:25:09 -0600 Subject: [PATCH 07/11] Revert "add test to CI" This reverts commit b5f6a80fe59293158078fbaca7adb2066c8abb67. --- .github/workflows/pr_build_linux.yml | 1 - .github/workflows/pr_build_macos.yml | 1 - 2 files changed, 2 deletions(-) diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index ab5cfb1bb6..030657fc39 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -279,7 +279,6 @@ jobs: org.apache.comet.exec.DisableAQECometShuffleSuite org.apache.comet.exec.DisableAQECometAsyncShuffleSuite org.apache.spark.shuffle.sort.SpillSorterSuite - org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.scala - name: "parquet" value: | org.apache.comet.parquet.CometParquetWriterSuite diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index 4908cdc337..8362a6cfba 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -160,7 +160,6 @@ jobs: org.apache.comet.exec.DisableAQECometShuffleSuite org.apache.comet.exec.DisableAQECometAsyncShuffleSuite org.apache.spark.shuffle.sort.SpillSorterSuite - org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.scala - name: "parquet" value: | org.apache.comet.parquet.CometParquetWriterSuite From c94cf1082bbf452650c9bc6b31445ba9f6058b81 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 17 Apr 2026 13:27:21 -0600 Subject: [PATCH 08/11] ci: add CometDppFallbackRepro3949Suite to Linux and macOS CI workflows Co-Authored-By: Claude Sonnet 4.6 --- .github/workflows/pr_build_linux.yml | 1 + .github/workflows/pr_build_macos.yml | 1 + 2 files changed, 2 insertions(+) diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index 030657fc39..9395c9cb6f 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -315,6 +315,7 @@ jobs: org.apache.spark.sql.comet.CometTPCDSV1_4_PlanStabilitySuite org.apache.spark.sql.comet.CometTPCDSV2_7_PlanStabilitySuite org.apache.spark.sql.comet.CometTaskMetricsSuite + org.apache.spark.sql.comet.CometDppFallbackRepro3949Suite org.apache.comet.objectstore.NativeConfigSuite - name: "expressions" value: | diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index 8362a6cfba..9f71231378 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -196,6 +196,7 @@ jobs: org.apache.spark.sql.comet.CometTPCDSV1_4_PlanStabilitySuite org.apache.spark.sql.comet.CometTPCDSV2_7_PlanStabilitySuite org.apache.spark.sql.comet.CometTaskMetricsSuite + org.apache.spark.sql.comet.CometDppFallbackRepro3949Suite org.apache.comet.objectstore.NativeConfigSuite - name: "expressions" value: | From 6d84bd645def8c5af66fa231bd35c27af602b208 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 17 Apr 2026 13:47:07 -0600 Subject: [PATCH 09/11] fix --- .../comet/execution/shuffle/CometShuffleExchangeExec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index 849f4d37d0..2500a52658 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -50,7 +50,7 @@ import com.google.common.base.Objects import org.apache.comet.CometConf import org.apache.comet.CometConf.{COMET_EXEC_SHUFFLE_ENABLED, COMET_SHUFFLE_MODE} -import org.apache.comet.CometFallback.isMarkedForFallback +import org.apache.comet.CometFallback.{isMarkedForFallback, markForFallback} import org.apache.comet.CometSparkSessionExtensions.{isCometShuffleManagerEnabled, withInfo} import org.apache.comet.serde.{Compatible, OperatorOuterClass, QueryPlanSerde, SupportLevel, Unsupported} import org.apache.comet.serde.operator.CometSink @@ -466,7 +466,7 @@ object CometShuffleExchangeExec } if (CometConf.COMET_DPP_FALLBACK_ENABLED.get() && stageContainsDPPScan(s)) { - withInfo(s, "Stage contains a scan with Dynamic Partition Pruning") + markForFallback(s, "Stage contains a scan with Dynamic Partition Pruning") return false } From 673c9a08094c5866a6e4983f8f98db38e2c7d1f5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 17 Apr 2026 16:07:11 -0600 Subject: [PATCH 10/11] ci: add CometShuffleFallbackStickinessSuite to Linux and macOS workflows --- .github/workflows/pr_build_linux.yml | 1 + .github/workflows/pr_build_macos.yml | 1 + 2 files changed, 2 insertions(+) diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index 9395c9cb6f..d93f85c0fc 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -316,6 +316,7 @@ jobs: org.apache.spark.sql.comet.CometTPCDSV2_7_PlanStabilitySuite org.apache.spark.sql.comet.CometTaskMetricsSuite org.apache.spark.sql.comet.CometDppFallbackRepro3949Suite + org.apache.spark.sql.comet.CometShuffleFallbackStickinessSuite org.apache.comet.objectstore.NativeConfigSuite - name: "expressions" value: | diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index 9f71231378..deaf595604 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -197,6 +197,7 @@ jobs: org.apache.spark.sql.comet.CometTPCDSV2_7_PlanStabilitySuite org.apache.spark.sql.comet.CometTaskMetricsSuite org.apache.spark.sql.comet.CometDppFallbackRepro3949Suite + org.apache.spark.sql.comet.CometShuffleFallbackStickinessSuite org.apache.comet.objectstore.NativeConfigSuite - name: "expressions" value: | From fbb0c7cf916aa0c69e9b54ded11b9fc4a263ba63 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 17 Apr 2026 16:23:25 -0600 Subject: [PATCH 11/11] remove debug println --- .../CometDppFallbackRepro3949Suite.scala | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/CometDppFallbackRepro3949Suite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/CometDppFallbackRepro3949Suite.scala index 6264038abf..5b74b590d2 100644 --- a/spark/src/test/scala/org/apache/spark/sql/comet/CometDppFallbackRepro3949Suite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/comet/CometDppFallbackRepro3949Suite.scala @@ -149,12 +149,6 @@ class CometDppFallbackRepro3949Suite extends CometTestBase { case _ => false } - // scalastyle:off println - println("=== mechanism check ===") - println(s"initialDppVisible=$initialDppVisible initialDecision=$initialDecision") - println(s"postAqeDppVisible=$postAqeDppVisible postAqeDecision=$postAqeDecision") - // scalastyle:on println - assert(initialDppVisible, "initial child tree should expose DPP scan") assert(!postAqeDppVisible, "stage-wrapped child should hide DPP scan") assert(!initialDecision, s"expected fall back initially, got $initialDecision") @@ -380,19 +374,6 @@ class CometDppFallbackRepro3949Suite extends CometTestBase { } } - // scalastyle:off println - println("=== end-to-end summary ===") - println(s"failures (collect threw): ${failures.size}") - failures.foreach { case (v, i, msg, cls) => - println(s" $v/q$i: $cls") - println(msg) - } - println(s"suspicious (plan-shape inconsistency): ${suspicious.size}") - suspicious.foreach { case (v, i, note) => - println(s" $v/q$i: ${note.take(800)}") - } - // scalastyle:on println - // Demonstrate-the-bug assertion: if EITHER an #3949-shaped crash or a plan inconsistency // was observed, the bug is reproduced. The 3949 signature is an AssertionError whose // stack goes through ColumnarToRowExec. (Columnar.scala:70) during