From 7d671cf230bdad22f42f336174e8e0a8f7bc267b Mon Sep 17 00:00:00 2001 From: Dale Date: Thu, 25 Sep 2014 20:46:30 +1000 Subject: [PATCH 1/3] [SPARK-3651] Grouped variables under a ExecutorDataObject, and reference them via a map entry as they are all retrieved under the same key --- .../CoarseGrainedSchedulerBackend.scala | 73 +++++++++---------- .../scheduler/cluster/ExecutorData.scala | 28 +++++++ 2 files changed, 63 insertions(+), 38 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 9a0cb1c6c6ccd..1473b13b5eed2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -62,15 +62,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A val createTime = System.currentTimeMillis() class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor with ActorLogReceive { - override protected def log = CoarseGrainedSchedulerBackend.this.log - - private val executorActor = new HashMap[String, ActorRef] - private val executorAddress = new HashMap[String, Address] - private val executorHost = new HashMap[String, String] - private val freeCores = new HashMap[String, Int] - private val totalCores = new HashMap[String, Int] private val addressToExecutorId = new HashMap[Address, String] + private val executorData = new HashMap[String, ExecutorData] override def preStart() { // Listen for remote client disconnection events, since they don't go through Akka's watch() @@ -85,16 +79,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A def receiveWithLogging = { case RegisterExecutor(executorId, hostPort, cores) => Utils.checkHostPort(hostPort, "Host port expected " + hostPort) - if (executorActor.contains(executorId)) { + if (executorData.contains(executorId)) { sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId) } else { logInfo("Registered executor: " + sender + " with ID " + executorId) sender ! RegisteredExecutor - executorActor(executorId) = sender - executorHost(executorId) = Utils.parseHostPort(hostPort)._1 - totalCores(executorId) = cores - freeCores(executorId) = cores - executorAddress(executorId) = sender.path.address + executorData.put(executorId, new ExecutorData( + executorActor = sender, + executorHost = Utils.parseHostPort(hostPort)._1, + totalCores = cores, + freeCores = cores, + executorAddress = sender.path.address)) + addressToExecutorId(sender.path.address) = executorId totalCoreCount.addAndGet(cores) totalRegisteredExecutors.addAndGet(1) @@ -104,13 +100,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A case StatusUpdate(executorId, taskId, state, data) => scheduler.statusUpdate(taskId, state, data.value) if (TaskState.isFinished(state)) { - if (executorActor.contains(executorId)) { - freeCores(executorId) += scheduler.CPUS_PER_TASK - makeOffers(executorId) - } else { - // Ignoring the update since we don't know about the executor. - val msg = "Ignored task status update (%d state %s) from unknown executor %s with ID %s" - logWarning(msg.format(taskId, state, sender, executorId)) + executorData.get(executorId) match { + case Some(executorInfo) => + executorInfo.freeCores += scheduler.CPUS_PER_TASK + makeOffers(executorId) + case None => + // Ignoring the update since we don't know about the executor. + val msg = "Ignored task status update (%d state %s) " + + "from unknown executor %s with ID %s" + logWarning(msg.format(taskId, state, sender, executorId)) } } @@ -118,7 +116,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A makeOffers() case KillTask(taskId, executorId, interruptThread) => - executorActor(executorId) ! KillTask(taskId, executorId, interruptThread) + executorData(executorId).executorActor ! KillTask(taskId, executorId, interruptThread) case StopDriver => sender ! true @@ -126,8 +124,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A case StopExecutors => logInfo("Asking each executor to shut down") - for (executor <- executorActor.values) { - executor ! StopExecutor + executorData.foreach { case(k,v) => + v.executorActor ! StopExecutor } sender ! true @@ -149,13 +147,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A // Make fake resource offers on all executors def makeOffers() { launchTasks(scheduler.resourceOffers( - executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))})) + executorData.map{ case(k,v) => new WorkerOffer( k, v.executorHost, v.freeCores)}.toSeq)) } // Make fake resource offers on just one executor def makeOffers(executorId: String) { + val executorInfo = executorData(executorId) launchTasks(scheduler.resourceOffers( - Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId))))) + Seq(new WorkerOffer(executorId, executorInfo.executorHost, executorInfo.freeCores)))) } // Launch tasks returned by a set of resource offers @@ -179,25 +178,22 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A } } else { - freeCores(task.executorId) -= scheduler.CPUS_PER_TASK - executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask)) + val executorInfo = executorData(task.executorId) + executorInfo.freeCores -= scheduler.CPUS_PER_TASK + executorInfo.executorActor ! LaunchTask(new SerializableBuffer(serializedTask)) } } } // Remove a disconnected slave from the cluster def removeExecutor(executorId: String, reason: String) { - if (executorActor.contains(executorId)) { - logInfo("Executor " + executorId + " disconnected, so removing it") - val numCores = totalCores(executorId) - executorActor -= executorId - executorHost -= executorId - addressToExecutorId -= executorAddress(executorId) - executorAddress -= executorId - totalCores -= executorId - freeCores -= executorId - totalCoreCount.addAndGet(-numCores) - scheduler.executorLost(executorId, SlaveLost(reason)) + executorData.get(executorId) match { + case Some(executorInfo) => + val numCores = executorInfo.totalCores + executorData.-=(executorId) + totalCoreCount.addAndGet(-numCores) + scheduler.executorLost(executorId, SlaveLost(reason)) + case None => logError(s"Asked to remove non existant executor $executorId") } } } @@ -297,6 +293,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A } } + private[spark] object CoarseGrainedSchedulerBackend { val ACTOR_NAME = "CoarseGrainedScheduler" } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala new file mode 100644 index 0000000000000..da216613d77f0 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import akka.actor.{Address, ActorRef} + +private[cluster] class ExecutorData( + var executorActor: ActorRef, + var executorAddress: Address, + var executorHost: String , + var freeCores: Int, + var totalCores: Int +) {} From 6890663c9b15914cfb8523b6cf26871c0f1c2727 Mon Sep 17 00:00:00 2001 From: Dale Date: Fri, 26 Sep 2014 09:09:35 +1000 Subject: [PATCH 2/3] [SPARK-3651] implemented suggested changes --- .../CoarseGrainedSchedulerBackend.scala | 34 +++++++++---------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 1473b13b5eed2..12ab907bc04f6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -64,7 +64,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor with ActorLogReceive { override protected def log = CoarseGrainedSchedulerBackend.this.log private val addressToExecutorId = new HashMap[Address, String] - private val executorData = new HashMap[String, ExecutorData] + private val executorDataMap = new HashMap[String, ExecutorData] override def preStart() { // Listen for remote client disconnection events, since they don't go through Akka's watch() @@ -79,17 +79,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A def receiveWithLogging = { case RegisterExecutor(executorId, hostPort, cores) => Utils.checkHostPort(hostPort, "Host port expected " + hostPort) - if (executorData.contains(executorId)) { + if (executorDataMap.contains(executorId)) { sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId) } else { logInfo("Registered executor: " + sender + " with ID " + executorId) sender ! RegisteredExecutor - executorData.put(executorId, new ExecutorData( - executorActor = sender, - executorHost = Utils.parseHostPort(hostPort)._1, - totalCores = cores, - freeCores = cores, - executorAddress = sender.path.address)) + executorDataMap.put(executorId, new ExecutorData(sender, sender.path.address, + Utils.parseHostPort(hostPort)._1, cores, cores)) addressToExecutorId(sender.path.address) = executorId totalCoreCount.addAndGet(cores) @@ -100,7 +96,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A case StatusUpdate(executorId, taskId, state, data) => scheduler.statusUpdate(taskId, state, data.value) if (TaskState.isFinished(state)) { - executorData.get(executorId) match { + executorDataMap.get(executorId) match { case Some(executorInfo) => executorInfo.freeCores += scheduler.CPUS_PER_TASK makeOffers(executorId) @@ -116,7 +112,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A makeOffers() case KillTask(taskId, executorId, interruptThread) => - executorData(executorId).executorActor ! KillTask(taskId, executorId, interruptThread) + executorDataMap(executorId).executorActor ! KillTask(taskId, executorId, interruptThread) case StopDriver => sender ! true @@ -124,8 +120,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A case StopExecutors => logInfo("Asking each executor to shut down") - executorData.foreach { case(k,v) => - v.executorActor ! StopExecutor + for ((_,executorData) <- executorDataMap) { + executorData.executorActor ! StopExecutor } sender ! true @@ -136,6 +132,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A case AddWebUIFilter(filterName, filterParams, proxyBase) => addWebUIFilter(filterName, filterParams, proxyBase) sender ! true + case DisassociatedEvent(_, address, _) => addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disassociated")) @@ -147,14 +144,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A // Make fake resource offers on all executors def makeOffers() { launchTasks(scheduler.resourceOffers( - executorData.map{ case(k,v) => new WorkerOffer( k, v.executorHost, v.freeCores)}.toSeq)) + executorDataMap.map{ case(id, executorData) => + new WorkerOffer( id, executorData.executorHost, executorData.freeCores)}.toSeq)) } // Make fake resource offers on just one executor def makeOffers(executorId: String) { - val executorInfo = executorData(executorId) + val executorData = executorDataMap(executorId) launchTasks(scheduler.resourceOffers( - Seq(new WorkerOffer(executorId, executorInfo.executorHost, executorInfo.freeCores)))) + Seq(new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores)))) } // Launch tasks returned by a set of resource offers @@ -178,7 +176,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A } } else { - val executorInfo = executorData(task.executorId) + val executorInfo = executorDataMap(task.executorId) executorInfo.freeCores -= scheduler.CPUS_PER_TASK executorInfo.executorActor ! LaunchTask(new SerializableBuffer(serializedTask)) } @@ -187,10 +185,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A // Remove a disconnected slave from the cluster def removeExecutor(executorId: String, reason: String) { - executorData.get(executorId) match { + executorDataMap.get(executorId) match { case Some(executorInfo) => val numCores = executorInfo.totalCores - executorData.-=(executorId) + executorDataMap -= executorId totalCoreCount.addAndGet(-numCores) scheduler.executorLost(executorId, SlaveLost(reason)) case None => logError(s"Asked to remove non existant executor $executorId") From d1be0a9f099840fc6fc1201f2072f8d08e1698fc Mon Sep 17 00:00:00 2001 From: Dale Date: Sun, 28 Sep 2014 11:01:18 +1000 Subject: [PATCH 3/3] [SPARK-3651] implemented suggested changes. Changed a reference from executorInfo to executorData to be consistent with other usages --- .../CoarseGrainedSchedulerBackend.scala | 23 ++++++++----------- .../scheduler/cluster/ExecutorData.scala | 20 ++++++++++++---- 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 12ab907bc04f6..59e15edc75f5a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -84,7 +84,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A } else { logInfo("Registered executor: " + sender + " with ID " + executorId) sender ! RegisteredExecutor - executorDataMap.put(executorId, new ExecutorData(sender, sender.path.address, + executorDataMap.put(executorId, new ExecutorData(sender, sender.path.address, Utils.parseHostPort(hostPort)._1, cores, cores)) addressToExecutorId(sender.path.address) = executorId @@ -102,9 +102,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A makeOffers(executorId) case None => // Ignoring the update since we don't know about the executor. - val msg = "Ignored task status update (%d state %s) " + - "from unknown executor %s with ID %s" - logWarning(msg.format(taskId, state, sender, executorId)) + logWarning(s"Ignored task status update ($taskId state $state) " + + "from unknown executor $sender with ID $executorId") } } @@ -120,7 +119,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A case StopExecutors => logInfo("Asking each executor to shut down") - for ((_,executorData) <- executorDataMap) { + for ((_, executorData) <- executorDataMap) { executorData.executorActor ! StopExecutor } sender ! true @@ -144,8 +143,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A // Make fake resource offers on all executors def makeOffers() { launchTasks(scheduler.resourceOffers( - executorDataMap.map{ case(id, executorData) => - new WorkerOffer( id, executorData.executorHost, executorData.freeCores)}.toSeq)) + executorDataMap.map {case (id, executorData) => + new WorkerOffer(id, executorData.executorHost, executorData.freeCores)}.toSeq)) } // Make fake resource offers on just one executor @@ -176,9 +175,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A } } else { - val executorInfo = executorDataMap(task.executorId) - executorInfo.freeCores -= scheduler.CPUS_PER_TASK - executorInfo.executorActor ! LaunchTask(new SerializableBuffer(serializedTask)) + val executorData = executorDataMap(task.executorId) + executorData.freeCores -= scheduler.CPUS_PER_TASK + executorData.executorActor ! LaunchTask(new SerializableBuffer(serializedTask)) } } } @@ -187,9 +186,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A def removeExecutor(executorId: String, reason: String) { executorDataMap.get(executorId) match { case Some(executorInfo) => - val numCores = executorInfo.totalCores executorDataMap -= executorId - totalCoreCount.addAndGet(-numCores) + totalCoreCount.addAndGet(-executorInfo.totalCores) scheduler.executorLost(executorId, SlaveLost(reason)) case None => logError(s"Asked to remove non existant executor $executorId") } @@ -291,7 +289,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A } } - private[spark] object CoarseGrainedSchedulerBackend { val ACTOR_NAME = "CoarseGrainedScheduler" } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala index da216613d77f0..74a92985b6629 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala @@ -19,10 +19,20 @@ package org.apache.spark.scheduler.cluster import akka.actor.{Address, ActorRef} +/** + * Grouping of data that is accessed by a CourseGrainedScheduler. This class + * is stored in a Map keyed by an executorID + * + * @param executorActor The actorRef representing this executor + * @param executorAddress The network address of this executor + * @param executorHost The hostname that this executor is running on + * @param freeCores The current number of cores available for work on the executor + * @param totalCores The total number of cores available to the executor + */ private[cluster] class ExecutorData( - var executorActor: ActorRef, - var executorAddress: Address, - var executorHost: String , + val executorActor: ActorRef, + val executorAddress: Address, + val executorHost: String , var freeCores: Int, - var totalCores: Int -) {} + val totalCores: Int +)