Skip to content

Commit

Permalink
[SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Implemented a new SparkPlan that executes the query adaptively. It splits the query plan into independent stages and executes them in order according to their dependencies. The query stage materializes its output at the end. When one stage completes, the data statistics of the materialized output will be used to optimize the remainder of the query.

The adaptive mode is off by default, when turned on, user can see "AdaptiveSparkPlan" as the top node of a query or sub-query. The inner plan of "AdaptiveSparkPlan" is subject to change during query execution but becomes final once the execution is complete. Whether the inner plan is final is included in the EXPLAIN string. Below is an example of the EXPLAIN plan before and after execution:

Query:
```
SELECT * FROM testData JOIN testData2 ON key = a WHERE value = '1'
```

Before execution:
```
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- SortMergeJoin [key#13], [a#23], Inner
   :- Sort [key#13 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(key#13, 5)
   :     +- Filter (isnotnull(value#14) AND (value#14 = 1))
   :        +- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false) AS value#14]
   :           +- Scan[obj#12]
   +- Sort [a#23 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(a#23, 5)
         +- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24]
            +- Scan[obj#22]
```

After execution:
```
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=true)
+- *(1) BroadcastHashJoin [key#13], [a#23], Inner, BuildLeft
   :- BroadcastQueryStage 2
   :  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
   :     +- ShuffleQueryStage 0
   :        +- Exchange hashpartitioning(key#13, 5)
   :           +- *(1) Filter (isnotnull(value#14) AND (value#14 = 1))
   :              +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false) AS value#14]
   :                 +- Scan[obj#12]
   +- ShuffleQueryStage 1
      +- Exchange hashpartitioning(a#23, 5)
         +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24]
            +- Scan[obj#22]
```

Credit also goes to carsonwang and cloud-fan

## How was this patch tested?

Added new UT.

Closes apache#24706 from maryannxue/aqe.

Authored-by: maryannxue <maryannxue@apache.org>
Signed-off-by: herman <herman@databricks.com>
  • Loading branch information
maryannxue authored and emanuelebardelli committed Jun 15, 2019
1 parent 76e821f commit 4de0ae1
Show file tree
Hide file tree
Showing 21 changed files with 1,430 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,12 @@ object SQLConf {
.bytesConf(ByteUnit.BYTE)
.createWithDefault(64 * 1024 * 1024)

val RUNTIME_REOPTIMIZATION_ENABLED =
buildConf("spark.sql.runtime.reoptimization.enabled")
.doc("When true, enable runtime query re-optimization.")
.booleanConf
.createWithDefault(false)

val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled")
.doc("When true, enable adaptive query execution.")
.booleanConf
Expand Down Expand Up @@ -1889,7 +1895,10 @@ class SQLConf extends Serializable with Logging {
def targetPostShuffleInputSize: Long =
getConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)

def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)
def runtimeReoptimizationEnabled: Boolean = getConf(RUNTIME_REOPTIMIZATION_ENABLED)

def adaptiveExecutionEnabled: Boolean =
getConf(ADAPTIVE_EXECUTION_ENABLED) && !getConf(RUNTIME_REOPTIMIZATION_ENABLED)

def minNumPostShufflePartitions: Int =
getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.StringUtils.{PlanStringConcat, StringConcat}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -74,9 +75,15 @@ class QueryExecution(

lazy val sparkPlan: SparkPlan = tracker.measurePhase(QueryPlanningTracker.PLANNING) {
SparkSession.setActiveSession(sparkSession)
// Runtime re-optimization requires a unique instance of every node in the logical plan.
val logicalPlan = if (sparkSession.sessionState.conf.runtimeReoptimizationEnabled) {
optimizedPlan.clone()
} else {
optimizedPlan
}
// TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
// but we will implement to choose the best plan.
planner.plan(ReturnAnswer(optimizedPlan)).next()
planner.plan(ReturnAnswer(logicalPlan)).next()
}

// executedPlan should not be used to initialize any SparkPlan. It should be
Expand Down Expand Up @@ -107,6 +114,9 @@ class QueryExecution(

/** A sequence of rules that will be applied in order to the physical plan before execution. */
protected def preparations: Seq[Rule[SparkPlan]] = Seq(
// `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following rules will be no-op
// as the original plan is hidden behind `AdaptiveSparkPlanExec`.
InsertAdaptiveSparkPlan(sparkSession),
PlanSubqueries(sparkSession),
EnsureRequirements(sparkSession.sessionState.conf),
CollapseCodegenStages(sparkSession.sessionState.conf),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.DataType

object SparkPlan {
// a TreeNode tag in SparkPlan, to carry its original logical plan. The planner will add this tag
// when converting a logical plan to a physical plan.
/** The original [[LogicalPlan]] from which this [[SparkPlan]] is converted. */
val LOGICAL_PLAN_TAG = TreeNodeTag[LogicalPlan]("logical_plan")

/** The [[LogicalPlan]] inherited from its ancestor. */
val LOGICAL_PLAN_INHERITED_TAG = TreeNodeTag[LogicalPlan]("logical_plan_inherited")
}

/**
Expand Down Expand Up @@ -79,6 +81,35 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
super.makeCopy(newArgs)
}

/**
* @return The logical plan this plan is linked to.
*/
def logicalLink: Option[LogicalPlan] =
getTagValue(SparkPlan.LOGICAL_PLAN_TAG)
.orElse(getTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG))

/**
* Set logical plan link recursively if unset.
*/
def setLogicalLink(logicalPlan: LogicalPlan): Unit = {
setLogicalLink(logicalPlan, false)
}

private def setLogicalLink(logicalPlan: LogicalPlan, inherited: Boolean = false): Unit = {
// Stop at a descendant which is the root of a sub-tree transformed from another logical node.
if (inherited && getTagValue(SparkPlan.LOGICAL_PLAN_TAG).isDefined) {
return
}

val tag = if (inherited) {
SparkPlan.LOGICAL_PLAN_INHERITED_TAG
} else {
SparkPlan.LOGICAL_PLAN_TAG
}
setTagValue(tag, logicalPlan)
children.foreach(_.setLogicalLink(logicalPlan, true))
}

/**
* @return All metrics containing metrics of this SparkPlan.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec}
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.metric.SQLMetricInfo
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -53,6 +54,8 @@ private[execution] object SparkPlanInfo {
val children = plan match {
case ReusedExchangeExec(_, child) => child :: Nil
case ReusedSubqueryExec(child) => child :: Nil
case a: AdaptiveSparkPlanExec => a.executedPlan :: Nil
case stage: QueryStageExec => stage.plan :: Nil
case _ => plan.children ++ plan.subqueries
}
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.adaptive.LogicalQueryStageStrategy
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FileSourceStrategy}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -36,6 +37,7 @@ class SparkPlanner(
override def strategies: Seq[Strategy] =
experimentalMethods.extraStrategies ++
extraPlanningStrategies ++ (
LogicalQueryStageStrategy ::
PythonEvals ::
DataSourceV2Strategy ::
FileSourceStrategy ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.execution.adaptive.LogicalQueryStage
import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
Expand Down Expand Up @@ -58,6 +59,8 @@ case class PlanLater(plan: LogicalPlan) extends LeafExecNode {
protected override def doExecute(): RDD[InternalRow] = {
throw new UnsupportedOperationException()
}

override def setLogicalLink(logicalPlan: LogicalPlan): Unit = {}
}

abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
Expand All @@ -69,7 +72,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case ReturnAnswer(rootPlan) => rootPlan
case _ => plan
}
p.setTagValue(SparkPlan.LOGICAL_PLAN_TAG, logicalPlan)
p.setLogicalLink(logicalPlan)
p
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution

import java.io.Writer
import java.util.Locale
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.mutable
import scala.util.control.NonFatal
Expand Down Expand Up @@ -551,56 +552,6 @@ object WholeStageCodegenExec {
}
}

object WholeStageCodegenId {
// codegenStageId: ID for codegen stages within a query plan.
// It does not affect equality, nor does it participate in destructuring pattern matching
// of WholeStageCodegenExec.
//
// This ID is used to help differentiate between codegen stages. It is included as a part
// of the explain output for physical plans, e.g.
//
// == Physical Plan ==
// *(5) SortMergeJoin [x#3L], [y#9L], Inner
// :- *(2) Sort [x#3L ASC NULLS FIRST], false, 0
// : +- Exchange hashpartitioning(x#3L, 200)
// : +- *(1) Project [(id#0L % 2) AS x#3L]
// : +- *(1) Filter isnotnull((id#0L % 2))
// : +- *(1) Range (0, 5, step=1, splits=8)
// +- *(4) Sort [y#9L ASC NULLS FIRST], false, 0
// +- Exchange hashpartitioning(y#9L, 200)
// +- *(3) Project [(id#6L % 2) AS y#9L]
// +- *(3) Filter isnotnull((id#6L % 2))
// +- *(3) Range (0, 5, step=1, splits=8)
//
// where the ID makes it obvious that not all adjacent codegen'd plan operators are of the
// same codegen stage.
//
// The codegen stage ID is also optionally included in the name of the generated classes as
// a suffix, so that it's easier to associate a generated class back to the physical operator.
// This is controlled by SQLConf: spark.sql.codegen.useIdInClassName
//
// The ID is also included in various log messages.
//
// Within a query, a codegen stage in a plan starts counting from 1, in "insertion order".
// WholeStageCodegenExec operators are inserted into a plan in depth-first post-order.
// See CollapseCodegenStages.insertWholeStageCodegen for the definition of insertion order.
//
// 0 is reserved as a special ID value to indicate a temporary WholeStageCodegenExec object
// is created, e.g. for special fallback handling when an existing WholeStageCodegenExec
// failed to generate/compile code.

private val codegenStageCounter: ThreadLocal[Integer] = ThreadLocal.withInitial(() => 1)

def resetPerQuery(): Unit = codegenStageCounter.set(1)

def getNextStageId(): Int = {
val counter = codegenStageCounter
val id = counter.get()
counter.set(id + 1)
id
}
}

/**
* WholeStageCodegen compiles a subtree of plans that support codegen together into single Java
* function.
Expand Down Expand Up @@ -824,8 +775,48 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)

/**
* Find the chained plans that support codegen, collapse them together as WholeStageCodegen.
*
* The `codegenStageCounter` generates ID for codegen stages within a query plan.
* It does not affect equality, nor does it participate in destructuring pattern matching
* of WholeStageCodegenExec.
*
* This ID is used to help differentiate between codegen stages. It is included as a part
* of the explain output for physical plans, e.g.
*
* == Physical Plan ==
* *(5) SortMergeJoin [x#3L], [y#9L], Inner
* :- *(2) Sort [x#3L ASC NULLS FIRST], false, 0
* : +- Exchange hashpartitioning(x#3L, 200)
* : +- *(1) Project [(id#0L % 2) AS x#3L]
* : +- *(1) Filter isnotnull((id#0L % 2))
* : +- *(1) Range (0, 5, step=1, splits=8)
* +- *(4) Sort [y#9L ASC NULLS FIRST], false, 0
* +- Exchange hashpartitioning(y#9L, 200)
* +- *(3) Project [(id#6L % 2) AS y#9L]
* +- *(3) Filter isnotnull((id#6L % 2))
* +- *(3) Range (0, 5, step=1, splits=8)
*
* where the ID makes it obvious that not all adjacent codegen'd plan operators are of the
* same codegen stage.
*
* The codegen stage ID is also optionally included in the name of the generated classes as
* a suffix, so that it's easier to associate a generated class back to the physical operator.
* This is controlled by SQLConf: spark.sql.codegen.useIdInClassName
*
* The ID is also included in various log messages.
*
* Within a query, a codegen stage in a plan starts counting from 1, in "insertion order".
* WholeStageCodegenExec operators are inserted into a plan in depth-first post-order.
* See CollapseCodegenStages.insertWholeStageCodegen for the definition of insertion order.
*
* 0 is reserved as a special ID value to indicate a temporary WholeStageCodegenExec object
* is created, e.g. for special fallback handling when an existing WholeStageCodegenExec
* failed to generate/compile code.
*/
case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] {
case class CollapseCodegenStages(
conf: SQLConf,
codegenStageCounter: AtomicInteger = new AtomicInteger(0))
extends Rule[SparkPlan] {

private def supportCodegen(e: Expression): Boolean = e match {
case e: LeafExpression => true
Expand Down Expand Up @@ -869,14 +860,13 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] {
case plan if plan.output.length == 1 && plan.output.head.dataType.isInstanceOf[ObjectType] =>
plan.withNewChildren(plan.children.map(insertWholeStageCodegen))
case plan: CodegenSupport if supportCodegen(plan) =>
WholeStageCodegenExec(insertInputAdapter(plan))(WholeStageCodegenId.getNextStageId())
WholeStageCodegenExec(insertInputAdapter(plan))(codegenStageCounter.incrementAndGet())
case other =>
other.withNewChildren(other.children.map(insertWholeStageCodegen))
}

def apply(plan: SparkPlan): SparkPlan = {
if (conf.wholeStageEnabled) {
WholeStageCodegenId.resetPerQuery()
insertWholeStageCodegen(plan)
} else {
plan
Expand Down
Loading

0 comments on commit 4de0ae1

Please sign in to comment.