Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18553][CORE][branch-2.0] Fix leak of TaskSetManager following executor loss #15986

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,12 @@ private[spark] class TaskSchedulerImpl(
// Incrementing task IDs
val nextTaskId = new AtomicLong(0)

// Number of tasks running on each executor
private val executorIdToTaskCount = new HashMap[String, Int]
// IDs of the tasks running on each executor
private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To pre-emptively address any concerns about the memory usage implications of this change, note that the hash set sizes should be bounded by the number of cores / task slots on the executor, so I don't think that there's much to be gained by using a more memory-efficient HashMap structure here. The only real optimization that I could think of would be to replace the map with a fixed-size array that we just linearly scan, but that seems like premature optimization and adds a lot of hard-to-reason-about complexity.


def runningTasksByExecutors(): Map[String, Int] = executorIdToTaskCount.toMap
def runningTasksByExecutors(): Map[String, Int] = {
executorIdToRunningTaskIds.toMap.mapValues(_.size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need the toMap here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this to convert from a mutable to immutable map.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry should have been more specific: why does the map need to be immutable? Just maintaining the old API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I think it's also needed for thread-safety. If we return a mutable map to the caller and they then go on to iterate over it then we're in trouble if that map isn't thread-safe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it doesn't matter because we're creating a new map with mapValues, right?

Anyway this seems not especially important to this PR so fine to leave as-is.

}

// The set of executors we have on each host; this is used to compute hostsAlive, which
// in turn is used to decide when we can attain data locality on a given host
Expand Down Expand Up @@ -259,7 +261,7 @@ private[spark] class TaskSchedulerImpl(
val tid = task.taskId
taskIdToTaskSetManager(tid) = taskSet
taskIdToExecutorId(tid) = execId
executorIdToTaskCount(execId) += 1
executorIdToRunningTaskIds(execId).add(tid)
executorsByHost(host) += execId
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
Expand Down Expand Up @@ -288,7 +290,7 @@ private[spark] class TaskSchedulerImpl(
var newExecAvail = false
for (o <- offers) {
executorIdToHost(o.executorId) = o.host
executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
executorIdToRunningTaskIds.getOrElseUpdate(o.executorId, HashSet[Long]())
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
executorAdded(o.executorId, o.host)
Expand Down Expand Up @@ -335,38 +337,34 @@ private[spark] class TaskSchedulerImpl(
var reason: Option[ExecutorLossReason] = None
synchronized {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To understand this change, note that taskIdToExecutorId and taskIdToTaskSetManager should always have the same key set, so the case Some(taskSet) if state == TaskState.LOST is equivalent to this if statement.

try {
if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
// We lost this entire executor, so remember that it's gone
val execId = taskIdToExecutorId(tid)

if (executorIdToTaskCount.contains(execId)) {
reason = Some(
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
removeExecutor(execId, reason.get)
failedExecutor = Some(execId)
}
}
taskIdToTaskSetManager.get(tid) match {
case Some(taskSet) =>
if (TaskState.isFinished(state)) {
taskIdToTaskSetManager.remove(tid)
taskIdToExecutorId.remove(tid).foreach { execId =>
if (executorIdToTaskCount.contains(execId)) {
executorIdToTaskCount(execId) -= 1
}
if (state == TaskState.LOST) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So just to check, this like was redundant before right, which is why it could be removed? It seems like as long as "taskIdToExecutorId.contains(tid)" evaluates to true in the old code, this should have evaluated to true also.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we might want to keep this in case the Mesos scheduler sends us duplicate task failure events so that we don't try to remove an already-removed executor. I'll put this back in with an explanatory comment in my next commit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why doesn't cleanupTaskState(tid) need to be called at the end of this if? It would have happened previously right (as a result of the if-statement on line 351)?

If it does need to be called here, I think it might be cleaner to put the following block at the end of the Some case:

if (TaskState.isFinished(state)) {
taskset.removeRunningTask(tid)
cleanupTaskState(tid)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is technically okay, but the rationale is a little tricky:

If we receive a status update for a task with TaskState.LOST and the task corresponds to a task set which exists (i.e. we're taking this if branch), then there are two cases to consider:

  1. the task is running on an executor that we know about (i.e. executorIdToRunningTaskIds.contains(execId) == true), so we mark the executor as failed in the removeExecutor() call, which has the side-effect of calling cleanupTaskState for all tasks running on that executor (including this task).
  2. we don't know about the executor now, but we must have known about it earlier otherwise we wouldn't have scheduled a task there. Therefore, the executor was already removed earlier and that removeExecutor() call should have already called cleanupTaskState(). We shouldn't have to worry about races within TaskSchedulerImpl because all of the methods which interact with these data structures are synchronized.

That said, cleanupTaskState() is idempotent and there's not really any harm in putting in an extra call here just to make it absolutely clear that it's guaranteed to be cleaned up in all cases. I'll look into performing this restructuring.

// TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode,
// where each executor corresponds to a single task, so mark the executor as failed.
val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException(
"taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)"))
if (executorIdToRunningTaskIds.contains(execId)) {
reason = Some(
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
removeExecutor(execId, reason.get)
failedExecutor = Some(execId)
}
}
if (state == TaskState.FINISHED) {
taskSet.removeRunningTask(tid)
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
if (TaskState.isFinished(state)) {
cleanupTaskState(tid)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok last comment: do you think it's more readable to structure this code as:

if (TaskState.isFinished(state) {
if (state == TaskState.LOST) {

taskResultGetter.enqueueFailed
} else if (SET(TaskState.FAILED, TaskState.KILLED).contains(state)) {
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
} else if (state == TaskState.FINISHED) {
taskResultGetter.enqueueSuccessful(...)
}

cleanupTaskState
taskSet.removeRunningTask
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do that, but I we might want to make sure that taskSet.removeRunningTask is called prior to the taskResultGetter call.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's necessary because all access to the TSM is gated on the TaskSchedulerImpl, so even though the TaskResultGetter might do some stuff, the TSM's state won't be accessed until the later handleSuccessfulTask call to the TaskSchedulerImpl

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough, although I ended up preserving the ordering just to reduce the set of changes that we need to reason about.

taskSet.removeRunningTask(tid)
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
if (state == TaskState.FINISHED) {
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, TaskState.LOST will have already been handled by the new case above, so I removed it here because this case will never be hit.

case None =>
logError(
("Ignoring update with state %s for TID %s because its task set is gone (this is " +
"likely the result of receiving duplicate task finished status updates)")
"likely the result of receiving duplicate task finished status updates) or its " +
"executor has been marked as failed.")
.format(state, tid))
}
} catch {
Expand Down Expand Up @@ -477,7 +475,7 @@ private[spark] class TaskSchedulerImpl(
var failedExecutor: Option[String] = None

synchronized {
if (executorIdToTaskCount.contains(executorId)) {
if (executorIdToRunningTaskIds.contains(executorId)) {
val hostPort = executorIdToHost(executorId)
logExecutorLoss(executorId, hostPort, reason)
removeExecutor(executorId, reason)
Expand Down Expand Up @@ -519,13 +517,31 @@ private[spark] class TaskSchedulerImpl(
logError(s"Lost executor $executorId on $hostPort: $reason")
}

/**
* Cleans up the TaskScheduler's state for tracking the given task.
*/
private def cleanupTaskState(tid: Long): Unit = {
taskIdToTaskSetManager.remove(tid)
taskIdToExecutorId.remove(tid).foreach { executorId =>
executorIdToRunningTaskIds.get(executorId).foreach { _.remove(tid) }
}
}

/**
* Remove an executor from all our data structures and mark it as lost. If the executor's loss
* reason is not yet known, do not yet remove its association with its host nor update the status
* of any running tasks, since the loss reason defines whether we'll fail those tasks.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused about this comment since this seems to suggest that we'll do some sort of per-task cleanup at some later time. If anyone knows which cleanup this is referring to then maybe we should consider not updating executorIdToRunningTaskIds at all in here and instead maybe should be performing the taskIdToExecutorId and taskIdToTaskSetManager updates somewhere else.

On the other hand, the only place where we currently remove entries from taskIdToExecutorId and taskIdToTaskSetManager are in statusUpdate, so my hunch is that the eventual cleanup alluded to here isn't happening in standalone mode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the "tasks are not re-scheduled while executor loss reason is pending" test in TaskSchedulerImplSuite, it looks like the API contract here is that if executorLost is called with LossReasonPending then it will eventually be called with some other reason. This will cause it to call rootPool.executorLost() , which, in turn, will call executorLost for all TaskSetManagers, which will perform their own internal executorId to task id mapping to mark tasks as failed and inform the DAGScheduler. The TaskSetManager doesn't call back into the TaskScheduler to access any of the data in these mappings so I think it's safe to clean them up immediately at the top of removeExecutor rather than putting them behind the reason != LossReasonPending check.

Note that it's also not as simple as just putting those behind reason != LossReasonPending as a defensive measure because then we'd be changing the contract on when runningTasksByExecutors() is updated: previously, it would set a failed executor's running task count to zero as soon as the executor failed, whereas it would do it only after the reason was known should we move this update behind that check.

I think that these subtleties / distinctions are only relevant to YARN mode, so I'll loop in @vanzin to comment on them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been a while, so my memory of how all this works is a little hazy. But yes, this only applies in YARN mode. Other backends do not use LossReasonPending.

From a quick look at the code, removeExecutor maintains the host-to-executor-id mapping so that it can differentiate between an executor that has already been removed and one whose exit reason is unknown. This avoids calling Pool.executorLost a second time in the first case. Not sure how much that distinction matters though, maybe it's ok to call it again and this code can be simplified.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Josh, my understanding is consistent with yours -- that if executorLost() is called with LossReasonPending, it will eventually be called again with a real loss reason. To elaborate on what Marcelo said, executorIdToHost is used at the "gatekeeper" for this: on this line, we use the existence of an entry in executorIdToHost to determine whether we need to call removeExecutor (possibly for a 2nd time).

My only concern with the current approach is the following: the task doesn't get marked as finished in the task set manager until rootPool.executorLost gets called (i.e., until the 2nd removeExecutor call). Is it possible for anything to go awry for task updates that happen between the first removeExecutor call (at which point the TaskSchedulerImpl's state for the task will be removed) and when the task is marked as failed and removed from the TSM? I looked over this and I think things should be fine -- we'll just drop the update -- but can you update the error message we log on 369 (since now it can happen when an update happens after an executor has been lost, in addition to when duplicate updates are received)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for anything to go awry for task updates that happen between the first removeExecutor call ... and when the task is marked as failed

If there's something really wrong with the YARN AM, it can take some time, but eventually the second call will come, either with the actual reason or with a generic "executor lost" reason. This is handled in YarnSchedulerEndpoint.handleExecutorDisconnectedFromDriver.

*/
private def removeExecutor(executorId: String, reason: ExecutorLossReason) {
executorIdToTaskCount -= executorId
// The tasks on the lost executor may not send any more status updates (because the executor
// has been lost), so they should be cleaned up here.
executorIdToRunningTaskIds.remove(executorId).foreach { taskIds =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment here -- something like

The tasks on the lost executor may not send any more status updates (because the executor has been lost), so they should be cleaned up here.

logDebug("Cleaning up TaskScheduler state for tasks " +
s"${taskIds.mkString("[", ",", "]")} on failed executor $executorId")
// We do not notify the TaskSetManager of the task failures because that will
// happen below in the rootPool.executorLost() call.
taskIds.foreach(cleanupTaskState)
}

val host = executorIdToHost(executorId)
val execs = executorsByHost.getOrElse(host, new HashSet)
Expand Down Expand Up @@ -563,11 +579,11 @@ private[spark] class TaskSchedulerImpl(
}

def isExecutorAlive(execId: String): Boolean = synchronized {
executorIdToTaskCount.contains(execId)
executorIdToRunningTaskIds.contains(execId)
}

def isExecutorBusy(execId: String): Boolean = synchronized {
executorIdToTaskCount.getOrElse(execId, -1) > 0
executorIdToRunningTaskIds.get(execId).exists(_.nonEmpty)
}

// By default, rack is unknown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,10 +433,11 @@ class StandaloneDynamicAllocationSuite
assert(executors.size === 2)

// simulate running a task on the executor
val getMap = PrivateMethod[mutable.HashMap[String, Int]]('executorIdToTaskCount)
val getMap =
PrivateMethod[mutable.HashMap[String, mutable.HashSet[Long]]]('executorIdToRunningTaskIds)
val taskScheduler = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl]
val executorIdToTaskCount = taskScheduler invokePrivate getMap()
executorIdToTaskCount(executors.head) = 1
val executorIdToRunningTaskIds = taskScheduler invokePrivate getMap()
executorIdToRunningTaskIds(executors.head) = mutable.HashSet(1L)
// kill the busy executor without force; this should fail
assert(!killExecutor(sc, executors.head, force = false))
apps = getApplications()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.scheduler

import java.nio.ByteBuffer

import org.apache.spark._
import org.apache.spark.internal.Logging

Expand Down Expand Up @@ -274,4 +276,70 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L
assert("executor1" === taskDescriptions3(0).executorId)
}

test("if an executor is lost then the state for its running tasks is cleaned up (SPARK-18553)") {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskScheduler = new TaskSchedulerImpl(sc)
taskScheduler.initialize(new FakeSchedulerBackend)
// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
new DAGScheduler(sc, taskScheduler) {
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
override def executorAdded(execId: String, host: String) {}
}

val e0Offers = Seq(WorkerOffer("executor0", "host0", 1))
val attempt1 = FakeTask.createTaskSet(1)

// submit attempt 1, offer resources, task gets scheduled
taskScheduler.submitTasks(attempt1)
val taskDescriptions = taskScheduler.resourceOffers(e0Offers).flatten
assert(1 === taskDescriptions.length)

// mark executor0 as dead
taskScheduler.executorLost("executor0", SlaveLost())
assert(!taskScheduler.isExecutorAlive("executor0"))
assert(!taskScheduler.hasExecutorsAliveOnHost("host0"))
assert(taskScheduler.getExecutorsAliveOnHost("host0").isEmpty)


// Check that state associated with the lost task attempt is cleaned up:
assert(taskScheduler.taskIdToExecutorId.isEmpty)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose that we should also strengthen the assertions in the existing tests to check that these maps are updated following task successes, but this may be tricky given that the existing tests aren't exercising the statusUpdate path. Rather, we may have to test this more end-to-end by asserting that these always become empty once all jobs and tasks are done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah is it prohibitively difficult to add a 2nd test for when the executorLost happens as a result of a status update call (e.g., have two tasks running on a particular executor, fail one of them via statusUpdate, and make sure the other one's state gets cleaned up?)? The scheduler code is such a tangled web that it would be nice to test both code paths.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that case can occur because TaskState.LOST should only be sent in Mesos fine-grained mode where we'll only ever have a single task per executor (in other words, executorLoss can only happen via statusUpdate in situations where there would never be concurrent tasks on an executor).

assert(taskScheduler.taskIdToTaskSetManager.isEmpty)
assert(taskScheduler.runningTasksByExecutors().get("executor0").isEmpty)
}

test("if a task finishes with TaskState.LOST its executor is marked as dead") {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskScheduler = new TaskSchedulerImpl(sc)
taskScheduler.initialize(new FakeSchedulerBackend)
// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
new DAGScheduler(sc, taskScheduler) {
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
override def executorAdded(execId: String, host: String) {}
}

val e0Offers = Seq(WorkerOffer("executor0", "host0", 1))
val attempt1 = FakeTask.createTaskSet(1)

// submit attempt 1, offer resources, task gets scheduled
taskScheduler.submitTasks(attempt1)
val taskDescriptions = taskScheduler.resourceOffers(e0Offers).flatten
assert(1 === taskDescriptions.length)

// Report the task as failed with TaskState.LOST
taskScheduler.statusUpdate(
tid = taskDescriptions.head.taskId,
state = TaskState.LOST,
serializedData = ByteBuffer.allocate(0)
)

// Check that state associated with the lost task attempt is cleaned up:
assert(taskScheduler.taskIdToExecutorId.isEmpty)
assert(taskScheduler.taskIdToTaskSetManager.isEmpty)
assert(taskScheduler.runningTasksByExecutors().get("executor0").isEmpty)

// Check that the executor has been marked as dead
assert(!taskScheduler.isExecutorAlive("executor0"))
assert(!taskScheduler.hasExecutorsAliveOnHost("host0"))
assert(taskScheduler.getExecutorsAliveOnHost("host0").isEmpty)
}
}