Skip to content

Commit

Permalink
[SPARK-21040][CORE] Speculate tasks which are running on decommission…
Browse files Browse the repository at this point in the history
… executors

This PR adds functionality to consider the running tasks on decommission executors based on some config.
In spark-on-cloud , we sometimes already know that an executor won't be alive for more than fix amount of time. Ex- In AWS Spot nodes, once we get the notification, we know that a node will be gone in 120 seconds.
So if the running tasks on the decommissioning executors may run beyond currentTime+120 seconds, then they are candidate for speculation.

Currently when an executor is decommission, we stop scheduling new tasks on those executors but the already running tasks keeps on running on them. Based on the cloud, we might know beforehand that an executor won't be alive for more than a preconfigured time. Different cloud providers gives different timeouts before they take away the nodes. For Ex- In case of AWS spot nodes, an executor won't be alive for more than 120 seconds. We can utilize this information in cloud environments and take better decisions about speculating the already running tasks on decommission executors.

Yes. This PR adds a new config "spark.executor.decommission.killInterval" which they can explicitly set based on the cloud environment where they are running.

Added UT.

Closes apache#28619 from prakharjain09/SPARK-21040-speculate-decommission-exec-tasks.

Authored-by: Prakhar Jain <prakharjain09@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>
  • Loading branch information
