New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-26543][SQL] Support the coordinator to determine post-shuffle partitions more reasonably #23490
Conversation
@cloud-fan @carsonwang Could you please have a look at this? |
ok to test |
@@ -178,11 +178,17 @@ class ExchangeCoordinator( | |||
// If including the nextShuffleInputSize would exceed the target partition size, then start a | |||
// new partition. | |||
if (i > 0 && postShuffleInputSize + nextShuffleInputSize > targetPostShuffleInputSize) { | |||
partitionStartIndices += i | |||
if (postShuffleInputSize != 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you put comments here about the case you talked about in the PR description?
// filter the last indice which will split the postShuffleInputSize=0 | ||
if (postShuffleInputSize == 0 && i == numPreShufflePartitions - 1) { | ||
partitionStartIndices.remove(partitionStartIndices.size - 1) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move this check outside the while
loop?
@maropu fix that ,thank you. |
Test build #101261 has finished for PR 23490 at commit
|
Test build #101264 has finished for PR 23490 at commit
|
Can one of the admins verify this patch? |
@dongjoon-hyun @maropu This patch merges cleanly before this, but today it indicates that "This branch has conflicts that must be resolved", Could you please have a look at this? |
You need to resolve the conflict by a git-rebase command. |
We're closing this PR because it hasn't been updated in a while. If you'd like to revive this PR, please reopen it! |
@southernriver can you check if this problem still exists in the new AQE framework? |
What changes were proposed in this pull request?
For SparkSQL ,when we open AE by 'set spark.sql.adapative.enable=true',the ExchangeCoordinator will introduced to determine the number of post-shuffle partitions. But in some certain conditions,the coordinator performed not very well, there are always some tasks retained and they worked with Shuffle Read Size / Records 0.0B/0 ,We could increase the spark.sql.adaptive.shuffle.targetPostShuffleInputSize to solve this,but this action is unreasonable as targetPostShuffleInputSize Should not be set too large. As follow:
We could reproduce this problem easily with the SQL:
set spark.sql.adaptive.enabled=true;
spark.sql.shuffle.partitions 100;
spark.sql.adaptive.shuffle.targetPostShuffleInputSize 33554432 ;
SELECT a,COUNT(1) FROM TABLE GROUP BY a DISTRIBUTE BY cast(rand()* 10 as bigint)
before fix:
after fix:
How was this patch tested?
manual and unit tests