-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-33701][SHUFFLE] Adaptive shuffle merge finalization for push-based shuffle #33896
Conversation
Please add corresponding empty commits from authors of the original internal patch and tag that information in the PR description. |
10b7896
to
0bd5f4a
Compare
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #143358 has finished for PR 33896 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #143423 has finished for PR 33896 at commit
|
Also regarding the comment in the JIRA that not to finalize when there are no blocks pushed due to the threshold. I almost implemented that change, but then I thought anyway we are having a timeout ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this @venkata91 !
Took first pass through the PR, yet to go over the test suite.
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
task.cancel(false) | ||
// The current task should be coming from handleShufflePushCompleted, thus the | ||
// delay should be 0 and registerMergeResults should be true. | ||
assert(delay == 0 && registerMergeResults) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this out of the if
condition ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this not be the case outside of the if
itself ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah since we are scheduling the finalize only if there is no scheduled task when the stage completes (one that checks if totalShuffleSize < shuffleMergeWaitMinSizeThreshold
. We can move this assertion outside. Will make the change.
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
Test build #144974 has finished for PR 33896 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #144975 has finished for PR 33896 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #146216 has finished for PR 33896 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #146261 has finished for PR 33896 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #146301 has finished for PR 33896 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just had a few minor comments, thanks for working on this @venkata91 - looks pretty good to me !
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Outdated
Show resolved
Hide resolved
@Ngone51 I am fine with the pr (barring the minor testcase related comments). |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #146403 has finished for PR 33896 at commit
|
Not sure why this test is failing |
I tried running it locally that succeeded though. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me.
+CC @Ngone51
Thanks for working on this @venkata91 ! |
…ased shuffle As part of SPARK-32920 implemented a simple approach to finalization for push-based shuffle. Shuffle merge finalization is the final operation happens at the end of the stage when all the tasks are completed asking all the external shuffle services to complete the shuffle merge for the stage. Once this request is completed no more shuffle pushes will be accepted. With this approach, `DAGScheduler` waits for a fixed time of 10s (`spark.shuffle.push.finalize.timeout`) to allow some time for the inflight shuffle pushes to complete, but this adds additional overhead to stages with very little shuffles. In this PR, instead of waiting for fixed amount of time before shuffle merge finalization now this is controlled adaptively if min threshold number of map tasks shuffle push (`spark.shuffle.push.minPushRatio`) completed then shuffle merge finalization will be scheduled. Also additionally if the total shuffle generated is lesser than min threshold shuffle size (`spark.shuffle.push.minShuffleSizeToWait`) then immediately shuffle merge finalization is scheduled. This is a performance improvement to the existing functionality Yes additional user facing configs `spark.shuffle.push.minPushRatio` and `spark.shuffle.push.minShuffleSizeToWait` Added unit tests in `DAGSchedulerSuite`, `ShuffleBlockPusherSuite` Lead-authored-by: Min Shen <mshenlinkedin.com> Co-authored-by: Venkata krishnan Sowrirajan <vsowrirajanlinkedin.com> Closes #33896 from venkata91/SPARK-33701. Lead-authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com> Co-authored-by: Min Shen <mshen@linkedin.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
What changes were proposed in this pull request?
As part of SPARK-32920 implemented a simple approach to finalization for push-based shuffle. Shuffle merge finalization is the final operation happens at the end of the stage when all the tasks are completed asking all the external shuffle services to complete the shuffle merge for the stage. Once this request is completed no more shuffle pushes will be accepted. With this approach,
DAGScheduler
waits for a fixed time of 10s (spark.shuffle.push.finalize.timeout
) to allow some time for the inflight shuffle pushes to complete, but this adds additional overhead to stages with very little shuffles.In this PR, instead of waiting for fixed amount of time before shuffle merge finalization now this is controlled adaptively if min threshold number of map tasks shuffle push (
spark.shuffle.push.minPushRatio
) completed then shuffle merge finalization will be scheduled. Also additionally if the total shuffle generated is lesser than min threshold shuffle size (spark.shuffle.push.minShuffleSizeToWait
) then immediately shuffle merge finalization is scheduled.Why are the changes needed?
This is a performance improvement to the existing functionality
Does this PR introduce any user-facing change?
Yes additional user facing configs
spark.shuffle.push.minPushRatio
andspark.shuffle.push.minShuffleSizeToWait
How was this patch tested?
Added unit tests in
DAGSchedulerSuite
,ShuffleBlockPusherSuite
Lead-authored-by: Min Shen mshen@linkedin.com
Co-authored-by: Venkata krishnan Sowrirajan vsowrirajan@linkedin.com