-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle #32816
Conversation
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #139471 has finished for PR 32816 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status success |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #139466 has finished for PR 32816 at commit
|
@@ -111,6 +115,19 @@ case class AdaptiveSparkPlanExec( | |||
CollapseCodegenStages() | |||
) | |||
|
|||
// OptimizeSkewedJoin has moved into preparation rules, so we should make | |||
// finalPreparationStageRules same as finalStageOptimizerRules | |||
private def finalPreparationStageRules: Seq[Rule[SparkPlan]] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it mainly to exclude OptimizeSkewedJoin
in the final stage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, currently it's only for OptimizeSkewedJoin
Test build #139480 has finished for PR 32816 at commit
|
Test build #139532 has finished for PR 32816 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status success |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #139541 has finished for PR 32816 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #139608 has finished for PR 32816 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #139621 has finished for PR 32816 at commit
|
I think using the My idea is, the re-optimization can produce 2 result plans: one with skew join handling and one without. Then we compare the cost: if the plan with skew join handling has a higher cost and force-apply is not enabled, we pick the other plan. Otherwise, we pick the plan with skew join handling. Then we don't need to change the |
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
Outdated
Show resolved
Hide resolved
Test build #143101 has finished for PR 32816 at commit
|
retest this please |
Kubernetes integration test starting |
Kubernetes integration test starting |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Kubernetes integration test status failure |
Kubernetes integration test status failure |
Test build #143103 has finished for PR 32816 at commit
|
|
||
private def ensureDistributionAndOrdering( | ||
originChildren: Seq[SparkPlan], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: originalChildren
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still, i'd prefer using children
here and newChildren
below.
@@ -35,15 +36,48 @@ case class SimpleCost(value: Long) extends Cost { | |||
} | |||
|
|||
/** | |||
* A simple implementation of [[CostEvaluator]], which counts the number of | |||
* [[ShuffleExchangeLike]] nodes in the plan. | |||
* A skew join aware implementation of [[Cost]], which consider shuffle number and skew join number |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add more description on how the cost is calculated in the presence of skew joins? What happens if there's two or more extra shuffles by adding a skew join optimization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, add more comment to show how we pick the cost with skew join and shuffle.
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #143164 has finished for PR 32816 at commit
|
retest this please |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #143175 has finished for PR 32816 at commit
|
thanks, merging to master! |
thank you all ! |
### What changes were proposed in this pull request? This is a followup of #32816 to simplify and improve the code: 1. Add a `SkewJoinChildWrapper` to wrap the skew join children, so that `EnsureRequirements` rule will skip them and save time 2. Remove `SkewJoinAwareCost` and keep using `SimpleCost`. We can put `numSkews` in the first 32 bits. ### Why are the changes needed? code simplification and improvement ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing tests Closes #34080 from cloud-fan/follow. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… with multiple joins ### What changes were proposed in this pull request? In #32816 , we moved the rule `OptimizeSkewedJoin` from `queryStageOptimizerRules` to `queryStagePreparationRules`. This means that the input plan of `OptimizeSkewedJoin` will be the entire query plan, while before the input plan was a shuffle stage which is usually a small part of the query plan. However, to simplify the implementation, `OptimizeSkewedJoin` has a check that it will be skipped if the input plan has more than 2 shuffle stages. This is obviously problematic now, as the input plan is the entire query plan and is very likely to have more than 2 shuffles. This PR proposes to remove the check from `OptimizeSkewedJoin` completely, as it's no longer needed. 1. We can and should optimize all the skewed joins in the query plan. 2. If it introduces extra shuffles, we can return the original input plan, or accept it if the force-apply config is true. ### Why are the changes needed? Fix a performance regression in the master branch (not released yet) ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? a new test Closes #34974 from cloud-fan/aqe. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
OptimizeSkewedJoin
from stage optimization phase to stage preparation phase.EnsureRequirements
one more time after theOptimizeSkewedJoin
rule in the stage preparation phase.SkewJoinAwareCost
to support estimate skewed join costOptimizeSkewedJoin
, we generate 2 physical plans, one with skew join optimization and one without. Then we use the cost evaluator w.r.t. the force-skew-join flag and pick the plan with lower cost.Why are the changes needed?
In general, skewed join has more impact on performance than once more shuffle. It makes sense to force optimize skewed join even if introduce extra shuffle.
A common case:
and after this PR, the plan looks like:
Note that, the new introduced shuffle also can be optimized by AQE.
Does this PR introduce any user-facing change?
Yes, a new config.
How was this patch tested?
SPARK-30524: Do not optimize skew join if introduce additional shuffle
SPARK-33551: Do not use custom shuffle reader for repartition