From 55dc94f622570534a7e70be69ae731cd9bc86766 Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Tue, 26 May 2020 20:18:37 +0530 Subject: [PATCH] address review comments --- .../org/apache/spark/scheduler/TaskSetManager.scala | 4 ++-- .../apache/spark/scheduler/TaskSetManagerSuite.scala | 11 ++++++----- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index f3237459abbde..668c404dd66b1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -1115,8 +1115,8 @@ private[spark] class TaskSetManager( def executorDecommission(execId: String): Unit = { recomputeLocality() - if (executorDecommissionKillInterval.nonEmpty) { - val executorKillTime = clock.getTimeMillis() + executorDecommissionKillInterval.get + executorDecommissionKillInterval.foreach { interval => + val executorKillTime = clock.getTimeMillis() + interval runningTasksSet.filter(taskInfos(_).executorId == execId).foreach { tid => tidToExecutorKillTimeMapping(tid) = executorKillTime } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 2c4ff533508dc..a0774f3bcac46 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1942,6 +1942,9 @@ class TaskSetManagerSuite // executorDecommissionSpeculationTriggerTimeoutOpt // (TASK2 -> 15, TASK3 -> 15) manager.executorDecommission("exec2") + assert(manager.tidToExecutorKillTimeMapping.keySet === Set(2, 3)) + assert(manager.tidToExecutorKillTimeMapping(2) === 15) + assert(manager.tidToExecutorKillTimeMapping(3) === 15) assert(manager.checkSpeculatableTasks(0)) // Task2 started at t=0, so it can still finish before t=15 (Median task runtime = 10) @@ -1952,7 +1955,7 @@ class TaskSetManagerSuite // Offer resource to start the speculative attempt for the running task val taskOption3New = manager.resourceOffer("exec3", "host3", NO_PREF)._1 - // Offer more resources. Noting should get scheduled now. + // Offer more resources. Nothing should get scheduled now. assert(manager.resourceOffer("exec3", "host3", NO_PREF)._1.isEmpty) assert(taskOption3New.isDefined) @@ -1970,8 +1973,8 @@ class TaskSetManagerSuite assert(manager.copiesRunning(3) === 2) clock.advance(5) // time = 16 ms - // At t=16 ms, Task 4 has completed 16 ms. It is more than the - // SPECULATION_MULTIPLIER * medianRuntime = 1.5 * 10 = 15 ms. So now Task 4 will + // At t=16 ms, Task 2 has been running for 16 ms. It is more than the + // SPECULATION_MULTIPLIER * medianRuntime = 1.5 * 10 = 15 ms. So now Task 2 will // be selected for speculation. Here we are verifying that regular speculation configs // should still take effect even when a EXECUTOR_DECOMMISSION_KILL_INTERVAL is provided and // corresponding executor is decommissioned @@ -1993,8 +1996,6 @@ class TaskSetManagerSuite // Offering additional resources should not lead to any speculative tasks being respawned assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty) - assert(manager.resourceOffer("exec2", "host2", ANY)._1.isEmpty) - assert(manager.resourceOffer("exec3", "host3", ANY)._1.isEmpty) } test("SPARK-21040: Check speculative tasks are not launched when an executor" +