Permalink
Browse files

simplify the implementation of CoarseGrainedSchedulerBackend

  • Loading branch information...
CodingCat committed Mar 3, 2014
1 parent 55a4f11 commit 0c0e409d49afa954703462b338af04481b74f563
@@ -21,4 +21,6 @@ package org.apache.spark.scheduler
* Represents free resources available on an executor.
*/
private[spark]
-class WorkerOffer(val executorId: String, val host: String, val cores: Int)
+class WorkerOffer(val executorId: String, val host: String, var cores: Int) {
+ @transient val totalcores = cores
+}
@@ -51,9 +51,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
private val executorActor = new HashMap[String, ActorRef]
- private val executorAddress = new HashMap[String, Address]
- private val executorHost = new HashMap[String, String]
- private val freeCores = new HashMap[String, Int]
+ private val workerOffers = new HashMap[String, WorkerOffer]
private val addressToExecutorId = new HashMap[Address, String]
override def preStart() {
@@ -75,9 +73,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
logInfo("Registered executor: " + sender + " with ID " + executorId)
sender ! RegisteredExecutor(sparkProperties)
executorActor(executorId) = sender
- executorHost(executorId) = Utils.parseHostPort(hostPort)._1
- freeCores(executorId) = cores
- executorAddress(executorId) = sender.path.address
+ workerOffers += (executorId -> new WorkerOffer(executorId, Utils.parseHostPort(hostPort)._1, cores))
addressToExecutorId(sender.path.address) = executorId
totalCoreCount.addAndGet(cores)
makeOffers()
@@ -87,7 +83,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
if (executorActor.contains(executorId)) {
- freeCores(executorId) += 1
+ workerOffers(executorId).cores += 1
makeOffers(executorId)
} else {
// Ignoring the update since we don't know about the executor.
@@ -125,20 +121,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
// Make fake resource offers on all executors
def makeOffers() {
- launchTasks(scheduler.resourceOffers(
- executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
+ launchTasks(scheduler.resourceOffers(workerOffers.values.toSeq))
}
// Make fake resource offers on just one executor
def makeOffers(executorId: String) {
- launchTasks(scheduler.resourceOffers(
- Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))
+ launchTasks(scheduler.resourceOffers(Seq(workerOffers(executorId))))
}
// Launch tasks returned by a set of resource offers
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
- freeCores(task.executorId) -= 1
+ workerOffers(task.executorId).cores -= 1
executorActor(task.executorId) ! LaunchTask(task)
}
}
@@ -147,11 +141,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
def removeExecutor(executorId: String, reason: String) {
if (executorActor.contains(executorId)) {
logInfo("Executor " + executorId + " disconnected, so removing it")
- val numCores = freeCores(executorId)
- addressToExecutorId -= executorAddress(executorId)
+ val numCores = workerOffers(executorId).totalcores
executorActor -= executorId
- executorHost -= executorId
- freeCores -= executorId
+ workerOffers -= executorId
totalCoreCount.addAndGet(-numCores)
scheduler.executorLost(executorId, SlaveLost(reason))
}

0 comments on commit 0c0e409

Please sign in to comment.