Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29976][CORE] Trigger speculation for stages with too few tasks #26614

Closed
wants to merge 13 commits into from
Expand Up @@ -1461,6 +1461,15 @@ package object config {
.doubleConf
.createWithDefault(0.75)

private[spark] val SPECULATION_TASK_DURATION_THRESHOLD =
ConfigBuilder("spark.speculation.task.duration.threshold")
.doc("Task duration after which scheduler would try to speculative run the task. If " +
"provided, tasks would be speculatively run if current stage contains less tasks than " +
"the number of slots on a single executor and the task is taking longer time than " +
"the threshold. This config helps speculate stage with very few tasks.")
.timeConf(TimeUnit.MILLISECONDS)
.createOptional
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a simple .doc to explain what this config does?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes please


private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir")
.doc("Staging directory used while submitting applications.")
.stringConf
Expand Down
60 changes: 44 additions & 16 deletions core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
Expand Up @@ -81,6 +81,13 @@ private[spark] class TaskSetManager(
val speculationQuantile = conf.get(SPECULATION_QUANTILE)
val speculationMultiplier = conf.get(SPECULATION_MULTIPLIER)
val minFinishedForSpeculation = math.max((speculationQuantile * numTasks).floor.toInt, 1)
// User provided threshold for speculation regardless of whether the quantile has been reached
val speculationTaskDurationThresOpt = conf.get(SPECULATION_TASK_DURATION_THRESHOLD)
// Only when the total number of tasks in the stage is less than this threshold, we would try
// speculative run based on the time threshold. SPARK-29976: We set this value to be the number
// of slots on a single executor so that we wouldn't speculate too aggressively but still
// handle basic cases.
val speculationTaskNumThres = conf.get(EXECUTOR_CORES) / conf.get(CPUS_PER_TASK)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use sched.CPUS_PER_TASK instead of conf.get(CPUS_PER_TASK).


// For each task, tracks whether a copy of the task has succeeded. A task will also be
// marked as "succeeded" if it failed with a fetch failure, in which case it should not
Expand Down Expand Up @@ -957,15 +964,41 @@ private[spark] class TaskSetManager(
recomputeLocality()
}

/**
* Check if the task associated with the given tid has past the time threshold and should be
* speculative run.
*/
private def checkAndSubmitSpeculatableTask(
tid: Long,
currentTimeMillis: Long,
threshold: Double): Boolean = {
val info = taskInfos(tid)
val index = info.index
if (!successful(index) && copiesRunning(index) == 1 &&
info.timeRunning(currentTimeMillis) > threshold && !speculatableTasks.contains(index)) {
addPendingTask(index, speculatable = true)
logInfo(
("Marking task %d in stage %s (on %s) as speculatable because it ran more" +
" than %.0f ms(%d speculatable tasks in this taskset now)")
.format(index, taskSet.id, info.host, threshold, speculatableTasks.size + 1))
speculatableTasks += index
sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
true
} else {
false
}
}

/**
* Check for tasks to be speculated and return true if there are any. This is called periodically
* by the TaskScheduler.
*
*/
override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {
// Can't speculate if we only have one task, and no need to speculate if the task set is a
// zombie or is from a barrier stage.
if (isZombie || isBarrier || numTasks == 1) {
// No need to speculate if the task set is zombie or is from a barrier stage. If there is only
// one task we don't speculate since we don't have metrics to decide whether it's taking too
// long or not, unless a task duration threshold is explicitly provided.
if (isZombie || isBarrier || (numTasks == 1 && !speculationTaskDurationThresOpt.isDefined)) {
return false
}
var foundTasks = false
Expand All @@ -983,19 +1016,14 @@ private[spark] class TaskSetManager(
// bound based on that.
logDebug("Task length threshold for speculation: " + threshold)
for (tid <- runningTasksSet) {
val info = taskInfos(tid)
val index = info.index
if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold &&
!speculatableTasks.contains(index)) {
addPendingTask(index, speculatable = true)
logInfo(
("Marking task %d in stage %s (on %s) as speculatable because it ran more" +
" than %.0f ms(%d speculatable tasks in this taskset now)")
.format(index, taskSet.id, info.host, threshold, speculatableTasks.size + 1))
speculatableTasks += index
sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
foundTasks = true
}
foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold)
}
} else if (speculationTaskDurationThresOpt.isDefined && numTasks <= speculationTaskNumThres) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can do the comparison numTasks <= speculationTaskNumThres once when taskSetManager created, the numTasks isn't changing in the TaskSet so do it once at top, then we don't even need speculationTaskNumThres

val time = clock.getTimeMillis()
val threshold = speculationTaskDurationThresOpt.get
logDebug(s"Tasks taking longer time than provided speculation threshold: $threshold")
for (tid <- runningTasksSet) {
foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold)
}
}
foundTasks
Expand Down
Expand Up @@ -1778,6 +1778,70 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty)
}

