Skip to content

[SPARK-32830][SQL] Optimize Skewed BroadcastNestedLoopJoin with AQE#29692

Closed
AngersZhuuuu wants to merge 8 commits intoapache:masterfrom
AngersZhuuuu:SPARK-32830-BROADCASET-NL-SKEW-JOIN-
Closed

[SPARK-32830][SQL] Optimize Skewed BroadcastNestedLoopJoin with AQE#29692
AngersZhuuuu wants to merge 8 commits intoapache:masterfrom
AngersZhuuuu:SPARK-32830-BROADCASET-NL-SKEW-JOIN-

Conversation

@AngersZhuuuu
Copy link
Contributor

@AngersZhuuuu AngersZhuuuu commented Sep 9, 2020

What changes were proposed in this pull request?

For BroadcastNestedLoopJoin, we will broadcast boradcast-side child to all executor and use stream side partition's data traversal broadcast-side data one-by-one.

We have meet some case that stream side data skew and all success task wait for skewed partition to finish.

We know that the execution time increases exponentially with the amount of partition's data.

If skewd with 100x, skewed partition's data will execute 100x than non-skewed part.

It is a bottleneck, with AE, we can avoid this by split skewed part's data to make it more balanced.

Why are the changes needed?

NO

Does this PR introduce any user-facing change?

NO

How was this patch tested?

Added UT

@SparkQA
Copy link

SparkQA commented Sep 9, 2020

Test build #128445 has finished for PR 29692 at commit 7aba44d.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 9, 2020

Test build #128453 has finished for PR 29692 at commit ec19256.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AngersZhuuuu AngersZhuuuu changed the title [WIP][SPARK-32830][SQL] Optimize Skewed BroadcastNestedLoopJoin with AE [SPARK-32830][SQL] Optimize Skewed BroadcastNestedLoopJoin with AE Sep 10, 2020
@AngersZhuuuu
Copy link
Contributor Author

ping @cloud-fan @hvanhovell @maryannxue

@SparkQA
Copy link

SparkQA commented Sep 10, 2020

Test build #128487 has finished for PR 29692 at commit 8b33b7f.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AngersZhuuuu
Copy link
Contributor Author

retest this please

