-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-14269][SCHEDULER] Eliminate unnecessary submitStage() call. #12060
Conversation
…cal order to ensure building ancestor stages first.
This reverts commit 1636531.
This PR is based on #11720, so please check it first. |
Test build #54509 has finished for PR 12060 at commit
|
Seems that this optimization must need correct stage graphs fixed in #11720, is this correct? |
Yes, that's right. |
@@ -1247,7 +1252,7 @@ class DAGScheduler( | |||
} | |||
} | |||
|
|||
// Note: newly runnable stages will be submitted below when we submit waiting stages | |||
submitWaitingChildStages(shuffleStage) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be done when !shuffleStage.isAvailable and we have resubmitted the shuffleStage, or only within the else branch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@markhamstra Thank you for your review.
Definitely we can move this into else branch.
I'll modify it.
…ge becomes available.
Test build #54582 has finished for PR 12060 at commit
|
Jenkins, retest this please. |
Test build #54595 has finished for PR 12060 at commit
|
for (stage <- waitingStagesCopy.sortBy(_.firstJobId)) { | ||
val childStages = waitingStages.filter(_.parents.contains(parent)).toArray | ||
waitingStages --= childStages | ||
for (stage <- childStages.sortBy(_.firstJobId)) { | ||
submitStage(stage) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems submitWaitingChildStages
is called to submit child stages when the given parent
stage is available. From this observation, do we have to re-check missing parents inside submitStage
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, and the re-check is done in the submitStage()
.
If there are some missing parent stages, the child will go to waitingStages
again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahah, I see.
This pr seems great in terms of spark-core performance, so could you assign qualified guys to review this? cc: @rxin |
Test build #58529 has finished for PR 12060 at commit
|
Good PR, will it plan to be merged into branch-1.6? |
@zzcclp Not likely. This PR shouldn't produce any different results, but rather produces the same results faster. We're typically very conservative with patch-level releases, so the optimization work for this PR will almost certainly only appear in the Spark 2.x series. That's not too far off. |
@markhamstra , thanks for your explaintion. |
@@ -1357,7 +1345,6 @@ class DAGScheduler( | |||
logDebug("Additional executor lost message for " + execId + | |||
"(epoch " + currentEpoch + ")") | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it necessary to submit some newly-waiting stages here (e.g., if shuffle output was lost for a map stage, so now that map stage needs to be re-run)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This appears to be a non-issue, because we handle lost shuffle output separately, when we get a FetchFailure from a task that tries to fetch the output.
This LGTM with the small comment changes I suggested. @markhamstra any objections to this? Mark / @rxin thoughts on merging it into the 2.0 branch? |
@kayousterhout Thank you for your review. |
Test build #58659 has finished for PR 12060 at commit
|
Test build #59133 has finished for PR 12060 at commit
|
Jenkins, retest this please. |
Test build #59153 has finished for PR 12060 at commit
|
I merged this into master (not 2.0, since it's a performance problem rather than a correctness problem, and this isn't a regression). Thanks @ueshin! |
Thanks a lot for merging this! |
Currently a method `submitStage()` for waiting stages is called on every iteration of the event loop in `DAGScheduler` to submit all waiting stages, but most of them are not necessary because they are not related to Stage status. The case we should try to submit waiting stages is only when their parent stages are successfully completed. This elimination can improve `DAGScheduler` performance. Added some checks and other existing tests, and our projects. We have a project bottle-necked by `DAGScheduler`, having about 2000 stages. Before this patch the almost all execution time in `Driver` process was spent to process `submitStage()` of `dag-scheduler-event-loop` thread but after this patch the performance was improved as follows: | | total execution time | `dag-scheduler-event-loop` thread time | `submitStage()` | |--------|---------------------:|---------------------------------------:|----------------:| | Before | 760 sec | 710 sec | 667 sec | | After | 440 sec | 14 sec | 10 sec | Author: Takuya UESHIN <ueshin@happy-camper.st> Closes apache#12060 from ueshin/issues/SPARK-14269.
Currently a method `submitStage()` for waiting stages is called on every iteration of the event loop in `DAGScheduler` to submit all waiting stages, but most of them are not necessary because they are not related to Stage status. The case we should try to submit waiting stages is only when their parent stages are successfully completed. This elimination can improve `DAGScheduler` performance. Added some checks and other existing tests, and our projects. We have a project bottle-necked by `DAGScheduler`, having about 2000 stages. Before this patch the almost all execution time in `Driver` process was spent to process `submitStage()` of `dag-scheduler-event-loop` thread but after this patch the performance was improved as follows: | | total execution time | `dag-scheduler-event-loop` thread time | `submitStage()` | |--------|---------------------:|---------------------------------------:|----------------:| | Before | 760 sec | 710 sec | 667 sec | | After | 440 sec | 14 sec | 10 sec | Author: Takuya UESHIN <ueshin@happy-camper.st> Closes apache#12060 from ueshin/issues/SPARK-14269.
## What changes were proposed in this pull request? Currently a method `submitStage()` for waiting stages is called on every iteration of the event loop in `DAGScheduler` to submit all waiting stages, but most of them are not necessary because they are not related to Stage status. The case we should try to submit waiting stages is only when their parent stages are successfully completed. This elimination can improve `DAGScheduler` performance. ## How was this patch tested? Added some checks and other existing tests, and our projects. We have a project bottle-necked by `DAGScheduler`, having about 2000 stages. Before this patch the almost all execution time in `Driver` process was spent to process `submitStage()` of `dag-scheduler-event-loop` thread but after this patch the performance was improved as follows: | | total execution time | `dag-scheduler-event-loop` thread time | `submitStage()` | |--------|---------------------:|---------------------------------------:|----------------:| | Before | 760 sec | 710 sec | 667 sec | | After | 440 sec | 14 sec | 10 sec | Author: Takuya UESHIN <ueshin@happy-camper.st> Closes apache#12060 from ueshin/issues/SPARK-14269.
## What changes were proposed in this pull request? Currently a method `submitStage()` for waiting stages is called on every iteration of the event loop in `DAGScheduler` to submit all waiting stages, but most of them are not necessary because they are not related to Stage status. The case we should try to submit waiting stages is only when their parent stages are successfully completed. This elimination can improve `DAGScheduler` performance. ## How was this patch tested? Added some checks and other existing tests, and our projects. We have a project bottle-necked by `DAGScheduler`, having about 2000 stages. Before this patch the almost all execution time in `Driver` process was spent to process `submitStage()` of `dag-scheduler-event-loop` thread but after this patch the performance was improved as follows: | | total execution time | `dag-scheduler-event-loop` thread time | `submitStage()` | |--------|---------------------:|---------------------------------------:|----------------:| | Before | 760 sec | 710 sec | 667 sec | | After | 440 sec | 14 sec | 10 sec | Author: Takuya UESHIN <ueshin@happy-camper.st> Closes apache#12060 from ueshin/issues/SPARK-14269.
## What changes were proposed in this pull request? Currently a method `submitStage()` for waiting stages is called on every iteration of the event loop in `DAGScheduler` to submit all waiting stages, but most of them are not necessary because they are not related to Stage status. The case we should try to submit waiting stages is only when their parent stages are successfully completed. This elimination can improve `DAGScheduler` performance. ## How was this patch tested? Added some checks and other existing tests, and our projects. We have a project bottle-necked by `DAGScheduler`, having about 2000 stages. Before this patch the almost all execution time in `Driver` process was spent to process `submitStage()` of `dag-scheduler-event-loop` thread but after this patch the performance was improved as follows: | | total execution time | `dag-scheduler-event-loop` thread time | `submitStage()` | |--------|---------------------:|---------------------------------------:|----------------:| | Before | 760 sec | 710 sec | 667 sec | | After | 440 sec | 14 sec | 10 sec | Author: Takuya UESHIN <ueshin@happy-camper.st> Closes apache#12060 from ueshin/issues/SPARK-14269.
What changes were proposed in this pull request?
Currently a method
submitStage()
for waiting stages is called on every iteration of the event loop inDAGScheduler
to submit all waiting stages, but most of them are not necessary because they are not related to Stage status.The case we should try to submit waiting stages is only when their parent stages are successfully completed.
This elimination can improve
DAGScheduler
performance.How was this patch tested?
Added some checks and other existing tests, and our projects.
We have a project bottle-necked by
DAGScheduler
, having about 2000 stages.Before this patch the almost all execution time in
Driver
process was spent to processsubmitStage()
ofdag-scheduler-event-loop
thread but after this patch the performance was improved as follows:dag-scheduler-event-loop
thread timesubmitStage()