Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31869][SQL] BroadcastHashJoinExec can utilize the build side for its output partitioning #28676

Closed
wants to merge 19 commits into from

Conversation

imback82
Copy link
Contributor

@imback82 imback82 commented May 30, 2020

What changes were proposed in this pull request?

Currently, the BroadcastHashJoinExec's outputPartitioning only uses the streamed side's outputPartitioning. However, if the join type of BroadcastHashJoinExec is an inner-like join, the build side's info (the join keys) can be added to BroadcastHashJoinExec's outputPartitioning.

For example,

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "500")
val t1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1")
val t2 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i2", "j2")
val t3 = (0 until 20).map(i => (i % 7, i % 11)).toDF("i3", "j3")
val t4 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i4", "j4")

// join1 is a sort merge join.
val join1 = t1.join(t2, t1("i1") === t2("i2"))

// join2 is a broadcast join where t3 is broadcasted.
val join2 = join1.join(t3, join1("i1") === t3("i3"))

// Join on the column from the broadcasted side (i3).
val join3 = join2.join(t4, join2("i3") === t4("i4"))

join3.explain

You see that Exchange hashpartitioning(i2#103, 200) is introduced because there is no output partitioning info from the build side.

== Physical Plan ==
*(6) SortMergeJoin [i3#29], [i4#40], Inner
:- *(4) Sort [i3#29 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(i3#29, 200), true, [id=#55]
:     +- *(3) BroadcastHashJoin [i1#7], [i3#29], Inner, BuildRight
:        :- *(3) SortMergeJoin [i1#7], [i2#18], Inner
:        :  :- *(1) Sort [i1#7 ASC NULLS FIRST], false, 0
:        :  :  +- Exchange hashpartitioning(i1#7, 200), true, [id=#28]
:        :  :     +- LocalTableScan [i1#7, j1#8]
:        :  +- *(2) Sort [i2#18 ASC NULLS FIRST], false, 0
:        :     +- Exchange hashpartitioning(i2#18, 200), true, [id=#29]
:        :        +- LocalTableScan [i2#18, j2#19]
:        +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#34]
:           +- LocalTableScan [i3#29, j3#30]
+- *(5) Sort [i4#40 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(i4#40, 200), true, [id=#39]
      +- LocalTableScan [i4#40, j4#41]

This PR proposes to introduce output partitioning for the build side for BroadcastHashJoinExec if the streamed side has a HashPartitioning or a collection of HashPartitionings.

There is a new internal config spark.sql.execution.broadcastHashJoin.outputPartitioningExpandLimit, which can limit the number of partitioning a HashPartitioning can expand to. It can be set to "0" to disable this feature.

Why are the changes needed?

To remove unnecessary shuffle.

Does this PR introduce any user-facing change?

Yes, now the shuffle in the above example can be eliminated:

== Physical Plan ==
*(5) SortMergeJoin [i3#108], [i4#119], Inner
:- *(3) Sort [i3#108 ASC NULLS FIRST], false, 0
:  +- *(3) BroadcastHashJoin [i1#86], [i3#108], Inner, BuildRight
:     :- *(3) SortMergeJoin [i1#86], [i2#97], Inner
:     :  :- *(1) Sort [i1#86 ASC NULLS FIRST], false, 0
:     :  :  +- Exchange hashpartitioning(i1#86, 200), true, [id=#120]
:     :  :     +- LocalTableScan [i1#86, j1#87]
:     :  +- *(2) Sort [i2#97 ASC NULLS FIRST], false, 0
:     :     +- Exchange hashpartitioning(i2#97, 200), true, [id=#121]
:     :        +- LocalTableScan [i2#97, j2#98]
:     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#126]
:        +- LocalTableScan [i3#108, j3#109]
+- *(4) Sort [i4#119 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(i4#119, 200), true, [id=#130]
      +- LocalTableScan [i4#119, j4#120]

How was this patch tested?

Added new tests.

@SparkQA
Copy link

SparkQA commented May 30, 2020

Test build #123310 has finished for PR 28676 at commit 985834b.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented May 30, 2020

retest this please

@SparkQA
Copy link

SparkQA commented May 30, 2020

Test build #123313 has finished for PR 28676 at commit 985834b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@imback82 imback82 changed the title [SPARK-31869][SQL] BroadcastHashJoinExec can utilize the build side for its output partitioning [WIP][SPARK-31869][SQL] BroadcastHashJoinExec can utilize the build side for its output partitioning May 31, 2020
@SparkQA
Copy link

SparkQA commented May 31, 2020

Test build #123326 has finished for PR 28676 at commit 683a705.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@imback82
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented May 31, 2020

Test build #123334 has finished for PR 28676 at commit 683a705.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 26, 2020

Test build #124528 has finished for PR 28676 at commit 488e051.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Jun 27, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Jun 27, 2020

Test build #124561 has finished for PR 28676 at commit 488e051.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Jun 27, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Jun 27, 2020

Test build #124565 has finished for PR 28676 at commit 488e051.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 30, 2020

Test build #124672 has started for PR 28676 at commit febc402.

@imback82 imback82 changed the title [WIP][SPARK-31869][SQL] BroadcastHashJoinExec can utilize the build side for its output partitioning [SPARK-31869][SQL] BroadcastHashJoinExec can utilize the build side for its output partitioning Jul 1, 2020
generateExprCombinations(current.tail, accumulated :+ current.head) ++
buildKeys.map { bKeys =>
bKeys.flatMap { bKey =>
if (currentNumCombinations < maxNumCombinations) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this if? I think generateExprCombinations will return Nil if hitting the upper bound.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wanted to avoid unnecessary recursion (+ not creating new Seq, etc.), but I removed the check for simplicity.

@SparkQA
Copy link

SparkQA commented Jul 16, 2020

Test build #125947 has finished for PR 28676 at commit afa5aca.

  • This patch fails PySpark pip packaging tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 17, 2020

Test build #126004 has finished for PR 28676 at commit 51187dc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also mention the config in the description?

@SparkQA
Copy link

SparkQA commented Jul 17, 2020

Test build #126067 has finished for PR 28676 at commit 80df4dc.

  • This patch fails PySpark pip packaging tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 18, 2020

Test build #126068 has finished for PR 28676 at commit ba19acb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 18, 2020

Test build #126075 has finished for PR 28676 at commit 9caeecd.

  • This patch fails PySpark pip packaging tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@imback82
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 18, 2020

Test build #126087 has finished for PR 28676 at commit 9caeecd.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@imback82
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 18, 2020

Test build #126091 has finished for PR 28676 at commit 9caeecd.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@imback82
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 18, 2020

Test build #126097 has finished for PR 28676 at commit 9caeecd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants