diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index e3a54e4d84fea..660e0bfca2bca 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1850,7 +1850,7 @@ package object config { "in aws spot nodes, 1/2 hrs in spot block nodes etc. This config is currently " + "used to decide what tasks running on decommission executors to speculate") .version("3.1.0") - .timeConf(TimeUnit.MILLISECONDS) + .timeConf(TimeUnit.SECONDS) .createOptional private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir") diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 668c404dd66b1..883119bdb0a50 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -19,7 +19,7 @@ package org.apache.spark.scheduler import java.io.NotSerializableException import java.nio.ByteBuffer -import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit} import scala.collection.immutable.Map import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} @@ -102,7 +102,8 @@ private[spark] class TaskSetManager( } numTasks <= slots } - val executorDecommissionKillInterval = conf.get(EXECUTOR_DECOMMISSION_KILL_INTERVAL) + val executorDecommissionKillInterval = conf.get(EXECUTOR_DECOMMISSION_KILL_INTERVAL).map( + TimeUnit.SECONDS.toMillis) // For each task, tracks whether a copy of the task has succeeded. A task will also be // marked as "succeeded" if it failed with a fetch failure, in which case it should not diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index a0774f3bcac46..583b69b7b59ab 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1900,7 +1900,7 @@ class TaskSetManagerSuite sc.conf.set(config.SPECULATION_ENABLED, true) sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5) sc.conf.set(config.SPECULATION_QUANTILE, 0.5) - sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5ms") + sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5s") val clock = new ManualClock() val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => @@ -1917,7 +1917,7 @@ class TaskSetManagerSuite assert(taskOption2.isDefined) assert(taskOption2.get.executorId === "exec2") - clock.advance(6) // time = 6ms + clock.advance(6*1000) // time = 6s // Start TASK 3 on exec2 after some delay val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1 assert(taskOption3.isDefined) @@ -1925,7 +1925,7 @@ class TaskSetManagerSuite assert(sched.startedTasks.toSet === Set(0, 1, 2, 3)) - clock.advance(4) // time = 10ms + clock.advance(4*1000) // time = 10s // Complete the first 2 tasks and leave the other 2 tasks in running for (id <- Set(0, 1)) { manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id))) @@ -1934,7 +1934,7 @@ class TaskSetManagerSuite // checkSpeculatableTasks checks that the task runtime is greater than the threshold for // speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks need to be running for - // > 15ms for speculation + // > 15s for speculation assert(!manager.checkSpeculatableTasks(0)) assert(sched.speculativeTasks.toSet === Set()) @@ -1943,12 +1943,12 @@ class TaskSetManagerSuite // (TASK2 -> 15, TASK3 -> 15) manager.executorDecommission("exec2") assert(manager.tidToExecutorKillTimeMapping.keySet === Set(2, 3)) - assert(manager.tidToExecutorKillTimeMapping(2) === 15) - assert(manager.tidToExecutorKillTimeMapping(3) === 15) + assert(manager.tidToExecutorKillTimeMapping(2) === 15*1000) + assert(manager.tidToExecutorKillTimeMapping(3) === 15*1000) assert(manager.checkSpeculatableTasks(0)) - // Task2 started at t=0, so it can still finish before t=15 (Median task runtime = 10) - // Task3 started at t=6, so it might not finish before t=15. So Task 3 should be part + // Task2 started at t=0s, so it can still finish before t=15s (Median task runtime = 10s) + // Task3 started at t=6s, so it might not finish before t=15s. So Task 3 should be part // of speculativeTasks assert(sched.speculativeTasks.toSet === Set(3)) assert(manager.copiesRunning(3) === 1) @@ -1966,15 +1966,15 @@ class TaskSetManagerSuite assert(speculativeTask3.executorId === "exec3") assert(speculativeTask3.attemptNumber === 1) - clock.advance(1) // time = 11 ms + clock.advance(1*1000) // time = 11s // Running checkSpeculatableTasks again should return false assert(!manager.checkSpeculatableTasks(0)) assert(manager.copiesRunning(2) === 1) assert(manager.copiesRunning(3) === 2) - clock.advance(5) // time = 16 ms - // At t=16 ms, Task 2 has been running for 16 ms. It is more than the - // SPECULATION_MULTIPLIER * medianRuntime = 1.5 * 10 = 15 ms. So now Task 2 will + clock.advance(5*1000) // time = 16s + // At t=16s, Task 2 has been running for 16s. It is more than the + // SPECULATION_MULTIPLIER * medianRuntime = 1.5 * 10 = 15s. So now Task 2 will // be selected for speculation. Here we are verifying that regular speculation configs // should still take effect even when a EXECUTOR_DECOMMISSION_KILL_INTERVAL is provided and // corresponding executor is decommissioned @@ -1998,51 +1998,6 @@ class TaskSetManagerSuite assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty) } - test("SPARK-21040: Check speculative tasks are not launched when an executor" + - " is decommissioned and the tasks running on it can finish within" + - " the EXECUTOR_DECOMMISSION_KILL_INTERVAL") { - sc = new SparkContext("local", "test") - sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) - val taskSet = FakeTask.createTaskSet(4) - sc.conf.set(config.SPECULATION_ENABLED, true) - sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5) - sc.conf.set(config.SPECULATION_QUANTILE, 0.5) - // Set high value for EXECUTOR_DECOMMISSION_KILL_INTERVAL - sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "50ms") - val clock = new ManualClock() - val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) - val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => - task.metrics.internalAccums - } - - // Offer resources for 4 tasks to start, 2 on each exec - Seq("exec1" -> "host1", "exec2" -> "host2").foreach { case (exec, host) => - (0 until 2).foreach { _ => - val taskOption = manager.resourceOffer(exec, host, NO_PREF)._1 - assert(taskOption.isDefined) - assert(taskOption.get.executorId === exec) - } - } - assert(sched.startedTasks.toSet === Set(0, 1, 2, 3)) - - clock.advance(10) // time = 10ms - // Complete the first 2 tasks and leave the other 2 tasks in running - for (id <- Set(0, 1)) { - manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id))) - assert(sched.endedTasks(id) === Success) - } - // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3) will be added to - // executorDecommissionSpeculationTriggerTimeoutOpt - // (TASK2 -> 60, TASK3 -> 60) - manager.executorDecommission("exec2") - - // Since the EXECUTOR_DECOMMISSION_KILL_INTERVAL was high, so the already running tasks - // on executor 2 still have chance to finish. So they should not be speculated. - assert(!manager.checkSpeculatableTasks(0)) - clock.advance(5) // time = 15ms - assert(!manager.checkSpeculatableTasks(0)) - } - test("SPARK-29976 Regular speculation configs should still take effect even when a " + "threshold is provided") { val (manager, clock) = testSpeculationDurationSetup(