Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, Expression, PlanExpression, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.plans.physical._
Expand Down Expand Up @@ -454,6 +454,11 @@ object CometShuffleExchangeExec
return false
}

if (CometConf.COMET_DPP_FALLBACK_ENABLED.get() && stageContainsDPPScan(s)) {
withInfo(s, "Stage contains a scan with Dynamic Partition Pruning")
return false
}

if (!isCometJVMShuffleMode(s.conf)) {
withInfo(s, "Comet columnar shuffle not enabled")
return false
Expand Down Expand Up @@ -546,6 +551,22 @@ object CometShuffleExchangeExec
}
}

/**
* Returns true if the stage (the subtree rooted at this shuffle) contains a scan with Dynamic
* Partition Pruning (DPP). When DPP is present, the scan falls back to Spark, and wrapping the
* stage with Comet shuffle creates inefficient row-to-columnar transitions.
*/
private def stageContainsDPPScan(s: ShuffleExchangeExec): Boolean = {
def isDynamicPruningFilter(e: Expression): Boolean =
e.exists(_.isInstanceOf[PlanExpression[_]])

s.child.exists {
case scan: FileSourceScanExec =>
scan.partitionFilters.exists(isDynamicPruningFilter)
case _ => false
}
}

def isCometShuffleEnabledWithInfo(op: SparkPlan): Boolean = {
if (!COMET_EXEC_SHUFFLE_ENABLED.get(op.conf)) {
withInfo(
Expand Down
38 changes: 37 additions & 1 deletion spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,44 @@ class CometExecSuite extends CometTestBase {
val (_, cometPlan) = checkSparkAnswer(df)
val infos = new ExtendedExplainInfo().generateExtendedInfo(cometPlan)
assert(infos.contains("Dynamic Partition Pruning is not supported"))
}
}
}
}

test("DPP fallback avoids inefficient Comet shuffle (#3874)") {
withTempDir { path =>
val factPath = s"${path.getAbsolutePath}/fact.parquet"
val dimPath = s"${path.getAbsolutePath}/dim.parquet"
withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "false") {
val one_day = 24 * 60 * 60000
val fact = Range(0, 100)
.map(i => (i, new java.sql.Date(System.currentTimeMillis() + i * one_day), i.toString))
.toDF("fact_id", "fact_date", "fact_str")
fact.write.partitionBy("fact_date").parquet(factPath)
val dim = Range(0, 10)
.map(i => (i, new java.sql.Date(System.currentTimeMillis() + i * one_day), i.toString))
.toDF("dim_id", "dim_date", "dim_str")
dim.write.parquet(dimPath)
}

// Force sort-merge join to get a shuffle exchange above the DPP scan
Seq("parquet").foreach { v1List =>
withSQLConf(
SQLConf.USE_V1_SOURCE_LIST.key -> v1List,
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "true") {
spark.read.parquet(factPath).createOrReplaceTempView("dpp_fact2")
spark.read.parquet(dimPath).createOrReplaceTempView("dpp_dim2")
val df =
spark.sql(
"select * from dpp_fact2 join dpp_dim2 on fact_date = dim_date where dim_id > 7")
val (_, cometPlan) = checkSparkAnswer(df)

assert(infos.contains("Comet accelerated"))
// Verify no CometShuffleExchangeExec wraps the DPP stage
assert(
!cometPlan.toString().contains("CometColumnarShuffle"),
"Should not use Comet columnar shuffle for stages with DPP scans")
}
}
}
Expand Down
Loading