Skip to content

Commit

Permalink
[SPARK-13279] Remove unnecessary duplicate check in addPendingTask fu…
Browse files Browse the repository at this point in the history
…nction
  • Loading branch information
Sital Kedia committed Feb 12, 2016
1 parent b5761d1 commit 26b4370
Showing 1 changed file with 13 additions and 15 deletions.
28 changes: 13 additions & 15 deletions core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,14 @@ private[spark] class TaskSetManager(
// treated as stacks, in which new tasks are added to the end of the
// ArrayBuffer and removed from the end. This makes it faster to detect
// tasks that repeatedly fail because whenever a task failed, it is put
// back at the head of the stack. They are also only cleaned up lazily;
// when a task is launched, it remains in all the pending lists except
// the one that it was launched from, but gets removed from them later.
// back at the head of the stack. These collections may contain duplicates
// for two reasons:
// (1): Tasks are only removed lazily; when a task is launched, it remains
// in all the pending lists except the one that it was launched from.
// (2): Tasks may be re-added to these lists multiple times as a result
// of failures.
// Duplicates are handled in dequeueTaskFromList, which ensures that a
// task hasn't already started running before launching it.
private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]

// Set of pending tasks for each host. Similar to pendingTasksForExecutor,
Expand Down Expand Up @@ -179,23 +184,16 @@ private[spark] class TaskSetManager(

/** Add a task to all the pending-task lists that it should be on. */
private def addPendingTask(index: Int) {
// Utility method that adds `index` to a list only if it's not already there
def addTo(list: ArrayBuffer[Int]) {
if (!list.contains(index)) {
list += index
}
}

for (loc <- tasks(index).preferredLocations) {
loc match {
case e: ExecutorCacheTaskLocation =>
addTo(pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer))
pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
case e: HDFSCacheTaskLocation => {
val exe = sched.getExecutorsAliveOnHost(loc.host)
exe match {
case Some(set) => {
for (e <- set) {
addTo(pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer))
pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
}
logInfo(s"Pending task $index has a cached location at ${e.host} " +
", where there are executors " + set.mkString(","))
Expand All @@ -206,14 +204,14 @@ private[spark] class TaskSetManager(
}
case _ => Unit
}
addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
for (rack <- sched.getRackForHost(loc.host)) {
addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
}
}

if (tasks(index).preferredLocations == Nil) {
addTo(pendingTasksWithNoPrefs)
pendingTasksWithNoPrefs += index
}

allPendingTasks += index // No point scanning this whole list to find the old task there
Expand Down

0 comments on commit 26b4370

Please sign in to comment.