diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index d65a6b21f4..df2dca0331 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -30,7 +30,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper} -import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, Expression, PlanExpression, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.catalyst.plans.physical._ @@ -454,6 +454,11 @@ object CometShuffleExchangeExec return false } + if (CometConf.COMET_DPP_FALLBACK_ENABLED.get() && stageContainsDPPScan(s)) { + withInfo(s, "Stage contains a scan with Dynamic Partition Pruning") + return false + } + if (!isCometJVMShuffleMode(s.conf)) { withInfo(s, "Comet columnar shuffle not enabled") return false @@ -546,6 +551,22 @@ object CometShuffleExchangeExec } } + /** + * Returns true if the stage (the subtree rooted at this shuffle) contains a scan with Dynamic + * Partition Pruning (DPP). When DPP is present, the scan falls back to Spark, and wrapping the + * stage with Comet shuffle creates inefficient row-to-columnar transitions. + */ + private def stageContainsDPPScan(s: ShuffleExchangeExec): Boolean = { + def isDynamicPruningFilter(e: Expression): Boolean = + e.exists(_.isInstanceOf[PlanExpression[_]]) + + s.child.exists { + case scan: FileSourceScanExec => + scan.partitionFilters.exists(isDynamicPruningFilter) + case _ => false + } + } + def isCometShuffleEnabledWithInfo(op: SparkPlan): Boolean = { if (!COMET_EXEC_SHUFFLE_ENABLED.get(op.conf)) { withInfo( diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 0bf9bbc95b..4e12e0b722 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -139,8 +139,44 @@ class CometExecSuite extends CometTestBase { val (_, cometPlan) = checkSparkAnswer(df) val infos = new ExtendedExplainInfo().generateExtendedInfo(cometPlan) assert(infos.contains("Dynamic Partition Pruning is not supported")) + } + } + } + } + + test("DPP fallback avoids inefficient Comet shuffle (#3874)") { + withTempDir { path => + val factPath = s"${path.getAbsolutePath}/fact.parquet" + val dimPath = s"${path.getAbsolutePath}/dim.parquet" + withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "false") { + val one_day = 24 * 60 * 60000 + val fact = Range(0, 100) + .map(i => (i, new java.sql.Date(System.currentTimeMillis() + i * one_day), i.toString)) + .toDF("fact_id", "fact_date", "fact_str") + fact.write.partitionBy("fact_date").parquet(factPath) + val dim = Range(0, 10) + .map(i => (i, new java.sql.Date(System.currentTimeMillis() + i * one_day), i.toString)) + .toDF("dim_id", "dim_date", "dim_str") + dim.write.parquet(dimPath) + } + + // Force sort-merge join to get a shuffle exchange above the DPP scan + Seq("parquet").foreach { v1List => + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> v1List, + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "true") { + spark.read.parquet(factPath).createOrReplaceTempView("dpp_fact2") + spark.read.parquet(dimPath).createOrReplaceTempView("dpp_dim2") + val df = + spark.sql( + "select * from dpp_fact2 join dpp_dim2 on fact_date = dim_date where dim_id > 7") + val (_, cometPlan) = checkSparkAnswer(df) - assert(infos.contains("Comet accelerated")) + // Verify no CometShuffleExchangeExec wraps the DPP stage + assert( + !cometPlan.toString().contains("CometColumnarShuffle"), + "Should not use Comet columnar shuffle for stages with DPP scans") } } }