-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-13671] [SPARK-13311] [SQL] Use different physical plans for RDD and data sources #11514
Conversation
} | ||
|
||
/** Physical plan node for scanning data from a relation. */ | ||
private[sql] case class PhysicalScan( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DataSourceScan
?
will conflict with #11509 |
Test build #52453 has finished for PR 11514 at commit
|
override def simpleString: String = { | ||
s"RDD $nodeName${output.mkString("[", ",", "]")}" | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we override outputPartitioning
and set it to UnknownPartitioning(rdd.partitions.length)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a partitioning is UnknownPartitioning, the number is meaningless, I think.
Test build #52610 has finished for PR 11514 at commit
|
Test build #2618 has finished for PR 11514 at commit
|
Test build #2619 has finished for PR 11514 at commit
|
Test build #52644 has finished for PR 11514 at commit
|
## What changes were proposed in this pull request? It’s possible to have common parts in a query, for example, self join, it will be good to avoid the duplicated part to same CPUs and memory (Broadcast or cache). Exchange will materialize the underlying RDD by shuffle or collect, it’s a great point to check duplicates and reuse them. Duplicated exchanges means they generate exactly the same result inside a query. In order to find out the duplicated exchanges, we should be able to compare SparkPlan to check that they have same results or not. We already have that for LogicalPlan, so we should move that into QueryPlan to make it available for SparkPlan. Once we can find the duplicated exchanges, we should replace all of them with same SparkPlan object (could be wrapped by ReusedExchage for explain), then the plan tree become a DAG. Since all the planner only work with tree, so this rule should be the last one for the entire planning. After the rule, the plan will looks like: ``` WholeStageCodegen : +- Project [id#0L] : +- BroadcastHashJoin [id#0L], [id#2L], Inner, BuildRight, None : :- Project [id#0L] : : +- BroadcastHashJoin [id#0L], [id#1L], Inner, BuildRight, None : : :- Range 0, 1, 4, 1024, [id#0L] : : +- INPUT : +- INPUT :- BroadcastExchange HashedRelationBroadcastMode(true,List(id#1L),List(id#1L)) : +- WholeStageCodegen : : +- Range 0, 1, 4, 1024, [id#1L] +- ReusedExchange [id#2L], BroadcastExchange HashedRelationBroadcastMode(true,List(id#1L),List(id#1L)) ``` ![bjoin](https://cloud.githubusercontent.com/assets/40902/13414787/209e8c5c-df0a-11e5-8a0f-edff69d89e83.png) For three ways SortMergeJoin, ``` == Physical Plan == WholeStageCodegen : +- Project [id#0L] : +- SortMergeJoin [id#0L], [id#4L], None : :- INPUT : +- INPUT :- WholeStageCodegen : : +- Project [id#0L] : : +- SortMergeJoin [id#0L], [id#3L], None : : :- INPUT : : +- INPUT : :- WholeStageCodegen : : : +- Sort [id#0L ASC], false, 0 : : : +- INPUT : : +- Exchange hashpartitioning(id#0L, 200), None : : +- WholeStageCodegen : : : +- Range 0, 1, 4, 33554432, [id#0L] : +- WholeStageCodegen : : +- Sort [id#3L ASC], false, 0 : : +- INPUT : +- ReusedExchange [id#3L], Exchange hashpartitioning(id#0L, 200), None +- WholeStageCodegen : +- Sort [id#4L ASC], false, 0 : +- INPUT +- ReusedExchange [id#4L], Exchange hashpartitioning(id#0L, 200), None ``` ![sjoin](https://cloud.githubusercontent.com/assets/40902/13414790/27aea61c-df0a-11e5-8cbf-fbc985c31d95.png) If the same ShuffleExchange or BroadcastExchange, execute()/executeBroadcast() will be called by different parents, they should cached the RDD/Broadcast, return the same one for all the parents. ## How was this patch tested? Added some unit tests for this. Had done some manual tests on TPCDS query Q59 and Q64, we can see some exchanges are re-used (this requires a change in PhysicalRDD to for sameResult, is be done in #11514 ). Author: Davies Liu <davies@databricks.com> Closes #11403 from davies/dedup.
Test build #52775 has finished for PR 11514 at commit
|
@@ -93,6 +93,10 @@ private[sql] object SparkPlanGraph { | |||
case "Subquery" if subgraph != null => | |||
// Subquery should not be included in WholeStageCodegen | |||
buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges, parent, null, exchanges) | |||
case "ReusedExchange" => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is lost when fix conflicts in last PR (#11403).
Test build #52788 has finished for PR 11514 at commit
|
Test build #52855 has finished for PR 11514 at commit
|
Test build #52865 has finished for PR 11514 at commit
|
lgtm |
Test build #52966 has finished for PR 11514 at commit
|
Merged into master |
…D and data sources ## What changes were proposed in this pull request? This PR split the PhysicalRDD into two classes, PhysicalRDD and PhysicalScan. PhysicalRDD is used for DataFrames that is created from existing RDD. PhysicalScan is used for DataFrame that is created from data sources. This enable use to apply different optimization on both of them. Also fix the problem for sameResult() on two DataSourceScan. Also fix the equality check to toString for `In`. It's better to use Seq there, but we can't break this public API (sad). ## How was this patch tested? Existing tests. Manually tested with TPCDS query Q59 and Q64, all those duplicated exchanges can be re-used now, also saw there are 40+% performance improvement (saving half of the scan). Author: Davies Liu <davies@databricks.com> Closes apache#11514 from davies/existing_rdd.
## What changes were proposed in this pull request? It’s possible to have common parts in a query, for example, self join, it will be good to avoid the duplicated part to same CPUs and memory (Broadcast or cache). Exchange will materialize the underlying RDD by shuffle or collect, it’s a great point to check duplicates and reuse them. Duplicated exchanges means they generate exactly the same result inside a query. In order to find out the duplicated exchanges, we should be able to compare SparkPlan to check that they have same results or not. We already have that for LogicalPlan, so we should move that into QueryPlan to make it available for SparkPlan. Once we can find the duplicated exchanges, we should replace all of them with same SparkPlan object (could be wrapped by ReusedExchage for explain), then the plan tree become a DAG. Since all the planner only work with tree, so this rule should be the last one for the entire planning. After the rule, the plan will looks like: ``` WholeStageCodegen : +- Project [id#0L] : +- BroadcastHashJoin [id#0L], [id#2L], Inner, BuildRight, None : :- Project [id#0L] : : +- BroadcastHashJoin [id#0L], [id#1L], Inner, BuildRight, None : : :- Range 0, 1, 4, 1024, [id#0L] : : +- INPUT : +- INPUT :- BroadcastExchange HashedRelationBroadcastMode(true,List(id#1L),List(id#1L)) : +- WholeStageCodegen : : +- Range 0, 1, 4, 1024, [id#1L] +- ReusedExchange [id#2L], BroadcastExchange HashedRelationBroadcastMode(true,List(id#1L),List(id#1L)) ``` ![bjoin](https://cloud.githubusercontent.com/assets/40902/13414787/209e8c5c-df0a-11e5-8a0f-edff69d89e83.png) For three ways SortMergeJoin, ``` == Physical Plan == WholeStageCodegen : +- Project [id#0L] : +- SortMergeJoin [id#0L], [id#4L], None : :- INPUT : +- INPUT :- WholeStageCodegen : : +- Project [id#0L] : : +- SortMergeJoin [id#0L], [id#3L], None : : :- INPUT : : +- INPUT : :- WholeStageCodegen : : : +- Sort [id#0L ASC], false, 0 : : : +- INPUT : : +- Exchange hashpartitioning(id#0L, 200), None : : +- WholeStageCodegen : : : +- Range 0, 1, 4, 33554432, [id#0L] : +- WholeStageCodegen : : +- Sort [id#3L ASC], false, 0 : : +- INPUT : +- ReusedExchange [id#3L], Exchange hashpartitioning(id#0L, 200), None +- WholeStageCodegen : +- Sort [id#4L ASC], false, 0 : +- INPUT +- ReusedExchange [id#4L], Exchange hashpartitioning(id#0L, 200), None ``` ![sjoin](https://cloud.githubusercontent.com/assets/40902/13414790/27aea61c-df0a-11e5-8cbf-fbc985c31d95.png) If the same ShuffleExchange or BroadcastExchange, execute()/executeBroadcast() will be called by different parents, they should cached the RDD/Broadcast, return the same one for all the parents. ## How was this patch tested? Added some unit tests for this. Had done some manual tests on TPCDS query Q59 and Q64, we can see some exchanges are re-used (this requires a change in PhysicalRDD to for sameResult, is be done in apache#11514 ). Author: Davies Liu <davies@databricks.com> Closes apache#11403 from davies/dedup.
…D and data sources ## What changes were proposed in this pull request? This PR split the PhysicalRDD into two classes, PhysicalRDD and PhysicalScan. PhysicalRDD is used for DataFrames that is created from existing RDD. PhysicalScan is used for DataFrame that is created from data sources. This enable use to apply different optimization on both of them. Also fix the problem for sameResult() on two DataSourceScan. Also fix the equality check to toString for `In`. It's better to use Seq there, but we can't break this public API (sad). ## How was this patch tested? Existing tests. Manually tested with TPCDS query Q59 and Q64, all those duplicated exchanges can be re-used now, also saw there are 40+% performance improvement (saving half of the scan). Author: Davies Liu <davies@databricks.com> Closes apache#11514 from davies/existing_rdd.
|
||
// Ignore rdd when checking results | ||
override def sameResult(plan: SparkPlan ): Boolean = plan match { | ||
case other: DataSourceScan => relation == other.relation && metadata == other.metadata |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is actually wrong because we cannot ignore the rdd, otherwise scans of different partitions are treated as "sameResult"!
What changes were proposed in this pull request?
This PR split the PhysicalRDD into two classes, PhysicalRDD and PhysicalScan. PhysicalRDD is used for DataFrames that is created from existing RDD. PhysicalScan is used for DataFrame that is created from data sources. This enable use to apply different optimization on both of them.
Also fix the problem for sameResult() on two DataSourceScan.
Also fix the equality check to toString for
In
. It's better to use Seq there, but we can't break this public API (sad).How was this patch tested?
Existing tests. Manually tested with TPCDS query Q59 and Q64, all those duplicated exchanges can be re-used now, also saw there are 40+% performance improvement (saving half of the scan).