diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index ee437c696b47e..f4724627de6ac 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1842,6 +1842,17 @@ package object config { .timeConf(TimeUnit.MILLISECONDS) .createOptional + private[spark] val EXECUTOR_DECOMMISSION_KILL_INTERVAL = + ConfigBuilder("spark.executor.decommission.killInterval") + .doc("Duration after which a decommissioned executor will be killed forcefully." + + "This config is useful for cloud environments where we know in advance when " + + "an executor is going to go down after decommissioning signal i.e. around 2 mins " + + "in aws spot nodes, 1/2 hrs in spot block nodes etc. This config is currently " + + "used to decide what tasks running on decommission executors to speculate.") + .version("3.1.0") + .timeConf(TimeUnit.SECONDS) + .createOptional + private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir") .doc("Staging directory used while submitting applications.") .version("2.0.0") diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index a302f680a272e..4b31ff0c790da 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -19,7 +19,7 @@ package org.apache.spark.scheduler import java.io.NotSerializableException import java.nio.ByteBuffer -import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit} import scala.collection.immutable.Map import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} @@ -102,6 +102,8 @@ private[spark] class TaskSetManager( } numTasks <= slots } + val executorDecommissionKillInterval = conf.get(EXECUTOR_DECOMMISSION_KILL_INTERVAL).map( + TimeUnit.SECONDS.toMillis) // For each task, tracks whether a copy of the task has succeeded. A task will also be // marked as "succeeded" if it failed with a fetch failure, in which case it should not @@ -165,6 +167,7 @@ private[spark] class TaskSetManager( // Task index, start and finish time for each task attempt (indexed by task ID) private[scheduler] val taskInfos = new HashMap[Long, TaskInfo] + private[scheduler] val tidToExecutorKillTimeMapping = new HashMap[Long, Long] // Use a MedianHeap to record durations of successful tasks so we know when to launch // speculative tasks. This is only used when speculation is enabled, to avoid the overhead @@ -933,6 +936,7 @@ private[spark] class TaskSetManager( /** If the given task ID is in the set of running tasks, removes it. */ def removeRunningTask(tid: Long): Unit = { + tidToExecutorKillTimeMapping.remove(tid) if (runningTasksSet.remove(tid) && parent != null) { parent.decreaseRunningTasks(1) } @@ -1042,7 +1046,19 @@ private[spark] class TaskSetManager( // bound based on that. logDebug("Task length threshold for speculation: " + threshold) for (tid <- runningTasksSet) { - foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold) + var speculated = checkAndSubmitSpeculatableTask(tid, time, threshold) + if (!speculated && tidToExecutorKillTimeMapping.contains(tid)) { + // Check whether this task will finish before the exectorKillTime assuming + // it will take medianDuration overall. If this task cannot finish within + // executorKillInterval, then this task is a candidate for speculation + val taskEndTimeBasedOnMedianDuration = taskInfos(tid).launchTime + medianDuration + val canExceedDeadline = tidToExecutorKillTimeMapping(tid) < + taskEndTimeBasedOnMedianDuration + if (canExceedDeadline) { + speculated = checkAndSubmitSpeculatableTask(tid, time, 0) + } + } + foundTasks |= speculated } } else if (speculationTaskDurationThresOpt.isDefined && speculationTasksLessEqToSlots) { val time = clock.getTimeMillis() @@ -1100,8 +1116,12 @@ private[spark] class TaskSetManager( def executorDecommission(execId: String): Unit = { recomputeLocality() - // Future consideration: if an executor is decommissioned it may make sense to add the current - // tasks to the spec exec queue. + executorDecommissionKillInterval.foreach { interval => + val executorKillTime = clock.getTimeMillis() + interval + runningTasksSet.filter(taskInfos(_).executorId == execId).foreach { tid => + tidToExecutorKillTimeMapping(tid) = executorKillTime + } + } } def recomputeLocality(): Unit = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index e4aad58d25064..849409aa70ba3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1911,6 +1911,112 @@ class TaskSetManagerSuite testSpeculationDurationThreshold(true, 2, 1) } + test("SPARK-21040: Check speculative tasks are launched when an executor is decommissioned" + + " and the tasks running on it cannot finish within EXECUTOR_DECOMMISSION_KILL_INTERVAL") { + sc = new SparkContext("local", "test") + sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3")) + val taskSet = FakeTask.createTaskSet(4) + sc.conf.set(config.SPECULATION_ENABLED, true) + sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5) + sc.conf.set(config.SPECULATION_QUANTILE, 0.5) + sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5s") + val clock = new ManualClock() + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) + val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => + task.metrics.internalAccums + } + + // Start TASK 0,1 on exec1, TASK 2 on exec2 + (0 until 2).foreach { _ => + val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1 + assert(taskOption.isDefined) + assert(taskOption.get.executorId === "exec1") + } + val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1 + assert(taskOption2.isDefined) + assert(taskOption2.get.executorId === "exec2") + + clock.advance(6*1000) // time = 6s + // Start TASK 3 on exec2 after some delay + val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1 + assert(taskOption3.isDefined) + assert(taskOption3.get.executorId === "exec2") + + assert(sched.startedTasks.toSet === Set(0, 1, 2, 3)) + + clock.advance(4*1000) // time = 10s + // Complete the first 2 tasks and leave the other 2 tasks in running + for (id <- Set(0, 1)) { + manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id))) + assert(sched.endedTasks(id) === Success) + } + + // checkSpeculatableTasks checks that the task runtime is greater than the threshold for + // speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks need to be running for + // > 15s for speculation + assert(!manager.checkSpeculatableTasks(0)) + assert(sched.speculativeTasks.toSet === Set()) + + // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3) will be added to + // executorDecommissionSpeculationTriggerTimeoutOpt + // (TASK 2 -> 15, TASK 3 -> 15) + manager.executorDecommission("exec2") + assert(manager.tidToExecutorKillTimeMapping.keySet === Set(2, 3)) + assert(manager.tidToExecutorKillTimeMapping(2) === 15*1000) + assert(manager.tidToExecutorKillTimeMapping(3) === 15*1000) + + assert(manager.checkSpeculatableTasks(0)) + // TASK 2 started at t=0s, so it can still finish before t=15s (Median task runtime = 10s) + // TASK 3 started at t=6s, so it might not finish before t=15s. So TASK 3 should be part + // of speculativeTasks + assert(sched.speculativeTasks.toSet === Set(3)) + assert(manager.copiesRunning(3) === 1) + + // Offer resource to start the speculative attempt for the running task + val taskOption3New = manager.resourceOffer("exec3", "host3", NO_PREF)._1 + // Offer more resources. Nothing should get scheduled now. + assert(manager.resourceOffer("exec3", "host3", NO_PREF)._1.isEmpty) + assert(taskOption3New.isDefined) + + // Assert info about the newly launched speculative task + val speculativeTask3 = taskOption3New.get + assert(speculativeTask3.index === 3) + assert(speculativeTask3.taskId === 4) + assert(speculativeTask3.executorId === "exec3") + assert(speculativeTask3.attemptNumber === 1) + + clock.advance(1*1000) // time = 11s + // Running checkSpeculatableTasks again should return false + assert(!manager.checkSpeculatableTasks(0)) + assert(manager.copiesRunning(2) === 1) + assert(manager.copiesRunning(3) === 2) + + clock.advance(5*1000) // time = 16s + // At t=16s, TASK 2 has been running for 16s. It is more than the + // SPECULATION_MULTIPLIER * medianRuntime = 1.5 * 10 = 15s. So now TASK 2 will + // be selected for speculation. Here we are verifying that regular speculation configs + // should still take effect even when a EXECUTOR_DECOMMISSION_KILL_INTERVAL is provided and + // corresponding executor is decommissioned + assert(manager.checkSpeculatableTasks(0)) + assert(sched.speculativeTasks.toSet === Set(2, 3)) + assert(manager.copiesRunning(2) === 1) + assert(manager.copiesRunning(3) === 2) + val taskOption2New = manager.resourceOffer("exec3", "host3", NO_PREF)._1 + assert(taskOption2New.isDefined) + val speculativeTask2 = taskOption2New.get + // Ensure that TASK 2 is re-launched on exec3, host3 + assert(speculativeTask2.index === 2) + assert(speculativeTask2.taskId === 5) + assert(speculativeTask2.executorId === "exec3") + assert(speculativeTask2.attemptNumber === 1) + + assert(manager.copiesRunning(2) === 2) + assert(manager.copiesRunning(3) === 2) + + // Offering additional resources should not lead to any speculative tasks being respawned + assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty) + } + test("SPARK-29976 Regular speculation configs should still take effect even when a " + "threshold is provided") { val (manager, clock) = testSpeculationDurationSetup(