Skip to content

Commit

Permalink
add test case
Browse files Browse the repository at this point in the history
  • Loading branch information
prakharjain09 committed May 25, 2020
1 parent 775cacb commit 7521adf
Showing 1 changed file with 59 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1892,7 +1892,7 @@ class TaskSetManagerSuite
testSpeculationDurationThreshold(true, 2, 1)
}

test("Check speculative tasks are launched when an executor is decommissioned" +
test("SPARK-21040: Check speculative tasks are launched when an executor is decommissioned" +
" and the tasks running on it cannot finish within EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
sc = new SparkContext("local", "test")
sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3"))
Expand All @@ -1907,7 +1907,7 @@ class TaskSetManagerSuite
task.metrics.internalAccums
}

// Offer TASK 0,1 to exec1, Task 2 to exec2
// Start TASK 0,1 on exec1, Task 2 on exec2
(0 until 2).foreach { _ =>
val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
assert(taskOption.isDefined)
Expand All @@ -1918,7 +1918,7 @@ class TaskSetManagerSuite
assert(taskOption2.get.executorId === "exec2")

clock.advance(6) // time = 6ms
// Offer TASK 3 to exec2 after some delay
// Start TASK 3 on exec2 after some delay
val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
assert(taskOption3.isDefined)
assert(taskOption3.get.executorId === "exec2")
Expand All @@ -1945,16 +1945,18 @@ class TaskSetManagerSuite

assert(manager.checkSpeculatableTasks(0))
// Task2 started at t=0, so it can still finish before t=15 (Median task runtime = 10)
// Task3 started at t=6, so it might not finish before t=15
// Task3 started at t=6, so it might not finish before t=15. So Task 3 should be part
// of speculativeTasks
assert(sched.speculativeTasks.toSet === Set(3))
assert(manager.copiesRunning(3) === 1)

// Offer resource to start the speculative attempt for the running task. We offer more
// resources, and ensure that speculative tasks get scheduled appropriately -- only one extra
// copy per speculatable task
// Offer resource to start the speculative attempt for the running task
val taskOption3New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
// Offer more resources. Noting should get scheduled now.
assert(manager.resourceOffer("exec3", "host3", NO_PREF)._1.isEmpty)
assert(taskOption3New.isDefined)

// Assert info about the newly launched speculative task
val speculativeTask3 = taskOption3New.get
assert(speculativeTask3.index === 3)
assert(speculativeTask3.taskId === 4)
Expand All @@ -1968,6 +1970,11 @@ class TaskSetManagerSuite
assert(manager.copiesRunning(3) === 2)

clock.advance(5) // time = 16 ms
// At t=16 ms, Task 4 has completed 16 ms. It is more than the
// SPECULATION_MULTIPLIER * medianRuntime = 1.5 * 10 = 15 ms. So now Task 4 will
// be selected for speculation. Here we are verifying that regular speculation configs
// should still take effect even when a EXECUTOR_DECOMMISSION_KILL_INTERVAL is provided and
// corresponding executor is decommissioned
assert(manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.toSet === Set(2, 3))
assert(manager.copiesRunning(2) === 1)
Expand All @@ -1990,6 +1997,51 @@ class TaskSetManagerSuite
assert(manager.resourceOffer("exec3", "host3", ANY)._1.isEmpty)
}

test("SPARK-21040: Check speculative tasks are not launched when an executor" +
" is decommissioned and the tasks running on it can finish within" +
" the EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
sc = new SparkContext("local", "test")
sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
val taskSet = FakeTask.createTaskSet(4)
sc.conf.set(config.SPECULATION_ENABLED, true)
sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
// Set high value for EXECUTOR_DECOMMISSION_KILL_INTERVAL
sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "50ms")
val clock = new ManualClock()
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
task.metrics.internalAccums
}

// Offer resources for 4 tasks to start, 2 on each exec
Seq("exec1" -> "host1", "exec2" -> "host2").foreach { case (exec, host) =>
(0 until 2).foreach { _ =>
val taskOption = manager.resourceOffer(exec, host, NO_PREF)._1
assert(taskOption.isDefined)
assert(taskOption.get.executorId === exec)
}
}
assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))

clock.advance(10) // time = 10ms
// Complete the first 2 tasks and leave the other 2 tasks in running
for (id <- Set(0, 1)) {
manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
assert(sched.endedTasks(id) === Success)
}
// decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3) will be added to
// executorDecommissionSpeculationTriggerTimeoutOpt
// (TASK2 -> 60, TASK3 -> 60)
manager.executorDecommission("exec2")

// Since the EXECUTOR_DECOMMISSION_KILL_INTERVAL was high, so the already running tasks
// on executor 2 still have chance to finish. So they should not be speculated.
assert(!manager.checkSpeculatableTasks(0))
clock.advance(5) // time = 15ms
assert(!manager.checkSpeculatableTasks(0))
}

test("SPARK-29976 Regular speculation configs should still take effect even when a " +
"threshold is provided") {
val (manager, clock) = testSpeculationDurationSetup(
Expand Down

0 comments on commit 7521adf

Please sign in to comment.