private def testSpeculationDurationThreshold(
speculationThresholdProvided: Boolean,
numTasks: Int,
numSlots: Int): Unit = {
sc = new SparkContext("local", "test")
sc.conf.set(config.SPECULATION_ENABLED, true)
// Set the quantile to be 1.0 so that regular speculation would not be triggered
sc.conf.set(config.SPECULATION_QUANTILE.key, "1.0")
// Set the number of slots per executor
sc.conf.set(config.EXECUTOR_CORES.key, numSlots.toString)
sc.conf.set(config.CPUS_PER_TASK.key, "1")
// Set the threshold to be 60 minutes
if (speculationThresholdProvided) {
sc.conf.set(config.SPECULATION_TASK_DURATION_THRESHOLD.key, "60min")
}
sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
// Create a task set with only one task
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove comment since numTasks passed in

val taskSet = FakeTask.createTaskSet(numTasks)
val clock = new ManualClock()
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
manager.isZombie = false

// Offer resources for the task to start
for (i <- 1 to numTasks) {
manager.resourceOffer(s"exec$i", s"host$i", NO_PREF)
}

// if the time threshold has not been exceeded, no speculative run should be triggered
clock.advance(1000*60*60)
assert(!manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.size == 0)

// Now the task should have been running for 60 minutes and 1 second
clock.advance(1)
if (speculationThresholdProvided && numSlots >= numTasks) {
assert(manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.size == numTasks)
// Should not submit duplicated tasks
assert(!manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.size == numTasks)
} else {
// If the feature flag is turned off, or the stage contains too few tasks
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you mean to many tasks

assert(!manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.size == 0)
}
}

Seq(1, 2).foreach { numTasks =>
test("SPARK-29976 when a speculation time threshold is provided, should speculative " +
s"run the task even if there are not enough successful runs, total tasks: $numTasks") {
testSpeculationDurationThreshold(true, numTasks, numTasks)
}

test("SPARK-29976: when the speculation time threshold is not provided," +
s"don't speculative run if there are not enough successful runs, total tasks: $numTasks") {
testSpeculationDurationThreshold(false, numTasks, numTasks)
}
}

test("SPARK-29976 when a speculation time threshold is provided, should not speculative " +
"if there are too many tasks in the stage even though time threshold is provided") {
testSpeculationDurationThreshold(true, 2, 1)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice to add anther test here that that test interaction of the speculative configs. Meaning I have both the threshold set and the speculation quantile is smaller, the threshold can still apply and vice versa, the quantile can still apply.


test("TaskOutputFileAlreadyExistException lead to task set abortion") {
sc = new SparkContext("local", "test")
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
Expand Down
10 changes: 10 additions & 0 deletions docs/configuration.md
Expand Up @@ -2031,6 +2031,16 @@ Apart from these, the following properties are also available, and may be useful
Fraction of tasks which must be complete before speculation is enabled for a particular stage.
</td>
</tr>
<tr>
<td><code>spark.speculation.task.duration.threshold</code></td>
<td>None</td>
<td>
Task duration after which scheduler would try to speculative run the task. If provided, tasks
would be speculatively run if current stage contains less tasks than the number of slots on a
single executor and the task is taking longer time than the threshold. This config helps
speculate stage with very few tasks.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be nice ot add a sentence that the regular other speculation configs may also apply if executor slots large enough

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it also might be nice to say default unit is milliseconds if unit not specified.

</td>
</tr>
<tr>
<td><code>spark.task.cpus</code></td>
<td>1</td>
Expand Down