New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-37193][SQL] DynamicJoinSelection.shouldDemoteBroadcastHashJoin should not apply to outer joins #34464
Conversation
Can one of the admins verify this patch? |
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
Outdated
Show resolved
Hide resolved
val demoteBroadcastHash = joinType match { | ||
// doesn't make sense for outer joins since one side is preserved and join is not | ||
// short circuited if the other side is empty | ||
case Inner | LeftSemi | LeftAnti => shouldDemoteBroadcastHashJoin (stage.mapStats.get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also include Existence Joins as well
- LeftExistence pattern to match all LeftSemi / LeftAnti / Existence in one shot
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed it a bit - seems more natural
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala
Outdated
Show resolved
Hide resolved
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { | ||
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( | ||
"SELECT * FROM (select * from testData where value = '1') td" + | ||
" right outer join testData2 ON key = a") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[optional] how about adding all outerJoinTypes here and making it more rigid
can be done with minimal changes
Seq("right outer", "left outer", "full outer").foreach { joinType =>
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT * FROM (select * from testData where value = '1') td" +
s" $joinType join testData2 ON key = a")
val smj = findTopLevelSortMergeJoin(plan)
assert(smj.size == 1)
val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
assert(bhj.size == 1)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the feedback but changing right outer
here to a left outer
won't have the same semantics w/o changing the sql. You can only broadcast the inner side.
I don't believe full outer
supports BHJ at all
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala
Outdated
Show resolved
Hide resolved
dd11a1e
to
08551fb
Compare
@HyukjinKwon do you have any other suggestions? |
Hi |
val demoteBroadcastHash = joinType match { | ||
// doesn't make sense for outer joins since one side is preserved and join is not | ||
// short circuited if the other side is empty | ||
case LeftOuter | RightOuter | FullOuter => false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does BHJ support FullOuter
? what about LeftSemi
and LeftAnti
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BHJ can't support FullOuter - I put it in the list since it seems odd (though not wrong) to add no_broadcast_hash
to a join that doesn't support BHJ anyway. Makes logs harder to read.
LeftSemi & LeftAnti could in principle support it - I don't know if they do today.
ping - is there anything else I can do to move this along? |
The last tests pass is long time ago, @ekoifman can you rebase this PR and run the test again? Sorry for the late review. |
… should not apply to outer joins
8a3f27d
to
050f745
Compare
@cloud-fan done |
thanks, merging to master! |
var newHint = hint | ||
if (!hint.leftHint.exists(_.strategy.isDefined)) { | ||
selectJoinStrategy(left).foreach { strategy => | ||
selectJoinStrategy(left, joinType).foreach { strategy => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For left outer join, and the left side has many empty partitions, shall we still demote?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For LOJ with many empty partitions on the left, the local join can short-circuit whether you broadcast or shuffle. I'm not sure how to determine which strategy will send less data around. Is there another heuristic that can be used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the local join can short-circuit whether you broadcast or shuffle.
I think there is a misunderstanding here. We see many empty partitions on the shuffled left side, it doesn't mean the original left side before shuffle also has many empty partitions. I think we need to demote.
@ekoifman can you open a followup PR to fix this issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. filed SPARK-37753
I'm reverting this to avoid mistakenly releasing a performance regression in Spark 3.3. Please resubmit this PR with SPARK-37753 resolved. |
…ynamicJoinSelection ### What changes were proposed in this pull request? ### Why are the changes needed? In the current implementation of DynamicJoinSelection the logic checks if one side of the join has high ratio of empty partitions and adds a NO_BROADCAST hint on that side since a shuffle join can short-circuit the local joins where one side is empty. This logic is doesn't make sense for all join type. For example, a Left Outer Join cannot short circuit if RHS is empty so we should not inhibit BHJ. On the other hand a LOJ executed as a shuffle join where the LHS has many empty can short circuit the local join so we should inhibit the BHJ because BHJ will use OptimizeShuffleWithLocalRead which will re-assemble LHS partitions as the were before the shuffle and thus may not have many empty ones any more. This supersedes [SPARK-37193](https://issues.apache.org/jira/browse/SPARK-37193) Also see previous discussion in #34464 (comment) ### Does this PR introduce _any_ user-facing change? It may change which joins run as BHJ vs shuffle joins ### How was this patch tested? Unit Tests Closes #35715 from ekoifman/SPARK-37753-enhance-DynamiJoinSelection. Authored-by: Eugene Koifman <eugene.koifman@workday.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
can you tell me ,What does short circuit local join mean,thanks |
@thomasg19930417 |
@ekoifman |
@ekoifman hi, when Inner Join and LHS has many empty partition ,why inner join not demote broadcast
|
…ynamicJoinSelection In the current implementation of DynamicJoinSelection the logic checks if one side of the join has high ratio of empty partitions and adds a NO_BROADCAST hint on that side since a shuffle join can short-circuit the local joins where one side is empty. This logic is doesn't make sense for all join type. For example, a Left Outer Join cannot short circuit if RHS is empty so we should not inhibit BHJ. On the other hand a LOJ executed as a shuffle join where the LHS has many empty can short circuit the local join so we should inhibit the BHJ because BHJ will use OptimizeShuffleWithLocalRead which will re-assemble LHS partitions as the were before the shuffle and thus may not have many empty ones any more. This supersedes [SPARK-37193](https://issues.apache.org/jira/browse/SPARK-37193) Also see previous discussion in #34464 (comment) It may change which joins run as BHJ vs shuffle joins Unit Tests Closes #35715 from ekoifman/SPARK-37753-enhance-DynamiJoinSelection. Authored-by: Eugene Koifman <eugene.koifman@workday.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
DynamicJoinSelection.shouldDemoteBroadcastHashJoin will prevent AQE from converting Sort merge join into a broadcast join because SMJ is faster when the side that would be broadcast has a lot of empty partitions.
This makes sense for inner joins which can short circuit if one side is empty.
For (left,right) outer join, the streaming side still has to be processed so demoting broadcast join doesn't have the same advantage.
Does this PR introduce any user-facing change?
Yes, it may cause AQE to choose BHJ more often than before with better performance
How was this patch tested?
Spark UTs
Also empirical evidence