-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-15927] Eliminate redundant DAGScheduler code. #13646
Conversation
I think that @cloud-fan proposed a similar change in #4134, so I think we might be able to resolve SPARK-5374 as a duplicate of this JIRA. |
toVisit.dependencies.foreach { | ||
case shuffleDep: ShuffleDependency[_, _, _] => | ||
parents += shuffleDep | ||
case dependency: Any => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can remove : Any
LGTM |
Test build #60419 has finished for PR 13646 at commit
|
Test build #60432 has finished for PR 13646 at commit
|
} | ||
} | ||
} | ||
parents |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while you're touching this, would be nice to rename parents
to ancestors
here.
lgtm. I realized there isn't really any test for going through a mix of narrow and shuffle dependencies -- might be nice to add this (it passes w/ this change). You would need to change the visibility of /**
* Ensure we get all parent shuffle stages. Walk back all narrow dependencies, but don't go
* past any shuffle dependencies (parent stages only, not ancestor stages).
*/
test("getShuffleDependencies") {
val rddA = new MyRDD(sc, 2, Nil)
val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(1))
val rddB = new MyRDD(sc, 2, Nil)
val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(1))
val rddC = new MyRDD(sc, 1, List(shuffleDepB))
val shuffleDepC = new ShuffleDependency(rddC, new HashPartitioner(1))
val rddD = new MyRDD(sc, 1, List(shuffleDepC))
val narrowDepD = new OneToOneDependency(rddD)
val finalRdd = new MyRDD(sc, 1, List(shuffleDepA, narrowDepD), tracker = mapOutputTracker)
assert(scheduler.getShuffleDependencies(rddA) === Set())
assert(scheduler.getShuffleDependencies(rddB) === Set())
assert(scheduler.getShuffleDependencies(rddC) === Set(shuffleDepB))
assert(scheduler.getShuffleDependencies(rddD) === Set(shuffleDepC))
assert(scheduler.getShuffleDependencies(finalRdd) === Set(shuffleDepA, shuffleDepC))
} |
LGTM, but I agree with Imran's renaming suggestion, and his new test looks good. |
Thanks for the comments / test @squito! Updated the naming and the test. Once this is merged, I have another DAGScheduler PR that does some more renaming / commenting / re-organizing to try to make some of the code easier to read / understand. |
Test build #60509 has finished for PR 13646 at commit
|
Jenkins, retest this please |
(@squito this looks like another timeout in the basic scheduler integration suite? I tried it locally and it passed) |
lgtm! not sure about the test timeout. I will take a look, but in any case I'm confident its not from this change. I've tried running that test in a loop but haven't triggered a failure in over 10k trials so far :( |
Test build #60518 has finished for PR 13646 at commit
|
Merged into master (not 2.0). |
@kayousterhout btw I tracked down the test issue. It wasn't safe to check the map output status inside the backend in those tests -- I addressed that here: 91ea3df. Also the real problem was somewhat hidden b/c the assertion was failing in another thread, which I fixed here: e35a7d3 I ran the tests over 15k times on my laptop and still never triggered it, though. wish I had a better way to find these issues. |
It looks like a bunch of the tests in Is there any way that we can hotfix / merge in the small fix for this sooner rather than later, perhaps by separating it from the blacklist test PR into its own PR? |
yikes, thanks for pointing this out @JoshRosen . Sorry I was wrong about the failure being unrelated -- it didn't create a new bug, but took an old rare problem and made it far more common. I have opened #13688. |
Ah shoot I see why this would make the stage ID numbering potentially different. Sorry about that!! |
What changes were proposed in this pull request?
To try to eliminate redundant code to traverse the RDD dependency graph, this PR creates a new function getShuffleDependencies that returns shuffle dependencies that are immediate parents of a given RDD. This new function is used by getParentStages and getAncestorShuffleDependencies.
cc @squito @markhamstra
FYI @rxin