-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-13902][SCHEDULER] Make DAGScheduler.getAncestorShuffleDependencies() return in topological order to ensure building ancestor stages first. #11720
Conversation
…cal order to ensure building ancestor stages first.
Test build #53181 has finished for PR 11720 at commit
|
Test build #53265 has finished for PR 11720 at commit
|
Test build #53318 has finished for PR 11720 at commit
|
Test build #53425 has finished for PR 11720 at commit
|
* Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet, | ||
* in topological order to ensure building ancestor stages first. | ||
*/ | ||
private def getAncestorShuffleDependencies(rdd: RDD[_]): Seq[ShuffleDependency[_, _, _]] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ISTM it'd be better to check illegal overwrites in shuffleToMapStage
like assert(!shuffleToMapStage.get(dep.shuffleId).isDefined)
in the caller site https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L289
.
@ueshin Great work. I'm not 100% sure though, one question; do we get wrong answers from this kind of incorrect stage graphs? If so, it'd be better to add tests in |
@maropu Thank you for your review.
I don't think this kind of incorrect stage graphs causes the wrong answers for now. |
cc: @rxin |
Test build #54298 has finished for PR 11720 at commit
|
Jenkins, retest this please. |
@@ -286,6 +286,7 @@ class DAGScheduler( | |||
case None => | |||
// We are going to register ancestor shuffle dependencies | |||
getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep => | |||
assert(!shuffleToMapStage.get(dep.shuffleId).isDefined) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are you asserting this when you are already within a match case where this must be true? If it is actually possible for the contents of shuffleToMapStage to change between the pattern match and the foreach, then we need to fix the unsynchronized update, not just shutdown Spark on a failed assertion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait... sorry, let me rethink this a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original codes possibly overwrite entries in shuffleToMapStage
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but it is not immediately obvious that that is inappropriate. I need to spend some time re-familiarizing myself with newOrUsedShuffleStage. In any event, just failing an assertion in the middle of the DAGScheduler is not likely something we want to do. At a bare minimum, we'd want to be logging a more useful error message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overwriting entries in shuffleToMapStage
definitely isn't an outright error -- we should still get correct results after the overwrite; so we shouldn't be adding this new assertion to change an evaluation path that was generating correct results into an error condition.
There are other open efforts to clean up this creation of additional Stages that will be skipped instead of just re-using the prior-executed Stage more cleanly. That issue is orthogonal to the generation of a topological ordering of the dependencies, so I'd prefer to handle it outside of this PR.
cc @squito
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@markhamstra Thank you for your review.
I'll remove the assertion for now but notice that there is a case that some stages refer the overwritten stage as its parent by the overwrite, which is undesirable for DAGScheduler
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I tend to agree that the way what really should be the same Stage gets recreated as a new Stage is undesirable in the DAGScheduler, but the more important point is that, even though it is not optimal, it does produce correct results, and we can't change that for the sake of code that seems more desirable.
That's not to say that some of the efforts that were begun to fix this kind of behavior while still generating correct results shouldn't be driven to completion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me show you another PR to improve DAGScheduler
performance based on this PR.
The basic idea is a good one, but in addition to needing to spend some time sorting out the logic around that |
Test build #54319 has finished for PR 11720 at commit
|
This reverts commit 1636531.
Test build #54407 has finished for PR 11720 at commit
|
@markhamstra |
I'm going to close this in favor of #12060. |
What changes were proposed in this pull request?
DAGScheduler
sometimes generate incorrect stage graph.Some stages are generated for the same shuffleId twice or more and they are referenced by the child stages because the building order of the graph is not correct.
This patch is fixing it.
How was this patch tested?
I added the sample RDD graph to show the illegal stage graph to
DAGSchedulerSuite
.