Skip to content

Commit

Permalink
Remove references to ClusterScheduler (SPARK-1140)
Browse files Browse the repository at this point in the history
ClusterScheduler was renamed to TaskSchedulerImpl; this commit
updates comments and tests accordingly.

Author: Kay Ousterhout <kayousterhout@gmail.com>

Closes apache#9 from kayousterhout/cluster_scheduler_death and squashes the following commits:

d6fd119 [Kay Ousterhout] Remove references to ClusterScheduler.
  • Loading branch information
kayousterhout authored and pwendell committed Feb 27, 2014
1 parent 2645035 commit 71f69d6
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 46 deletions.
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.scheduler

/**
* A backend interface for scheduling systems that allows plugging in different ones under
* ClusterScheduler. We assume a Mesos-like model where the application gets resource offers as
* TaskSchedulerImpl. We assume a Mesos-like model where the application gets resource offers as
* machines become available and can launch tasks on them.
*/
private[spark] trait SchedulerBackend {
Expand Down
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.scheduler
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode

/**
* Low-level task scheduler interface, currently implemented exclusively by the ClusterScheduler.
* Low-level task scheduler interface, currently implemented exclusively by TaskSchedulerImpl.
* This interface allows plugging in different task schedulers. Each TaskScheduler schedulers tasks
* for a single SparkContext. These schedulers get sets of tasks submitted to them from the
* DAGScheduler for each stage, and are responsible for sending the tasks to the cluster, running
Expand Down
Expand Up @@ -26,13 +26,14 @@ import scala.collection.mutable.HashSet
import scala.math.max
import scala.math.min

import org.apache.spark.{ExceptionFailure, ExecutorLostFailure, FetchFailed, Logging, Resubmitted, SparkEnv, Success, TaskEndReason, TaskKilled, TaskResultLost, TaskState}
import org.apache.spark.{ExceptionFailure, ExecutorLostFailure, FetchFailed, Logging, Resubmitted,
SparkEnv, Success, TaskEndReason, TaskKilled, TaskResultLost, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.util.{Clock, SystemClock}

/**
* Schedules the tasks within a single TaskSet in the ClusterScheduler. This class keeps track of
* Schedules the tasks within a single TaskSet in the TaskSchedulerImpl. This class keeps track of
* each task, retries tasks if they fail (up to a limited number of times), and
* handles locality-aware scheduling for this TaskSet via delay scheduling. The main interfaces
* to it are resourceOffer, which asks the TaskSet whether it wants to run a task on one node,
Expand All @@ -41,7 +42,7 @@ import org.apache.spark.util.{Clock, SystemClock}
* THREADING: This class is designed to only be called from code with a lock on the
* TaskScheduler (e.g. its event handlers). It should not be called from other threads.
*
* @param sched the ClusterScheduler associated with the TaskSetManager
* @param sched the TaskSchedulerImpl associated with the TaskSetManager
* @param taskSet the TaskSet to manage scheduling for
* @param maxTaskFailures if any particular task fails more than this number of times, the entire
* task set will be aborted
Expand Down
Expand Up @@ -203,7 +203,7 @@ private[spark] class MesosSchedulerBackend(
getResource(offer.getResourcesList, "cpus").toInt)
}

// Call into the ClusterScheduler
// Call into the TaskSchedulerImpl
val taskLists = scheduler.resourceOffers(offerableWorkers)

// Build a list of Mesos tasks for each slave
Expand Down
Expand Up @@ -35,7 +35,7 @@ private case class KillTask(taskId: Long)
/**
* Calls to LocalBackend are all serialized through LocalActor. Using an actor makes the calls on
* LocalBackend asynchronous, which is necessary to prevent deadlock between LocalBackend
* and the ClusterScheduler.
* and the TaskSchedulerImpl.
*/
private[spark] class LocalActor(
scheduler: TaskSchedulerImpl,
Expand Down Expand Up @@ -76,7 +76,7 @@ private[spark] class LocalActor(

/**
* LocalBackend is used when running a local version of Spark where the executor, backend, and
* master all run in the same JVM. It sits behind a ClusterScheduler and handles launching tasks
* master all run in the same JVM. It sits behind a TaskSchedulerImpl and handles launching tasks
* on a single Executor (created by the LocalBackend) running locally.
*/
private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: Int)
Expand Down
Expand Up @@ -93,10 +93,10 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA
// If this test hangs, it's probably because no resource offers were made after the task
// failed.
val scheduler: TaskSchedulerImpl = sc.taskScheduler match {
case clusterScheduler: TaskSchedulerImpl =>
clusterScheduler
case taskScheduler: TaskSchedulerImpl =>
taskScheduler
case _ =>
assert(false, "Expect local cluster to use ClusterScheduler")
assert(false, "Expect local cluster to use TaskSchedulerImpl")
throw new ClassCastException
}
scheduler.taskResultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler)
Expand Down
Expand Up @@ -29,9 +29,9 @@ class FakeTaskSetManager(
initPriority: Int,
initStageId: Int,
initNumTasks: Int,
clusterScheduler: TaskSchedulerImpl,
taskScheduler: TaskSchedulerImpl,
taskSet: TaskSet)
extends TaskSetManager(clusterScheduler, taskSet, 0) {
extends TaskSetManager(taskScheduler, taskSet, 0) {

parent = null
weight = 1
Expand Down Expand Up @@ -105,7 +105,7 @@ class FakeTaskSetManager(
}
}

class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging {
class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Logging {

def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: TaskSchedulerImpl, taskSet: TaskSet): FakeTaskSetManager = {
new FakeTaskSetManager(priority, stage, numTasks, cs , taskSet)
Expand Down Expand Up @@ -133,8 +133,8 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
}

test("FIFO Scheduler Test") {
sc = new SparkContext("local", "ClusterSchedulerSuite")
val clusterScheduler = new TaskSchedulerImpl(sc)
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskScheduler = new TaskSchedulerImpl(sc)
var tasks = ArrayBuffer[Task[_]]()
val task = new FakeTask(0)
tasks += task
Expand All @@ -144,9 +144,9 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
schedulableBuilder.buildPools()

val taskSetManager0 = createDummyTaskSetManager(0, 0, 2, clusterScheduler, taskSet)
val taskSetManager1 = createDummyTaskSetManager(0, 1, 2, clusterScheduler, taskSet)
val taskSetManager2 = createDummyTaskSetManager(0, 2, 2, clusterScheduler, taskSet)
val taskSetManager0 = createDummyTaskSetManager(0, 0, 2, taskScheduler, taskSet)
val taskSetManager1 = createDummyTaskSetManager(0, 1, 2, taskScheduler, taskSet)
val taskSetManager2 = createDummyTaskSetManager(0, 2, 2, taskScheduler, taskSet)
schedulableBuilder.addTaskSetManager(taskSetManager0, null)
schedulableBuilder.addTaskSetManager(taskSetManager1, null)
schedulableBuilder.addTaskSetManager(taskSetManager2, null)
Expand All @@ -160,8 +160,8 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
}

test("Fair Scheduler Test") {
sc = new SparkContext("local", "ClusterSchedulerSuite")
val clusterScheduler = new TaskSchedulerImpl(sc)
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskScheduler = new TaskSchedulerImpl(sc)
var tasks = ArrayBuffer[Task[_]]()
val task = new FakeTask(0)
tasks += task
Expand Down Expand Up @@ -189,15 +189,15 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
val properties2 = new Properties()
properties2.setProperty("spark.scheduler.pool","2")

val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, clusterScheduler, taskSet)
val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, clusterScheduler, taskSet)
val taskSetManager12 = createDummyTaskSetManager(1, 2, 2, clusterScheduler, taskSet)
val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, taskScheduler, taskSet)
val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, taskScheduler, taskSet)
val taskSetManager12 = createDummyTaskSetManager(1, 2, 2, taskScheduler, taskSet)
schedulableBuilder.addTaskSetManager(taskSetManager10, properties1)
schedulableBuilder.addTaskSetManager(taskSetManager11, properties1)
schedulableBuilder.addTaskSetManager(taskSetManager12, properties1)

