Skip to content

Commit

Permalink
remove unnecessary test
Browse files Browse the repository at this point in the history
  • Loading branch information
prakharjain09 committed May 27, 2020
1 parent 55dc94f commit dae9cfe
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1850,7 +1850,7 @@ package object config {
"in aws spot nodes, 1/2 hrs in spot block nodes etc. This config is currently " +
"used to decide what tasks running on decommission executors to speculate")
.version("3.1.0")
.timeConf(TimeUnit.MILLISECONDS)
.timeConf(TimeUnit.SECONDS)
.createOptional

private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.scheduler

import java.io.NotSerializableException
import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit}

import scala.collection.immutable.Map
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
Expand Down Expand Up @@ -102,7 +102,8 @@ private[spark] class TaskSetManager(
}
numTasks <= slots
}
val executorDecommissionKillInterval = conf.get(EXECUTOR_DECOMMISSION_KILL_INTERVAL)
val executorDecommissionKillInterval = conf.get(EXECUTOR_DECOMMISSION_KILL_INTERVAL).map(
TimeUnit.SECONDS.toMillis)

// For each task, tracks whether a copy of the task has succeeded. A task will also be
// marked as "succeeded" if it failed with a fetch failure, in which case it should not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1900,7 +1900,7 @@ class TaskSetManagerSuite
sc.conf.set(config.SPECULATION_ENABLED, true)
sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5ms")
sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5s")
val clock = new ManualClock()
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
Expand All @@ -1917,15 +1917,15 @@ class TaskSetManagerSuite
assert(taskOption2.isDefined)
assert(taskOption2.get.executorId === "exec2")

clock.advance(6) // time = 6ms
clock.advance(6*1000) // time = 6s
// Start TASK 3 on exec2 after some delay
val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
assert(taskOption3.isDefined)
assert(taskOption3.get.executorId === "exec2")

assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))

clock.advance(4) // time = 10ms
clock.advance(4*1000) // time = 10s
// Complete the first 2 tasks and leave the other 2 tasks in running
for (id <- Set(0, 1)) {
manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
Expand All @@ -1934,7 +1934,7 @@ class TaskSetManagerSuite

// checkSpeculatableTasks checks that the task runtime is greater than the threshold for
// speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks need to be running for
// > 15ms for speculation
// > 15s for speculation
assert(!manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.toSet === Set())

Expand All @@ -1943,12 +1943,12 @@ class TaskSetManagerSuite
// (TASK2 -> 15, TASK3 -> 15)
manager.executorDecommission("exec2")
assert(manager.tidToExecutorKillTimeMapping.keySet === Set(2, 3))
assert(manager.tidToExecutorKillTimeMapping(2) === 15)
assert(manager.tidToExecutorKillTimeMapping(3) === 15)
assert(manager.tidToExecutorKillTimeMapping(2) === 15*1000)
assert(manager.tidToExecutorKillTimeMapping(3) === 15*1000)

assert(manager.checkSpeculatableTasks(0))
// Task2 started at t=0, so it can still finish before t=15 (Median task runtime = 10)
// Task3 started at t=6, so it might not finish before t=15. So Task 3 should be part
// Task2 started at t=0s, so it can still finish before t=15s (Median task runtime = 10s)
// Task3 started at t=6s, so it might not finish before t=15s. So Task 3 should be part
// of speculativeTasks
assert(sched.speculativeTasks.toSet === Set(3))
assert(manager.copiesRunning(3) === 1)
Expand All @@ -1966,15 +1966,15 @@ class TaskSetManagerSuite
assert(speculativeTask3.executorId === "exec3")
assert(speculativeTask3.attemptNumber === 1)

clock.advance(1) // time = 11 ms
clock.advance(1*1000) // time = 11s
// Running checkSpeculatableTasks again should return false
assert(!manager.checkSpeculatableTasks(0))
assert(manager.copiesRunning(2) === 1)
assert(manager.copiesRunning(3) === 2)

clock.advance(5) // time = 16 ms
// At t=16 ms, Task 2 has been running for 16 ms. It is more than the
// SPECULATION_MULTIPLIER * medianRuntime = 1.5 * 10 = 15 ms. So now Task 2 will
clock.advance(5*1000) // time = 16s
// At t=16s, Task 2 has been running for 16s. It is more than the
// SPECULATION_MULTIPLIER * medianRuntime = 1.5 * 10 = 15s. So now Task 2 will
// be selected for speculation. Here we are verifying that regular speculation configs
// should still take effect even when a EXECUTOR_DECOMMISSION_KILL_INTERVAL is provided and
// corresponding executor is decommissioned
Expand All @@ -1998,51 +1998,6 @@ class TaskSetManagerSuite
assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty)
}

test("SPARK-21040: Check speculative tasks are not launched when an executor" +
" is decommissioned and the tasks running on it can finish within" +
" the EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
sc = new SparkContext("local", "test")
sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
val taskSet = FakeTask.createTaskSet(4)
sc.conf.set(config.SPECULATION_ENABLED, true)
sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
// Set high value for EXECUTOR_DECOMMISSION_KILL_INTERVAL
sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "50ms")
val clock = new ManualClock()
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
task.metrics.internalAccums
}

// Offer resources for 4 tasks to start, 2 on each exec
Seq("exec1" -> "host1", "exec2" -> "host2").foreach { case (exec, host) =>
(0 until 2).foreach { _ =>
val taskOption = manager.resourceOffer(exec, host, NO_PREF)._1
assert(taskOption.isDefined)
assert(taskOption.get.executorId === exec)
}
}
assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))

clock.advance(10) // time = 10ms
// Complete the first 2 tasks and leave the other 2 tasks in running
for (id <- Set(0, 1)) {
manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
assert(sched.endedTasks(id) === Success)
}
// decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3) will be added to
// executorDecommissionSpeculationTriggerTimeoutOpt
// (TASK2 -> 60, TASK3 -> 60)
manager.executorDecommission("exec2")

// Since the EXECUTOR_DECOMMISSION_KILL_INTERVAL was high, so the already running tasks
// on executor 2 still have chance to finish. So they should not be speculated.
assert(!manager.checkSpeculatableTasks(0))
clock.advance(5) // time = 15ms
assert(!manager.checkSpeculatableTasks(0))
}

test("SPARK-29976 Regular speculation configs should still take effect even when a " +
"threshold is provided") {
val (manager, clock) = testSpeculationDurationSetup(
Expand Down

0 comments on commit dae9cfe

Please sign in to comment.