Skip to content

Commit

Permalink
avoid more than one active task set managers for a stage
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Mar 1, 2019
1 parent bc7592b commit 424a3c8
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 8 deletions.
Expand Up @@ -212,14 +212,20 @@ private[spark] class TaskSchedulerImpl(
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
ts.taskSet != taskSet && !ts.isZombie
}
if (conflictingTaskSet) {
throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")

// Mark all the existing TaskSetManagers of this stage as zombie, as we are adding a new one.
// This is necessary to handle a corner case. Let's say a stage has 10 partitions and has 2
// TaskSetManagers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10
// and it completes. TSM2 finishes tasks for partition 1-19, and thinks he is still active
// because partition 10 is not completed yet. However, DAGScheduler gets task completion
// events for all the 10 partitions and thinks the stage is finished. If it's a shuffle stage
// and somehow it has missing map outputs, then DAGScheduler will resubmit it and create a
// TSM3 for it. As a stage can't have more than one active task set managers, we must mark
// TSM2 as zombie (it actually is).
stageTaskSets.foreach { case (_, ts) =>
ts.isZombie = true
}
stageTaskSets(taskSet.stageAttemptId) = manager
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

if (!isLocal && !hasReceivedTask) {
Expand Down
Expand Up @@ -123,7 +123,7 @@ private[spark] class TaskSetManager(
// state until all tasks have finished running; we keep TaskSetManagers that are in the zombie
// state in order to continue to track and account for the running tasks.
// TODO: We should kill any running task attempts when the task set manager becomes a zombie.
private[scheduler] var isZombie = false
@transient private[scheduler] var isZombie = false

// Whether the taskSet run tasks from a barrier stage. Spark must launch all the tasks at the
// same time for a barrier stage.
Expand Down

0 comments on commit 424a3c8

Please sign in to comment.