Skip to content

Commit

Permalink
Speculate tasks which are running on decommission executors based on …
Browse files Browse the repository at this point in the history
…executor kill interval
  • Loading branch information
prakharjain09 committed May 23, 2020
1 parent 5d67331 commit 775cacb
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 3 deletions.
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1842,6 +1842,17 @@ package object config {
.timeConf(TimeUnit.MILLISECONDS)
.createOptional

private[spark] val EXECUTOR_DECOMMISSION_KILL_INTERVAL =
ConfigBuilder("spark.executor.decommission.killInterval")
.doc("Duration after which a decommissioned executor will be killed forcefully." +
"This config is useful for cloud environments where we know in advance when " +
"an executor is going to go down after decommissioning signal Ex- around 2 mins " +
"in aws spot nodes, 1/2 hrs in spot block nodes etc. This config is currently " +
"used to decide what tasks running on decommission executors to speculate")
.version("3.1.0")
.timeConf(TimeUnit.MILLISECONDS)
.createOptional

private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir")
.doc("Staging directory used while submitting applications.")
.version("2.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ private[spark] class TaskSetManager(
}
numTasks <= slots
}
val executorDecommissionKillInterval = conf.get(EXECUTOR_DECOMMISSION_KILL_INTERVAL)

// For each task, tracks whether a copy of the task has succeeded. A task will also be
// marked as "succeeded" if it failed with a fetch failure, in which case it should not
Expand Down Expand Up @@ -165,6 +166,7 @@ private[spark] class TaskSetManager(

// Task index, start and finish time for each task attempt (indexed by task ID)
private[scheduler] val taskInfos = new HashMap[Long, TaskInfo]
private[scheduler] val tidToExecutorKillTimeMapping = new HashMap[Long, Long]

// Use a MedianHeap to record durations of successful tasks so we know when to launch
// speculative tasks. This is only used when speculation is enabled, to avoid the overhead
Expand Down Expand Up @@ -933,6 +935,7 @@ private[spark] class TaskSetManager(

/** If the given task ID is in the set of running tasks, removes it. */
def removeRunningTask(tid: Long): Unit = {
tidToExecutorKillTimeMapping.remove(tid)
if (runningTasksSet.remove(tid) && parent != null) {
parent.decreaseRunningTasks(1)
}
Expand Down Expand Up @@ -1042,7 +1045,19 @@ private[spark] class TaskSetManager(
// bound based on that.
logDebug("Task length threshold for speculation: " + threshold)
for (tid <- runningTasksSet) {
foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold)
var speculated = checkAndSubmitSpeculatableTask(tid, time, threshold)
if (!speculated && tidToExecutorKillTimeMapping.contains(tid)) {
// Check whether this task will finish before the exectorKillTime assuming
// it will take medianDuration overall. If this task cannot finish within
// executorKillInterval, then this task is a candidate for speculation
val taskEndTimeBasedOnMedianDuration = taskInfos(tid).launchTime + medianDuration
val canExceedDeadline = tidToExecutorKillTimeMapping(tid) <
taskEndTimeBasedOnMedianDuration
if (canExceedDeadline) {
speculated = checkAndSubmitSpeculatableTask(tid, time, 0)
}
}
foundTasks |= speculated
}
} else if (speculationTaskDurationThresOpt.isDefined && speculationTasksLessEqToSlots) {
val time = clock.getTimeMillis()
Expand Down Expand Up @@ -1100,8 +1115,12 @@ private[spark] class TaskSetManager(

def executorDecommission(execId: String): Unit = {
recomputeLocality()
// Future consideration: if an executor is decommissioned it may make sense to add the current
// tasks to the spec exec queue.
if (executorDecommissionKillInterval.nonEmpty) {
val executorKillTime = clock.getTimeMillis() + executorDecommissionKillInterval.get
runningTasksSet.filter(taskInfos(_).executorId == execId).foreach { tid =>
tidToExecutorKillTimeMapping(tid) = executorKillTime
}
}
}

def recomputeLocality(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1892,6 +1892,104 @@ class TaskSetManagerSuite
testSpeculationDurationThreshold(true, 2, 1)
}

test("Check speculative tasks are launched when an executor is decommissioned" +
" and the tasks running on it cannot finish within EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
sc = new SparkContext("local", "test")
sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3"))
val taskSet = FakeTask.createTaskSet(4)
sc.conf.set(config.SPECULATION_ENABLED, true)
sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5ms")
val clock = new ManualClock()
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
task.metrics.internalAccums
}

// Offer TASK 0,1 to exec1, Task 2 to exec2
(0 until 2).foreach { _ =>
val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
assert(taskOption.isDefined)
assert(taskOption.get.executorId === "exec1")
}
val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
assert(taskOption2.isDefined)
assert(taskOption2.get.executorId === "exec2")

clock.advance(6) // time = 6ms
// Offer TASK 3 to exec2 after some delay
val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
assert(taskOption3.isDefined)
assert(taskOption3.get.executorId === "exec2")

assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))

clock.advance(4) // time = 10ms
// Complete the first 2 tasks and leave the other 2 tasks in running
for (id <- Set(0, 1)) {
manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
assert(sched.endedTasks(id) === Success)
}

// checkSpeculatableTasks checks that the task runtime is greater than the threshold for
// speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks need to be running for
// > 15ms for speculation
assert(!manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.toSet === Set())

// decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3) will be added to
// executorDecommissionSpeculationTriggerTimeoutOpt
// (TASK2 -> 15, TASK3 -> 15)
manager.executorDecommission("exec2")

assert(manager.checkSpeculatableTasks(0))
// Task2 started at t=0, so it can still finish before t=15 (Median task runtime = 10)
// Task3 started at t=6, so it might not finish before t=15
assert(sched.speculativeTasks.toSet === Set(3))
assert(manager.copiesRunning(3) === 1)

// Offer resource to start the speculative attempt for the running task. We offer more
// resources, and ensure that speculative tasks get scheduled appropriately -- only one extra
// copy per speculatable task
val taskOption3New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
assert(manager.resourceOffer("exec3", "host3", NO_PREF)._1.isEmpty)
assert(taskOption3New.isDefined)
val speculativeTask3 = taskOption3New.get
assert(speculativeTask3.index === 3)
assert(speculativeTask3.taskId === 4)
assert(speculativeTask3.executorId === "exec3")
assert(speculativeTask3.attemptNumber === 1)

clock.advance(1) // time = 11 ms
// Running checkSpeculatableTasks again should return false
assert(!manager.checkSpeculatableTasks(0))
assert(manager.copiesRunning(2) === 1)
assert(manager.copiesRunning(3) === 2)

clock.advance(5) // time = 16 ms
assert(manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.toSet === Set(2, 3))
assert(manager.copiesRunning(2) === 1)
assert(manager.copiesRunning(3) === 2)
val taskOption2New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
assert(taskOption2New.isDefined)
val speculativeTask2 = taskOption2New.get
// Ensure that task index 2 is launched on exec3, host3
assert(speculativeTask2.index === 2)
assert(speculativeTask2.taskId === 5)
assert(speculativeTask2.executorId === "exec3")
assert(speculativeTask2.attemptNumber === 1)

assert(manager.copiesRunning(2) === 2)
assert(manager.copiesRunning(3) === 2)

// Offering additional resources should not lead to any speculative tasks being respawned
assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty)
assert(manager.resourceOffer("exec2", "host2", ANY)._1.isEmpty)
assert(manager.resourceOffer("exec3", "host3", ANY)._1.isEmpty)
}

test("SPARK-29976 Regular speculation configs should still take effect even when a " +
"threshold is provided") {
val (manager, clock) = testSpeculationDurationSetup(
Expand Down

0 comments on commit 775cacb

Please sign in to comment.