Skip to content

Commit

Permalink
[SPARK-28753][SQL] Dynamically reuse subqueries in AQE
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This PR changes subquery reuse in Adaptive Query Execution from compile-time static reuse to execution-time dynamic reuse. This PR adds a `ReuseAdaptiveSubquery` rule that applies to a query stage after it is created and before it is executed. The new dynamic reuse enables subqueries to be reused across all different subquery levels.

### Why are the changes needed?
This is an improvement to the current subquery reuse in Adaptive Query Execution, which allows subquery reuse to happen in a lazy fashion as well as at different subquery levels.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Passed existing tests.

Closes #25471 from maryannxue/aqe-dynamic-sub-reuse.

Authored-by: maryannxue <maryannxue@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
maryannxue authored and cloud-fan committed Aug 20, 2019
1 parent d045221 commit 39c1127
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ import org.apache.spark.util.ThreadUtils
case class AdaptiveSparkPlanExec(
initialPlan: SparkPlan,
@transient session: SparkSession,
@transient subqueryMap: Map[Long, ExecSubqueryExpression],
@transient preprocessingRules: Seq[Rule[SparkPlan]],
@transient subqueryCache: TrieMap[SparkPlan, BaseSubqueryExec],
@transient stageCache: TrieMap[SparkPlan, QueryStageExec],
@transient queryExecution: QueryExecution)
extends LeafExecNode {
Expand All @@ -73,24 +74,27 @@ case class AdaptiveSparkPlanExec(
override protected def batches: Seq[Batch] = Seq()
}

@transient private val ensureRequirements = EnsureRequirements(conf)

// A list of physical plan rules to be applied before creation of query stages. The physical
// plan should reach a final status of query stages (i.e., no more addition or removal of
// Exchange nodes) after running these rules.
@transient private val queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq(
PlanAdaptiveSubqueries(subqueryMap),
EnsureRequirements(conf)
private def queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq(
ensureRequirements
)

// A list of physical optimizer rules to be applied to a new stage before its execution. These
// optimizations should be stage-independent.
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
ReuseAdaptiveSubquery(conf, subqueryCache),
ReduceNumShufflePartitions(conf),
ApplyColumnarRulesAndInsertTransitions(session.sessionState.conf,
session.sessionState.columnarRules),
CollapseCodegenStages(conf)
)

@volatile private var currentPhysicalPlan = initialPlan
@volatile private var currentPhysicalPlan =
applyPhysicalRules(initialPlan, queryStagePreparationRules)

private var isFinalPlan = false

Expand Down Expand Up @@ -205,6 +209,16 @@ case class AdaptiveSparkPlanExec(
depth + 1, lastChildren :+ true, append, verbose, "", addSuffix = false, maxFields)
}

override def hashCode(): Int = initialPlan.hashCode()

override def equals(obj: Any): Boolean = {
if (!obj.isInstanceOf[AdaptiveSparkPlanExec]) {
return false
}

this.initialPlan == obj.asInstanceOf[AdaptiveSparkPlanExec].initialPlan
}

/**
* This method is called recursively to traverse the plan tree bottom-up and create a new query
* stage or try reusing an existing stage if the current node is an [[Exchange]] node and all of
Expand Down Expand Up @@ -356,7 +370,7 @@ case class AdaptiveSparkPlanExec(
val optimized = optimizer.execute(logicalPlan)
SparkSession.setActiveSession(session)
val sparkPlan = session.sessionState.planner.plan(ReturnAnswer(optimized)).next()
val newPlan = applyPhysicalRules(sparkPlan, queryStagePreparationRules)
val newPlan = applyPhysicalRules(sparkPlan, preprocessingRules ++ queryStagePreparationRules)
(newPlan, optimized)
}

Expand Down Expand Up @@ -403,17 +417,6 @@ object AdaptiveSparkPlanExec {
private val executionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("QueryStageCreator", 16))

/**
* Creates the list of physical plan rules to be applied before creation of query stages.
*/
def createQueryStagePreparationRules(
conf: SQLConf,
subqueryMap: Map[Long, ExecSubqueryExpression]): Seq[Rule[SparkPlan]] = {
Seq(
PlanAdaptiveSubqueries(subqueryMap),
EnsureRequirements(conf))
}

/**
* Apply a list of physical operator rules on a [[SparkPlan]].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ case class InsertAdaptiveSparkPlan(

private val conf = session.sessionState.conf

// Subquery-reuse is shared across the entire query.
private val subqueryCache = new TrieMap[SparkPlan, BaseSubqueryExec]()

// Exchange-reuse is shared across the entire query, including sub-queries.
private val stageCache = new TrieMap[SparkPlan, QueryStageExec]()

Expand All @@ -53,12 +56,13 @@ case class InsertAdaptiveSparkPlan(
// Plan sub-queries recursively and pass in the shared stage cache for exchange reuse. Fall
// back to non-adaptive mode if adaptive execution is supported in any of the sub-queries.
val subqueryMap = buildSubqueryMap(plan)
// Run preparation rules.
val preparations = AdaptiveSparkPlanExec.createQueryStagePreparationRules(
session.sessionState.conf, subqueryMap)
val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, preparations)
val planSubqueriesRule = PlanAdaptiveSubqueries(subqueryMap)
val preprocessingRules = Seq(
planSubqueriesRule)
// Run pre-processing rules.
val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, preprocessingRules)
logDebug(s"Adaptive execution enabled for plan: $plan")
AdaptiveSparkPlanExec(newPlan, session, subqueryMap, stageCache, qe)
AdaptiveSparkPlanExec(newPlan, session, preprocessingRules, subqueryCache, stageCache, qe)
} catch {
case SubqueryAdaptiveNotSupportedException(subquery) =>
logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " +
Expand Down Expand Up @@ -87,48 +91,33 @@ case class InsertAdaptiveSparkPlan(
* For each sub-query, generate the adaptive execution plan for each sub-query by applying this
* rule, or reuse the execution plan from another sub-query of the same semantics if possible.
*/
private def buildSubqueryMap(plan: SparkPlan): Map[Long, ExecSubqueryExpression] = {
val subqueryMapBuilder = mutable.HashMap.empty[Long, ExecSubqueryExpression]
private def buildSubqueryMap(plan: SparkPlan): mutable.HashMap[Long, ExecSubqueryExpression] = {
val subqueryMap = mutable.HashMap.empty[Long, ExecSubqueryExpression]
plan.foreach(_.expressions.foreach(_.foreach {
case expressions.ScalarSubquery(p, _, exprId)
if !subqueryMapBuilder.contains(exprId.id) =>
val executedPlan = getExecutedPlan(p)
if !subqueryMap.contains(exprId.id) =>
val executedPlan = compileSubquery(p)
verifyAdaptivePlan(executedPlan, p)
val scalarSubquery = execution.ScalarSubquery(
SubqueryExec(s"subquery${exprId.id}", executedPlan), exprId)
subqueryMapBuilder.put(exprId.id, scalarSubquery)
subqueryMap.put(exprId.id, scalarSubquery)
case _ =>
}))

// Reuse subqueries
if (session.sessionState.conf.subqueryReuseEnabled) {
// Build a hash map using schema of subqueries to avoid O(N*N) sameResult calls.
val reuseMap = mutable.HashMap[StructType, mutable.ArrayBuffer[BaseSubqueryExec]]()
subqueryMapBuilder.keySet.foreach { exprId =>
val sub = subqueryMapBuilder(exprId)
val sameSchema =
reuseMap.getOrElseUpdate(sub.plan.schema, mutable.ArrayBuffer.empty)
val sameResult = sameSchema.find(_.sameResult(sub.plan))
if (sameResult.isDefined) {
val newExpr = sub.withNewPlan(ReusedSubqueryExec(sameResult.get))
subqueryMapBuilder.update(exprId, newExpr)
} else {
sameSchema += sub.plan
}
}
}

subqueryMapBuilder.toMap
subqueryMap
}

private def getExecutedPlan(plan: LogicalPlan): SparkPlan = {
def compileSubquery(plan: LogicalPlan): SparkPlan = {
val queryExec = new QueryExecution(session, plan)
// Apply the same instance of this rule to sub-queries so that sub-queries all share the
// same `stageCache` for Exchange reuse.
val adaptivePlan = this.applyInternal(queryExec.sparkPlan, queryExec)
if (!adaptivePlan.isInstanceOf[AdaptiveSparkPlanExec]) {
throw SubqueryAdaptiveNotSupportedException(plan)
this.applyInternal(queryExec.sparkPlan, queryExec)
}

private def verifyAdaptivePlan(plan: SparkPlan, logicalPlan: LogicalPlan): Unit = {
if (!plan.isInstanceOf[AdaptiveSparkPlanExec]) {
throw SubqueryAdaptiveNotSupportedException(logicalPlan)
}
adaptivePlan
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{ExecSubqueryExpression, SparkPlan}

case class PlanAdaptiveSubqueries(
subqueryMap: Map[Long, ExecSubqueryExpression]) extends Rule[SparkPlan] {
subqueryMap: scala.collection.Map[Long, ExecSubqueryExpression]) extends Rule[SparkPlan] {

def apply(plan: SparkPlan): SparkPlan = {
plan.transformAllExpressions {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.adaptive

import scala.collection.concurrent.TrieMap

import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{BaseSubqueryExec, ExecSubqueryExpression, ReusedSubqueryExec, SparkPlan}
import org.apache.spark.sql.internal.SQLConf

case class ReuseAdaptiveSubquery(
conf: SQLConf,
reuseMap: TrieMap[SparkPlan, BaseSubqueryExec]) extends Rule[SparkPlan] {

def apply(plan: SparkPlan): SparkPlan = {
if (!conf.subqueryReuseEnabled) {
return plan
}

plan.transformAllExpressions {
case sub: ExecSubqueryExpression =>
val newPlan = reuseMap.getOrElseUpdate(sub.plan.canonicalized, sub.plan)
if (newPlan.ne(sub.plan)) {
sub.withNewPlan(ReusedSubqueryExec(newPlan))
} else {
sub
}
}
}
}

0 comments on commit 39c1127

Please sign in to comment.