Skip to content

Commit

Permalink
[SPARK-34637][SQL] Support DPP + AQE when the broadcast exchange can …
Browse files Browse the repository at this point in the history
…be reused

### What changes were proposed in this pull request?
We have supported DPP in AQE when the join is Broadcast hash join before applying the AQE rules in [SPARK-34168](https://issues.apache.org/jira/browse/SPARK-34168), which has some limitations. It only apply DPP when the small table side executed firstly and then the big table side can reuse the broadcast exchange in small table side. This PR is to address the above limitations and can apply the DPP when the broadcast exchange can be reused.

### Why are the changes needed?
Resolve the limitations when both enabling DPP and AQE

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Adding new ut

Closes #31756 from JkSelf/supportDPP2.

Authored-by: jiake <ke.a.jia@intel.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
JkSelf authored and cloud-fan committed May 13, 2021
1 parent d1b8bd7 commit b6d57b6
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ import org.apache.spark.util.ThreadUtils
*
* @param index the index of the join key in the list of keys from the build side
* @param buildKeys the join keys from the build side of the join used
* @param child the BroadcastExchange from the build side of the join
* @param child the BroadcastExchange or the AdaptiveSparkPlan with BroadcastQueryStageExec
* from the build side of the join
*/
case class SubqueryBroadcastExec(
name: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection.mutable
import scala.concurrent.ExecutionContext
import scala.util.control.NonFatal

import org.apache.spark.SparkException
import org.apache.spark.{broadcast, SparkException}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -94,7 +94,7 @@ case class AdaptiveSparkPlanExec(
// A list of physical optimizer rules to be applied to a new stage before its execution. These
// optimizations should be stage-independent.
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
PlanAdaptiveDynamicPruningFilters(context.stageCache),
PlanAdaptiveDynamicPruningFilters(this),
ReuseAdaptiveSubquery(context.subqueryCache),
CoalesceShufflePartitions(context.session),
// The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs'
Expand Down Expand Up @@ -310,6 +310,10 @@ case class AdaptiveSparkPlanExec(
rdd
}

override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
getFinalPhysicalPlan().doExecuteBroadcast()
}

protected override def stringArgs: Iterator[Any] = Iterator(s"isFinalPlan=$isFinalPlan")

override def generateTreeString(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,18 @@

package org.apache.spark.sql.execution.adaptive

import scala.collection.concurrent.TrieMap

import org.apache.spark.sql.catalyst.expressions.{BindReferences, DynamicPruningExpression, Literal}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
import org.apache.spark.sql.execution.joins.{HashedRelationBroadcastMode, HashJoin}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashedRelationBroadcastMode, HashJoin}

/**
* A rule to insert dynamic pruning predicates in order to reuse the results of broadcast.
*/
case class PlanAdaptiveDynamicPruningFilters(
stageCache: TrieMap[SparkPlan, QueryStageExec]) extends Rule[SparkPlan] {
rootPlan: AdaptiveSparkPlanExec) extends Rule[SparkPlan] with AdaptiveSparkPlanHelper {
def apply(plan: SparkPlan): SparkPlan = {
if (!conf.dynamicPartitionPruningEnabled) {
return plan
Expand All @@ -44,12 +43,22 @@ case class PlanAdaptiveDynamicPruningFilters(
val mode = HashedRelationBroadcastMode(packedKeys)
// plan a broadcast exchange of the build side of the join
val exchange = BroadcastExchangeExec(mode, adaptivePlan.executedPlan)
val existingStage = stageCache.get(exchange.canonicalized)
if (existingStage.nonEmpty && conf.exchangeReuseEnabled) {
val name = s"dynamicpruning#${exprId.id}"
val reuseQueryStage = existingStage.get.newReuseInstance(0, exchange.output)
val broadcastValues =
SubqueryBroadcastExec(name, index, buildKeys, reuseQueryStage)

val canReuseExchange = conf.exchangeReuseEnabled && buildKeys.nonEmpty &&
find(rootPlan) {
case BroadcastHashJoinExec(_, _, _, BuildLeft, _, left, _, _) =>
left.sameResult(exchange)
case BroadcastHashJoinExec(_, _, _, BuildRight, _, _, right, _) =>
right.sameResult(exchange)
case _ => false
}.isDefined

if (canReuseExchange) {
exchange.setLogicalLink(adaptivePlan.executedPlan.logicalLink.get)
val newAdaptivePlan = adaptivePlan.copy(inputPlan = exchange)

val broadcastValues = SubqueryBroadcastExec(
name, index, buildKeys, newAdaptivePlan)
DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))
} else {
DynamicPruningExpression(Literal.TrueLiteral)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,17 @@ abstract class DynamicPartitionPruningSuiteBase
case _ => false
}.isDefined
assert(hasReuse, s"$s\nshould have been reused in\n$plan")
case a: AdaptiveSparkPlanExec =>
val broadcastQueryStage = collectFirst(a) {
case b: BroadcastQueryStageExec => b
}
val broadcastPlan = broadcastQueryStage.get.broadcast
val hasReuse = find(plan) {
case ReusedExchangeExec(_, e) => e eq broadcastPlan
case b: BroadcastExchangeLike => b eq broadcastPlan
case _ => false
}.isDefined
assert(hasReuse, s"$s\nshould have been reused in\n$plan")
case _ =>
fail(s"Invalid child node found in\n$s")
}
Expand Down Expand Up @@ -1463,6 +1474,37 @@ abstract class DynamicPartitionPruningSuiteBase
}
}
}

test("SPARK-34637: DPP side broadcast query stage is created firstly") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
val df = sql(
""" WITH v as (
| SELECT f.store_id FROM fact_stats f WHERE f.units_sold = 70 group by f.store_id
| )
|
| SELECT * FROM v v1 join v v2 WHERE v1.store_id = v2.store_id
""".stripMargin)

// A possible resulting query plan:
// BroadcastHashJoin
// +- HashAggregate
// +- ShuffleQueryStage
// +- Exchange
// +- HashAggregate
// +- Filter
// +- FileScan [PartitionFilters: dynamicpruning#3385]
// +- SubqueryBroadcast dynamicpruning#3385
// +- AdaptiveSparkPlan
// +- BroadcastQueryStage
// +- BroadcastExchange
//
// +- BroadcastQueryStage
// +- ReusedExchange

checkPartitionPruningPredicate(df, false, true)
checkAnswer(df, Row(15, 15) :: Nil)
}
}
}

class DynamicPartitionPruningSuiteAEOff extends DynamicPartitionPruningSuiteBase
Expand Down

0 comments on commit b6d57b6

Please sign in to comment.