Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
prakharjain09 committed May 26, 2020
1 parent 7521adf commit 55dc94f
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1115,8 +1115,8 @@ private[spark] class TaskSetManager(

def executorDecommission(execId: String): Unit = {
recomputeLocality()
if (executorDecommissionKillInterval.nonEmpty) {
val executorKillTime = clock.getTimeMillis() + executorDecommissionKillInterval.get
executorDecommissionKillInterval.foreach { interval =>
val executorKillTime = clock.getTimeMillis() + interval
runningTasksSet.filter(taskInfos(_).executorId == execId).foreach { tid =>
tidToExecutorKillTimeMapping(tid) = executorKillTime
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1942,6 +1942,9 @@ class TaskSetManagerSuite
// executorDecommissionSpeculationTriggerTimeoutOpt
// (TASK2 -> 15, TASK3 -> 15)
manager.executorDecommission("exec2")
assert(manager.tidToExecutorKillTimeMapping.keySet === Set(2, 3))
assert(manager.tidToExecutorKillTimeMapping(2) === 15)
assert(manager.tidToExecutorKillTimeMapping(3) === 15)

assert(manager.checkSpeculatableTasks(0))
// Task2 started at t=0, so it can still finish before t=15 (Median task runtime = 10)
Expand All @@ -1952,7 +1955,7 @@ class TaskSetManagerSuite

// Offer resource to start the speculative attempt for the running task
val taskOption3New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
// Offer more resources. Noting should get scheduled now.
// Offer more resources. Nothing should get scheduled now.
assert(manager.resourceOffer("exec3", "host3", NO_PREF)._1.isEmpty)
assert(taskOption3New.isDefined)

Expand All @@ -1970,8 +1973,8 @@ class TaskSetManagerSuite
assert(manager.copiesRunning(3) === 2)

clock.advance(5) // time = 16 ms
// At t=16 ms, Task 4 has completed 16 ms. It is more than the
// SPECULATION_MULTIPLIER * medianRuntime = 1.5 * 10 = 15 ms. So now Task 4 will
// At t=16 ms, Task 2 has been running for 16 ms. It is more than the
// SPECULATION_MULTIPLIER * medianRuntime = 1.5 * 10 = 15 ms. So now Task 2 will
// be selected for speculation. Here we are verifying that regular speculation configs
// should still take effect even when a EXECUTOR_DECOMMISSION_KILL_INTERVAL is provided and
// corresponding executor is decommissioned
Expand All @@ -1993,8 +1996,6 @@ class TaskSetManagerSuite

// Offering additional resources should not lead to any speculative tasks being respawned
assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty)
assert(manager.resourceOffer("exec2", "host2", ANY)._1.isEmpty)
assert(manager.resourceOffer("exec3", "host3", ANY)._1.isEmpty)
}

test("SPARK-21040: Check speculative tasks are not launched when an executor" +
Expand Down

0 comments on commit 55dc94f

Please sign in to comment.