prakharjain09 authored and dongjoon-hyun committed Sep 2, 2020
1 parent 41d65a2 commit 2945044
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 6 deletions.
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1830,6 +1830,17 @@ package object config {
.timeConf(TimeUnit.MILLISECONDS)
.createOptional

private[spark] val EXECUTOR_DECOMMISSION_KILL_INTERVAL =
ConfigBuilder("spark.executor.decommission.killInterval")
.doc("Duration after which a decommissioned executor will be killed forcefully." +
"This config is useful for cloud environments where we know in advance when " +
"an executor is going to go down after decommissioning signal i.e. around 2 mins " +
"in aws spot nodes, 1/2 hrs in spot block nodes etc. This config is currently " +
"used to decide what tasks running on decommission executors to speculate.")
.version("3.1.0")
.timeConf(TimeUnit.SECONDS)
.createOptional

private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir")
.doc("Staging directory used while submitting applications.")
.version("2.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.scheduler

import java.io.NotSerializableException
import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit}

import scala.collection.immutable.Map
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
Expand Down Expand Up @@ -92,8 +92,18 @@ private[spark] class TaskSetManager(
// the worker. Therefore, CPUS_PER_TASK is okay to be greater than 1 without setting #cores.
// To handle this case, we assume the minimum number of slots is 1.
// TODO: use the actual number of slots for standalone mode.
val speculationTasksLessEqToSlots =
numTasks <= Math.max(conf.get(EXECUTOR_CORES) / sched.CPUS_PER_TASK, 1)
val speculationTasksLessEqToSlots = {
val rpId = taskSet.resourceProfileId
val resourceProfile = sched.sc.resourceProfileManager.resourceProfileFromId(rpId)
val slots = if (!resourceProfile.isCoresLimitKnown) {
1
} else {
resourceProfile.maxTasksPerExecutor(conf)
}
numTasks <= slots
}
val executorDecommissionKillInterval = conf.get(EXECUTOR_DECOMMISSION_KILL_INTERVAL).map(
TimeUnit.SECONDS.toMillis)

// For each task, tracks whether a copy of the task has succeeded. A task will also be
// marked as "succeeded" if it failed with a fetch failure, in which case it should not
Expand Down Expand Up @@ -157,6 +167,7 @@ private[spark] class TaskSetManager(

// Task index, start and finish time for each task attempt (indexed by task ID)
private[scheduler] val taskInfos = new HashMap[Long, TaskInfo]
private[scheduler] val tidToExecutorKillTimeMapping = new HashMap[Long, Long]

// Use a MedianHeap to record durations of successful tasks so we know when to launch
// speculative tasks. This is only used when speculation is enabled, to avoid the overhead
Expand Down Expand Up @@ -918,6 +929,7 @@ private[spark] class TaskSetManager(

/** If the given task ID is in the set of running tasks, removes it. */
def removeRunningTask(tid: Long): Unit = {
tidToExecutorKillTimeMapping.remove(tid)
if (runningTasksSet.remove(tid) && parent != null) {
parent.decreaseRunningTasks(1)
}
Expand Down Expand Up @@ -1027,7 +1039,19 @@ private[spark] class TaskSetManager(
// bound based on that.
logDebug("Task length threshold for speculation: " + threshold)
for (tid <- runningTasksSet) {
foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold)
var speculated = checkAndSubmitSpeculatableTask(tid, time, threshold)
if (!speculated && tidToExecutorKillTimeMapping.contains(tid)) {
// Check whether this task will finish before the exectorKillTime assuming
// it will take medianDuration overall. If this task cannot finish within
// executorKillInterval, then this task is a candidate for speculation
val taskEndTimeBasedOnMedianDuration = taskInfos(tid).launchTime + medianDuration
val canExceedDeadline = tidToExecutorKillTimeMapping(tid) <
taskEndTimeBasedOnMedianDuration
if (canExceedDeadline) {
speculated = checkAndSubmitSpeculatableTask(tid, time, 0)
}
}
foundTasks |= speculated
}
} else if (speculationTaskDurationThresOpt.isDefined && speculationTasksLessEqToSlots) {
val time = clock.getTimeMillis()
Expand Down Expand Up @@ -1085,8 +1109,12 @@ private[spark] class TaskSetManager(

def executorDecommission(execId: String): Unit = {
recomputeLocality()
// Future consideration: if an executor is decommissioned it may make sense to add the current
// tasks to the spec exec queue.
executorDecommissionKillInterval.foreach { interval =>
val executorKillTime = clock.getTimeMillis() + interval
runningTasksSet.filter(taskInfos(_).executorId == execId).foreach { tid =>
tidToExecutorKillTimeMapping(tid) = executorKillTime
}
}
}

def recomputeLocality(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1867,6 +1867,112 @@ class TaskSetManagerSuite
testSpeculationDurationThreshold(true, 2, 1)
}

test("SPARK-21040: Check speculative tasks are launched when an executor is decommissioned" +
" and the tasks running on it cannot finish within EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
sc = new SparkContext("local", "test")
sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3"))
val taskSet = FakeTask.createTaskSet(4)
sc.conf.set(config.SPECULATION_ENABLED, true)
sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5s")
val clock = new ManualClock()
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
task.metrics.internalAccums
}

// Start TASK 0,1 on exec1, TASK 2 on exec2
(0 until 2).foreach { _ =>
val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
assert(taskOption.isDefined)
assert(taskOption.get.executorId === "exec1")
}
val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
assert(taskOption2.isDefined)
assert(taskOption2.get.executorId === "exec2")

clock.advance(6*1000) // time = 6s
// Start TASK 3 on exec2 after some delay
val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
assert(taskOption3.isDefined)
assert(taskOption3.get.executorId === "exec2")

assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))

clock.advance(4*1000) // time = 10s
// Complete the first 2 tasks and leave the other 2 tasks in running
for (id <- Set(0, 1)) {
manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
assert(sched.endedTasks(id) === Success)
}

// checkSpeculatableTasks checks that the task runtime is greater than the threshold for
// speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks need to be running for
// > 15s for speculation
assert(!manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.toSet === Set())

// decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3) will be added to
// executorDecommissionSpeculationTriggerTimeoutOpt
// (TASK 2 -> 15, TASK 3 -> 15)
manager.executorDecommission("exec2")
assert(manager.tidToExecutorKillTimeMapping.keySet === Set(2, 3))
assert(manager.tidToExecutorKillTimeMapping(2) === 15*1000)
assert(manager.tidToExecutorKillTimeMapping(3) === 15*1000)

assert(manager.checkSpeculatableTasks(0))
// TASK 2 started at t=0s, so it can still finish before t=15s (Median task runtime = 10s)
// TASK 3 started at t=6s, so it might not finish before t=15s. So TASK 3 should be part
// of speculativeTasks
assert(sched.speculativeTasks.toSet === Set(3))
assert(manager.copiesRunning(3) === 1)

// Offer resource to start the speculative attempt for the running task
val taskOption3New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
// Offer more resources. Nothing should get scheduled now.
assert(manager.resourceOffer("exec3", "host3", NO_PREF)._1.isEmpty)
assert(taskOption3New.isDefined)

// Assert info about the newly launched speculative task
val speculativeTask3 = taskOption3New.get
assert(speculativeTask3.index === 3)
assert(speculativeTask3.taskId === 4)
assert(speculativeTask3.executorId === "exec3")
assert(speculativeTask3.attemptNumber === 1)

clock.advance(1*1000) // time = 11s
// Running checkSpeculatableTasks again should return false
assert(!manager.checkSpeculatableTasks(0))
assert(manager.copiesRunning(2) === 1)
assert(manager.copiesRunning(3) === 2)

clock.advance(5*1000) // time = 16s
// At t=16s, TASK 2 has been running for 16s. It is more than the
// SPECULATION_MULTIPLIER * medianRuntime = 1.5 * 10 = 15s. So now TASK 2 will
// be selected for speculation. Here we are verifying that regular speculation configs
// should still take effect even when a EXECUTOR_DECOMMISSION_KILL_INTERVAL is provided and
// corresponding executor is decommissioned
assert(manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.toSet === Set(2, 3))
assert(manager.copiesRunning(2) === 1)
assert(manager.copiesRunning(3) === 2)
val taskOption2New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
assert(taskOption2New.isDefined)
val speculativeTask2 = taskOption2New.get
// Ensure that TASK 2 is re-launched on exec3, host3
assert(speculativeTask2.index === 2)
assert(speculativeTask2.taskId === 5)
assert(speculativeTask2.executorId === "exec3")
assert(speculativeTask2.attemptNumber === 1)

assert(manager.copiesRunning(2) === 2)
assert(manager.copiesRunning(3) === 2)

// Offering additional resources should not lead to any speculative tasks being respawned
assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty)
}

test("SPARK-29976 Regular speculation configs should still take effect even when a " +
"threshold is provided") {
val (manager, clock) = testSpeculationDurationSetup(
Expand Down

0 comments on commit 2945044

Please sign in to comment.