-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-36652][SQL] AQE dynamic join selection should not apply to non-equi join #33899
Conversation
@cloud-fan and @ulysses-you could you help take a look when you have time? Thanks. |
withLogAppender(hintAppender, level = Some(Level.WARN)) { | ||
withSQLConf( | ||
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", | ||
SQLConf.ADAPTIVE_MAX_SHUFFLE_HASH_JOIN_LOCAL_MAP_THRESHOLD.key -> "67108865") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's this magic number?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's due to advisoryPartitionSize <= maxShuffledHashJoinLocalMapThreshold
in https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala#L48. spark.sql.adaptive.advisoryPartitionSizeInBytes
by default is 64MB, which is 67108864. +1 one here to trigger the bug. This is just an example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can change the number to 64MB
if it's preferred.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea '64MB' seems better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - sure, updated.
is it only a log issue? |
@cloud-fan - yes I think so. We just logged a warning for these wrongly-applied hint. So it should not be a blocker for release. |
@c21 thank you for the fix, I think this issue only exists in master branch since the hint check #32355 is only merged into master after branch-3.2 cut. The origin idea of |
@@ -169,7 +169,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { | |||
} | |||
|
|||
private def checkHintNonEquiJoin(hint: JoinHint): Unit = { | |||
if (hintToShuffleHashJoin(hint) || hintToSortMergeJoin(hint)) { | |||
if (hintToShuffleHashJoin(hint) || hintToPreferShuffleHashJoin(hint) || | |||
hintToSortMergeJoin(hint)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PreferShuffleHashJoin
is an internal hint so user cann't use it, we don't need do this check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I am aware of this hint is internal only. I am more thinking towards to catch and expose the bug where we apply PREFER_SHUFFLE_HASH
internally by mistake (e.g. for the added unit test query in JoinHintSuite.scala
). I feel it's no harm to add here and help catch more bugs in the future. But if you guys think we should not add it, I can also remove it. cc @cloud-fan .
cc @gengliangwang , too |
then we should update the affected version in jira SPARK-36652 |
Yep updated. Sorry for confusion @gengliangwang. |
Thank you for updating the affected version. :) |
…SHOLD.key to 64MB
Kubernetes integration test starting |
Kubernetes integration test status failure |
@@ -169,7 +169,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { | |||
} | |||
|
|||
private def checkHintNonEquiJoin(hint: JoinHint): Unit = { | |||
if (hintToShuffleHashJoin(hint) || hintToSortMergeJoin(hint)) { | |||
if (hintToShuffleHashJoin(hint) || hintToPreferShuffleHashJoin(hint) || | |||
hintToSortMergeJoin(hint)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. indentation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongjoon-hyun - sure, updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM (with one nit comment)
Test build #142939 has finished for PR 33899 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
Thank you, @c21 , @cloud-fan , @ulysses-you , @gengliangwang . |
Thank you all for review! |
Test build #142948 has finished for PR 33899 at commit
|
Refer to this link for build results (access rights to CI server needed): |
What changes were proposed in this pull request?
Currently
DynamicJoinSelection
has two features: 1.demote broadcast hash join, and 2.promote shuffled hash join. Both are achieved by adding join hint in query plan, and only works for equi join. However the rule is matching withJoin
operator now, so it would add hint for non-equi join by mistake (See added test query inJoinHintSuite.scala
for an example).This PR is to fix
DynamicJoinSelection
to only apply to equi-join, and improvecheckHintNonEquiJoin
to check we should not addPREFER_SHUFFLE_HASH
for non-equi join.Why are the changes needed?
Improve the logic of codebase to be better.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Added unit test in
JoinHintSuite.scala
.