diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala index ef22c0ab44e4..7f3752fdade4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala @@ -184,6 +184,7 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join private def calculatePlanOverhead(plan: LogicalPlan): Float = { val (cached, notCached) = plan.collectLeaves().partition(p => p match { case _: InMemoryRelation => true + case _: CommandResult => true case _ => false }) val scanOverhead = notCached.map(_.stats.sizeInBytes).sum.toFloat @@ -195,6 +196,7 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join m.stats.sizeInBytes.toFloat * 0.2 case m: InMemoryRelation if m.cacheBuilder.storageLevel.useMemory => 0.0 + case _: CommandResult => 0.0 }.sum.toFloat scanOverhead + cachedOverhead } @@ -206,6 +208,7 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join private def hasSelectivePredicate(plan: LogicalPlan): Boolean = { plan.exists { case f: Filter => isLikelySelective(f.condition) + case _: CommandResult => true case _ => false } } @@ -217,7 +220,8 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join * (2) it needs to contain a selective predicate used for filtering */ private def hasPartitionPruningFilter(plan: LogicalPlan): Boolean = { - !plan.isStreaming && hasSelectivePredicate(plan) + !plan.isStreaming && + hasSelectivePredicate(plan) } private def prune(plan: LogicalPlan): LogicalPlan = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala index e1a2fd33c7c9..34cdb3f82d6e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -1821,6 +1821,45 @@ class DynamicPartitionPruningV1SuiteAEOn extends DynamicPartitionPruningV1Suite checkAnswer(df, Row(1000, 1) :: Row(1010, 2) :: Row(1020, 2) :: Nil) } } + + test("SPARK-54554: DPP with CommandResult from SHOW PARTITIONS in broadcast join") { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", + SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { + // Get max partition from SHOW PARTITIONS (CommandResult) + val maxPartitionDF = sql("SHOW PARTITIONS fact_stats") + .agg(max("partition").alias("max_partition")) + .selectExpr("split(max_partition, '=')[1] as max_store_id") + + // Register as temp view + maxPartitionDF.createOrReplaceTempView("max_partition") + + // Join partitioned table with CommandResult + val df = sql( + """ + |SELECT f.date_id, f.product_id, f.store_id, f.units_sold + |FROM fact_stats f + |JOIN max_partition m ON f.store_id = m.max_store_id + """.stripMargin) + + checkPartitionPruningPredicate(df, false, true) + + checkAnswer(df, + Row(1150, 1, 9, 20) :: Nil + ) + + // Verify DPP predicates exist in the optimized logical plan + val optimizedPlan = df.queryExecution.optimizedPlan.toString() + assert(optimizedPlan.contains("DynamicPruningSubquery") || + optimizedPlan.contains("dynamicpruning"), + s"Optimized plan should contain DynamicPruningSubquery:\n$optimizedPlan") + + // Verify the executed plan shows partition pruning happened + val executedPlan = df.queryExecution.executedPlan.toString() + assert(executedPlan.contains("SubqueryBroadcast") || + executedPlan.contains("dynamicpruning"), + s"Executed plan should show dynamic pruning:\n$executedPlan") + } + } } abstract class DynamicPartitionPruningV2Suite extends DynamicPartitionPruningDataSourceSuiteBase {