From 7406e857a5eec5b9486ec7bbc4a65601b5404b6b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 2 Apr 2026 11:01:58 -0600 Subject: [PATCH] fix: skip Comet columnar shuffle for stages with DPP scans (#3879) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a scan uses Dynamic Partition Pruning (DPP) and falls back to Spark, Comet was still wrapping the stage with columnar shuffle, creating inefficient row-to-columnar transitions: CometShuffleWriter → CometRowToColumnar → SparkFilter → SparkColumnarToRow → SparkScan This adds a check in columnarShuffleSupported() that walks the child plan tree to detect FileSourceScanExec nodes with dynamic pruning filters. When found, the shuffle is not converted to Comet, allowing the entire stage to fall back to Spark. --- .../shuffle/CometShuffleExchangeExec.scala | 23 ++++++++++- .../apache/comet/exec/CometExecSuite.scala | 38 ++++++++++++++++++- 2 files changed, 59 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index d65a6b21f4..df2dca0331 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -30,7 +30,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper} -import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, Expression, PlanExpression, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.catalyst.plans.physical._ @@ -454,6 +454,11 @@ object CometShuffleExchangeExec return false } + if (CometConf.COMET_DPP_FALLBACK_ENABLED.get() && stageContainsDPPScan(s)) { + withInfo(s, "Stage contains a scan with Dynamic Partition Pruning") + return false + } + if (!isCometJVMShuffleMode(s.conf)) { withInfo(s, "Comet columnar shuffle not enabled") return false @@ -546,6 +551,22 @@ object CometShuffleExchangeExec } } + /** + * Returns true if the stage (the subtree rooted at this shuffle) contains a scan with Dynamic + * Partition Pruning (DPP). When DPP is present, the scan falls back to Spark, and wrapping the + * stage with Comet shuffle creates inefficient row-to-columnar transitions. + */ + private def stageContainsDPPScan(s: ShuffleExchangeExec): Boolean = { + def isDynamicPruningFilter(e: Expression): Boolean = + e.exists(_.isInstanceOf[PlanExpression[_]]) + + s.child.exists { + case scan: FileSourceScanExec => + scan.partitionFilters.exists(isDynamicPruningFilter) + case _ => false + } + } + def isCometShuffleEnabledWithInfo(op: SparkPlan): Boolean = { if (!COMET_EXEC_SHUFFLE_ENABLED.get(op.conf)) { withInfo( diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 0bf9bbc95b..4e12e0b722 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -139,8 +139,44 @@ class CometExecSuite extends CometTestBase { val (_, cometPlan) = checkSparkAnswer(df) val infos = new ExtendedExplainInfo().generateExtendedInfo(cometPlan) assert(infos.contains("Dynamic Partition Pruning is not supported")) + } + } + } + } + + test("DPP fallback avoids inefficient Comet shuffle (#3874)") { + withTempDir { path => + val factPath = s"${path.getAbsolutePath}/fact.parquet" + val dimPath = s"${path.getAbsolutePath}/dim.parquet" + withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "false") { + val one_day = 24 * 60 * 60000 + val fact = Range(0, 100) + .map(i => (i, new java.sql.Date(System.currentTimeMillis() + i * one_day), i.toString)) + .toDF("fact_id", "fact_date", "fact_str") + fact.write.partitionBy("fact_date").parquet(factPath) + val dim = Range(0, 10) + .map(i => (i, new java.sql.Date(System.currentTimeMillis() + i * one_day), i.toString)) + .toDF("dim_id", "dim_date", "dim_str") + dim.write.parquet(dimPath) + } + + // Force sort-merge join to get a shuffle exchange above the DPP scan + Seq("parquet").foreach { v1List => + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> v1List, + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "true") { + spark.read.parquet(factPath).createOrReplaceTempView("dpp_fact2") + spark.read.parquet(dimPath).createOrReplaceTempView("dpp_dim2") + val df = + spark.sql( + "select * from dpp_fact2 join dpp_dim2 on fact_date = dim_date where dim_id > 7") + val (_, cometPlan) = checkSparkAnswer(df) - assert(infos.contains("Comet accelerated")) + // Verify no CometShuffleExchangeExec wraps the DPP stage + assert( + !cometPlan.toString().contains("CometColumnarShuffle"), + "Should not use Comet columnar shuffle for stages with DPP scans") } } }