-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-5259][CORE] don't submit stage until its dependencies map outputs are registered #7699
Conversation
…s true while stage was retry
…hile stageId and partId are same
This work was primarily done by @suyanNone in #4055. I am just trying to address a few remaining issues since this is a really important fix to get in for 1.5. |
Test build #38555 has finished for PR 7699 at commit
|
@kayousterhout @markhamstra (if you do merge, please note I'm not the primary author) |
... sorry for not update instantly... I am working busy off-line. it's nice for you to refine the code for #4055. |
no problem @suyanNone . Sorry if I am being a little too eager on this one, but I just really want to get it into spark 1.5, and that deadline is coming up fast. You did great work discovering this and fixing, I feel bad it sat open for so long. |
@kayousterhout @markhamstra @pwendell @mateiz another one still waiting. Mostly got buy-in from Mark on #4055, but I'd still appreciate somebody else looking, especially since I was more involved in the code for this |
Test build #41380 has finished for PR 7699 at commit
|
Test build #41381 has finished for PR 7699 at commit
|
BTW I'd rename this JIRA or at least expand the PR description to say "track pending tasks by partition ID instead of Task objects". Otherwise it really doesn't explain how this fixes the reported problem (and it's really meant to be fixing a wider class of problems caused by this). |
|
||
// What should happen now is that we submit stage 2. However, we might not see an error | ||
// b/c of DAGScheduler's error handling (it tends to swallow errors and just log them). But | ||
// we can check some conditions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of doing this, why not add a mechanism to grab the errors during tests? Doesn't have to be in this PR but it would make more sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea, I've created https://issues.apache.org/jira/browse/SPARK-10248
Test build #41554 has finished for PR 7699 at commit
|
@mateiz I'm a bit confused by:
I'm not sure what the wider class of problem is -- the only thing I know of is this issue, that the the downstream stage gets executed before the map outputs are registered for the previous stages. If there are other issues, should we add more tests? I can definitely add more test cases if you can give me the general idea of what else you think this will fix. |
@markhamstra @mateiz thanks for taking a look, I think I've addressed your concerns. However, the last round of comments made me realize that there is probably still an issue -- after we register the map output for stage 1, and start executing stage 2, I think we'll still have a pending task set for stage 1 that is non-zombie. You'll probably get pretty confusing behavior if you still see lots of tasks completing for stage 1, and you're very likely to run into SPARK-8029. On one hand, we can't eliminate this completely, since both attempts can be running the same partition at the same time (so no matter what SPARK-8029 is a possibility). But I feel like we should at least mark the attempt as zombie to avoid running even more tasks, just to reduce the possibility, make the output a little more understandable, and also avoid wasting resources by running tasks that aren't needed. I think testing that is going to be a little tricky, since it involves interaction between |
Test build #41565 has finished for PR 7699 at commit
|
Regarding the "wider class of problems", I just meant that the core problem here seems to be that tasks don't get identified correctly. This also seems to affect other code paths, e.g. if a task is Resubmitted. Would it be possible to test for that right now without additional test infrastructure? Otherwise, I think it's fine to merge this as is, but we definitely have to "zombify" the old stage. If you want to work on that, it would be great if you could write a doc that explains the state transitions and, in particular, shows that the stage will always become a zombie if we submit another copy of it. Otherwise it's hard to reason from just a patch. |
@mateiz thanks, I think I see now -- I should be able to add a unit test for that without much work. I'm not sure whether or not that was broken before this fix, but I'll check. (Looks like there aren't any existing tests which involve and I may be wrong about this "zombie" problem ... but I will keep looking into that. |
@mateiz I've added a test case for task resubmission -- that test case passes before these changes, because the same |
Test build #41795 has finished for PR 7699 at commit
|
btw I've opened https://issues.apache.org/jira/browse/SPARK-10370 to deal with zombifying the running task set after the map output is available |
@squito Spark-10370 , eh...I think I already resolved it few month ago in my local env... but it's based in spark 1.3.0... |
Thanks, this makes sense. Anyway this PR looks good to me. |
…erSuite This is pretty minor, just trying to improve the readability of `DAGSchedulerSuite`, I figure every bit helps. Before whenever I read this test, I never knew what "should work" and "should be ignored" really meant -- this adds some asserts & updates comments to make it more clear. Also some reformatting per a suggestion from markhamstra on #7699 Author: Imran Rashid <irashid@cloudera.com> Closes #8434 from squito/SPARK-10247.
Conflicts: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Test build #42758 has finished for PR 7699 at commit
|
thanks @mateiz , and for you all your work on this @suyanNone ! merging to master |
…puts are registered Track pending tasks by partition ID instead of Task objects. Before this change, failure & retry could result in a case where a stage got submitted before the map output from its dependencies get registered. This was due to an error in the condition for registering map outputs. Author: hushan[胡珊] <hushan@xiaomi.com> Author: Imran Rashid <irashid@cloudera.com> Closes apache#7699 from squito/SPARK-5259.
`DAGSchedulerEventLoop` normally only logs errors (so it can continue to process more events, from other jobs). However, this is not desirable in the tests -- the tests should be able to easily detect any exception, and also shouldn't silently succeed if there is an exception. This was suggested by mateiz on #7699. It may have already turned up an issue in "zero split job". Author: Imran Rashid <irashid@cloudera.com> Closes #8466 from squito/SPARK-10248. (cherry picked from commit 38d9795) Signed-off-by: Andrew Or <andrew@databricks.com>
`DAGSchedulerEventLoop` normally only logs errors (so it can continue to process more events, from other jobs). However, this is not desirable in the tests -- the tests should be able to easily detect any exception, and also shouldn't silently succeed if there is an exception. This was suggested by mateiz on #7699. It may have already turned up an issue in "zero split job". Author: Imran Rashid <irashid@cloudera.com> Closes #8466 from squito/SPARK-10248.
…erSuite This is pretty minor, just trying to improve the readability of `DAGSchedulerSuite`, I figure every bit helps. Before whenever I read this test, I never knew what "should work" and "should be ignored" really meant -- this adds some asserts & updates comments to make it more clear. Also some reformatting per a suggestion from markhamstra on apache/spark#7699 Author: Imran Rashid <irashid@cloudera.com> Closes #8434 from squito/SPARK-10247.
…puts are registered Track pending tasks by partition ID instead of Task objects. Before this change, failure & retry could result in a case where a stage got submitted before the map output from its dependencies get registered. This was due to an error in the condition for registering map outputs. Author: hushan[胡珊] <hushan@xiaomi.com> Author: Imran Rashid <irashid@cloudera.com> Closes apache#7699 from squito/SPARK-5259. Conflicts: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Track pending tasks by partition ID instead of Task objects.
Before this change, failure & retry could result in a case where a stage got submitted before the map output from its dependencies get registered. This was due to an error in the condition for registering map outputs.