Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join
private def calculatePlanOverhead(plan: LogicalPlan): Float = {
val (cached, notCached) = plan.collectLeaves().partition(p => p match {
case _: InMemoryRelation => true
case _: CommandResult => true
case _ => false
})
val scanOverhead = notCached.map(_.stats.sizeInBytes).sum.toFloat
Expand All @@ -195,6 +196,7 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join
m.stats.sizeInBytes.toFloat * 0.2
case m: InMemoryRelation if m.cacheBuilder.storageLevel.useMemory =>
0.0
case _: CommandResult => 0.0
}.sum.toFloat
scanOverhead + cachedOverhead
}
Expand All @@ -206,6 +208,7 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join
private def hasSelectivePredicate(plan: LogicalPlan): Boolean = {
plan.exists {
case f: Filter => isLikelySelective(f.condition)
case _: CommandResult => true
case _ => false
}
}
Expand All @@ -217,7 +220,8 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join
* (2) it needs to contain a selective predicate used for filtering
*/
private def hasPartitionPruningFilter(plan: LogicalPlan): Boolean = {
!plan.isStreaming && hasSelectivePredicate(plan)
!plan.isStreaming &&
hasSelectivePredicate(plan)
}

private def prune(plan: LogicalPlan): LogicalPlan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1821,6 +1821,45 @@ class DynamicPartitionPruningV1SuiteAEOn extends DynamicPartitionPruningV1Suite
checkAnswer(df, Row(1000, 1) :: Row(1010, 2) :: Row(1020, 2) :: Nil)
}
}

test("SPARK-54554: DPP with CommandResult from SHOW PARTITIONS in broadcast join") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
// Get max partition from SHOW PARTITIONS (CommandResult)
val maxPartitionDF = sql("SHOW PARTITIONS fact_stats")
.agg(max("partition").alias("max_partition"))
.selectExpr("split(max_partition, '=')[1] as max_store_id")

// Register as temp view
maxPartitionDF.createOrReplaceTempView("max_partition")

// Join partitioned table with CommandResult
val df = sql(
"""
|SELECT f.date_id, f.product_id, f.store_id, f.units_sold
|FROM fact_stats f
|JOIN max_partition m ON f.store_id = m.max_store_id
""".stripMargin)

checkPartitionPruningPredicate(df, false, true)

checkAnswer(df,
Row(1150, 1, 9, 20) :: Nil
)

// Verify DPP predicates exist in the optimized logical plan
val optimizedPlan = df.queryExecution.optimizedPlan.toString()
assert(optimizedPlan.contains("DynamicPruningSubquery") ||
optimizedPlan.contains("dynamicpruning"),
s"Optimized plan should contain DynamicPruningSubquery:\n$optimizedPlan")

// Verify the executed plan shows partition pruning happened
val executedPlan = df.queryExecution.executedPlan.toString()
assert(executedPlan.contains("SubqueryBroadcast") ||
executedPlan.contains("dynamicpruning"),
s"Executed plan should show dynamic pruning:\n$executedPlan")
}
}
}

abstract class DynamicPartitionPruningV2Suite extends DynamicPartitionPruningDataSourceSuiteBase {
Expand Down