Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL #24706

Closed
wants to merge 30 commits into from

Conversation

Projects
None yet
7 participants
@maryannxue
Copy link
Contributor

commented May 24, 2019

What changes were proposed in this pull request?

Implemented a new SparkPlan that executes the query adaptively. It splits the query plan into independent stages and executes them in order according to their dependencies. The query stage materializes its output at the end. When one stage completes, the data statistics of the materialized output will be used to optimize the remainder of the query.

The adaptive mode is off by default, when turned on, user can see "AdaptiveSparkPlan" as the top node of a query or sub-query. The inner plan of "AdaptiveSparkPlan" is subject to change during query execution but becomes final once the execution is complete. Whether the inner plan is final is included in the EXPLAIN string. Below is an example of the EXPLAIN plan before and after execution:

Query:

SELECT * FROM testData JOIN testData2 ON key = a WHERE value = '1'

Before execution:

== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- SortMergeJoin [key#13], [a#23], Inner
   :- Sort [key#13 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(key#13, 5)
   :     +- Filter (isnotnull(value#14) AND (value#14 = 1))
   :        +- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false) AS value#14]
   :           +- Scan[obj#12]
   +- Sort [a#23 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(a#23, 5)
         +- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24]
            +- Scan[obj#22]

After execution:

== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=true)
+- *(1) BroadcastHashJoin [key#13], [a#23], Inner, BuildLeft
   :- BroadcastQueryStage 2
   :  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
   :     +- ShuffleQueryStage 0
   :        +- Exchange hashpartitioning(key#13, 5)
   :           +- *(1) Filter (isnotnull(value#14) AND (value#14 = 1))
   :              +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false) AS value#14]
   :                 +- Scan[obj#12]
   +- ShuffleQueryStage 1
      +- Exchange hashpartitioning(a#23, 5)
         +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24]
            +- Scan[obj#22]

Credit also goes to @carsonwang and @cloud-fan

How was this patch tested?

Added new UT.

@gatorsmile gatorsmile requested review from cloud-fan and hvanhovell May 24, 2019

@SparkQA

This comment has been minimized.

Copy link

commented May 25, 2019

Test build #105776 has finished for PR 24706 at commit 07ea123.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class AdaptiveSparkPlanExec(
  • case class StageSuccess(stage: QueryStageExec) extends StageMaterializationEvent
  • case class StageFailure(stage: QueryStageExec, error: Throwable) extends StageMaterializationEvent
  • case class InsertAdaptiveSparkPlan(session: SparkSession) extends Rule[SparkPlan]
  • case class LogicalQueryStage(
  • case class PlanAdaptiveSubqueries(
  • abstract class QueryStageExec extends LeafExecNode
  • case class ShuffleQueryStageExec(
  • case class BroadcastQueryStageExec(
  • case class ReusedQueryStageExec(
  • case class SparkListenerSQLAdaptiveExecutionUpdate(
@viirya
Copy link
Contributor

left a comment

Awesome!


/**
* A root node to execute the query plan adaptively. It splits the query plan into independent
* stages and executes them in order according to their dependencies. The query stage

This comment has been minimized.

Copy link
@viirya

viirya May 25, 2019

Contributor

It sounds a little weird that the stages are independent and executing them in order by their dependencies.

This comment has been minimized.

Copy link
@maryannxue

maryannxue May 28, 2019

Author Contributor

Are you saying the wording is no good? Any suggestions?

This comment has been minimized.

Copy link
@viirya

viirya May 30, 2019

Contributor

Just remove independent? As they are run by some dependencies, sounds like they're not really independent.

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala Outdated

override def output: Seq[Attribute] = initialPlan.output

override def doCanonicalize(): SparkPlan = initialPlan.canonicalized

This comment has been minimized.

Copy link
@viirya

viirya May 25, 2019

Contributor

Why not currentPhysicalPlan.canonicalized but initialPlan.canonicalized?

This comment has been minimized.

Copy link
@hvanhovell

hvanhovell May 28, 2019

Contributor

currentPhysicalPlan can change after each iteration, this is means we cannot use it as a stable identifier.

This comment has been minimized.

Copy link
@maryannxue

maryannxue May 28, 2019

Author Contributor

+1 to @hvanhovell's comment. Moreover, one important use of the "canonicalized" plan is for adaptive sub-query re-use, in which we for sure want to compare the initial plans and initial plans only.

Show resolved Hide resolved ...cala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala Outdated
.doc("When true, enable runtime query re-optimization.")
.booleanConf
.createWithDefault(false)

val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled")
.doc("When true, enable adaptive query execution.")
.booleanConf

This comment has been minimized.

Copy link
@viirya

viirya May 25, 2019

Contributor

We probably need to update the doc for this config. It isn't enabled when runtime query re-optimization is true.

This comment has been minimized.

Copy link
@maryannxue

maryannxue May 28, 2019

Author Contributor

Let's leave it now and see if we should use the existing config spark.sql.adaptive.enabled instead.

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala Outdated
assert(logicalNodeOpt.isDefined)
val logicalNode = logicalNodeOpt.get
val physicalNode = currentPhysicalPlan.collectFirst {
case p if p.eq(newNode) || p.logicalLink.exists(logicalNode.eq) => p

This comment has been minimized.

Copy link
@viirya

viirya May 25, 2019

Contributor

If the physical plan has been updated with the newly created stages, doesn't this new node (a newly created stage) always match one p?

This comment has been minimized.

Copy link
@maryannxue

maryannxue May 28, 2019

Author Contributor

It's the most complicated part here:
Normally we only need to replace an Exchange with a logical "Stage", simple straightforward logic. But when it comes to several physical nodes generated from a single logical node and there's an Exchange is in the middle of these nodes, we need to apply a little "trick" here, by replacing the top node (e.g., the final agg) with a logical "Stage" which wraps the whole sub-tree starting from the top node and of course containing the exchange stage.

The code comment for this method explains this logic. Alternatively, we could have logical aggregate that represents "final" and "local" aggregate in order not to go this route. Let's have a follow-up on that though.

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan May 30, 2019

Contributor

Since this is hacky, how about we explicitly match the physical aggregate (hash and sort) here? Except for aggregate, we should always look for condition if p.eq(newNode)

SparkPlan.LOGICAL_PLAN_TAG
}
setTagValue(tag, logicalPlan)
children.foreach(_.setLogicalLink(logicalPlan, true))

This comment has been minimized.

Copy link
@viirya

viirya May 25, 2019

Contributor

Once we set inherited logical plan into children, we can't set logical plan into them? So only the top SparkPlan has its logical plan, all its children just have inherited logical plan?

This comment has been minimized.

Copy link
@maryannxue

maryannxue May 28, 2019

Author Contributor

Yes. That's true. And you could always "force set" this logical link if need be.
It's not necessary to draw a line between "logical plan" and "inherited logical plan" for the use of adaptive execution, so this is simply to make sure any future use of it can tell a top node from the rest.

* |
* child
* The updated plan node will be:
* LogicalQueryStage(HashAgg - Stage1)

This comment has been minimized.

Copy link
@viirya

viirya May 25, 2019

Contributor

I think the plan node is updated by:

val newLogicalPlan = currentLogicalPlan.transformDown {
  case p if p.eq(logicalNode) => newLogicalNode
}

The logicalNode should be logicalLink of oldNode and oldNode is Xchg. Why isn't child be transformed, but the whole Agg-child?

This comment has been minimized.

Copy link
@maryannxue

maryannxue May 28, 2019

Author Contributor

The reason is simple: Which logical node does Xchg correspond to? None. And in case of an Aggregate, you can't even find a whole sub-tree in the logical plan that corresponds to the Xchg.

Please also refer to #24706 (comment)

@maryannxue maryannxue force-pushed the maryannxue:aqe branch to 52f0222 May 28, 2019

@hvanhovell
Copy link
Contributor

left a comment

I did an initial round. Will try to do a second one later this week.

Show resolved Hide resolved ...la/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala Outdated
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala Outdated
val queryExec = new QueryExecution(session, plan)
val adaptivePlan = this.apply(queryExec.sparkPlan)
if (!adaptivePlan.isInstanceOf[AdaptiveSparkPlanExec]) {
throw new SubqueryAdaptiveNotSupportedException(plan)

This comment has been minimized.

Copy link
@hvanhovell

hvanhovell May 28, 2019

Contributor

Can you explain when this happens?

This comment has been minimized.

Copy link
@maryannxue

maryannxue May 28, 2019

Author Contributor

When a non-correlated sub-query itself fails the sanity check for being converted into a AdaptiveSparkPlanExec.

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala Outdated
private val conf = session.sessionState.conf

// Exchange-reuse is shared across the entire query, including sub-queries.
private val stageCache = new TrieMap[StructType, mutable.Buffer[(Exchange, QueryStageExec)]]()

This comment has been minimized.

Copy link
@hvanhovell

hvanhovell May 28, 2019

Contributor

For my curiosity, why a TrieMap?

This comment has been minimized.

Copy link
@maryannxue

maryannxue May 28, 2019

Author Contributor

We need a concurrent map that supports getOrElseUpdate.

I thought we should change this "schema-to-a-list-of-plans" map to a "canonicalized-plan-to-plan" map so we can use a simple concurrent map and do not need to put a lock on the list. If we'll fix it, we'll fix it with the compile-time reuse maps together.

Show resolved Hide resolved ...la/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
Show resolved Hide resolved ...la/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala Outdated
Show resolved Hide resolved ...la/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala Outdated
Show resolved Hide resolved ...la/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala Outdated
// Run preparation rules.
val preparations = AdaptiveSparkPlanExec.createQueryStagePreparationRules(
session.sessionState.conf, subqueryMap)
val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, preparations)

This comment has been minimized.

Copy link
@hvanhovell

hvanhovell May 28, 2019

Contributor

Why do we need to do this? This already seems to be done in AdaptiveSparkPlanExec when we submit the stage.

This comment has been minimized.

Copy link
@maryannxue

maryannxue May 28, 2019

Author Contributor

This already seems to be done in AdaptiveSparkPlanExec when we submit the stage.

No.

The physical transformations we used in QueryExecution.preparations have now been split into two groups in adaptive execution here (also noted in the code comment):

  1. Rules that add or remove exchanges.
  2. Rules that are independent within each exchange, or say, stage.

InsertAdaptiveSparkPlan is now the first in QueryExecution.preparations, which means neither of these two groups has been applied yet. It is this way so that we do not need to manipulate (modify) the rule application order in QueryExecution.preparations for AQE.

This comment has been minimized.

Copy link
@hvanhovell

hvanhovell Jun 14, 2019

Contributor

To me it makes more sense to this in the AdaptiveSparkPlanExec. The AdaptiveSparkPlanExec is now expecting a plan that can only be produced by this rule, and not any physical plan.

This comment has been minimized.

Copy link
@maryannxue

maryannxue Jun 14, 2019

Author Contributor

In one of the commits I had tried refactoring this into AdaptiveSparkPlanExec, but later found out that this would cause a problem in serializing/deserializing initialPlan in AdaptiveSparkPlanExec, for the initialPlan before applying the sub-query planning rule contains instances of expression.ScalarSubquery.

* This rule wraps the query plan with an [[AdaptiveSparkPlanExec]], which executes the query plan
* and re-optimize the plan during execution based on runtime data statistics.
*/
case class InsertAdaptiveSparkPlan(session: SparkSession) extends Rule[SparkPlan] {

This comment has been minimized.

Copy link
@hvanhovell

hvanhovell May 28, 2019

Contributor

A couple of general remarks:

  • As fair as I understand this code subqueries for a given stage are now executed before the stage. This used to be that all the subqueries for a query were executed before the main query.
  • We may need to consider moving this into the AdaptiveSparkPlanExec to put most state in one place and make this stateless again. You could turn this into a mix-in if this adds too much LOC.

This comment has been minimized.

Copy link
@maryannxue

maryannxue May 28, 2019

Author Contributor

As fair as I understand this code subqueries for a given stage are now executed before the stage. This used to be that all the subqueries for a query were executed before the main query.

Agreed. But this has nothing to do with what this rule does. It's just we don't call execute higher in the tree before stages below get to finish. So we may need to refactor the original SparkPlan.executeQuery logic to make this "wait-for-all-subqueries-to-finish" thing more explicit.

We may need to consider moving this into the AdaptiveSparkPlanExec to put most state in one place and make this stateless again. You could turn this into a mix-in if this adds too much LOC.

Agreed. Not the prettiest solution to put a stateful object into a rule.

Show resolved Hide resolved ...cala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala Outdated
@SparkQA

This comment has been minimized.

Copy link

commented May 28, 2019

Test build #105869 has finished for PR 24706 at commit 751d117.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

commented May 28, 2019

Test build #105868 has finished for PR 24706 at commit 046fbd6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

commented May 28, 2019

Test build #105870 has finished for PR 24706 at commit 52f0222.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class AdaptiveSparkPlanExec(
  • case class StageSuccess(stage: QueryStageExec) extends StageMaterializationEvent
  • case class StageFailure(stage: QueryStageExec, error: Throwable) extends StageMaterializationEvent
  • case class InsertAdaptiveSparkPlan(session: SparkSession) extends Rule[SparkPlan]
  • case class LogicalQueryStage(
  • case class PlanAdaptiveSubqueries(
  • abstract class QueryStageExec extends LeafExecNode
  • case class ShuffleQueryStageExec(
  • case class BroadcastQueryStageExec(
  • case class ReusedQueryStageExec(
  • case class SparkListenerSQLAdaptiveExecutionUpdate(
@SparkQA

This comment has been minimized.

Copy link

commented May 29, 2019

Test build #105880 has finished for PR 24706 at commit 1c665d2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class StageSuccess(stage: QueryStageExec, result: Any) extends StageMaterializationEvent
// Wait on the next completed stage, which indicates new stats are available and probably
// new stages can be created. There might be other stages that finish at around the same
// time, so we process those stages too in order to reduce re-planning.
val nextMsg = events.take()

This comment has been minimized.

Copy link
@hvanhovell

hvanhovell May 29, 2019

Contributor

I think it is kind of weird that we are defining a nextMsg and that we are not really using it. How about:

val newStageEvents = new util.ArrayList[StageMaterializationEvent]
newStageEvents.add(events.take())
events.drainTo(newStageEvents)
newStageEvents.asScala.foreach {
  ...
}

This comment has been minimized.

Copy link
@maryannxue

maryannxue May 29, 2019

Author Contributor

That'll work too. But nextMsg is used in the current impl, otherwise the whole thing wouldn't work:
https://github.com/apache/spark/pull/24706/files/1c665d2e616a7c6ffc79c8be8307c9f3ff503b23#diff-6954dd8020a9ca298f1fb9602c0e831cR98.

var result = createQueryStages(currentPhysicalPlan)
currentPhysicalPlan.synchronized {
val events = new LinkedBlockingQueue[StageMaterializationEvent]()
val errors = new mutable.ArrayBuffer[SparkException]()

This comment has been minimized.

Copy link
@hvanhovell

hvanhovell May 29, 2019

Contributor

You could move this into the loop.

This comment has been minimized.

Copy link
@maryannxue

maryannxue May 29, 2019

Author Contributor

I'd argue it'll have to be instantiated with every loop, yet used exactly once (when there's sth. wrong).

@SparkQA

This comment has been minimized.

Copy link

commented May 29, 2019

Test build #105924 has finished for PR 24706 at commit a9b4209.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

commented May 30, 2019

Test build #105932 has finished for PR 24706 at commit cbfbc4e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
private val conf = session.sessionState.conf

// Exchange-reuse is shared across the entire query, including sub-queries.
private val stageCache = new TrieMap[SparkPlan, QueryStageExec]()

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan May 30, 2019

Contributor

This stage cache is passed to the created AdaptiveSparkPlanExec directly. Can we create the stage cache in AdaptiveSparkPlanExec?

This comment has been minimized.

Copy link
@maryannxue

maryannxue May 30, 2019

Author Contributor

Problem is we want to make all AdaptiveSparkPlanExecs of the main query and the subqueries share the same stageCache.


private var currentStageId = 0

@volatile private var currentPhysicalPlan =

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan May 30, 2019

Contributor

nit: can we move these variable definitions before the doExecute method? To make the code a little easier to read.

This comment has been minimized.

Copy link
@maryannxue

maryannxue May 30, 2019

Author Contributor

I had moved currentLogicalPlan into the doExecute method, but now it doesn't look like we can do it with any other variables. We need to provide access to currentPhysicalPlan (thru method executedPlan). Plus, if the plan ever gets executed again, we'll have the currentPhysicalPlan as the final plan, and the stages will not be created or run again.

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan May 31, 2019

Contributor

move these variables before doExecute, not inside doExecute...

This comment has been minimized.

Copy link
@maryannxue

maryannxue May 31, 2019

Author Contributor

Oh, got you.

@SparkQA

This comment has been minimized.

Copy link

commented Jun 13, 2019

Test build #106454 has finished for PR 24706 at commit 5688cb4.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.
@gatorsmile

This comment has been minimized.

Copy link
Member

commented Jun 13, 2019

Try this?

    withSQLConf(
        SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",
        SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
       spark.sql(
        "EXPLAIN CODEGEN SELECT * FROM testData join testData2 ON key = a where value = '1'").show(false)
    }
@gatorsmile

This comment has been minimized.

Copy link
Member

commented Jun 13, 2019

I just ran the following script

    withSQLConf(
        SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",
        SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
      spark.sql(
        "SELECT * FROM testData join testData2 ON key = a where value = '1'").show(false)
      Thread.sleep(100000000)
    }

In the details of SQL Tab, the plan has been changed to broadcast join, but the value of isFinalPlan is still false and the codegen id is wrong.

Screen Shot 2019-06-13 at 4 43 59 PM

@maryannxue

This comment has been minimized.

Copy link
Contributor Author

commented Jun 13, 2019

@gatorsmile Thank you for trying this out! There's two things here:

  1. The current plan is not updated on AdaptiveSparkPlan node but rather on the inner plan node, so I need to change that.
  2. The WSCG rule is run for each stage, so the counting restarts within each stage. I'll work out a way to change this, too.
@SparkQA

This comment has been minimized.

Copy link

commented Jun 13, 2019

Test build #106473 has finished for PR 24706 at commit a40b771.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

commented Jun 14, 2019

Test build #106490 has finished for PR 24706 at commit 37905f5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class CollapseCodegenStages(
@SparkQA

This comment has been minimized.

Copy link

commented Jun 14, 2019

Test build #106502 has finished for PR 24706 at commit eb8fe75.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@hvanhovell
Copy link
Contributor

left a comment

A couple of smallish comments. This is almost good to go!

Show resolved Hide resolved .../main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala Outdated
Show resolved Hide resolved .../org/apache/spark/sql/execution/adaptive/LogicalQueryStageStrategy.scala Outdated
}

override def cancel(): Unit = {
mapOutputStatisticsFuture match {

This comment has been minimized.

Copy link
@hvanhovell

hvanhovell Jun 14, 2019

Contributor

This forces materialization right? It would be better to if we can check whether it is already running.

This comment has been minimized.

Copy link
@maryannxue

maryannxue Jun 14, 2019

Author Contributor

At this point, all the existing QueryStageExec nodes in the plan has been called "materialize" already, so it should not be a concern.

}

override def cancel(): Unit = {
if (!plan.relationFuture.isDone) {

This comment has been minimized.

Copy link
@hvanhovell

hvanhovell Jun 14, 2019

Contributor

This also forces materialization right?

Show resolved Hide resolved ...cala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala Outdated
case other => other
}
def mapChild(child: Any): Any = child match {
case arg: TreeNode[_] if applyToAll || containsChild(arg) =>

This comment has been minimized.

Copy link
@hvanhovell

hvanhovell Jun 14, 2019

Contributor

It is not super intuitive to use the same flag to allow transformations on all element, and to force a copy. Can we maybe split them for documentation purposes?

I am also not entirely convinced that we need to transform all elements. This change now also transforms expressions in a plan node.

This comment has been minimized.

Copy link
@maryannxue

maryannxue Jun 14, 2019

Author Contributor

I'm not sure about the expressions, I think it totally depends on the usage, and right now we don't need to copy expressions for AQE I believe.
On the other hand, though, there's "fake leaf nodes" that derive from LeafNode but do have children nodes not declared as children, e.g., ReusedExchange. Again, right now for AQE usage, we only care about the logical plans, so we are probably OK? (can't think of any logical plan fake leaf nodes so far).

This comment has been minimized.

Copy link
@maryannxue

maryannxue Jun 14, 2019

Author Contributor

I double checked and there's no "fake LeafNode" in the logical plan space. So I removed the "applyToAll" from the condition for transforming the elements and renamed it to "forceCopy". I've also changed the method name back to "mapChildren" since it's only for children nodes.

}
case other => other
}
def mapChild(child: Any): Any = child match {

This comment has been minimized.

Copy link
@hvanhovell

hvanhovell Jun 14, 2019

Contributor

mapElement?

* on children nodes. Also, when this is true, a copy of this node will be
* returned even if no elements have been changed.
*/
private def mapProductElements(

This comment has been minimized.

Copy link
@hvanhovell

hvanhovell Jun 14, 2019

Contributor

If we can then we should spin this off in a separate PR.

// Run preparation rules.
val preparations = AdaptiveSparkPlanExec.createQueryStagePreparationRules(
session.sessionState.conf, subqueryMap)
val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, preparations)

This comment has been minimized.

Copy link
@hvanhovell

hvanhovell Jun 14, 2019

Contributor

To me it makes more sense to this in the AdaptiveSparkPlanExec. The AdaptiveSparkPlanExec is now expecting a plan that can only be produced by this rule, and not any physical plan.

Show resolved Hide resolved ...cala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala Outdated

@dongjoon-hyun dongjoon-hyun added the SQL label Jun 14, 2019

@maryannxue

This comment has been minimized.

Copy link
Contributor Author

commented Jun 14, 2019

@hvanhovell Just submitted #24876 for the TreeNode changes. Please take a look.

@SparkQA

This comment has been minimized.

Copy link

commented Jun 14, 2019

Test build #106524 has finished for PR 24706 at commit 8570ec0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

commented Jun 14, 2019

Test build #106526 has finished for PR 24706 at commit 237c067.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

commented Jun 15, 2019

Test build #106536 has finished for PR 24706 at commit e265104.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@hvanhovell
Copy link
Contributor

left a comment

LGTM

@hvanhovell

This comment has been minimized.

Copy link
Contributor

commented Jun 15, 2019

Merging to master. @maryannxue @carsonwang thanks for all the hard work!

emanuelebardelli added a commit to emanuelebardelli/spark that referenced this pull request Jun 15, 2019

[SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL
## What changes were proposed in this pull request?

Implemented a new SparkPlan that executes the query adaptively. It splits the query plan into independent stages and executes them in order according to their dependencies. The query stage materializes its output at the end. When one stage completes, the data statistics of the materialized output will be used to optimize the remainder of the query.

The adaptive mode is off by default, when turned on, user can see "AdaptiveSparkPlan" as the top node of a query or sub-query. The inner plan of "AdaptiveSparkPlan" is subject to change during query execution but becomes final once the execution is complete. Whether the inner plan is final is included in the EXPLAIN string. Below is an example of the EXPLAIN plan before and after execution:

Query:
```
SELECT * FROM testData JOIN testData2 ON key = a WHERE value = '1'
```

Before execution:
```
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- SortMergeJoin [key#13], [a#23], Inner
   :- Sort [key#13 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(key#13, 5)
   :     +- Filter (isnotnull(value#14) AND (value#14 = 1))
   :        +- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false) AS value#14]
   :           +- Scan[obj#12]
   +- Sort [a#23 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(a#23, 5)
         +- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24]
            +- Scan[obj#22]
```

After execution:
```
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=true)
+- *(1) BroadcastHashJoin [key#13], [a#23], Inner, BuildLeft
   :- BroadcastQueryStage 2
   :  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
   :     +- ShuffleQueryStage 0
   :        +- Exchange hashpartitioning(key#13, 5)
   :           +- *(1) Filter (isnotnull(value#14) AND (value#14 = 1))
   :              +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false) AS value#14]
   :                 +- Scan[obj#12]
   +- ShuffleQueryStage 1
      +- Exchange hashpartitioning(a#23, 5)
         +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24]
            +- Scan[obj#22]
```

Credit also goes to carsonwang and cloud-fan

## How was this patch tested?

Added new UT.

Closes apache#24706 from maryannxue/aqe.

Authored-by: maryannxue <maryannxue@apache.org>
Signed-off-by: herman <herman@databricks.com>
case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] {
case class CollapseCodegenStages(
conf: SQLConf,
codegenStageCounter: AtomicInteger = new AtomicInteger(0))

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Jun 17, 2019

Contributor

where do we not use the default value?

This comment has been minimized.

Copy link
@hvanhovell

hvanhovell Jun 17, 2019

Contributor

Yeah, we could have made that part of the class body. OTOH this also works :)...

dongjoon-hyun pushed a commit to dongjoon-hyun/spark that referenced this pull request Jul 4, 2019

[SPARK-28177][SQL] Adjust post shuffle partition number in adaptive e…
…xecution

## What changes were proposed in this pull request?
This is to implement a ReduceNumShufflePartitions rule in the new adaptive execution framework introduced in apache#24706. This rule is used to adjust the post shuffle partitions based on the map output statistics.

## How was this patch tested?
Added ReduceNumShufflePartitionsSuite

Closes apache#24978 from carsonwang/reduceNumShufflePartitions.

Authored-by: Carson Wang <carson.wang@intel.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.