-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-35282][SQL] Support AQE side shuffled hash join formula #32450
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @maropu @cloud-fan @maryannxue @c21 do you have any thought about this new config ? |
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
| s"${PREFER_SORTMERGEJOIN.key} is false.") | ||
| .version("3.2.0") | ||
| .bytesConf(ByteUnit.BYTE) | ||
| .createWithDefaultString("64MB") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious why we choose this default value? to be same as spark.sql.adaptive.shuffle.targetPostShuffleInputSize?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main idea is that the default skew join size is 256MB and the local map should smaller 3x(follow the existed formula) than other side. So assume the local map size is 64MB and other side is 192MB.
| isRuntime: Boolean = false, | ||
| mapOutputStatistics: Option[MapOutputStatistics] = None) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel it's a bit weird that Statistics has a field MapOutputStatistics where MapOutputStatistics is a physical shuffle operator only thing, but Statistics is for all logical operators. Maybe we can have:
RunTimeStatsSpec(
isRuntime: Boolean,
sizeInBytesPerPartition: Option[Array[Long]]
)
Statistics(
runTimeStatsSpec: Option[RunTimeStatsSpec] = None,
...
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have the similar thought and the change you point out seems a better approach. What do you think about ? @maropu @cloud-fan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, we took anther approach to support SHJ in AQE. We added a rule in AdaptiveSparkPlanExec to convert SMJ to SHJ according to shuffle stats, which requires no changes in Statistics.scala as the statistics is ready in ShuffleStageInfo.
The SMJ could also be converted to SHJ if applicable even if PREFER_SORTMERGE is set. cc @Liulietong
cc @luuliietong
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We added a rule in AdaptiveSparkPlanExec to convert SMJ to SHJ according to shuffle stats
This looks like a better idea. Do you want to open a PR for it?
|
Test build #138192 has finished for PR 32450 at commit
|
| * Note: this assume that the number of partition is fixed, requires additional work if it's | ||
| * dynamic. | ||
| * In AQE framework, we use runtime statistics to check if we can build local map. Only if | ||
| * all the partition size not large than `ADAPTIVE_SHUFFLE_HASH_JOIN_LOCAL_MAP_THRESHOLD`, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
size not large -> size is not larger
| * dynamic. | ||
| * In AQE framework, we use runtime statistics to check if we can build local map. Only if | ||
| * all the partition size not large than `ADAPTIVE_SHUFFLE_HASH_JOIN_LOCAL_MAP_THRESHOLD`, | ||
| * we allow to build local hash map. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
build local -> build a local
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #138250 has finished for PR 32450 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #138620 has finished for PR 32450 at commit
|
What changes were proposed in this pull request?
Use runtime statistics to decide if we can convert join to shuffled hash join.
Why are the changes needed?
Use AQE runtime statistics to decide if we can use shuffled hash join instead of sort merge join. Currently, the formula of shuffled hash join selection dose not work due to the dymanic shuffle partition number.
Add a new config
spark.sql.adaptive.shuffledHashJoinLocalMapThresholdto decide if join can be converted to shuffled hash join safely.Does this PR introduce any user-facing change?
Yes, add a new config.
How was this patch tested?
Add new test.