[SPARK-31134][SQL] optimize skew join after shuffle partitions are coalesced#27893
[SPARK-31134][SQL] optimize skew join after shuffle partitions are coalesced#27893cloud-fan wants to merge 5 commits intoapache:masterfrom
Conversation
|
Test build #119721 has finished for PR 27893 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
Show resolved
Hide resolved
| val sizes = mapStats.bytesByPartitionId | ||
| val partitions = partitionSpecs.map { | ||
| case spec @ CoalescedPartitionSpec(start, end) => | ||
| var sum = 0L |
There was a problem hiding this comment.
nit: sizes.slice(start, end).sum?
There was a problem hiding this comment.
slice will create a new array, which is less efficient.
| val mapStartIndices = getMapStartIndices(left, partitionIndex, leftTargetSize) | ||
| if (mapStartIndices.length > 1) { | ||
| val CoalescedPartitionSpec(start, end) = left.partitions(partitionIndex)._1 | ||
| assert(start + 1 == end, "coalesced partition should never be skewed.") |
There was a problem hiding this comment.
First of all, our factor check is not strict enough as being "> 0", what happens here if it's set to "1"?
Second, the assert is usually disabled in production, which could lead to errors later in this code.
We should probably make it more robust by putting this condition into isSkew. And you can still add such an assertion in isSkew implementation.
| val rightSize = rightStats.bytesByPartitionId(partitionIndex) | ||
| val rightSize = rightSizes(partitionIndex) | ||
| val isRightSkew = isSkewed(rightSize, rightMedSize) && canSplitRight | ||
| if (isLeftSkew || isRightSkew) { |
There was a problem hiding this comment.
Not related to this PR, but I think we can remove this outer if now.
| val rightParts = if (isRightSkew) { | ||
| val mapStartIndices = getMapStartIndices(right, partitionIndex, rightTargetSize) | ||
| if (mapStartIndices.length > 1) { | ||
| val CoalescedPartitionSpec(start, end) = right.partitions(partitionIndex)._1 |
There was a problem hiding this comment.
The code in the calculation of leftParts and rightParts is almost same. It is better to wrap the code in a method.
5671ce2 to
d7e55e8
Compare
|
Test build #119840 has finished for PR 27893 at commit
|
|
Test build #119837 has finished for PR 27893 at commit
|
|
Test build #119839 has finished for PR 27893 at commit
|
|
retest this please |
|
Test build #119841 has finished for PR 27893 at commit
|
| val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex | ||
|
|
||
| // Ideally a skewed partition won't get coalesced, but skip it here for safety. | ||
| val leftParts = if (isLeftSkew && !isLeftCoalesced) { |
There was a problem hiding this comment.
@JkSelf I tried to create a common method to handle both sides, but the method takes too many parameters so I give up. Besides, it's not much duplicated code here.
| val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1 | ||
| val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex | ||
|
|
||
| // Ideally a skewed partition won't get coalesced, but skip it here for safety. |
There was a problem hiding this comment.
nit: A skewed partition should never be coalesced, but skip it here just to be safe.
|
Test build #119878 has finished for PR 27893 at commit
|
|
Test build #119881 has finished for PR 27893 at commit
|
|
Test build #119883 has finished for PR 27893 at commit
|
| val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1 | ||
| val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex | ||
|
|
||
| // A skewed partition should never be coalesced, but skip it here just to be safe. |
There was a problem hiding this comment.
Say we have original map output: 100, 10, 2000, and the coalesce target is 100. So, after CoalesceShufflePartitions, we shall have CoalescedPartitionSpec(0, 1) and CoalescedPartitionSpec(1, 3). Then, we start to apply OptimizeSkewedJoin where CoalescedPartitionSpec(1, 3) is obviously skewed but can be missed. Right?
There was a problem hiding this comment.
I don't think the coalesce rule will coalesce 10 and 2000, can you double check?
There was a problem hiding this comment.
Oh yeah, I checked, you're right!
|
LGTM |
|
Thanks! Merged to master/3.0 |
…alesced ### What changes were proposed in this pull request? Run the `OptimizeSkewedJoin` rule after the `CoalesceShufflePartitions` rule. ### Why are the changes needed? Remove duplicated coalescing code in `OptimizeSkewedJoin`. ### Does this PR introduce any user-facing change? No ### How was this patch tested? existing tests Closes #27893 from cloud-fan/aqe. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: gatorsmile <gatorsmile@gmail.com> (cherry picked from commit 30d9535) Signed-off-by: gatorsmile <gatorsmile@gmail.com>
…alesced ### What changes were proposed in this pull request? Run the `OptimizeSkewedJoin` rule after the `CoalesceShufflePartitions` rule. ### Why are the changes needed? Remove duplicated coalescing code in `OptimizeSkewedJoin`. ### Does this PR introduce any user-facing change? No ### How was this patch tested? existing tests Closes apache#27893 from cloud-fan/aqe. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: gatorsmile <gatorsmile@gmail.com>
What changes were proposed in this pull request?
Run the
OptimizeSkewedJoinrule after theCoalesceShufflePartitionsrule.Why are the changes needed?
Remove duplicated coalescing code in
OptimizeSkewedJoin.Does this PR introduce any user-facing change?
No
How was this patch tested?
existing tests