Skip to content

Commit

Permalink
simplify the logic in TaskSchedulerImpl
Browse files Browse the repository at this point in the history
  • Loading branch information
CodingCat committed Aug 4, 2014
1 parent c8c1de4 commit f9a2ad8
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.annotation.DeveloperApi
@DeveloperApi
object TaskLocality extends Enumeration {
// Process local is expected to be used ONLY within TaskSetManager for now.
val PROCESS_LOCAL, NODE_LOCAL, NOPREF, RACK_LOCAL, ANY = Value
val PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY = Value

type TaskLocality = Value

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,36 +249,29 @@ private[spark] class TaskSchedulerImpl(

// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NOPREF, RACK_local, ANY
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NOPREF, RACK_LOCAL, ANY
var launchedTask = false
for (taskSet <- sortedTaskSets; preferredLocality <- taskSet.myLocalityLevels) {
def launchTaskOnLocalityLevel(locality: TaskLocality.Value) {
do {
launchedTask = false
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
if (availableCpus(i) >= CPUS_PER_TASK) {
for (task <- taskSet.resourceOffer(execId, host, locality)) {
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetId(tid) = taskSet.taskSet.id
taskIdToExecutorId(tid) = execId
activeExecutorIds += execId
executorsByHost(host) += execId
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
launchedTask = true
}
do {
launchedTask = false
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
if (availableCpus(i) >= CPUS_PER_TASK) {
for (task <- taskSet.resourceOffer(execId, host, preferredLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetId(tid) = taskSet.taskSet.id
taskIdToExecutorId(tid) = execId
activeExecutorIds += execId
executorsByHost(host) += execId
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
launchedTask = true
}
}
} while (launchedTask)
}
launchTaskOnLocalityLevel(preferredLocality)
// search noPref task after we have launched all node_local and nearer tasks
if (preferredLocality == TaskLocality.NODE_LOCAL) {
launchTaskOnLocalityLevel(TaskLocality.NOPREF)
}
}
} while (launchedTask)
}

if (tasks.size > 0) {
Expand Down
45 changes: 27 additions & 18 deletions core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ private[spark] class TaskSetManager(
for (index <- speculatableTasks if canRunOnHost(index)) {
val prefs = tasks(index).preferredLocations
val executors = prefs.flatMap(_.executorId)
if (prefs.size == 0 || executors.contains(execId)) {
if (executors.contains(execId)) {
speculatableTasks -= index
return Some((index, TaskLocality.PROCESS_LOCAL))
}
Expand All @@ -315,6 +315,16 @@ private[spark] class TaskSetManager(
}
}

if (TaskLocality.isAllowed(locality, TaskLocality.NO_PREF)) {
for (index <- speculatableTasks if canRunOnHost(index)) {
val locations = tasks(index).preferredLocations
if (locations.size == 0) {
speculatableTasks -= index
return Some((index, TaskLocality.PROCESS_LOCAL))
}
}
}

// Check for rack-local tasks
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
for (rack <- sched.getRackForHost(host)) {
Expand Down Expand Up @@ -359,15 +369,11 @@ private[spark] class TaskSetManager(
}
}

if (TaskLocality.isAllowed(maxLocality, TaskLocality.NOPREF)) {
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
// Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
}
// find a speculative task if all noPref tasks have been scheduled
val specTask = findSpeculativeTask(execId, host, maxLocality).map {
case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
if (specTask != None) return specTask
}

if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
Expand All @@ -385,7 +391,9 @@ private[spark] class TaskSetManager(
}
}

None
// find a speculative task if all others tasks have been scheduled
findSpeculativeTask(execId, host, maxLocality).map {
case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
}

/**
Expand All @@ -397,25 +405,25 @@ private[spark] class TaskSetManager(
*
* @param execId the executor Id of the offered resource
* @param host the host Id of the offered resource
* @param preferredLocality the maximum locality we want to schedule the tasks at
* @param maxLocality the maximum locality we want to schedule the tasks at
*/
def resourceOffer(
execId: String,
host: String,
preferredLocality: TaskLocality.TaskLocality)
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
if (!isZombie) {
val curTime = clock.getTime()

var allowedLocality = preferredLocality
var allowedLocality = maxLocality

if (preferredLocality != TaskLocality.NOPREF ||
if (maxLocality != TaskLocality.NO_PREF ||
(nodeLocalTasks.contains(host) && nodeLocalTasks(host).size > 0)) {
allowedLocality = getAllowedLocalityLevel(curTime)
if (allowedLocality > preferredLocality) {
if (allowedLocality > maxLocality) {
// We're not allowed to search for farther-away tasks
allowedLocality = preferredLocality
allowedLocality = maxLocality
}
}

Expand All @@ -433,7 +441,7 @@ private[spark] class TaskSetManager(
taskAttempts(index) = info :: taskAttempts(index)
// Update our locality level for delay scheduling
// NOPREF will not affect the variables related to delay scheduling
if (preferredLocality != TaskLocality.NOPREF) {
if (maxLocality != TaskLocality.NO_PREF) {
currentLocalityIndex = getLocalityIndex(taskLocality)
lastLaunchTime = curTime
}
Expand Down Expand Up @@ -756,19 +764,17 @@ private[spark] class TaskSetManager(
conf.get("spark.locality.wait.node", defaultWait).toLong
case TaskLocality.RACK_LOCAL =>
conf.get("spark.locality.wait.rack", defaultWait).toLong
case TaskLocality.ANY =>
0L
case _ => 0L
}
}

/**
* Compute the locality levels used in this TaskSet. Assumes that all tasks have already been
* added to queues using addPendingTask.
*
* NOTE: don't need to handle NOPREF here, because NOPREF is scheduled as PROCESS_LOCAL
*/
private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY}
import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
val levels = new ArrayBuffer[TaskLocality.TaskLocality]
if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 &&
pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
Expand All @@ -778,6 +784,9 @@ private[spark] class TaskSetManager(
pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
levels += NODE_LOCAL
}
if (!pendingTasksWithNoPrefs.isEmpty) {
levels += NO_PREF
}
if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 &&
pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
levels += RACK_LOCAL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class LargeTask(stageId: Int) extends Task[Array[Byte]](stageId, 0) {
}

class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
import TaskLocality.{ANY, PROCESS_LOCAL, NOPREF, NODE_LOCAL, RACK_LOCAL}
import TaskLocality.{ANY, PROCESS_LOCAL, NO_PREF, NODE_LOCAL, RACK_LOCAL}

private val conf = new SparkConf

Expand All @@ -163,7 +163,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {

// Offer a host with NOPREF as the constraint,
// we should get a nopref task immediately since that's what we only have
var taskOption = manager.resourceOffer("exec1", "host1", NOPREF)
var taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
assert(taskOption.isDefined)

// Tell it the task has finished
Expand All @@ -180,15 +180,15 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {

// First three offers should all find tasks
for (i <- 0 until 3) {
var taskOption = manager.resourceOffer("exec1", "host1", NOPREF)
var taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
assert(taskOption.isDefined)
val task = taskOption.get
assert(task.executorId === "exec1")
}
assert(sched.startedTasks.toSet === Set(0, 1, 2))

// Re-offer the host -- now we should get no more tasks
assert(manager.resourceOffer("exec1", "host1", NOPREF) === None)
assert(manager.resourceOffer("exec1", "host1", NO_PREF) === None)

// Finish the first two tasks
manager.handleSuccessfulTask(0, createTaskResult(0))
Expand Down Expand Up @@ -245,7 +245,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// Offer host2, exec3 again, at NODE_LOCAL level: we should get noPref task
// after failing to find a node_Local task
assert(manager.resourceOffer("exec3", "host2", NODE_LOCAL) == None)
assert(manager.resourceOffer("exec3", "host2", NOPREF).get.index == 3)
assert(manager.resourceOffer("exec3", "host2", NO_PREF).get.index == 3)
}

test("we do not need to delay scheduling when we only have noPref tasks in the queue") {
Expand All @@ -262,7 +262,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL).get.index === 0)
assert(manager.resourceOffer("exec3", "host2", PROCESS_LOCAL).get.index === 1)
assert(manager.resourceOffer("exec3", "host2", NODE_LOCAL) == None)
assert(manager.resourceOffer("exec3", "host2", NOPREF).get.index === 2)
assert(manager.resourceOffer("exec3", "host2", NO_PREF).get.index === 2)
}

test("delay scheduling with fallback") {
Expand Down Expand Up @@ -482,24 +482,24 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
val clock = new FakeClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// Only ANY is valid
assert(manager.myLocalityLevels.sameElements(Array(ANY)))
assert(manager.myLocalityLevels.sameElements(Array(NO_PREF, ANY)))
// Add a new executor
sched.addExecutor("execD", "host1")
manager.executorAdded()
// Valid locality should contain NODE_LOCAL and ANY
assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, ANY)))
assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, NO_PREF, ANY)))
// Add another executor
sched.addExecutor("execC", "host2")
manager.executorAdded()
// Valid locality should contain PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL and ANY
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY)))
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY)))
// test if the valid locality is recomputed when the executor is lost
sched.removeExecutor("execC")
manager.executorLost("execC", "host2")
assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, ANY)))
assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, NO_PREF, ANY)))
sched.removeExecutor("execD")
manager.executorLost("execD", "host1")
assert(manager.myLocalityLevels.sameElements(Array(ANY)))
assert(manager.myLocalityLevels.sameElements(Array(NO_PREF, ANY)))
}

test("test RACK_LOCAL tasks") {
Expand Down Expand Up @@ -572,15 +572,15 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {

assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 0)
assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None)
assert(manager.resourceOffer("execA", "host1", NOPREF) == None)
assert(manager.resourceOffer("execA", "host1", NO_PREF) == None)
clock.advance(LOCALITY_WAIT)
// schedule a node local task
assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 1)
manager.speculatableTasks += 1
// schedule the nonPref task
assert(manager.resourceOffer("execA", "host1", NOPREF).get.index === 2)
assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 2)
// schedule the speculative task
assert(manager.resourceOffer("execB", "host2", NOPREF).get.index === 1)
assert(manager.resourceOffer("execB", "host2", NO_PREF).get.index === 1)
clock.advance(LOCALITY_WAIT * 3)
// schedule non-local tasks
assert(manager.resourceOffer("execB", "host2", ANY).get.index === 3)
Expand Down

0 comments on commit f9a2ad8

Please sign in to comment.