Skip to content

Commit

Permalink
Correctly handle maximum task failures introduced stage abortion scen…
Browse files Browse the repository at this point in the history
…ario
  • Loading branch information
jerryshao committed Oct 26, 2017
1 parent bc1e766 commit 59f9c15
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -603,10 +603,12 @@ private[spark] class ExecutorAllocationManager(

private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]
private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]]
// Number of tasks currently running on the cluster including speculative tasks.
// Should be 0 when no stages are active.
private var numRunningTasks: Int = _

// Structure to track stage and numTasks running on executors, this is used to 1) calculate
// the total running tasks; 2) track if there're tasks running on specific executor, if not
// mark this executor as idle.
private val executorIdToStageAndNumTasks =
new mutable.HashMap[String, mutable.HashMap[Int, Int]]

// Number of speculative tasks to be scheduled in each stage
private val stageIdToNumSpeculativeTasks = new mutable.HashMap[Int, Int]
Expand Down Expand Up @@ -656,29 +658,33 @@ private[spark] class ExecutorAllocationManager(
stageIdToSpeculativeTaskIndices -= stageId
stageIdToExecutorPlacementHints -= stageId

// Update the executor to task number statistics
executorIdToStageAndNumTasks.foreach { case (_, stageToNumTasks) =>
stageToNumTasks -= stageId
}
val idleExecutorIds = executorIdToStageAndNumTasks.filter(_._2.isEmpty).keySet
idleExecutorIds.foreach { id =>
executorIdToStageAndNumTasks.remove(id)
allocationManager.onExecutorIdle(id)
}

// Update the executor placement hints
updateExecutorPlacementHints()

// If this is the last stage with pending tasks, mark the scheduler queue as empty
// This is needed in case the stage is aborted for any reason
if (stageIdToNumTasks.isEmpty && stageIdToNumSpeculativeTasks.isEmpty) {
allocationManager.onSchedulerQueueEmpty()
if (numRunningTasks != 0) {
logWarning("No stages are running, but numRunningTasks != 0")
numRunningTasks = 0
}
}
}
}

override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
val stageId = taskStart.stageId
val taskId = taskStart.taskInfo.taskId
val taskIndex = taskStart.taskInfo.index
val executorId = taskStart.taskInfo.executorId

allocationManager.synchronized {
numRunningTasks += 1
// This guards against the race condition in which the `SparkListenerTaskStart`
// event is posted before the `SparkListenerBlockManagerAdded` event, which is
// possible because these events are posted in different threads. (see SPARK-4951)
Expand All @@ -698,24 +704,30 @@ private[spark] class ExecutorAllocationManager(
}

// Mark the executor on which this task is scheduled as busy
executorIdToTaskIds.getOrElseUpdate(executorId, new mutable.HashSet[Long]) += taskId
val stageToNumTasks = executorIdToStageAndNumTasks.getOrElseUpdate(
executorId, new mutable.HashMap[Int, Int])
stageToNumTasks(stageId) = stageToNumTasks.getOrElse(stageId, 0) + 1
allocationManager.onExecutorBusy(executorId)
}
}

override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
val executorId = taskEnd.taskInfo.executorId
val taskId = taskEnd.taskInfo.taskId
val taskIndex = taskEnd.taskInfo.index
val stageId = taskEnd.stageId
allocationManager.synchronized {
numRunningTasks -= 1
// If the executor is no longer running any scheduled tasks, mark it as idle
if (executorIdToTaskIds.contains(executorId)) {
executorIdToTaskIds(executorId) -= taskId
if (executorIdToTaskIds(executorId).isEmpty) {
executorIdToTaskIds -= executorId
allocationManager.onExecutorIdle(executorId)
if (executorIdToStageAndNumTasks.contains(executorId) &&
executorIdToStageAndNumTasks(executorId).contains(stageId)) {
val taskNum = executorIdToStageAndNumTasks(executorId)(stageId) - 1
if (taskNum <= 0) {
executorIdToStageAndNumTasks(executorId) -= stageId
if (executorIdToStageAndNumTasks(executorId).isEmpty) {
executorIdToStageAndNumTasks -= executorId
allocationManager.onExecutorIdle(executorId)
}
} else {
executorIdToStageAndNumTasks(executorId)(stageId) = taskNum
}
}

Expand Down Expand Up @@ -787,15 +799,17 @@ private[spark] class ExecutorAllocationManager(
/**
* The number of tasks currently running across all stages.
*/
def totalRunningTasks(): Int = numRunningTasks
def totalRunningTasks(): Int = {
executorIdToStageAndNumTasks.values.map(_.values.sum).sum
}

/**
* Return true if an executor is not currently running a task, and false otherwise.
*
* Note: This is not thread-safe without the caller owning the `allocationManager` lock.
*/
def isExecutorIdle(executorId: String): Boolean = {
!executorIdToTaskIds.contains(executorId)
!executorIdToStageAndNumTasks.contains(executorId)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1029,6 +1029,43 @@ class ExecutorAllocationManagerSuite
assert(removeTimes(manager) === Map.empty)
}

test("correctly handle task failure introduces stage abortion scenario") {
sc = createSparkContext()
val manager = sc.executorAllocationManager.get
assert(maxNumExecutorsNeeded(manager) === 0)

val stageInfo = createStageInfo(0, 3)
post(sc.listenerBus, SparkListenerStageSubmitted(stageInfo))
assert(maxNumExecutorsNeeded(manager) === 3)

val taskInfos = Seq(
createTaskInfo(1, 1, "executor-1"),
createTaskInfo(2, 2, "executor-1"),
createTaskInfo(3, 3, "executor-1"))

taskInfos.foreach(task => post(sc.listenerBus, SparkListenerTaskStart(0, 0, task)))
assert(maxNumExecutorsNeeded(manager) === 3)

// Simulate task 1 failed due to exception more than 4 times
post(sc.listenerBus,
SparkListenerTaskEnd(
0, 0, null, ExceptionFailure(null, null, null, null, None), taskInfos(0), null))
// 1 pending + 2 running
assert(maxNumExecutorsNeeded(manager) === 3)

// Stage will be aborted when task is failed more than 4 times
post(sc.listenerBus, SparkListenerStageCompleted(stageInfo))
// When stage is completed, all the related tasks should be finished
assert(maxNumExecutorsNeeded(manager) === 0)

// Simulate task 2 is killed intentionally because of stage abortion
post(sc.listenerBus, SparkListenerTaskEnd(0, 0, null, TaskKilled(""), taskInfos(1), null))
assert(maxNumExecutorsNeeded(manager) === 0)

// TaskEnd event may never be delivered, still we should guarantee executor-1 can be removed.
assert(removeTimes(manager).keySet === Set("executor-1"))
}

private def createSparkContext(
minExecutors: Int = 1,
maxExecutors: Int = 5,
Expand Down

0 comments on commit 59f9c15

Please sign in to comment.