|Stream side partitions size info:
|${getSizeInfo(streamMedSize, stream.mapStats.bytesByPartitionId)}
""".stripMargin)
val canSplitStream = canSplitLeftSide(joinType)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really correct? How about the case: BuildLeft and right-outer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really correct? How about the case: BuildLeft and right-outer?

Sorry, a mistake, we should remove this condition since for BroadcastNestedLoopJoin, we don't need to care which kind join type.

}

case bnl @ BroadcastNestedLoopJoinExec(leftChild, rightChild, buildSide, joinType, _, _) =>
def resolveBroadcastNLJoinSkew(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a suggestion; could we share code between smj and bnl cases? Most parts look duplicated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a suggestion; could we share code between smj and bnl cases? Most parts look duplicated.

After all detail ok, I will start this work. I quite agree with this suggestion.

@maropu maropu changed the title [SPARK-32830][SQL] Optimize Skewed BroadcastNestedLoopJoin with AE [SPARK-32830][SQL] Optimize Skewed BroadcastNestedLoopJoin with AQE Sep 10, 2020
@maropu
Copy link
Member

maropu commented Sep 10, 2020

Ah, one more comment; could you update the code comment in OptimizeSkewedJoin? It looks most comments assume smj only, e.g., https://github.com/apache/spark/pull/29692/files#diff-2d6bea6eed43ca6f37fe3531cb574069R151

@AngersZhuuuu
Copy link
Contributor Author

Ah, one more comment; could you update the code comment in OptimizeSkewedJoin? It looks most comments assume smj only, e.g., https://github.com/apache/spark/pull/29692/files#diff-2d6bea6eed43ca6f37fe3531cb574069R151

Updated, no comment point to SMJ only now.

@cloud-fan
Copy link
Contributor

What's the high-level idea? We can handle skew SMJ because there is a shuffle and we can split the partition with the granularity of shuffle blocks. Broadcast join doesn't have shuffles.

@SparkQA
Copy link

SparkQA commented Sep 10, 2020

Test build #128513 has finished for PR 29692 at commit 1e36ca0.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 10, 2020

Test build #128514 has finished for PR 29692 at commit 827a0b4.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 10, 2020

Test build #128496 has finished for PR 29692 at commit 8b33b7f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AngersZhuuuu
Copy link
Contributor Author

What's the high-level idea? We can handle skew SMJ because there is a shuffle and we can split the partition with the granularity of shuffle blocks. Broadcast join doesn't have shuffles.

Yeah, thought a lot that normal SQL can't match this case.

For our production env experience, data skew (such as stream side is group by and skewed )before broadcast nested loop join always cause a long running time.

two ways to avoid this case :

  1. avoid nested loop join.
  2. use distribute by to increase parallelism by dispersing data.

method 1 need to handle SQL logic, for method 2, although it will cause one more time shuffle, it 's narrow dependent. Nowadays, network cost is cheap and always not a bottleneck.

After try with AQE, AQE's mode is not suitable for this case. Since it doesn't have a shuffle before BNLJ.
In our inner version, in BroadcastNestedLoopJoinExec, after stream side executed, we will get the raw count of each partition and judge if it's skewed seriously, if skewed seriously and volume is large, repartition stream side to make stream side RDD average.

It's not very elegant but the benefits are very clear. I am not sure if community will accept this way.

@SparkQA
Copy link

SparkQA commented Sep 10, 2020

Test build #128508 has finished for PR 29692 at commit 8e8b1bd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 10, 2020

Test build #128515 has finished for PR 29692 at commit 5c6f895.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

after stream side executed, we will get the raw count of each partition and judge if it's skewed seriously, if skewed seriously and volume is large, repartition stream side to make stream side RDD average.

So you add a shuffle to the stream side to stop it before the join node and get statistics?

@AngersZhuuuu
Copy link
Contributor Author

after stream side executed, we will get the raw count of each partition and judge if it's skewed seriously, if skewed seriously and volume is large, repartition stream side to make stream side RDD average.

So you add a shuffle to the stream side to stop it before the join node and get statistics?

Yes, stop before join and get count row of each stream side partitions. re-shuffle stream side then join.

@cloud-fan
Copy link
Contributor

cloud-fan commented Sep 11, 2020

If the user manually adds a shuffle (DISTRIBUTE BY) in the query before broadcast join, I think we can take care of the skew. Spark query optimizer should not add the extra shuffle by itself, as it's likely to cause perf regression.

But we need to be careful to break output partitioning by splitting the skewed partitions, and cause extra shuffle.

@AngersZhuuuu
Copy link
Contributor Author

Spark query optimizer should not add the extra shuffle by itself, as it's likely to cause perf regression.

With this rule, we can't handle such data skew case automatic. With strict and reasonable conf value, extra shuffle 's cost is much less than the overhead of data skew.

Especially like broadcast join/broadcast nested loop join. if stream side executing end with a group by(There are many such business scenarios) and always data skew seriously. Getting business people to tune each job is difficult.
For the community, what do you think about this scene

@cloud-fan
Copy link
Contributor

Then we need some estimation work, as the shuffle/scan node may be far away from the join node. We also need to carefully justify if the extra shuffle cost worths the skew elimination benefits.

val shuffleStages = collectShuffleStages(plan)

if (shuffleStages.length == 2) {
if (shuffleStages.length >= 1) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean we will also handle multi-table join (e.g. three-table join) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean we will also handle multi-table join (e.g. three-table join) ?

Since broadcast nested loop join only can have one side shuffle exchange, but sort merge join with two

@AngersZhuuuu
Copy link
Contributor Author

Then we need some estimation work, as the shuffle/scan node may be far away from the join node. We also need to carefully justify if the extra shuffle cost worths the skew elimination benefits.

Yea, only when skewed very serious and threshold is reached , worth to re-shuffle data.
Hope for some advise: in current code, is there any method to estimate shuffle cost?

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Dec 30, 2020
@github-actions github-actions bot closed this Dec 31, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants