Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21040][CORE] Speculate tasks which are running on decommission executors #28619

11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1842,6 +1842,17 @@ package object config {
.timeConf(TimeUnit.MILLISECONDS)
.createOptional

private[spark] val EXECUTOR_DECOMMISSION_KILL_INTERVAL =
ConfigBuilder("spark.executor.decommission.killInterval")
.doc("Duration after which a decommissioned executor will be killed forcefully." +
"This config is useful for cloud environments where we know in advance when " +
"an executor is going to go down after decommissioning signal Ex- around 2 mins " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: does the Ex in the "go down after decommissioning signal Ex- around 2 mins" stands for "example"?
Sorry I have not seen this abbreviation used before and still not sure it exists. What about "i.e." that even used in Spark documentation several times?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to "i.e.".

"in aws spot nodes, 1/2 hrs in spot block nodes etc. This config is currently " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the timeout is decided by the cloud vendors? What does this config specify?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan This config can be set by users based on their setups. If they are using AWS spot nodes, timeout can be set to somewhere around 120 seconds, if they are using fix duration 6hrs spot blocks (say they decommission executors at 5:45), timeout can be set to 15 mins and so on.

If user doesn't set this timeout, things will remain as they were and tasks running on decommission executors won't get any special treatment with respect to speculation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible that Spark can get this timeout value from the cluster manager? So that users don't need to set it manually. cc @holdenk

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan As per my understanding, Worker Decommissioning is getting triggered currently using SIGPWR signal (and not via some message coming from YARN/Kubernetes Cluster manager). So getting this timeout from Spark Cluster Manager might not be possible. We might be able to do this once Spark's Worker Decommissioning logic starts triggering via communication from YARN etc in future. cc @holdenk

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe there are some situations where we can know the length of time from the cluster manager or from Spark it's self, but not all. I think having a configurable default for folks who know their cloud provider environment makes sense

"used to decide what tasks running on decommission executors to speculate")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Missing dot from the end of sentence.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

.version("3.1.0")
.timeConf(TimeUnit.SECONDS)
.createOptional

private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir")
.doc("Staging directory used while submitting applications.")
.version("2.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.scheduler

import java.io.NotSerializableException
import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit}

import scala.collection.immutable.Map
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
Expand Down Expand Up @@ -102,6 +102,8 @@ private[spark] class TaskSetManager(
}
numTasks <= slots
}
val executorDecommissionKillInterval = conf.get(EXECUTOR_DECOMMISSION_KILL_INTERVAL).map(
TimeUnit.SECONDS.toMillis)

// For each task, tracks whether a copy of the task has succeeded. A task will also be
// marked as "succeeded" if it failed with a fetch failure, in which case it should not
Expand Down Expand Up @@ -165,6 +167,7 @@ private[spark] class TaskSetManager(

// Task index, start and finish time for each task attempt (indexed by task ID)
private[scheduler] val taskInfos = new HashMap[Long, TaskInfo]
private[scheduler] val tidToExecutorKillTimeMapping = new HashMap[Long, Long]

// Use a MedianHeap to record durations of successful tasks so we know when to launch
// speculative tasks. This is only used when speculation is enabled, to avoid the overhead
Expand Down Expand Up @@ -933,6 +936,7 @@ private[spark] class TaskSetManager(

/** If the given task ID is in the set of running tasks, removes it. */
def removeRunningTask(tid: Long): Unit = {
tidToExecutorKillTimeMapping.remove(tid)
if (runningTasksSet.remove(tid) && parent != null) {
parent.decreaseRunningTasks(1)
}
Expand Down Expand Up @@ -1042,7 +1046,19 @@ private[spark] class TaskSetManager(
// bound based on that.
logDebug("Task length threshold for speculation: " + threshold)
for (tid <- runningTasksSet) {
foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold)
var speculated = checkAndSubmitSpeculatableTask(tid, time, threshold)
if (!speculated && tidToExecutorKillTimeMapping.contains(tid)) {
prakharjain09 marked this conversation as resolved.
Show resolved Hide resolved
// Check whether this task will finish before the exectorKillTime assuming
// it will take medianDuration overall. If this task cannot finish within
// executorKillInterval, then this task is a candidate for speculation
val taskEndTimeBasedOnMedianDuration = taskInfos(tid).launchTime + medianDuration
val canExceedDeadline = tidToExecutorKillTimeMapping(tid) <
taskEndTimeBasedOnMedianDuration
if (canExceedDeadline) {
speculated = checkAndSubmitSpeculatableTask(tid, time, 0)
prakharjain09 marked this conversation as resolved.
Show resolved Hide resolved
}
}
foundTasks |= speculated
}
} else if (speculationTaskDurationThresOpt.isDefined && speculationTasksLessEqToSlots) {
val time = clock.getTimeMillis()
Expand Down Expand Up @@ -1100,8 +1116,12 @@ private[spark] class TaskSetManager(

def executorDecommission(execId: String): Unit = {
recomputeLocality()
// Future consideration: if an executor is decommissioned it may make sense to add the current
// tasks to the spec exec queue.
executorDecommissionKillInterval.foreach { interval =>
val executorKillTime = clock.getTimeMillis() + interval
runningTasksSet.filter(taskInfos(_).executorId == execId).foreach { tid =>
tidToExecutorKillTimeMapping(tid) = executorKillTime
}
}
}

def recomputeLocality(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1892,6 +1892,112 @@ class TaskSetManagerSuite
testSpeculationDurationThreshold(true, 2, 1)
}

test("SPARK-21040: Check speculative tasks are launched when an executor is decommissioned" +
" and the tasks running on it cannot finish within EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
sc = new SparkContext("local", "test")
sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3"))
val taskSet = FakeTask.createTaskSet(4)
sc.conf.set(config.SPECULATION_ENABLED, true)
sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5s")
val clock = new ManualClock()
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
task.metrics.internalAccums
}

// Start TASK 0,1 on exec1, Task 2 on exec2
(0 until 2).foreach { _ =>
val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
assert(taskOption.isDefined)
assert(taskOption.get.executorId === "exec1")
}
val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
assert(taskOption2.isDefined)
assert(taskOption2.get.executorId === "exec2")

clock.advance(6*1000) // time = 6s
// Start TASK 3 on exec2 after some delay
val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
assert(taskOption3.isDefined)
assert(taskOption3.get.executorId === "exec2")

assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
prakharjain09 marked this conversation as resolved.
Show resolved Hide resolved

clock.advance(4*1000) // time = 10s
// Complete the first 2 tasks and leave the other 2 tasks in running
for (id <- Set(0, 1)) {
manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
assert(sched.endedTasks(id) === Success)
}

// checkSpeculatableTasks checks that the task runtime is greater than the threshold for
// speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks need to be running for
// > 15s for speculation
assert(!manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.toSet === Set())

// decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3) will be added to
// executorDecommissionSpeculationTriggerTimeoutOpt
prakharjain09 marked this conversation as resolved.
Show resolved Hide resolved
// (TASK2 -> 15, TASK3 -> 15)
manager.executorDecommission("exec2")
prakharjain09 marked this conversation as resolved.
Show resolved Hide resolved
assert(manager.tidToExecutorKillTimeMapping.keySet === Set(2, 3))
assert(manager.tidToExecutorKillTimeMapping(2) === 15*1000)
assert(manager.tidToExecutorKillTimeMapping(3) === 15*1000)

assert(manager.checkSpeculatableTasks(0))
// Task2 started at t=0s, so it can still finish before t=15s (Median task runtime = 10s)
// Task3 started at t=6s, so it might not finish before t=15s. So Task 3 should be part
// of speculativeTasks
assert(sched.speculativeTasks.toSet === Set(3))
assert(manager.copiesRunning(3) === 1)

// Offer resource to start the speculative attempt for the running task
val taskOption3New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
// Offer more resources. Nothing should get scheduled now.
assert(manager.resourceOffer("exec3", "host3", NO_PREF)._1.isEmpty)
assert(taskOption3New.isDefined)

// Assert info about the newly launched speculative task
val speculativeTask3 = taskOption3New.get
assert(speculativeTask3.index === 3)
assert(speculativeTask3.taskId === 4)
assert(speculativeTask3.executorId === "exec3")
assert(speculativeTask3.attemptNumber === 1)

clock.advance(1*1000) // time = 11s
// Running checkSpeculatableTasks again should return false
assert(!manager.checkSpeculatableTasks(0))
assert(manager.copiesRunning(2) === 1)
assert(manager.copiesRunning(3) === 2)

clock.advance(5*1000) // time = 16s
// At t=16s, Task 2 has been running for 16s. It is more than the
// SPECULATION_MULTIPLIER * medianRuntime = 1.5 * 10 = 15s. So now Task 2 will
// be selected for speculation. Here we are verifying that regular speculation configs
// should still take effect even when a EXECUTOR_DECOMMISSION_KILL_INTERVAL is provided and
// corresponding executor is decommissioned
assert(manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.toSet === Set(2, 3))
assert(manager.copiesRunning(2) === 1)
assert(manager.copiesRunning(3) === 2)
val taskOption2New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
assert(taskOption2New.isDefined)
val speculativeTask2 = taskOption2New.get
// Ensure that task index 2 is launched on exec3, host3
prakharjain09 marked this conversation as resolved.
Show resolved Hide resolved
assert(speculativeTask2.index === 2)
assert(speculativeTask2.taskId === 5)
assert(speculativeTask2.executorId === "exec3")
assert(speculativeTask2.attemptNumber === 1)

assert(manager.copiesRunning(2) === 2)
assert(manager.copiesRunning(3) === 2)

// Offering additional resources should not lead to any speculative tasks being respawned
assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty)
}

test("SPARK-29976 Regular speculation configs should still take effect even when a " +
"threshold is provided") {
val (manager, clock) = testSpeculationDurationSetup(
Expand Down