Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32330][SQL] Preserve shuffled hash join build side partitioning #29130

Closed
wants to merge 2 commits into from

Conversation

c21
Copy link
Contributor

@c21 c21 commented Jul 16, 2020

What changes were proposed in this pull request?

Currently ShuffledHashJoin.outputPartitioning inherits from HashJoin.outputPartitioning, which only preserves stream side partitioning (HashJoin.scala):

override def outputPartitioning: Partitioning = streamedPlan.outputPartitioning

This loses build side partitioning information, and causes extra shuffle if there's another join / group-by after this join.

Example:

withSQLConf(
    SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50",
    SQLConf.SHUFFLE_PARTITIONS.key -> "2",
    SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
  val df1 = spark.range(10).select($"id".as("k1"))
  val df2 = spark.range(30).select($"id".as("k2"))
  Seq("inner", "cross").foreach(joinType => {
    val plan = df1.join(df2, $"k1" === $"k2", joinType).groupBy($"k1").count()
      .queryExecution.executedPlan
    assert(plan.collect { case _: ShuffledHashJoinExec => true }.size === 1)
    // No extra shuffle before aggregate
    assert(plan.collect { case _: ShuffleExchangeExec => true }.size === 2)
  })
}

Current physical plan (having an extra shuffle on k1 before aggregate)

*(4) HashAggregate(keys=[k1#220L], functions=[count(1)], output=[k1#220L, count#235L])
+- Exchange hashpartitioning(k1#220L, 2), true, [id=#117]
   +- *(3) HashAggregate(keys=[k1#220L], functions=[partial_count(1)], output=[k1#220L, count#239L])
      +- *(3) Project [k1#220L]
         +- ShuffledHashJoin [k1#220L], [k2#224L], Inner, BuildLeft
            :- Exchange hashpartitioning(k1#220L, 2), true, [id=#109]
            :  +- *(1) Project [id#218L AS k1#220L]
            :     +- *(1) Range (0, 10, step=1, splits=2)
            +- Exchange hashpartitioning(k2#224L, 2), true, [id=#111]
               +- *(2) Project [id#222L AS k2#224L]
                  +- *(2) Range (0, 30, step=1, splits=2)

Ideal physical plan (no shuffle on k1 before aggregate)

*(3) HashAggregate(keys=[k1#220L], functions=[count(1)], output=[k1#220L, count#235L])
+- *(3) HashAggregate(keys=[k1#220L], functions=[partial_count(1)], output=[k1#220L, count#239L])
   +- *(3) Project [k1#220L]
      +- ShuffledHashJoin [k1#220L], [k2#224L], Inner, BuildLeft
         :- Exchange hashpartitioning(k1#220L, 2), true, [id=#107]
         :  +- *(1) Project [id#218L AS k1#220L]
         :     +- *(1) Range (0, 10, step=1, splits=2)
         +- Exchange hashpartitioning(k2#224L, 2), true, [id=#109]
            +- *(2) Project [id#222L AS k2#224L]
               +- *(2) Range (0, 30, step=1, splits=2)

This can be fixed by overriding outputPartitioning method in ShuffledHashJoinExec, similar to SortMergeJoinExec.
In addition, also fix one typo in HashJoin, as that code path is shared between broadcast hash join and shuffled hash join.

Why are the changes needed?

To avoid shuffle (for queries having multiple joins or group-by), for saving CPU and IO.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added unit test in JoinSuite.

@c21
Copy link
Contributor Author

c21 commented Jul 16, 2020

cc @maropu, @cloud-fan, @gatorsmile and @sameeragarwal if you guys can help take a look. Thanks!

@SparkQA
Copy link

SparkQA commented Jul 16, 2020

Test build #125951 has finished for PR 29130 at commit dface2a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

is it covered by #28676 ? cc @imback82

@c21
Copy link
Contributor Author

c21 commented Jul 17, 2020

@cloud-fan and @imback82 - I was not aware of #28676 before making this PR. After checking #28676, TLDR is I think we are solving different issues and there's no code conflict between these two.

#28676: preserve broadcast hash join build side partitioning for inner join if the stream side is hash partitioned. @imback82 It's a great idea that I never thought about it before. I bet in production, out users should have hit this issue before, but I think our action was just asking them to disable broadcast join (SMJ on small table, instead of broadcasting it - the cost of it is small, as the table should be small enough to be broadcasted), then partitioning info gets propagated through query plan, and the followed shuffle can be saved. But I think #28676 handles the thing automatically on spark side, which should be better.

this PR: preserve shuffled hash join build side partitioning, which is a much smaller trivial change compared to handle broadcast hash join. Because for required children distribution, shuffled hash join is same as sort merge join, shuffled hash join output partitioning should be same as sort merge join (except it cannot handle full outer join case). We found this issue when our users did shuffled hash join on bucketed tables, and had followed join / group-by on build side. We have run this in production for more than one years. There's no config needed for this feature, and IMO it could be enabled by default.

What do you think? Thanks.

@@ -47,6 +47,18 @@ case class ShuffledHashJoinExec(
"buildDataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size of build side"),
"buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build hash map"))

override def outputPartitioning: Partitioning = joinType match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is exactly the same as SMJ. Shall we create a common trait ShuffleJoin to put it?

Copy link
Contributor Author

@c21 c21 Jul 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - there's an extra case for sort merge join to handle full outer join. I am thinking to handle all other join types in parent trait ShuffleJoin, and override outputPartitioning in SortMergeJoinExec to handle the extra FullOuter? What do you think?

But for me it's kind of weird that ShuffleJoin not handle FullOuter as shuffled FullOuter join is one kind of ShuffleJoin. But if ShuffleJoin handles FullOuter, it seems to be also weird that ShuffledHashJoinExec extends it.

Wondering what do you think? The change itself is easy. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does shuffle hash join not support FullOuter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does shuffle hash join not support FullOuter?

@cloud-fan sorry if I miss anything, but isn't this true now? Given current spark implementation for hash join, stream side looks up in build side hash map, it can handle non-matching keys from stream side if there's no match in build side hash map. But it cannot handle non-matching keys from build side, as there's no info persisted from stream side.

I feel an interesting followup could be to handle full outer join in shuffled hash join, where when looking up stream side keys from build side HashedRelation. Mark this info inside build side HashedRelation, and after reading all rows from stream side, output all non-matching rows from build side based on modified HashedRelation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we simply move SortMergeJoinExec.outputPartitioning to the parent trait? It works for ShuffledHashJoinExec as well, as the planner guarantees ShuffledHashJoinExec.joinType won't be FullOuter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I agree. Updated.

Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 as well


test("SPARK-32330: Preserve shuffled hash join build side partitioning") {
withSQLConf(
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: set it to "-1" to make the intention (turning off broadcast join) clear?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah OK. Thanks!

@SparkQA
Copy link

SparkQA commented Jul 17, 2020

Test build #126058 has finished for PR 29130 at commit f9479b6.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait ShuffledJoin extends BaseJoinExec

@c21
Copy link
Contributor Author

c21 commented Jul 18, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Jul 18, 2020

Test build #126081 has finished for PR 29130 at commit f9479b6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait ShuffledJoin extends BaseJoinExec

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in fe07521 Jul 20, 2020
@c21
Copy link
Contributor Author

c21 commented Jul 20, 2020

Thank you @cloud-fan for review!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants