Skip to content

Commit

Permalink
remove fine granularity tracking for node-local only tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
CodingCat committed Aug 4, 2014
1 parent f9a2ad8 commit b3a430b
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ private[spark] class TaskSchedulerImpl(

// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NOPREF, RACK_LOCAL, ANY
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
var launchedTask = false
for (taskSet <- sortedTaskSets; preferredLocality <- taskSet.myLocalityLevels) {
do {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,8 @@ private[spark] class TaskSetManager(
// but at host level.
private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]

// this collection is mainly for ensuring that the NODE_LOCAL task is always scheduled
// before NOPREF and it contain all NODE_LOCAL and "not-launched" tasks
private[scheduler] val nodeLocalTasks = new HashMap[String, HashSet[Int]]
//private[scheduler] val nodeLocalTasks = new HashMap[String, HashSet[Int]]
private var hasNodeLocalOnlyTasks = true

// Set of pending tasks for each rack -- similar to the above.
private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]
Expand Down Expand Up @@ -194,7 +193,7 @@ private[spark] class TaskSetManager(
}
addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
if (loc.executorId == None) {
nodeLocalTasks.getOrElseUpdate(loc.host, new HashSet[Int]) += index
hasNodeLocalOnlyTasks = true
}
for (rack <- sched.getRackForHost(loc.host)) {
addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
Expand Down Expand Up @@ -401,7 +400,7 @@ private[spark] class TaskSetManager(
*
* NOTE: this function is either called with a real preferredLocality level which
* would be adjusted by delay scheduling algorithm or it will be with a special
* NOPREF locality which will be not modified
* NO_PREF locality which will be not modified
*
* @param execId the executor Id of the offered resource
* @param host the host Id of the offered resource
Expand All @@ -418,8 +417,7 @@ private[spark] class TaskSetManager(

var allowedLocality = maxLocality

if (maxLocality != TaskLocality.NO_PREF ||
(nodeLocalTasks.contains(host) && nodeLocalTasks(host).size > 0)) {
if (maxLocality != TaskLocality.NO_PREF || hasNodeLocalOnlyTasks) {
allowedLocality = getAllowedLocalityLevel(curTime)
if (allowedLocality > maxLocality) {
// We're not allowed to search for farther-away tasks
Expand All @@ -440,7 +438,7 @@ private[spark] class TaskSetManager(
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
// Update our locality level for delay scheduling
// NOPREF will not affect the variables related to delay scheduling
// NO_PREF will not affect the variables related to delay scheduling
if (maxLocality != TaskLocality.NO_PREF) {
currentLocalityIndex = getLocalityIndex(taskLocality)
lastLaunchTime = curTime
Expand Down Expand Up @@ -468,13 +466,13 @@ private[spark] class TaskSetManager(
taskName, taskId, host, taskLocality, serializedTask.limit))

sched.dagScheduler.taskStarted(task, info)
if (taskLocality <= TaskLocality.NODE_LOCAL) {
/*if (taskLocality <= TaskLocality.NODE_LOCAL) {
for (preferedLocality <- tasks(index).preferredLocations) {
if (nodeLocalTasks.contains(preferedLocality.host)) {
nodeLocalTasks(preferedLocality.host) -= index
}
}
}
}*/
return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask))
}
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
val clock = new FakeClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)

// Offer a host with NOPREF as the constraint,
// Offer a host with NO_PREF as the constraint,
// we should get a nopref task immediately since that's what we only have
var taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
assert(taskOption.isDefined)
Expand Down

0 comments on commit b3a430b

Please sign in to comment.