val taskSetManager23 = createDummyTaskSetManager(2, 3, 2, clusterScheduler, taskSet)
val taskSetManager24 = createDummyTaskSetManager(2, 4, 2, clusterScheduler, taskSet)
val taskSetManager23 = createDummyTaskSetManager(2, 3, 2, taskScheduler, taskSet)
val taskSetManager24 = createDummyTaskSetManager(2, 4, 2, taskScheduler, taskSet)
schedulableBuilder.addTaskSetManager(taskSetManager23, properties2)
schedulableBuilder.addTaskSetManager(taskSetManager24, properties2)

Expand All @@ -217,8 +217,8 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
}

test("Nested Pool Test") {
sc = new SparkContext("local", "ClusterSchedulerSuite")
val clusterScheduler = new TaskSchedulerImpl(sc)
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskScheduler = new TaskSchedulerImpl(sc)
var tasks = ArrayBuffer[Task[_]]()
val task = new FakeTask(0)
tasks += task
Expand All @@ -240,23 +240,23 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
pool1.addSchedulable(pool10)
pool1.addSchedulable(pool11)

val taskSetManager000 = createDummyTaskSetManager(0, 0, 5, clusterScheduler, taskSet)
val taskSetManager001 = createDummyTaskSetManager(0, 1, 5, clusterScheduler, taskSet)
val taskSetManager000 = createDummyTaskSetManager(0, 0, 5, taskScheduler, taskSet)
val taskSetManager001 = createDummyTaskSetManager(0, 1, 5, taskScheduler, taskSet)
pool00.addSchedulable(taskSetManager000)
pool00.addSchedulable(taskSetManager001)

val taskSetManager010 = createDummyTaskSetManager(1, 2, 5, clusterScheduler, taskSet)
val taskSetManager011 = createDummyTaskSetManager(1, 3, 5, clusterScheduler, taskSet)
val taskSetManager010 = createDummyTaskSetManager(1, 2, 5, taskScheduler, taskSet)
val taskSetManager011 = createDummyTaskSetManager(1, 3, 5, taskScheduler, taskSet)
pool01.addSchedulable(taskSetManager010)
pool01.addSchedulable(taskSetManager011)

val taskSetManager100 = createDummyTaskSetManager(2, 4, 5, clusterScheduler, taskSet)
val taskSetManager101 = createDummyTaskSetManager(2, 5, 5, clusterScheduler, taskSet)
val taskSetManager100 = createDummyTaskSetManager(2, 4, 5, taskScheduler, taskSet)
val taskSetManager101 = createDummyTaskSetManager(2, 5, 5, taskScheduler, taskSet)
pool10.addSchedulable(taskSetManager100)
pool10.addSchedulable(taskSetManager101)

val taskSetManager110 = createDummyTaskSetManager(3, 6, 5, clusterScheduler, taskSet)
val taskSetManager111 = createDummyTaskSetManager(3, 7, 5, clusterScheduler, taskSet)
val taskSetManager110 = createDummyTaskSetManager(3, 6, 5, taskScheduler, taskSet)
val taskSetManager111 = createDummyTaskSetManager(3, 7, 5, taskScheduler, taskSet)
pool11.addSchedulable(taskSetManager110)
pool11.addSchedulable(taskSetManager111)

Expand Down
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.util.FakeClock

class FakeDAGScheduler(taskScheduler: FakeClusterScheduler) extends DAGScheduler(taskScheduler) {
class FakeDAGScheduler(taskScheduler: FakeTaskScheduler) extends DAGScheduler(taskScheduler) {
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {
taskScheduler.startedTasks += taskInfo.index
}
Expand All @@ -51,12 +51,12 @@ class FakeDAGScheduler(taskScheduler: FakeClusterScheduler) extends DAGScheduler
}

/**
* A mock ClusterScheduler implementation that just remembers information about tasks started and
* A mock TaskSchedulerImpl implementation that just remembers information about tasks started and
* feedback received from the TaskSetManagers. Note that it's important to initialize this with
* a list of "live" executors and their hostnames for isExecutorAlive and hasExecutorsAliveOnHost
* to work, and these are required for locality in TaskSetManager.
*/
class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */)
class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */)
extends TaskSchedulerImpl(sc)
{
val startedTasks = new ArrayBuffer[Long]
Expand Down Expand Up @@ -87,7 +87,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {

test("TaskSet with no preferences") {
sc = new SparkContext("local", "test")
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = createTaskSet(1)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)

Expand All @@ -113,7 +113,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {

test("multiple offers with no preferences") {
sc = new SparkContext("local", "test")
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = createTaskSet(3)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)

Expand Down Expand Up @@ -144,7 +144,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {

test("basic delay scheduling") {
sc = new SparkContext("local", "test")
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
val taskSet = createTaskSet(4,
Seq(TaskLocation("host1", "exec1")),
Seq(TaskLocation("host2", "exec2")),
Expand Down Expand Up @@ -188,7 +188,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {

test("delay scheduling with fallback") {
sc = new SparkContext("local", "test")
val sched = new FakeClusterScheduler(sc,
val sched = new FakeTaskScheduler(sc,
("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3"))
val taskSet = createTaskSet(5,
Seq(TaskLocation("host1")),
Expand Down Expand Up @@ -228,7 +228,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {

test("delay scheduling with failed hosts") {
sc = new SparkContext("local", "test")
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
val taskSet = createTaskSet(3,
Seq(TaskLocation("host1")),
Seq(TaskLocation("host2")),
Expand Down Expand Up @@ -260,7 +260,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {

test("task result lost") {
sc = new SparkContext("local", "test")
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = createTaskSet(1)
val clock = new FakeClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
Expand All @@ -277,7 +277,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {

test("repeated failures lead to task set abortion") {
sc = new SparkContext("local", "test")
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = createTaskSet(1)
val clock = new FakeClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
Expand Down

0 comments on commit 71f69d6

Please sign in to comment.