Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13523] [SQL] Reuse exchanges in a query #11403

Closed
wants to merge 6 commits into from

Conversation

davies
Copy link
Contributor

@davies davies commented Feb 26, 2016

What changes were proposed in this pull request?

It’s possible to have common parts in a query, for example, self join, it will be good to avoid the duplicated part to same CPUs and memory (Broadcast or cache).

Exchange will materialize the underlying RDD by shuffle or collect, it’s a great point to check duplicates and reuse them. Duplicated exchanges means they generate exactly the same result inside a query.

In order to find out the duplicated exchanges, we should be able to compare SparkPlan to check that they have same results or not. We already have that for LogicalPlan, so we should move that into QueryPlan to make it available for SparkPlan.

Once we can find the duplicated exchanges, we should replace all of them with same SparkPlan object (could be wrapped by ReusedExchage for explain), then the plan tree become a DAG. Since all the planner only work with tree, so this rule should be the last one for the entire planning.

After the rule, the plan will looks like:

WholeStageCodegen
:  +- Project [id#0L]
:     +- BroadcastHashJoin [id#0L], [id#2L], Inner, BuildRight, None
:        :- Project [id#0L]
:        :  +- BroadcastHashJoin [id#0L], [id#1L], Inner, BuildRight, None
:        :     :- Range 0, 1, 4, 1024, [id#0L]
:        :     +- INPUT
:        +- INPUT
:- BroadcastExchange HashedRelationBroadcastMode(true,List(id#1L),List(id#1L))
:  +- WholeStageCodegen
:     :  +- Range 0, 1, 4, 1024, [id#1L]
+- ReusedExchange [id#2L], BroadcastExchange HashedRelationBroadcastMode(true,List(id#1L),List(id#1L))

bjoin

For three ways SortMergeJoin,

== Physical Plan ==
WholeStageCodegen
:  +- Project [id#0L]
:     +- SortMergeJoin [id#0L], [id#4L], None
:        :- INPUT
:        +- INPUT
:- WholeStageCodegen
:  :  +- Project [id#0L]
:  :     +- SortMergeJoin [id#0L], [id#3L], None
:  :        :- INPUT
:  :        +- INPUT
:  :- WholeStageCodegen
:  :  :  +- Sort [id#0L ASC], false, 0
:  :  :     +- INPUT
:  :  +- Exchange hashpartitioning(id#0L, 200), None
:  :     +- WholeStageCodegen
:  :        :  +- Range 0, 1, 4, 33554432, [id#0L]
:  +- WholeStageCodegen
:     :  +- Sort [id#3L ASC], false, 0
:     :     +- INPUT
:     +- ReusedExchange [id#3L], Exchange hashpartitioning(id#0L, 200), None
+- WholeStageCodegen
   :  +- Sort [id#4L ASC], false, 0
   :     +- INPUT
   +- ReusedExchange [id#4L], Exchange hashpartitioning(id#0L, 200), None

sjoin

If the same ShuffleExchange or BroadcastExchange, execute()/executeBroadcast() will be called by different parents, they should cached the RDD/Broadcast, return the same one for all the parents.

How was this patch tested?

Added some unit tests for this. Had done some manual tests on TPCDS query Q59 and Q64, we can see some exchanges are re-used (this requires a change in PhysicalRDD to for sameResult, is be done in #11514 ).

@SparkQA
Copy link

SparkQA commented Feb 26, 2016

Test build #52081 has finished for PR 11403 at commit 0830cc7.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanType]
    • case class InputAdapter(child: SparkPlan) extends UnaryNode with CodegenSupport
    • case class WholeStageCodegen(child: CodegenSupport)
    • abstract class Exchange extends UnaryNode
    • case class ReusedExchange(override val output: Seq[Attribute], child: Exchange) extends LeafNode

asfgit pushed a commit that referenced this pull request Mar 4, 2016
## What changes were proposed in this pull request?

This PR support visualization for subquery in SQL web UI, also improve the explain of subquery, especially when it's used together with whole stage codegen.

For example:
```python
>>> sqlContext.range(100).registerTempTable("range")
>>> sqlContext.sql("select id / (select sum(id) from range) from range where id > (select id from range limit 1)").explain(True)
== Parsed Logical Plan ==
'Project [unresolvedalias(('id / subquery#9), None)]
:  +- 'SubqueryAlias subquery#9
:     +- 'Project [unresolvedalias('sum('id), None)]
:        +- 'UnresolvedRelation `range`, None
+- 'Filter ('id > subquery#8)
   :  +- 'SubqueryAlias subquery#8
   :     +- 'GlobalLimit 1
   :        +- 'LocalLimit 1
   :           +- 'Project [unresolvedalias('id, None)]
   :              +- 'UnresolvedRelation `range`, None
   +- 'UnresolvedRelation `range`, None

== Analyzed Logical Plan ==
(id / scalarsubquery()): double
Project [(cast(id#0L as double) / cast(subquery#9 as double)) AS (id / scalarsubquery())#11]
:  +- SubqueryAlias subquery#9
:     +- Aggregate [(sum(id#0L),mode=Complete,isDistinct=false) AS sum(id)#10L]
:        +- SubqueryAlias range
:           +- Range 0, 100, 1, 4, [id#0L]
+- Filter (id#0L > subquery#8)
   :  +- SubqueryAlias subquery#8
   :     +- GlobalLimit 1
   :        +- LocalLimit 1
   :           +- Project [id#0L]
   :              +- SubqueryAlias range
   :                 +- Range 0, 100, 1, 4, [id#0L]
   +- SubqueryAlias range
      +- Range 0, 100, 1, 4, [id#0L]

== Optimized Logical Plan ==
Project [(cast(id#0L as double) / cast(subquery#9 as double)) AS (id / scalarsubquery())#11]
:  +- SubqueryAlias subquery#9
:     +- Aggregate [(sum(id#0L),mode=Complete,isDistinct=false) AS sum(id)#10L]
:        +- Range 0, 100, 1, 4, [id#0L]
+- Filter (id#0L > subquery#8)
   :  +- SubqueryAlias subquery#8
   :     +- GlobalLimit 1
   :        +- LocalLimit 1
   :           +- Project [id#0L]
   :              +- Range 0, 100, 1, 4, [id#0L]
   +- Range 0, 100, 1, 4, [id#0L]

== Physical Plan ==
WholeStageCodegen
:  +- Project [(cast(id#0L as double) / cast(subquery#9 as double)) AS (id / scalarsubquery())#11]
:     :  +- Subquery subquery#9
:     :     +- WholeStageCodegen
:     :        :  +- TungstenAggregate(key=[], functions=[(sum(id#0L),mode=Final,isDistinct=false)], output=[sum(id)#10L])
:     :        :     +- INPUT
:     :        +- Exchange SinglePartition, None
:     :           +- WholeStageCodegen
:     :              :  +- TungstenAggregate(key=[], functions=[(sum(id#0L),mode=Partial,isDistinct=false)], output=[sum#14L])
:     :              :     +- Range 0, 1, 4, 100, [id#0L]
:     +- Filter (id#0L > subquery#8)
:        :  +- Subquery subquery#8
:        :     +- CollectLimit 1
:        :        +- WholeStageCodegen
:        :           :  +- Project [id#0L]
:        :           :     +- Range 0, 1, 4, 100, [id#0L]
:        +- Range 0, 1, 4, 100, [id#0L]
```

The web UI looks like:

![subquery](https://cloud.githubusercontent.com/assets/40902/13377963/932bcbae-dda7-11e5-82f7-03c9be85d77c.png)

This PR also change the tree structure of WholeStageCodegen to make it consistent than others. Before this change, Both WholeStageCodegen and InputAdapter hold a references to the same plans, those could be updated without notify another, causing problems, this is discovered by #11403 .

## How was this patch tested?

Existing tests, also manual tests with the example query, check the explain and web UI.

Author: Davies Liu <davies@databricks.com>

Closes #11417 from davies/viz_subquery.
@SparkQA
Copy link

SparkQA commented Mar 4, 2016

Test build #52451 has finished for PR 11403 at commit 42096c8.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanType]
    • abstract class Exchange extends UnaryNode
    • case class ReusedExchange(override val output: Seq[Attribute], child: Exchange) extends LeafNode

@SparkQA
Copy link

SparkQA commented Mar 4, 2016

Test build #52452 has finished for PR 11403 at commit e2b9987.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanType]
    • abstract class Exchange extends UnaryNode
    • case class ReusedExchange(override val output: Seq[Attribute], child: Exchange) extends LeafNode

@SparkQA
Copy link

SparkQA commented Mar 4, 2016

Test build #52477 has finished for PR 11403 at commit fa386a7.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanType]
    • abstract class Exchange extends UnaryNode
    • case class ReusedExchange(override val output: Seq[Attribute], child: Exchange) extends LeafNode

@SparkQA
Copy link

SparkQA commented Mar 4, 2016

Test build #52480 has finished for PR 11403 at commit 57866c2.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanType]
    • abstract class Exchange extends UnaryNode
    • case class ReusedExchange(override val output: Seq[Attribute], child: Exchange) extends LeafNode

@davies davies changed the title [SPARK-13523] [SQL] WIP: reuse exchanges in a query [SPARK-13523] [SQL] Reuse exchanges in a query Mar 4, 2016
@davies
Copy link
Contributor Author

davies commented Mar 4, 2016

@nongli @JoshRosen This PR is ready for review.

@SparkQA
Copy link

SparkQA commented Mar 4, 2016

Test build #52481 has finished for PR 11403 at commit a8c1961.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 4, 2016

Test build #52483 has finished for PR 11403 at commit f6a7f5c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -237,4 +237,65 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
}

override def innerChildren: Seq[PlanType] = subqueries

/**
* Cleaned copy of this query plan.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does cleaned mean? It doesn't help me understand this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1; this is confusing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does "cleaned" actually mean "canonicalized" or something similar?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, will changed to "canonicalized"

@SparkQA
Copy link

SparkQA commented Mar 5, 2016

Test build #52492 has finished for PR 11403 at commit 3cc23f3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@davies
Copy link
Contributor Author

davies commented Mar 8, 2016

@JoshRosen Do you have time to review this?

@SparkQA
Copy link

SparkQA commented Mar 8, 2016

Test build #52643 has finished for PR 11403 at commit 679d669.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor

Taking a look at this now.

return plan
}
// Build a hash map using schema of exchanges to avoid O(N*N) sameResult calls.
val exchanges = mutable.HashMap[StructType, ArrayBuffer[Exchange]]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't StructType's equals() and hashCode() methods be affected by field names? What if the two exchanges produce logically equivalent output but assign different names to the output columns? In this case, would that lead to false-negatives when searching for exchanges that have the sameResult?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, it could be false negative.

But usually if two plan have the same result, they should have the same inputs also the same plan and expressions, they should generate the same name (does not include the random ExprId).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we can always follow up on this later if it turns out to be a problem in practice.

@markhamstra
Copy link
Contributor

FYI, I re-organized the JIRA relationships a bit under SPARK-13756.

@@ -237,4 +237,65 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
}

override def innerChildren: Seq[PlanType] = subqueries

/**
* Canonicalized copy of this query plan.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nongli, is "canonicalized" sufficiently unambiguous here or do we need to explain what this means?

@SparkQA
Copy link

SparkQA commented Mar 9, 2016

Test build #52703 has finished for PR 11403 at commit 7df43ca.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 9, 2016

Test build #52709 has finished for PR 11403 at commit 7cd6844.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor Author

davies commented Mar 9, 2016

@JoshRosen @nongli Does this look good to you now?

@JoshRosen
Copy link
Contributor

LGTM

@davies
Copy link
Contributor Author

davies commented Mar 9, 2016

Merging this into master, thanks!

@asfgit asfgit closed this in 3dc9ae2 Mar 9, 2016
roygao94 pushed a commit to roygao94/spark that referenced this pull request Mar 22, 2016
## What changes were proposed in this pull request?

This PR support visualization for subquery in SQL web UI, also improve the explain of subquery, especially when it's used together with whole stage codegen.

For example:
```python
>>> sqlContext.range(100).registerTempTable("range")
>>> sqlContext.sql("select id / (select sum(id) from range) from range where id > (select id from range limit 1)").explain(True)
== Parsed Logical Plan ==
'Project [unresolvedalias(('id / subquery#9), None)]
:  +- 'SubqueryAlias subquery#9
:     +- 'Project [unresolvedalias('sum('id), None)]
:        +- 'UnresolvedRelation `range`, None
+- 'Filter ('id > subquery#8)
   :  +- 'SubqueryAlias subquery#8
   :     +- 'GlobalLimit 1
   :        +- 'LocalLimit 1
   :           +- 'Project [unresolvedalias('id, None)]
   :              +- 'UnresolvedRelation `range`, None
   +- 'UnresolvedRelation `range`, None

== Analyzed Logical Plan ==
(id / scalarsubquery()): double
Project [(cast(id#0L as double) / cast(subquery#9 as double)) AS (id / scalarsubquery())apache#11]
:  +- SubqueryAlias subquery#9
:     +- Aggregate [(sum(id#0L),mode=Complete,isDistinct=false) AS sum(id)#10L]
:        +- SubqueryAlias range
:           +- Range 0, 100, 1, 4, [id#0L]
+- Filter (id#0L > subquery#8)
   :  +- SubqueryAlias subquery#8
   :     +- GlobalLimit 1
   :        +- LocalLimit 1
   :           +- Project [id#0L]
   :              +- SubqueryAlias range
   :                 +- Range 0, 100, 1, 4, [id#0L]
   +- SubqueryAlias range
      +- Range 0, 100, 1, 4, [id#0L]

== Optimized Logical Plan ==
Project [(cast(id#0L as double) / cast(subquery#9 as double)) AS (id / scalarsubquery())apache#11]
:  +- SubqueryAlias subquery#9
:     +- Aggregate [(sum(id#0L),mode=Complete,isDistinct=false) AS sum(id)#10L]
:        +- Range 0, 100, 1, 4, [id#0L]
+- Filter (id#0L > subquery#8)
   :  +- SubqueryAlias subquery#8
   :     +- GlobalLimit 1
   :        +- LocalLimit 1
   :           +- Project [id#0L]
   :              +- Range 0, 100, 1, 4, [id#0L]
   +- Range 0, 100, 1, 4, [id#0L]

== Physical Plan ==
WholeStageCodegen
:  +- Project [(cast(id#0L as double) / cast(subquery#9 as double)) AS (id / scalarsubquery())apache#11]
:     :  +- Subquery subquery#9
:     :     +- WholeStageCodegen
:     :        :  +- TungstenAggregate(key=[], functions=[(sum(id#0L),mode=Final,isDistinct=false)], output=[sum(id)#10L])
:     :        :     +- INPUT
:     :        +- Exchange SinglePartition, None
:     :           +- WholeStageCodegen
:     :              :  +- TungstenAggregate(key=[], functions=[(sum(id#0L),mode=Partial,isDistinct=false)], output=[sum#14L])
:     :              :     +- Range 0, 1, 4, 100, [id#0L]
:     +- Filter (id#0L > subquery#8)
:        :  +- Subquery subquery#8
:        :     +- CollectLimit 1
:        :        +- WholeStageCodegen
:        :           :  +- Project [id#0L]
:        :           :     +- Range 0, 1, 4, 100, [id#0L]
:        +- Range 0, 1, 4, 100, [id#0L]
```

The web UI looks like:

![subquery](https://cloud.githubusercontent.com/assets/40902/13377963/932bcbae-dda7-11e5-82f7-03c9be85d77c.png)

This PR also change the tree structure of WholeStageCodegen to make it consistent than others. Before this change, Both WholeStageCodegen and InputAdapter hold a references to the same plans, those could be updated without notify another, causing problems, this is discovered by apache#11403 .

## How was this patch tested?

Existing tests, also manual tests with the example query, check the explain and web UI.

Author: Davies Liu <davies@databricks.com>

Closes apache#11417 from davies/viz_subquery.
roygao94 pushed a commit to roygao94/spark that referenced this pull request Mar 22, 2016
## What changes were proposed in this pull request?

It’s possible to have common parts in a query, for example, self join, it will be good to avoid the duplicated part to same CPUs and memory (Broadcast or cache).

Exchange will materialize the underlying RDD by shuffle or collect, it’s a great point to check duplicates and reuse them. Duplicated exchanges means they generate exactly the same result inside a query.

In order to find out the duplicated exchanges, we should be able to compare SparkPlan to check that they have same results or not. We already have that for LogicalPlan, so we should move that into QueryPlan to make it available for SparkPlan.

Once we can find the duplicated exchanges, we should replace all of them with same SparkPlan object (could be wrapped by ReusedExchage for explain), then the plan tree become a DAG. Since all the planner only work with tree, so this rule should be the last one for the entire planning.

After the rule, the plan will looks like:

```
WholeStageCodegen
:  +- Project [id#0L]
:     +- BroadcastHashJoin [id#0L], [id#2L], Inner, BuildRight, None
:        :- Project [id#0L]
:        :  +- BroadcastHashJoin [id#0L], [id#1L], Inner, BuildRight, None
:        :     :- Range 0, 1, 4, 1024, [id#0L]
:        :     +- INPUT
:        +- INPUT
:- BroadcastExchange HashedRelationBroadcastMode(true,List(id#1L),List(id#1L))
:  +- WholeStageCodegen
:     :  +- Range 0, 1, 4, 1024, [id#1L]
+- ReusedExchange [id#2L], BroadcastExchange HashedRelationBroadcastMode(true,List(id#1L),List(id#1L))
```

![bjoin](https://cloud.githubusercontent.com/assets/40902/13414787/209e8c5c-df0a-11e5-8a0f-edff69d89e83.png)

For three ways SortMergeJoin,
```
== Physical Plan ==
WholeStageCodegen
:  +- Project [id#0L]
:     +- SortMergeJoin [id#0L], [id#4L], None
:        :- INPUT
:        +- INPUT
:- WholeStageCodegen
:  :  +- Project [id#0L]
:  :     +- SortMergeJoin [id#0L], [id#3L], None
:  :        :- INPUT
:  :        +- INPUT
:  :- WholeStageCodegen
:  :  :  +- Sort [id#0L ASC], false, 0
:  :  :     +- INPUT
:  :  +- Exchange hashpartitioning(id#0L, 200), None
:  :     +- WholeStageCodegen
:  :        :  +- Range 0, 1, 4, 33554432, [id#0L]
:  +- WholeStageCodegen
:     :  +- Sort [id#3L ASC], false, 0
:     :     +- INPUT
:     +- ReusedExchange [id#3L], Exchange hashpartitioning(id#0L, 200), None
+- WholeStageCodegen
   :  +- Sort [id#4L ASC], false, 0
   :     +- INPUT
   +- ReusedExchange [id#4L], Exchange hashpartitioning(id#0L, 200), None
```
![sjoin](https://cloud.githubusercontent.com/assets/40902/13414790/27aea61c-df0a-11e5-8cbf-fbc985c31d95.png)

If the same ShuffleExchange or BroadcastExchange, execute()/executeBroadcast() will be called by different parents, they should cached the RDD/Broadcast, return the same one for all the parents.

## How was this patch tested?

Added some unit tests for this.  Had done some manual tests on TPCDS query Q59 and Q64, we can see some exchanges are re-used (this requires a change in PhysicalRDD to for sameResult, is be done in apache#11514 ).

Author: Davies Liu <davies@databricks.com>

Closes apache#11403 from davies/dedup.
@viirya
Copy link
Member

viirya commented Sep 16, 2016

@davies I have a question about this. Maybe you have the answer for it? Thanks.

For a shuffle, although ShuffleExchange returns a cached ShuffledRowRDD so ReusedExchangeExec can reuse it. In ShuffledRowRDD, it still needs to retrieve remote blocks again, because the previously retrieved remote blocks are not stored in local. So I am wondering if we explicitly call cache on the ShuffledRowRDD which is prepared for reusing, we can skip next round of remote retrieving. Will it improve the shuffle exchange reuse?

@gcz2022
Copy link

gcz2022 commented Oct 29, 2017

@davies Hi, what do you mean by "Since all the planner only work with tree, so this rule should be the last one for the entire planning."?
Thanks if you have time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants