Skip to content

Commit

Permalink
address matei's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
CodingCat committed Aug 5, 2014
1 parent 18cae02 commit 89f9bc0
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -251,14 +251,14 @@ private[spark] class TaskSchedulerImpl(
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
var launchedTask = false
for (taskSet <- sortedTaskSets; preferredLocality <- taskSet.myLocalityLevels) {
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
do {
launchedTask = false
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
if (availableCpus(i) >= CPUS_PER_TASK) {
for (task <- taskSet.resourceOffer(execId, host, preferredLocality)) {
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetId(tid) = taskSet.taskSet.id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,20 +180,13 @@ private[spark] class TaskSetManager(
}
}

var hadAliveLocations = false
for (loc <- tasks(index).preferredLocations) {
for (execId <- loc.executorId) {
addTo(pendingTasksForExecutor.getOrElseUpdate(execId, new ArrayBuffer))
}
if (sched.hasExecutorsAliveOnHost(loc.host)) {
hadAliveLocations = true
}
addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
for (rack <- sched.getRackForHost(loc.host)) {
addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
if(sched.hasHostAliveOnRack(rack)){
hadAliveLocations = true
}
}
}

Expand Down Expand Up @@ -286,7 +279,7 @@ private[spark] class TaskSetManager(
!hasAttemptOnHost(index, host) && !executorIsBlacklisted(execId, index)

if (!speculatableTasks.isEmpty) {
// Check for process-local or preference-less tasks; note that tasks can be process-local
// Check for process-local tasks; note that tasks can be process-local
// on multiple nodes when we replicate cached blocks, as in Spark Streaming
for (index <- speculatableTasks if canRunOnHost(index)) {
val prefs = tasks(index).preferredLocations
Expand All @@ -308,6 +301,7 @@ private[spark] class TaskSetManager(
}
}

// Check for no-preference tasks
if (TaskLocality.isAllowed(locality, TaskLocality.NO_PREF)) {
for (index <- speculatableTasks if canRunOnHost(index)) {
val locations = tasks(index).preferredLocations
Expand Down Expand Up @@ -350,7 +344,7 @@ private[spark] class TaskSetManager(
* @return An option containing (task index within the task set, locality, is speculative?)
*/
private def findTask(execId: String, host: String, maxLocality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value, Boolean)] =
: Option[(Int, TaskLocality.Value, Boolean)] =
{
for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
Expand Down Expand Up @@ -392,7 +386,7 @@ private[spark] class TaskSetManager(
/**
* Respond to an offer of a single executor from the scheduler by finding a task
*
* NOTE: this function is either called with a real preferredLocality level which
* NOTE: this function is either called with a maxLocality which
* would be adjusted by delay scheduling algorithm or it will be with a special
* NO_PREF locality which will be not modified
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class FakeTaskSetManager(
override def resourceOffer(
execId: String,
host: String,
preferredLocality: TaskLocality.TaskLocality)
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
if (tasksSuccessful + numRunningTasks < numTasks) {
Expand Down

0 comments on commit 89f9bc0

Please sign in to comment.