Skip to content

[SPARK-33832][SQL] Add an option in AQE to mitigate skew even if it c…#30829

Closed
ekoifman wants to merge 4 commits intoapache:masterfrom
ekoifman:SPARK-33832-skew-mitigation-even-if-shuffle
Closed

[SPARK-33832][SQL] Add an option in AQE to mitigate skew even if it c…#30829
ekoifman wants to merge 4 commits intoapache:masterfrom
ekoifman:SPARK-33832-skew-mitigation-even-if-shuffle

Conversation

@ekoifman
Copy link
Contributor

…auses an new shuffle

What changes were proposed in this pull request?

This PR adds spark.sql.adaptive.force.if.shuffle config option which, if set to true, causes OptimizeSkewedJoin to perform skew mitigation even if it requires a new shuffle to be performed.
For example, current behavior is not perform skew mitigation in the following query

Project
|-Group By key1
  |-SMJ on key1 = key2

With spark.sql.adaptive.force.if.shuffle=true, the SMJ will become skew=true and a new shuffle will be added for the Aggregate. New shuffle itself will be processed by AQE and coalesced, etc if needed.

Why are the changes needed?

It enables AQE to apply in cases where the cost of extra shuffle is less than the cost of unmitigated skew.

Does this PR introduce any user-facing change?

No, with spark.sql.adaptive.force.if.shuffle=false, which is the default

How was this patch tested?

New UTs + enhancements of existing ones

@github-actions github-actions bot added the SQL label Dec 18, 2020
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be 3.2.0, @ekoifman .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@ekoifman
Copy link
Contributor Author

This failed with [error] (Javaunidoc / doc) sbt.inc.Doc$JavadocGenerationFailed which other unrelated PRs failed with.
Could someone restart this please? cc @dongjoon-hyun

@ekoifman
Copy link
Contributor Author

ekoifman commented Jan 2, 2021

Jenkins, retest this please

@ekoifman ekoifman force-pushed the SPARK-33832-skew-mitigation-even-if-shuffle branch from eed4f64 to ff8e8e8 Compare January 2, 2021 02:46
@ekoifman ekoifman force-pushed the SPARK-33832-skew-mitigation-even-if-shuffle branch from ff8e8e8 to c9c236b Compare January 2, 2021 22:45
@ekoifman
Copy link
Contributor Author

ekoifman commented Jan 8, 2021

Hi,
@maryannxue , @cloud-fan, @HyukjinKwon could one of you take a look please? Or perhaps suggest someone else who could comment

@cloud-fan
Copy link
Contributor

ok to test

@cloud-fan
Copy link
Contributor

It's hard to calculate the cost of a shuffle and compare it with the benefit of skew join handling. We need some ways to tune it manually. But I don't understand why this patch is so complicated. Isn't it simply skip counting the shuffles at the end of OptimizeSkewedJoin?

@SparkQA
Copy link

SparkQA commented Jan 11, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38500/

@SparkQA
Copy link

SparkQA commented Jan 11, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38500/

@SparkQA
Copy link

SparkQA commented Jan 11, 2021

Test build #133913 has finished for PR 30829 at commit c9c236b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 11, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38520/

@SparkQA
Copy link

SparkQA commented Jan 11, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38520/

@ekoifman
Copy link
Contributor Author

I agree that automatically computing the cost of the new shuffle is hard. That is why this first step uses an option to enable this behavior - based on previous runs of a query or report, for example. I think the mechanics of adding a new shuffle have to be built first before it can become a cost based decision.

I'm not sure what you mean by "skip counting"

OptimizeSkewedJoin runs on the part of the plan bounded by Exchange above (call it E) and completed QueryStageExec leaves below.

So if OptimizeSkewedJoin makes a change that needs a new shuffle, it happens in the Nth recursive invocation of createQueryStages. So it figures out where the new Exchange should be and throws ShuffleAddedException to pop back out to the top, add the new Exchange in the right place and start createQueryStages again from the top. Now that there is a new Exchange (call it E') is in the plan and below E, and the same QueryStageExec leaves, newQueryStage(E') is run.

This seems like the easiest way to preserve the flow of the existing algorithm.

The

        var continue = true
        while (continue) {

in getFinalPhysicalPlan is because in a complex plan there may be several new stages created during one iteration of
while (!result.allChildStagesMaterialized) { and each may potentially introduce a new shuffle.

The processing the final stage (before any of my changes) is slightly different - finalStageOptimizerRules to preserve properties of the output, so handleFinalStageShuffle does the same.

@cloud-fan
Copy link
Contributor

@ekoifman ah I get your point. By design, queryStageOptimizerRules should not add/remove shuffles. We probably need to move the rule OptimizeSkewedJoin to queryStagePreparationRules.

@ekoifman
Copy link
Contributor Author

@cloud-fan I don't think you can move OptimizeSkewJoin to queryStagePreparationRules for 2 reasons

  1. OptimizeSkewJoin logically has to run after CoalesceShufflePartitions. (Also, comments on queryStageOptimizerRules state the same)
  2. queryStagePreparationRules may run before all child QueryStageExec of the stage have materialized (called from reOptimize()) but OptimizeSkewJoin has to know the input from both sides.

@cloud-fan
Copy link
Contributor

OK, then can you briefly explain your idea about how to allow queryStageOptimizerRules to add shuffles?

@ekoifman
Copy link
Contributor Author

A Rule in queryStageOptimizerRules adds a shuffle indirectly.

  1. OptimizeSkewJoin runs and adds a new shuffle below e in a newQueryStage(e: Exchange) call
  2. We can't keep this optimizedPlan because the new shuffle should become the new root of a stage so instead it remembers the location in the plan where the new shuffle needs to be and throws ShuffleAddedException with the location back to the main loop in getFinalPhysicalPlan() and val z = insertShuffle(e.sp, currentPhysicalPlan) properly inserts the new shuffle in currentPhysicalPlan. Call the new shuffle e'
  3. now it restarts createQueryStages(currentPhysicalPlan) from the top but with new shuffle added. When newQueryStage finds e', it becomes the root of the stage with the same leaves as e. Now when OptimizeSkewJoin runs on this stage, it can do skew mitigation but it no longer needs to add a new shuffle to do so, since e' "protects" e

@cloud-fan
Copy link
Contributor

This seems like a big change to the AQE framework. If it's rare to force apply the skew join handling, how about we start with a sub-optimal solution: we just add the extra shuffle and do not create a query stage for this shuffle. Under the hood, ShuffleQueryStageExec submits a map stage, which can contain shuffle as well.

@ekoifman
Copy link
Contributor Author

@cloud-fan I think introducing an "unhandled" shuffle is conceptually a bigger change. Currently all the code assumes (and tests assert it) that that there are no "unhandled" shuffles. That is the reason I made this PR such that when a queryStageOptimizerRules rule wants to add a shuffle, the algorithm pops back out to the top to restart createQueryStages(currentPhysicalPlan) from the top. This preserves the main logic of finding and creating new QueryStages as close as possible to what it was before my change.

I think the mechanics of adding a shuffle in the middle of stage creation would be just as complicated. Currently, queryStagePreparationRules which includes EnsureRequirements run on the whole plan (i.e. currentPhysicalPlan not a specific stage). If ShuffleQueryStageExec is the place where a new shuffle is added, should it just run EnsureRequirements on the current stage or all of queryStagePreparationRules? Should it be done before or after other rules in queryStageOptimizerRules are to run? As additional rules are added to queryStagePreparationRules or queryStageOptimizerRules, would OptimizeSkewJoin ability to add shuffles make it more complicated?

I'm raising these questions to illustrate why I don't think adding an "unhandled" shuffle makes it easier. Please let me know if you think they have easy answers.

@cloud-fan
Copy link
Contributor

OptimizeSkewJoin already runs EnsureRequirements inside it, the only change we need is to not give up the optimization even if extra shuffles are added.

Query stage is quite self-contained. queryStagePreparationRules can't see the query plan of query stages. During re-optimization, query stage becomes a leaf node LogicalQueryStage.

One problem is, queryStageOptimizerRules can't assume that query stage doesn't have shuffles in the middle. We need to revisit these rules.

@cloud-fan
Copy link
Contributor

cloud-fan commented Jan 22, 2021

After a second thought, I think adding "unhandled" shuffle is a bit risky, but allowing the stage optimization phase to add new shuffles is too complicated.

I'd like to revisit the idea of putting the skew join optimization rule in the stage preparation phase. For the two points you gave:

  1. I think it's not true now. The comment is stale. If you look at the classdoc of OptimizeSkewedJoin, it says when this rule is enabled, it also coalesces non-skewed partitions like CoalesceShufflePartitions does. So I don't think OptimizeSkewedJoin needs to be run after CoalesceShufflePartitions.
  2. We can add some checks and only trigger OptimizeSkewedJoin if the dependent shuffle stages are all materialized.

@ekoifman
Copy link
Contributor Author

RE 1, I see this comment on OptimizeSkewedJoin but I don't see the code actually doing any coalesce. optimizeSkewJoin() uses ShuffleStage.unapply to either take CoalescedPartitionSpec from a CustomShuffleReaderExec child node or creates CoalescedPartitionSpec if the child is ShuffleQueryStageExec. In the latter case, all the specs created are CoalescedPartitionSpec(i, i + 1). As far as I can tell, the rest of optimizeSkewJoin() either handles a skewed partition or uses CoalescedPartitionSpec from ShuffleStageInfo. Is there some other place OptimizeSkewedJoin performs coalesce?

Also, I'd like to clarify one thing in the in the current PR. Stage optimization doesn't actually add the shuffle. OptimizeSkewedJoin in the stage optimization runs and returns the modified plan with the new shuffle in newQueryStage(e) but this plan is not used. Once this happens, recursive call stack of createQueryStages() stops and control goes back to the top of currentPhysicalPlan in the main while (!result.allChildStagesMaterialized) { of getFinalPhysicalPlan. This is done via ShuffleAddedException which serves as a "request" to add a shuffle. So with respect to createQueryStages(), the insertion of the shuffle is no different than if the new shuffle was caused by something in reOptimize(logicalPlan).

So in fact, the new shuffle is added during Query Stage preparation as you suggest, but it looks different in the code because OptimizeSkewedJoin decides where the new shuffle should be added, but it asks higher logic to add it. In some sense this can be thought of as search space exploration with backtracking: query stage optimization (specifically OptimizeSkewedJoin) says I can't do what you want unless you give me a plan with a new shuffle in it and causes query stage creation to backtrack, add the shuffle and restart query stage creation/optimization on the new currentPhysicalPlan with the new shuffle.

@cloud-fan
Copy link
Contributor

Ah sorry for my bad memory, so the comment in OptimizeSkewedJoin is actually the stale one...

I get your point that it's like backtracking, but it does make the control flow more complicated. And using exceptions in control flow is anti-pattern (see here), we need more effort to refactor the code of the AQE loop.

So I'd like to avoid changing the control flow if possible. I don't see any blockers to run OptimizeSkewedJoin in the stage preparation. We can update CoalesceShufflePartitions to make it work even if there are skewed partition specs, or do the coalesce in OptimizeSkewedJoin.

@ekoifman
Copy link
Contributor Author

ok, let me think about this - I'll be back

@ekoifman
Copy link
Contributor Author

ekoifman commented Feb 1, 2021

The main loop in AdaptiveSparkPlanExec.getFinalPhysicalPlan() has this check

        if (newCost < origCost ||
            (newCost == origCost && currentPhysicalPlan != newPhysicalPlan)) {

Why does it check for new shuffles? Under what circumstances can reOptimize(logicalPlan) add a new shuffle (w/o changes proposed in this PR)?

If we move OptimizeSkewedJoin to queryStagePreparationRules and it adds a new shuffle, there is no good way that I see to know which new shuffles are "good" and which are "bad".

@cloud-fan
Copy link
Contributor

I think it's just a very conservative check. We can skip this check if the config to force-apply skew join optimization is turned on.

@maryannxue
Copy link
Contributor

@ekoifman This check is to guard against re-planning that turns an already shuffle-materialized SMJ to BHJ while causing extra shuffles downstream.

One way around this might be to cost a skew join differently so it counters the increased cost of the extra shuffle.

@ekoifman
Copy link
Contributor Author

Hi @cloud-fan, @maryannxue
sorry it took me longer than I expected to get back to this. I opened a new PR #31653 for the proposed changes to make it easier to compare.

@ekoifman
Copy link
Contributor Author

ekoifman commented Mar 7, 2021

@cloud-fan do you have any more thoughts about this or #31653?

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jun 16, 2021
@github-actions github-actions bot closed this Jun 17, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants