-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-4654] Clean up DAGScheduler getMissingParentStages / stageDependsOn methods #3515
[SPARK-4654] Clean up DAGScheduler getMissingParentStages / stageDependsOn methods #3515
Conversation
Test build #23950 has started for PR 3515 at commit
|
@@ -401,7 +370,7 @@ class DAGScheduler( | |||
val s = stages.head | |||
s.jobIds += jobId | |||
jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) += s.id | |||
val parents: List[Stage] = getParentStages(s.rdd, jobId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might look like this changes the behavior of this method, since getParentStages
will create any parent stages that are missing. However, I think that this call never ended up taking the "create a missing stage" branch because stage
's parent stages should have already been created before it was created, since getParentStages(stage.rdd, jobId)
should have been called from the newStage
method: https://github.com/JoshRosen/spark/blob/dagscheduler-missingparents-cleanup/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L247
Test build #23950 timed out for PR 3515 at commit |
Test FAILed. |
@@ -401,7 +370,7 @@ class DAGScheduler( | |||
val s = stages.head | |||
s.jobIds += jobId | |||
jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) += s.id | |||
val parents: List[Stage] = getParentStages(s.rdd, jobId) | |||
val parents: List[Stage] = stage.parents | |||
val parentsWithoutThisJobId = parents.filter { ! _.jobIds.contains(jobId) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't bother binding a local anymore, so just:
val parentsWithoutThisJobId = stage.parents.filter { ! _.jobIds.contains(jobId) }
Test build #23954 has started for PR 3515 at commit
|
Test build #23954 timed out for PR 3515 at commit |
Test FAILed. |
It looks like the tests have failed twice with the same error, so this looks like it might be a legitimate bug. That would be surprising, since it would seem to indicate that there's some latent complexity in the old code that I overlooked. I'll investigate this tomorrow if I have time. |
I reproduced the failed testing locally and took a look at the log. The failed test case ("awaitTermination with error in task") is to check if task failure is successfully captured by the system. But it seems that DAGScheduler doesn't fail the job although its task fails. In my log, I saw "Ignoring failure of Stage 0 because all jobs depending on it are done" which is printed at the end of abortStage() of DAGScheduler. So the job is not aborted and the failure is not captured as expected. The reason is that after the pull request is applied, the DAGScheduler cannot correctly create the dependency between the failed rdd and the job. I'm digging the cause of that. |
I haven't had a chance to dig into this much more, but perhaps this could be due to streaming checkpointing; if RDDs' dependencies change after checkpointing, then that might mean that we need to re-walk the stage / dependency graph rather than relying on the cached results of the earlier traversal. |
Looks like this has gone stale so I'd like to close this issue pending an update form @JoshRosen |
DAGScheduler has getMissingParentStages() and stageDependsOn() methods which are suspiciously similar to getParentStages().
Both of these methods perform traversals of the RDD / Stage graph to inspect parent stages. We can remove both of these methods, though: the set of parent stages is known when a Stage instance is constructed and is stored in Stage.parents, so we can just check for missing stages by looking for unavailable stages in Stage.parents. Similarly, we can determine whether one stage depends on another by searching Stage.parents rather than performing a graph traversal from scratch.