Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13931] Resolve stage hanging up problem in a particular case #11760

Closed

Conversation

GavinGavinNo1
Copy link

What changes were proposed in this pull request?

When function 'executorLost' is invoked in class 'TaskSetManager', it's significant to judge whether variable 'isZombie' is set to true.

This pull request fixes the following hang:

  1. Open speculation switch in the application.
  2. Run this app and suppose last task of shuffleMapStage 1 finishes. Let's get the record straight, from the eyes of DAG, this stage really finishes, and from the eyes of TaskSetManager, variable 'isZombie' is set to true, but variable runningTasksSet isn't empty because of speculation.
  3. Suddenly, executor 3 is lost. TaskScheduler receiving this signal, invokes all executorLost functions of rootPool's taskSetManagers. DAG receiving this signal, removes all this executor's outputLocs.
  4. TaskSetManager adds all this executor's tasks to pendingTasks and tells DAG they will be resubmitted (Attention: possibly not on time).
  5. DAG starts to submit a new waitingStage, let's say shuffleMapStage 2, and going to find that shuffleMapStage 1 is its missing parent because some outputLocs are removed due to executor lost. Then DAG submits shuffleMapStage 1 again.
  6. DAG still receives Task 'Resubmitted' signal from old taskSetManager, and increases the number of pendingTasks of shuffleMapStage 1 each time. However, old taskSetManager won't resolve new task to submit because its variable 'isZombie' is set to true.
  7. Finally shuffleMapStage 1 never finishes in DAG together with all stages depending on it.

How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)

It's quite difficult to construct test cases.

@GavinGavinNo1 GavinGavinNo1 changed the title Resolve stage hanging up problem in a particular case [SPARK-13931]Resolve stage hanging up problem in a particular case Mar 16, 2016
@GavinGavinNo1 GavinGavinNo1 changed the title [SPARK-13931]Resolve stage hanging up problem in a particular case [SPARK-13931] Resolve stage hanging up problem in a particular case Mar 16, 2016
@srowen
Copy link
Member

srowen commented Mar 16, 2016

@mccheah @kayousterhout do you have an opinion on this? you might be familiar with this method

@GavinGavinNo1
Copy link
Author

@mccheah @kayousterhout Could you please take a look at this for me? Thank you in advance.

@@ -776,6 +776,7 @@ private[spark] class TaskSetManager(

/** Called by TaskScheduler when an executor is lost so we can re-enqueue our tasks */
override def executorLost(execId: String, host: String, reason: ExecutorLossReason) {
if (isZombie) return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add a logging line here at least.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe to just return here if the task is a zombie? IIRC we need to mark all of the tasks in the TaskSetManager as either completed or failed at some point (otherwise I think this TaskSetManager will never get cleaned up).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at this a little more and I think the right solution is to add "!zombie" to the if-condition below, on line 784 (the reasoning being that, if a task set is a zombie and some shuffle map output was lost, this will be handled when the reduce tasks try to fetch the output, so it's too complicated to bring the task set back from the dead here). That way, the loop on line 799 will still run, so the DAGScheduler will still get told that any speculated tasks running on the lost executor have failed (so the web UI can be updated correctly etc.). Does this seem reasonable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking a bit more closely, I concur with this idea. We basically don't want to resubmit the task, but we also want to mark the task as failed and update metrics. FWIW the if switch in handleFailedTask() should also prevent this task from erroneously counting against the failed task attempt count that would otherwise possibly cause the stage to fail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm... but It's slightly tricky though because I just noticed handleFailedTask has calls like sched.dagScheduler.taskEnded, addPendingTask... https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L721 - are the effects of these calls what was causing the problem in the first place?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that the problem is from this line: https://github.com/GavinGavinNo1/spark/blob/resolve-stage-blocked/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L794 which makes the DAGScheduler think the task is still outstanding. handleFailedTask does seem to erroneously call addPendingTask, but then it calls maybeFinishTaskSet, which will see that there are no more running tasks (and that it's a zombie) and then mark it as really finished.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM - and I suppose the call to dagScheduler.taskEnded is fine in handleFailedTask, as opposed to the call on L794 which marks the task as Resubmitted.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kayousterhout I agree with you, it's reasonable to add "!zombie" to the if-condition below, on line 784.
@mccheah Sorry, I haven't kept up your mind. Will it be fine to just add "!zombie" to the if-condition below, on line 784, or there's a better way? Thanks.

@mccheah
Copy link
Contributor

mccheah commented Mar 24, 2016

Can we not wrap this in a unit test in TaskSetManagerSuite?

@kayousterhout
Copy link
Contributor

+1 on @mccheah's request to write a unit test for this in TaskSetManagerSuite.

Also, can you change the PR description to say something like:

This pull request fixes the following hang:

@GavinGavinNo1
Copy link
Author

@mccheah @kayousterhout OK, I'll add a logging line. But it's quite difficult to construct a reproduction of the case. I can add a test symbolically, but without effect. Do you have suggestions for me?
It's no problem to change the PR description.I'll change it.

@kayousterhout
Copy link
Contributor

I think you can write a test that's similar to this one: https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala#L639

You can make a test with a one-task task set, and like the above task set, you can set things up so the task set's task gets speculatively executed. Then, after finishing one copy of the task (so the task set will be marked as a zombie), you can call executorLost() on the TaskSetManager, and then check that the DAGScheduler's executorLost function gets called in the right way. It looks like you'll need to some better mocking of the DAGScheduler than what's currently done in that test; it may be easier to create a DAGScheduler mock than to use the FakeDAGScheduler / FakeTaskScheduler that already exist in that test.

@GavinGavinNo1
Copy link
Author

Is the unit test ok? Thanks. @mccheah @kayousterhout

@andrewor14
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Mar 29, 2016

Test build #54450 has finished for PR 11760 at commit a1eb0f9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@GavinGavinNo1
Copy link
Author

@kayousterhout Sorry to bother you again. What should I do then?

@kayousterhout
Copy link
Contributor

Sorry for letting this hang for so long. @GavinGavinNo1 do you have time to work on this, and if so, can you bring it up to date with master? Then I can review it again.

@kayousterhout
Copy link
Contributor

@GavinGavinNo1 if you don't have time to work on this PR, can you close it?

@GavinGavinNo1
Copy link
Author

@kayousterhout Sorry that I tried before but internet connection is poor in my company and then I forget. I'll work on it back home today or tomorrow. Thank you for caring for this PR.

@kayousterhout
Copy link
Contributor

Great thanks!

@GavinGavinNo1
Copy link
Author

@kayousterhout I got some problem with git conflict. So I create a new branch and a new pull request. You may refer to #16855. And I close this pull request for the time being. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants