-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-19565] Improve DAGScheduler tests. #16901
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@kayousterhout @squito @markhamstra |
| } | ||
|
|
||
| test("After fetching failed, success of old attempt of stage should be taken as valid.") { | ||
| val rddA = new MyRDD(sc, 2, Nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a brief comment here with something like:
/// Create 3 RDDs with shuffle dependencies on each other: A <--- B <---- C
| submit(rddC, Array(0, 1)) | ||
| assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) | ||
|
|
||
| complete(taskSets(0), Seq( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a comment saying something like "Complete both tasks in rddA"
| "Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"), null), | ||
| (Success, makeMapStatus("hostB", 2)))) | ||
|
|
||
| scheduler.resubmitFailedStages() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a comment here saying something like "Both original tasks in rddA should be marked as failed, because they ran on the failed hostA, so both should be resubmitted. Complete them successfully."
| (Success, makeMapStatus("hostB", 2)), | ||
| (Success, makeMapStatus("hostB", 2)))) | ||
|
|
||
| assert(taskSets(3).stageId === 1 && taskSets(2).stageAttemptId === 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should the second condition in the assert be checking taskSets(3) (not taskSets(2) again?)
| runEvent(makeCompletionEvent( | ||
| taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2))) | ||
|
|
||
| // Thanks to the success from old attempt of stage(stageId=1), there's no pending |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the success above is from the newer attempt of the stage (since you're taking the task from taskSets(3), not taskSets(1)), which is inconsistent with the comment. I think perhaps the intention here was to not finish one of the tasks from taskSets(1) in the first time around (i.e., eliminate the Success on line 2185)) and then move that success here (instead of completing the task from the more recent task set)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the success should be moved. Sorry for this and I'll rectify.
da6b4d3 to
26e8ab4
Compare
26e8ab4 to
f8cf4fc
Compare
|
@kayousterhout |
squito
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
I left a comment about another issue but that is unrelated to just adding this test. I was surprised there wasn't already a good test for this, but I don't see one that really addresses it, thanks for suggesting this adddition.
| } | ||
| } | ||
|
|
||
| test("After fetching failed, success of old attempt of stage should be taken as valid.") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you rename to "After a fetch failure, success ..."
really minor, but I had to read this twice
| // though there's still a running task(stageId=1, stageAttempt=1, partitionId=1) | ||
| // in the active stage attempt. | ||
| assert(taskSets.size === 5 && taskSets(4).tasks(0).isInstanceOf[ResultTask[_, _]]) | ||
| complete(taskSets(4), Seq( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was going to suggest adding a check here to make sure that all prior tasksetmanagers are marked as zombies. But (a) you can't check that, since the dagscheduler doesn't have a handle on the tasksetmanagers, and (b) more importantly, the prior TSMs actually are not marked as zombies. So they may continue to submit tasks even though they're not necessary.
I will file a separate bug for that -- its a performance issue, not a correctness issue, so not critical. But this isn't the same as the old "kill running tasks when marking a tsm as a zombie" -- in this case, the issue is that the tsm may continue to launch new tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
@squito |
|
After looking at my other test cleanup PR I realized the "map stage submission with executor failure late map task completions" test already tests this functionality, only for map stages that are submitted in isolation (without a following reduce stage). @squito do you think we need this one, for the same reason? If so, do you think it's worth it to do some cleanup to (a) move this test next to that one and (b) make it consistent with that one? |
|
Jenkins this is OK to test |
|
@kayousterhout |
|
sorry responding late to this, but your analysis sounds fine |
What changes were proposed in this pull request?
This is related to #16620.
When fetch failed, stage will be resubmitted. There can be running tasks from both old and new stage attempts. This pr added a test to check the case that success of tasks from old stage attempt should be taken as valid and partitionId should be removed from stage's pendingPartitions accordingly. When pending partitions is empty, downstream stage can be scheduled, even though there's still running tasks in the active(new) stage attempt.