Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1937: fix issue with task locality #892

Closed
wants to merge 15 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,14 @@ private[spark] class TaskSchedulerImpl(
SparkEnv.set(sc.env)

// Mark each slave as alive and remember its hostname
// Also track if new executor is added
var newExecAvail = false
for (o <- offers) {
executorIdToHost(o.executorId) = o.host
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
executorAdded(o.executorId, o.host)
newExecAvail = true
}
}

Expand All @@ -227,6 +230,9 @@ private[spark] class TaskSchedulerImpl(
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
if (newExecAvail) {
taskSet.executorAdded()
}
}

// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,15 @@ private[spark] class TaskSetManager(
clock: Clock = SystemClock)
extends Schedulable with Logging
{
// Remember when this TaskSetManager is created
val creationTime = clock.getTime()
val conf = sched.sc.conf

// The period we wait for new executors to come up
// After this period, tasks in pendingTasksWithNoPrefs will be considered as PROCESS_LOCAL
private val WAIT_NEW_EXEC_TIMEOUT = conf.getLong("spark.scheduler.waitNewExecutorTime", 3000L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this something the application can just do: if it wants to wait 3 seconds before scheduling anything on non-local executors, just sleep for 3 seconds before trying to launch any jobs? I'm wary of adding more config options to the scheduler.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This waiting period is only intended for pendingTasksWithNoPrefs. Suppose pendingTasksWithNoPrefs contains tasks whose preference is unavailable. Within this waiting period, we want to try pendingTasksForExecutor, pendingTasksForHost and pendingTasksForRack first because tasks in these lists do have some locality. And when an executor is added, we remove tasks newly have locality from pendingTasksWithNoPrefs. Then after the waiting period, we believe no executor will come for tasks still remain in pendingTasksWithNoPrefs. So they can be shceduled as PROCESS_LOCAL.
You can see tasks in pendingTasksForHost can still get scheduled even within the period. We're just holding back on pendingTasksWithNoPrefs. I think it's better than holding back the whole application and schedule nothing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't make sense to put this here because it will apply to every TaskSet, no matter how late into the application it was submitted, so you'll get a 3-second latency on every TaskSet that is missing one of its preferred nodes. Can we not add this as part of this patch, and simply make the change to put tasks in the node- and rack-local lists even if no nodes are available in those right now? Then later we can update the code that calls resourceOffer to treat tasks that have preferred locations but are missing executors for them specially.

private var waitingNewExec = true

/*
* Sometimes if an executor is dead or in an otherwise invalid state, the driver
* does not realize right away leading to repeated task failures. If enabled,
Expand Down Expand Up @@ -118,7 +125,7 @@ private[spark] class TaskSetManager(
private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]

// Set containing pending tasks with no locality preferences.
val pendingTasksWithNoPrefs = new ArrayBuffer[Int]
var pendingTasksWithNoPrefs = new ArrayBuffer[Int]

// Set containing all pending tasks (also used as a stack, as above).
val allPendingTasks = new ArrayBuffer[Int]
Expand Down Expand Up @@ -182,15 +189,16 @@ private[spark] class TaskSetManager(
for (loc <- tasks(index).preferredLocations) {
for (execId <- loc.executorId) {
if (sched.isExecutorAlive(execId)) {
addTo(pendingTasksForExecutor.getOrElseUpdate(execId, new ArrayBuffer))
hadAliveLocations = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this check redundant with the one on line 197 now?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is used when we add a task to pending lists. If the task has any preferred location available (executor / host / rack), we won't add it to pendingTasksWithNoPrefs. Do you mean check for executor and host is redundant?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes -- if the executor is alive (so if the if statement on line 191 evaluates to true), then there will certainly be an executor alive on the host (the if-statement on line 196), and hadAliveLocations will be set to true on line 197. So this line is not needed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks :)

}
addTo(pendingTasksForExecutor.getOrElseUpdate(execId, new ArrayBuffer))
}
if (sched.hasExecutorsAliveOnHost(loc.host)) {
addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
for (rack <- sched.getRackForHost(loc.host)) {
addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
}
hadAliveLocations = true
}
addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
for (rack <- sched.getRackForHost(loc.host)) {
addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
hadAliveLocations = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess technically we might have no hosts in this rack, but right now our TaskScheduler doesn't track that. Maybe we should open another JIRA to track it. I can imagine this happening in really large clusters.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean the TaskScheduler should provide something like "hasHostOnRack", and we have to check that before set hadAliveLocations to true?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but we can do it in another JIRA.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure :-)

}
}
Expand Down Expand Up @@ -361,7 +369,8 @@ private[spark] class TaskSetManager(
}

// Look for no-pref tasks after rack-local tasks since they can run anywhere.
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)
if (!waitingNewExec || tasks(index).preferredLocations.isEmpty)) {
return Some((index, TaskLocality.PROCESS_LOCAL))
}

Expand Down Expand Up @@ -391,6 +400,9 @@ private[spark] class TaskSetManager(
if (allowedLocality > maxLocality) {
allowedLocality = maxLocality // We're not allowed to search for farther-away tasks
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Editing previous comment:
Suppose maxLocality == PROCESS_LOCAL while allowedLocality == RACK_LOCAL (by virture of having waited long enough for a free executor).

Instead of restricting schedule to upto PROCESS_LOCAL, this will now relax it all the way till RACK_LOCAL - which is incorrect (myLocalityLevels might not have PROCESS_LOCAL but could have NODE_LOCAL for example).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm - Thanks for replying. In my opinion, however, relaxing the allowed locality won't change the scheduling order. NODE_LOCAL tasks (if any) still get scheduled before RACK_LOCAL ones. And if we allow RACK_LOCAL but get a NODE_LOCAL task, currentLocalityIndex will be updated so that next time we will use NODE_LOCAL as the constraint.
However, if we restrict up to PROCESS_LOCAL while it's in fact not valid for the TaskSetManager, the NODE_LOCAL and RACK_LOCAL tasks will be skipped and we may end up picking tasks from pendingTasksWithNoPrefs.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm , I think I got your point. Restricting the allowed locality can help achieve some delay scheduling. Anyway, this change is meant to keep pendingTasksWithNoPrefs from messing up the scheduling. It's better to address this in another JIRA.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mridulm , another possible change: maxLocality always starts from PROCESS_LOCAL, how about making it start from the highest level of myLocalityLevels? Do you think this makes sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Back at desktop, so can elaborate better.
The issue is, with relaxed constraint for the executor, some task might get scheduled to it - which would have been better scheduled at some other executor.

Simple scenario extending my earlier example, suppose there is only one task t1 left and two executors become available.
Suppose for exec1 it is RACK_LOCAL while for exec2 it is NODE_LOCAL

We start with PROCESS_LOCAL as maxLocality - and suppose enough time had elapsed so allowedLocality == RACK_LOCAL or ANY.

In this case, if resourceOffer is called on exec1 first, we get RACK_LOCAL schedule
If resourceOffer was called on exec2 first, we get NODE_LOCAL schedule.

The reason for that if condition was exactly to prevent this. I am actually surprised I did not have any testcase to catch this ...

if (waitingNewExec && curTime - creationTime > WAIT_NEW_EXEC_TIMEOUT) {
waitingNewExec = false
}

findTask(execId, host, allowedLocality) match {
case Some((index, taskLocality)) => {
Expand Down Expand Up @@ -738,4 +750,20 @@ private[spark] class TaskSetManager(
logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))
levels.toArray
}

// Re-compute pendingTasksWithNoPrefs since new preferred locations may become available
def executorAdded() {
def newLocAvail(index: Int): Boolean = {
for (loc <- tasks(index).preferredLocations) {
if (sched.hasExecutorsAliveOnHost(loc.host) ||
(loc.executorId.isDefined && sched.isExecutorAlive(loc.executorId.get)) ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to above, I think this line is just a more specific version of the previous one -- so is redundant

sched.getRackForHost(loc.host).isDefined) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

return true
}
}
false
}
logInfo("Re-computing pending task lists.")
pendingTasksWithNoPrefs = pendingTasksWithNoPrefs.filter(!newLocAvail(_))
}
}