diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index 030657fc39..d93f85c0fc 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -315,6 +315,8 @@ jobs: org.apache.spark.sql.comet.CometTPCDSV1_4_PlanStabilitySuite org.apache.spark.sql.comet.CometTPCDSV2_7_PlanStabilitySuite org.apache.spark.sql.comet.CometTaskMetricsSuite + org.apache.spark.sql.comet.CometDppFallbackRepro3949Suite + org.apache.spark.sql.comet.CometShuffleFallbackStickinessSuite org.apache.comet.objectstore.NativeConfigSuite - name: "expressions" value: | diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index 8362a6cfba..deaf595604 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -196,6 +196,8 @@ jobs: org.apache.spark.sql.comet.CometTPCDSV1_4_PlanStabilitySuite org.apache.spark.sql.comet.CometTPCDSV2_7_PlanStabilitySuite org.apache.spark.sql.comet.CometTaskMetricsSuite + org.apache.spark.sql.comet.CometDppFallbackRepro3949Suite + org.apache.spark.sql.comet.CometShuffleFallbackStickinessSuite org.apache.comet.objectstore.NativeConfigSuite - name: "expressions" value: | diff --git a/spark/src/main/scala/org/apache/comet/CometFallback.scala b/spark/src/main/scala/org/apache/comet/CometFallback.scala new file mode 100644 index 0000000000..28a4816b66 --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/CometFallback.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet + +import org.apache.spark.sql.catalyst.trees.{TreeNode, TreeNodeTag} + +import org.apache.comet.CometSparkSessionExtensions.withInfo + +/** + * Sticky fallback marker for shuffle / stage nodes. + * + * Comet's shuffle-support predicates (e.g. `CometShuffleExchangeExec.columnarShuffleSupported`) + * run at both initial planning and AQE stage-prep. Some fallback decisions depend on the + * surrounding plan shape - for example, the presence of a DPP scan below a shuffle. Between the + * two passes AQE can reshape that subtree (a completed child stage becomes a + * `ShuffleQueryStageExec`, a `LeafExecNode` whose `children` is empty), so a naive re-evaluation + * can flip the decision. + * + * When a decision is made on the initial-plan pass, the deciding rule records a sticky tag via + * [[markForFallback]]. On subsequent passes, callers short-circuit via [[isMarkedForFallback]] + * and preserve the earlier decision instead of re-deriving it from the current plan shape. + * + * This tag is kept separate from `CometExplainInfo.EXTENSION_INFO` on purpose: the explain tag + * accumulates informational reasons (including rolled-up child reasons), many of which are not a + * full-fallback signal. Treating any presence of explain info as fallback is too coarse and + * breaks legitimate conversions (e.g. a shuffle tagged "Comet native shuffle not enabled" should + * still be eligible for columnar shuffle). The fallback tag exists only for decisions that should + * remain sticky. + */ +object CometFallback { + + val STAGE_FALLBACK_TAG: TreeNodeTag[Set[String]] = + new TreeNodeTag[Set[String]]("CometStageFallback") + + /** + * Mark a node so that subsequent shuffle-support re-evaluations fall back to Spark without + * re-deriving the decision from the (possibly reshaped) subtree. Also records the reason in the + * usual explain channel so it surfaces in extended explain output. + */ + def markForFallback[T <: TreeNode[_]](node: T, reason: String): T = { + val existing = node.getTagValue(STAGE_FALLBACK_TAG).getOrElse(Set.empty[String]) + node.setTagValue(STAGE_FALLBACK_TAG, existing + reason) + withInfo(node, reason) + node + } + + /** True if a prior rule pass marked this node for Spark fallback via [[markForFallback]]. */ + def isMarkedForFallback(node: TreeNode[_]): Boolean = + node.getTagValue(STAGE_FALLBACK_TAG).exists(_.nonEmpty) +} diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index df2dca0331..2500a52658 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -50,6 +50,7 @@ import com.google.common.base.Objects import org.apache.comet.CometConf import org.apache.comet.CometConf.{COMET_EXEC_SHUFFLE_ENABLED, COMET_SHUFFLE_MODE} +import org.apache.comet.CometFallback.{isMarkedForFallback, markForFallback} import org.apache.comet.CometSparkSessionExtensions.{isCometShuffleManagerEnabled, withInfo} import org.apache.comet.serde.{Compatible, OperatorOuterClass, QueryPlanSerde, SupportLevel, Unsupported} import org.apache.comet.serde.operator.CometSink @@ -328,6 +329,11 @@ object CometShuffleExchangeExec false } + // Preserve any prior-pass fallback decision (see `CometFallback`). + if (isMarkedForFallback(s)) { + return false + } + if (!isCometShuffleEnabledWithInfo(s)) { return false } @@ -450,12 +456,17 @@ object CometShuffleExchangeExec false } + // Preserve any prior-pass fallback decision (see `CometFallback`). + if (isMarkedForFallback(s)) { + return false + } + if (!isCometShuffleEnabledWithInfo(s)) { return false } if (CometConf.COMET_DPP_FALLBACK_ENABLED.get() && stageContainsDPPScan(s)) { - withInfo(s, "Stage contains a scan with Dynamic Partition Pruning") + markForFallback(s, "Stage contains a scan with Dynamic Partition Pruning") return false } diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/CometDppFallbackRepro3949Suite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/CometDppFallbackRepro3949Suite.scala new file mode 100644 index 0000000000..5b74b590d2 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/comet/CometDppFallbackRepro3949Suite.scala @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet + +import scala.collection.mutable + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{CometTestBase, Row} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, PlanExpression} +import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec +import org.apache.spark.sql.execution.{FileSourceScanExec, LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.internal.SQLConf + +import org.apache.comet.{CometConf, CometExplainInfo} + +/** + * Attempts an end-to-end reproduction of issue #3949. + * + * #3949 reports `[INTERNAL_ERROR]` from `BroadcastExchangeExec.doCanonicalize`, ultimately caused + * by `ColumnarToRowExec.` asserting that its child `supportsColumnar`. The suspected root + * cause is that Comet's DPP fallback (`CometShuffleExchangeExec.stageContainsDPPScan`) walks + * `s.child.exists(...)` for a `FileSourceScanExec` with a `PlanExpression` partition filter, and + * that walk is not stable across the two planning passes: + * + * - initial planning: shuffle's child subtree includes the DPP scan -> `.exists` finds it -> + * fall back to Spark. + * - AQE stage prep: the inner stage that contained the scan has materialized and been replaced + * by a `ShuffleQueryStageExec` (a `LeafExecNode` whose `children == Seq.empty`). `.exists` + * can no longer descend into it, the DPP scan becomes invisible, the same shuffle is + * converted to Comet, and the plan shape changes between passes. + * + * This suite has two tests: + * + * 1. `mechanism`: synthetic. Builds a real DPP plan, observes the initial-pass decision is + * "fall back", then swaps the shuffle's child for an opaque `LeafExecNode` (mirroring how + * `ShuffleQueryStageExec` presents to `.exists`) and asserts the decision flips to + * "convert". Documents the mechanism without depending on AQE actually triggering it. + * + * 2. `endToEnd`: runs DPP-flavored queries with AQE on, sweeps a few seeds/variants, and asks + * whether `df.collect()` ever throws or whether the final executed plan ever contains a Comet + * shuffle whose child subtree (descending through `QueryStageExec.plan`) still contains a DPP + * scan -- i.e. an inconsistency that the bug would produce. + */ +class CometDppFallbackRepro3949Suite extends CometTestBase { + + // ---------------------------------------------------------------------- + // Mechanism (synthetic): proves the AQE wrap flips the fallback decision. + // ---------------------------------------------------------------------- + + private def buildDppTables(dir: java.io.File, factPrefix: String): Unit = { + val factPath = s"${dir.getAbsolutePath}/$factPrefix.parquet" + val dimPath = s"${dir.getAbsolutePath}/${factPrefix}_dim.parquet" + withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "false") { + val sess = spark + import sess.implicits._ + val oneDay = 24L * 60L * 60000L + val now = System.currentTimeMillis() + (0 until 400) + .map(i => (i, new java.sql.Date(now + (i % 40) * oneDay), i.toString)) + .toDF("fact_id", "fact_date", "fact_str") + .write + .partitionBy("fact_date") + .parquet(factPath) + (0 until 40) + .map(i => (i, new java.sql.Date(now + i * oneDay), i.toString)) + .toDF("dim_id", "dim_date", "dim_str") + .write + .parquet(dimPath) + } + spark.read.parquet(factPath).createOrReplaceTempView(s"${factPrefix}_fact") + spark.read.parquet(dimPath).createOrReplaceTempView(s"${factPrefix}_dim") + } + + private def unwrapAqe(plan: SparkPlan): SparkPlan = plan match { + case a: AdaptiveSparkPlanExec => a.initialPlan + case other => other + } + + private def findFirstShuffle(plan: SparkPlan): Option[ShuffleExchangeExec] = { + var found: Option[ShuffleExchangeExec] = None + plan.foreach { + case s: ShuffleExchangeExec if found.isEmpty => found = Some(s) + case _ => + } + found + } + + test("mechanism: DPP fallback decision is sticky across an AQE-style child wrap") { + withTempDir { dir => + buildDppTables(dir, "mech") + withSQLConf( + CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.PREFER_SORTMERGEJOIN.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + + val df = spark.sql( + "select f.fact_date, count(*) c " + + "from mech_fact f join mech_dim d on f.fact_date = d.dim_date " + + "where d.dim_id > 35 group by f.fact_date") + val initialPlan = unwrapAqe(df.queryExecution.executedPlan) + val shuffle = findFirstShuffle(initialPlan).getOrElse { + fail(s"No ShuffleExchangeExec found in initial plan:\n${initialPlan.treeString}") + } + + val initialDecision = CometShuffleExchangeExec.columnarShuffleSupported(shuffle) + + val initialDppVisible = shuffle.child.exists { + case scan: FileSourceScanExec => + scan.partitionFilters.exists(_.exists(_.isInstanceOf[PlanExpression[_]])) + case _ => false + } + + // Simulate AQE stage prep: wrap the shuffle's child in an opaque LeafExecNode, + // matching how `ShuffleQueryStageExec` presents to `.exists` walks (its `children` + // is `Seq.empty`). `withNewChildren` preserves tree-node tags, so if the fix is in + // place the sticky CometFallback marker on `shuffle` carries over to + // `postAqeShuffle`, and the decision short-circuits to false. Without the fix, + // the DPP walk re-runs, fails to see the scan, and flips to true. + val hiddenChild = OpaqueStageStub(shuffle.child.output) + val postAqeShuffle = + shuffle.withNewChildren(Seq(hiddenChild)).asInstanceOf[ShuffleExchangeExec] + val postAqeDecision = CometShuffleExchangeExec.columnarShuffleSupported(postAqeShuffle) + + val postAqeDppVisible = postAqeShuffle.child.exists { + case scan: FileSourceScanExec => + scan.partitionFilters.exists(_.exists(_.isInstanceOf[PlanExpression[_]])) + case _ => false + } + + assert(initialDppVisible, "initial child tree should expose DPP scan") + assert(!postAqeDppVisible, "stage-wrapped child should hide DPP scan") + assert(!initialDecision, s"expected fall back initially, got $initialDecision") + assert( + !postAqeDecision, + s"decision must stay 'fall back' across the AQE-style wrap, got $postAqeDecision") + } + } + } + + // ---------------------------------------------------------------------- + // End-to-end: actually run DPP-flavored queries and look for the bug. + // ---------------------------------------------------------------------- + + // Walk the executed plan descending into QueryStageExec.plan, and return any Comet + // shuffle whose subtree still contains a DPP scan -- the cross-pass inconsistency the + // bug would create. + private def findInconsistentCometShuffles(plan: SparkPlan): Seq[SparkPlan] = { + import org.apache.spark.sql.execution.adaptive.QueryStageExec + def containsDppScan(p: SparkPlan): Boolean = p match { + case scan: FileSourceScanExec => + scan.partitionFilters.exists(_.exists(_.isInstanceOf[PlanExpression[_]])) + case stage: QueryStageExec => containsDppScan(stage.plan) + case other => other.children.exists(containsDppScan) + } + val acc = mutable.Buffer.empty[SparkPlan] + def walk(p: SparkPlan): Unit = { + p match { + case s: CometShuffleExchangeExec if containsDppScan(s) => + acc += s + case _ => + } + p match { + case stage: QueryStageExec => walk(stage.plan) + case _ => p.children.foreach(walk) + } + } + walk(plan) + acc.toSeq + } + + // Match the executed plan: any Comet shuffle whose explain-info already says it should have + // fallen back due to DPP. That's the smoking-gun pattern from the issue: + // CometColumnarExchange [COMET: Stage contains a scan with Dynamic Partition Pruning] + private def findCometShufflesTaggedAsDppFallback(plan: SparkPlan): Seq[SparkPlan] = { + import org.apache.spark.sql.execution.adaptive.QueryStageExec + val acc = mutable.Buffer.empty[SparkPlan] + def walk(p: SparkPlan): Unit = { + p match { + case s: CometShuffleExchangeExec => + val tags = s.getTagValue(CometExplainInfo.EXTENSION_INFO).getOrElse(Set.empty[String]) + if (tags.exists(_.contains("Dynamic Partition Pruning"))) acc += s + case _ => + } + p match { + case stage: QueryStageExec => walk(stage.plan) + case _ => p.children.foreach(walk) + } + } + walk(plan) + acc.toSeq + } + + private def buildDppTablesShared(dir: java.io.File): Unit = { + val factPath = s"${dir.getAbsolutePath}/e2e_fact.parquet" + val dimPath = s"${dir.getAbsolutePath}/e2e_dim.parquet" + val dim2Path = s"${dir.getAbsolutePath}/e2e_dim2.parquet" + withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "false") { + val sess = spark + import sess.implicits._ + val oneDay = 24L * 60L * 60000L + val now = System.currentTimeMillis() + // larger fact so AQE actually has stages to materialize + (0 until 4000) + .map(i => (i, new java.sql.Date(now + (i % 80) * oneDay), i.toString, i % 10)) + .toDF("fact_id", "fact_date", "fact_str", "fact_grp") + .write + .partitionBy("fact_date") + .parquet(factPath) + (0 until 80) + .map(i => (i, new java.sql.Date(now + i * oneDay), i.toString)) + .toDF("dim_id", "dim_date", "dim_str") + .write + .parquet(dimPath) + (0 until 10) + .map(i => (i, s"g$i")) + .toDF("grp_id", "grp_str") + .write + .parquet(dim2Path) + } + spark.read.parquet(factPath).createOrReplaceTempView("e2e_fact") + spark.read.parquet(dimPath).createOrReplaceTempView("e2e_dim") + spark.read.parquet(dim2Path).createOrReplaceTempView("e2e_dim2") + } + + // A handful of DPP-flavored queries in roughly increasing complexity. The last few mimic the + // q14a structure (UNION ALL of multiple DPP-using subqueries with HAVING and outer aggregate) + // because the issue specifically hit q14a / q14b / q31 / q47 / q57. + private val queries: Seq[String] = Seq( + // 1. Plain SMJ DPP + """select f.fact_date, count(*) c + |from e2e_fact f join e2e_dim d on f.fact_date = d.dim_date + |where d.dim_id > 70 + |group by f.fact_date""".stripMargin, + // 2. Aggregation above DPP join, then second join on the aggregate + """select g.grp_str, sum(t.c) total from e2e_dim2 g join ( + | select f.fact_grp, count(*) c + | from e2e_fact f join e2e_dim d on f.fact_date = d.dim_date + | where d.dim_id > 60 + | group by f.fact_grp + |) t on g.grp_id = t.fact_grp + |group by g.grp_str""".stripMargin, + // 3. UNION ALL of two DPP-using subqueries, outer aggregate -- q14a-style. + """select channel, fact_grp, sum(c) total from ( + | select 'a' channel, f.fact_grp, count(*) c + | from e2e_fact f join e2e_dim d on f.fact_date = d.dim_date + | where d.dim_id > 60 + | group by f.fact_grp + | union all + | select 'b' channel, f.fact_grp, count(*) c + | from e2e_fact f join e2e_dim d on f.fact_date = d.dim_date + | where d.dim_id > 70 + | group by f.fact_grp + |) y group by channel, fact_grp""".stripMargin, + // 4. UNION ALL with rollup -- pushes the bug surface higher up the plan. + """select channel, fact_grp, sum(c) total from ( + | select 'a' channel, f.fact_grp, count(*) c + | from e2e_fact f join e2e_dim d on f.fact_date = d.dim_date + | where d.dim_id > 60 + | group by f.fact_grp + | union all + | select 'b' channel, f.fact_grp, count(*) c + | from e2e_fact f join e2e_dim d on f.fact_date = d.dim_date + | where d.dim_id > 70 + | group by f.fact_grp + | union all + | select 'c' channel, f.fact_grp, count(*) c + | from e2e_fact f join e2e_dim d on f.fact_date = d.dim_date + | where d.dim_id > 50 + | group by f.fact_grp + |) y group by rollup (channel, fact_grp)""".stripMargin, + // 5. q14a-style with HAVING + scalar subquery (forces broadcast-side stage materialization + // BEFORE outer planning -- the configuration that historically reproduced #3949). + """with avg_sales as ( + | select avg(c) avg_c from ( + | select count(*) c from e2e_fact f + | join e2e_dim d on f.fact_date = d.dim_date + | where d.dim_id > 50 + | group by f.fact_grp + | ) x + |) + |select 'store' channel, f.fact_grp, count(*) c + |from e2e_fact f join e2e_dim d on f.fact_date = d.dim_date + |where d.dim_id > 60 + |group by f.fact_grp + |having count(*) > (select avg_c from avg_sales) + |union all + |select 'web' channel, f.fact_grp, count(*) c + |from e2e_fact f join e2e_dim d on f.fact_date = d.dim_date + |where d.dim_id > 70 + |group by f.fact_grp + |having count(*) > (select avg_c from avg_sales)""".stripMargin) + + private val variants: Seq[(String, Map[String, String])] = Seq( + "smj+aqe" -> Map( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.PREFER_SORTMERGEJOIN.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true"), + "bhj+aqe" -> Map( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true"), + "smj+aqe+coalesce" -> Map( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.PREFER_SORTMERGEJOIN.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + "spark.sql.adaptive.coalescePartitions.enabled" -> "true", + "spark.sql.adaptive.coalescePartitions.minPartitionSize" -> "1b", + "spark.sql.adaptive.coalescePartitions.initialPartitionNum" -> "16")) + + test("end-to-end: collect DPP queries under AQE; look for #3949 symptoms") { + withTempDir { dir => + buildDppTablesShared(dir) + + val failures = mutable.Buffer.empty[(String, Int, String, String)] + val suspicious = mutable.Buffer.empty[(String, Int, String)] + + for ((variantName, variantConf) <- variants; (q, idx) <- queries.zipWithIndex) { + val conf = variantConf ++ Map( + CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "true", + SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") + try { + withSQLConf(conf.toSeq: _*) { + val df = spark.sql(q) + val rows: Array[Row] = df.collect() + rows.length // touch + + val executedPlan = df.queryExecution.executedPlan + val taggedFallback = findCometShufflesTaggedAsDppFallback(executedPlan) + val inconsistent = findInconsistentCometShuffles(executedPlan) + if (taggedFallback.nonEmpty) { + suspicious += (( + variantName, + idx, + "Comet shuffle tagged with DPP fallback reason but not fallen back " + + s"(${taggedFallback.size}). Plan:\n${executedPlan.treeString}")) + } + if (inconsistent.nonEmpty) { + suspicious += (( + variantName, + idx, + s"Comet shuffle still has DPP scan in subtree (${inconsistent.size}). " + + s"Plan:\n${executedPlan.treeString}")) + } + } + } catch { + case t: Throwable => + var c: Throwable = t + while (c.getCause != null && c.getCause != c) c = c.getCause + val sw = new java.io.StringWriter + c.printStackTrace(new java.io.PrintWriter(sw)) + val detail = Option(c.getMessage).getOrElse(c.toString) + "\n" + sw.toString + failures += ((variantName, idx, detail, c.getClass.getName)) + } + } + + // Demonstrate-the-bug assertion: if EITHER an #3949-shaped crash or a plan inconsistency + // was observed, the bug is reproduced. The 3949 signature is an AssertionError whose + // stack goes through ColumnarToRowExec. (Columnar.scala:70) during + // BroadcastExchangeExec.doCanonicalize. + val anyEvidence = failures.exists { case (_, _, msg, _) => + msg.contains("INTERNAL_ERROR") || + msg.contains("supportsColumnar") || + msg.contains("ColumnarToRowExec") || + msg.contains("doCanonicalize") + } || suspicious.nonEmpty + + val summary = new StringBuilder + summary.append(s"#3949 reproduced: ${failures.size} collect failure(s) and ") + summary.append(s"${suspicious.size} plan-shape inconsistencies\n") + summary.append("---- failures ----\n") + failures.foreach { case (v, i, msg, cls) => + summary.append(s"$v/q$i ($cls):\n").append(msg.take(4000)).append("\n") + } + summary.append("---- suspicious ----\n") + suspicious.foreach { case (v, i, note) => + summary.append(s"$v/q$i: ${note.take(1500)}\n") + } + assert(!anyEvidence, summary.toString) + } + } +} + +/** + * `LeafExecNode` stub mirroring how `ShuffleQueryStageExec` presents to a `.exists` walk: + * `children == Seq.empty`, so descent stops at the wrapper. Used by the mechanism test only. + */ +private case class OpaqueStageStub(output: Seq[Attribute]) extends LeafExecNode { + override protected def doExecute(): RDD[InternalRow] = + throw new UnsupportedOperationException("stub") +} diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/CometShuffleFallbackStickinessSuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/CometShuffleFallbackStickinessSuite.scala new file mode 100644 index 0000000000..7e3ee63502 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/comet/CometShuffleFallbackStickinessSuite.scala @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.SinglePartition +import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec +import org.apache.spark.sql.execution.LeafExecNode +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.internal.SQLConf + +import org.apache.comet.{CometConf, CometFallback} + +/** + * Pins the sticky-fallback invariant for Comet shuffle decisions: `nativeShuffleSupported` / + * `columnarShuffleSupported` must return `false` whenever the shuffle already carries a + * `CometFallback` marker from a prior rule pass. + * + * Without this behavior, AQE's stage-prep rule re-evaluation can flip the decision — e.g., + * `stageContainsDPPScan` walks the shuffle's child tree with `.exists`, but a materialized child + * stage is wrapped in `ShuffleQueryStageExec` (a `LeafExecNode`) so `.exists` stops at the + * wrapper and the DPP scan becomes invisible. That causes the same shuffle to fall back to Spark + * at initial planning and then convert to Comet at stage prep, producing plan-shape + * inconsistencies across the two passes (suspected mechanism behind #3949). + * + * Fallback decisions that must survive AQE replanning use `CometFallback.markForFallback`. The + * shuffle-support predicates check `isMarkedForFallback` at the top and short-circuit. + */ +class CometShuffleFallbackStickinessSuite extends CometTestBase { + + test("both support predicates fall back when the shuffle carries a CometFallback marker") { + val shuffle = ShuffleExchangeExec(SinglePartition, SyntheticLeaf(Nil)) + CometFallback.markForFallback(shuffle, "pretend prior pass decided Spark fallback") + + withSQLConf(CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "true") { + assert( + !CometShuffleExchangeExec.columnarShuffleSupported(shuffle), + "marked shuffle must preserve its prior-pass fallback decision (columnar path)") + assert( + !CometShuffleExchangeExec.nativeShuffleSupported(shuffle), + "marked shuffle must preserve its prior-pass fallback decision (native path)") + } + } + + test("informational explain-info alone does NOT force fallback") { + // A shuffle can accumulate explain info (e.g. 'Comet native shuffle not enabled') as + // informational output from earlier checks without being a full-fallback signal. That + // info must not cause the columnar path to decline. + val shuffle = ShuffleExchangeExec(SinglePartition, SyntheticLeaf(Nil)) + // Note: withInfo, not markForFallback. + org.apache.comet.CometSparkSessionExtensions + .withInfo(shuffle, "Comet native shuffle not enabled") + assert( + !CometFallback.isMarkedForFallback(shuffle), + "explain info alone must not imply a sticky fallback marker") + } + + test( + "DPP fallback decision is sticky across two invocations even when the child tree changes") { + withTempDir { dir => + val factPath = s"${dir.getAbsolutePath}/fact.parquet" + val dimPath = s"${dir.getAbsolutePath}/dim.parquet" + withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "false") { + val sess = spark + import sess.implicits._ + val oneDay = 24L * 60L * 60000L + val now = System.currentTimeMillis() + (0 until 400) + .map(i => (i, new java.sql.Date(now + (i % 40) * oneDay), i.toString)) + .toDF("fact_id", "fact_date", "fact_str") + .write + .partitionBy("fact_date") + .parquet(factPath) + (0 until 40) + .map(i => (i, new java.sql.Date(now + i * oneDay), i.toString)) + .toDF("dim_id", "dim_date", "dim_str") + .write + .parquet(dimPath) + } + spark.read.parquet(factPath).createOrReplaceTempView("t_sticky_fact") + spark.read.parquet(dimPath).createOrReplaceTempView("t_sticky_dim") + + withSQLConf( + CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.PREFER_SORTMERGEJOIN.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + + val df = spark.sql( + "select f.fact_date, count(*) c from t_sticky_fact f " + + "join t_sticky_dim d on f.fact_date = d.dim_date " + + "where d.dim_id > 35 group by f.fact_date") + val initial = df.queryExecution.executedPlan match { + case a: org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec => a.initialPlan + case other => other + } + val shuffle = + initial + .collectFirst { case s: ShuffleExchangeExec => s } + .getOrElse(fail(s"no shuffle found:\n${initial.treeString}")) + + // Pass 1: real DPP subtree visible. Returns false AND marks the shuffle. + val first = CometShuffleExchangeExec.columnarShuffleSupported(shuffle) + assert(!first, "initial pass must fall back (DPP visible)") + assert( + CometFallback.isMarkedForFallback(shuffle), + "fallback marker must be placed on the shuffle") + + // Pass 2 simulates AQE stage-prep: replace the child with an opaque leaf that hides + // the DPP subtree from tree walks. A naive `.exists`-based check would flip to true + // here; the sticky marker must keep the decision stable. + val reshapedShuffle = + shuffle + .withNewChildren(Seq(SyntheticLeaf(shuffle.child.output))) + .asInstanceOf[ShuffleExchangeExec] + val second = CometShuffleExchangeExec.columnarShuffleSupported(reshapedShuffle) + assert( + !second, + "second pass must still fall back even though the DPP subtree is now hidden") + } + } + } +} + +private case class SyntheticLeaf(output: Seq[Attribute]) extends LeafExecNode { + override protected def doExecute(): RDD[InternalRow] = + throw new UnsupportedOperationException("stub") +}