From 80047aa17047e30359bddda7dcce29e282b2ec14 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 20 Jul 2015 00:57:49 -0700 Subject: [PATCH 01/12] First working implementation This adds two messages between the AppClient on the driver and the standalone Master: request and kill. The scheduling on the Master side handles both applications that explicitly set `spark.executor.cores` and those that did not. TODO: clean up shuffle files on application exit and unit tests. --- .../org/apache/spark/HeartbeatReceiver.scala | 16 ++- .../scala/org/apache/spark/SparkContext.scala | 19 --- .../apache/spark/deploy/DeployMessage.scala | 4 + .../spark/deploy/client/AppClient.scala | 45 +++++++ .../spark/deploy/master/ApplicationInfo.scala | 15 ++- .../apache/spark/deploy/master/Master.scala | 117 +++++++++++++++++- .../CoarseGrainedSchedulerBackend.scala | 2 +- .../cluster/SparkDeploySchedulerBackend.scala | 28 +++++ 8 files changed, 211 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 43dd4a170731d..ee60d697d8799 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -177,16 +177,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms") scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + s"timed out after ${now - lastSeenMs} ms")) - if (sc.supportDynamicAllocation) { // Asynchronously kill the executor to avoid blocking the current thread - killExecutorThread.submit(new Runnable { - override def run(): Unit = Utils.tryLogNonFatalError { - // Note: we want to get an executor back after expiring this one, - // so do not simply call `sc.killExecutor` here (SPARK-8119) - sc.killAndReplaceExecutor(executorId) - } - }) - } + killExecutorThread.submit(new Runnable { + override def run(): Unit = Utils.tryLogNonFatalError { + // Note: we want to get an executor back after expiring this one, + // so do not simply call `sc.killExecutor` here (SPARK-8119) + sc.killAndReplaceExecutor(executorId) + } + }) executorLastSeen.remove(executorId) } } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d00c012d80560..21204eb3e04c5 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -531,8 +531,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli val dynamicAllocationEnabled = _conf.getBoolean("spark.dynamicAllocation.enabled", false) _executorAllocationManager = if (dynamicAllocationEnabled) { - assert(supportDynamicAllocation, - "Dynamic allocation of executors is currently only supported in YARN and Mesos mode") Some(new ExecutorAllocationManager(this, listenerBus, _conf)) } else { None @@ -1361,17 +1359,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli postEnvironmentUpdate() } - /** - * Return whether dynamically adjusting the amount of resources allocated to - * this application is supported. This is currently only available for YARN - * and Mesos coarse-grained mode. - */ - private[spark] def supportDynamicAllocation: Boolean = { - (master.contains("yarn") - || master.contains("mesos") - || _conf.getBoolean("spark.dynamicAllocation.testing", false)) - } - /** * :: DeveloperApi :: * Register a listener to receive up-calls from events that happen during execution. @@ -1387,8 +1374,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * This is currently only supported in YARN mode. Return whether the request is received. */ private[spark] override def requestTotalExecutors(numExecutors: Int): Boolean = { - assert(supportDynamicAllocation, - "Requesting executors is currently only supported in YARN and Mesos modes") schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.requestTotalExecutors(numExecutors) @@ -1405,8 +1390,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ @DeveloperApi override def requestExecutors(numAdditionalExecutors: Int): Boolean = { - assert(supportDynamicAllocation, - "Requesting executors is currently only supported in YARN and Mesos modes") schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.requestExecutors(numAdditionalExecutors) @@ -1429,8 +1412,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ @DeveloperApi override def killExecutors(executorIds: Seq[String]): Boolean = { - assert(supportDynamicAllocation, - "Killing executors is currently only supported in YARN and Mesos modes") schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.killExecutors(executorIds) diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 12727de9b4cf3..d8084a57658ad 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -107,6 +107,10 @@ private[deploy] object DeployMessages { case class MasterChangeAcknowledged(appId: String) + case class RequestExecutors(appId: String, requestedTotal: Int) + + case class KillExecutors(appId: String, executorIds: Seq[String]) + // Master to AppClient case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 79b251e7e62fe..5ebc658cd3d71 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -197,6 +197,22 @@ private[spark] class AppClient( sendToMaster(UnregisterApplication(appId)) context.reply(true) stop() + + case r: RequestExecutors => + master match { + case Some(m) => context.reply(m.askWithRetry[Boolean](r)) + case None => + logWarning("Attempted to request executors before registering with Master.") + context.reply(false) + } + + case k: KillExecutors => + master match { + case Some(m) => context.reply(m.askWithRetry[Boolean](k)) + case None => + logWarning("Attempted to kill executors before registering with Master.") + context.reply(false) + } } override def onDisconnected(address: RpcAddress): Unit = { @@ -256,4 +272,33 @@ private[spark] class AppClient( endpoint = null } } + + /** + * Request executors from the Master by specifying the total number desired, + * including existing pending and running executors. + * + * @return whether the request is acknowledged. + */ + def requestTotalExecutors(requestedTotal: Int): Boolean = { + if (endpoint != null && appId != null) { + endpoint.askWithRetry[Boolean](RequestExecutors(appId, requestedTotal)) + } else { + logWarning("Attempted to request executors before driver fully initialized.") + false + } + } + + /** + * Kill the given list of executors through the Master. + * @return whether the kill request is acknowledged. + */ + def killExecutors(executorIds: Seq[String]): Boolean = { + if (endpoint != null && appId != null) { + endpoint.askWithRetry[Boolean](KillExecutors(appId, executorIds)) + } else { + logWarning("Attempted to kill executors before driver fully initialized.") + false + } + } + } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index aa54ed9360f36..b9edff182f6e3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -22,7 +22,6 @@ import java.util.Date import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import org.apache.spark.annotation.DeveloperApi import org.apache.spark.deploy.ApplicationDescription import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.util.Utils @@ -43,6 +42,18 @@ private[spark] class ApplicationInfo( @transient var endTime: Long = _ @transient var appSource: ApplicationSource = _ + // A cap on the number of executors this application can have at any given time. + // By default, this is infinite. Only after the first allocation request is issued + // by the application will this be set to a finite value. + @transient var executorLimit: Int = _ + + // A set of workers on which this application cannot launch executors. + // This is used to handle kill requests when `spark.executor.cores` is NOT set. In this mode, + // at most one executor from this application can be run on each worker. When an executor is + // killed, its worker is added to the blacklist to avoid having the master immediately schedule + // a new executor on the worker. + @transient var blacklistedWorkers: mutable.HashSet[String] = _ + @transient private var nextExecutorId: Int = _ init() @@ -60,6 +71,8 @@ private[spark] class ApplicationInfo( appSource = new ApplicationSource(this) nextExecutorId = 0 removedExecutors = new ArrayBuffer[ExecutorDesc] + executorLimit = Integer.MAX_VALUE + blacklistedWorkers = new mutable.HashSet[String] } private def newExecutorId(useID: Option[Int] = None): Int = { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 4615febf17d24..f9bcc7b6ec89a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -468,6 +468,25 @@ private[master] class Master( case BoundPortsRequest => { context.reply(BoundPortsResponse(address.port, webUi.boundPort, restServerBoundPort)) } + + case RequestExecutors(appId, requestedTotal) => + context.reply(handleRequestExecutors(appId, requestedTotal)) + + case KillExecutors(appId, executorIdsString) => + // All executors IDs should be integers since we launched these executors. + // However, the kill interface on the driver side accepts strings, so we need to handle + // non-integer executor IDs just to be safe since the user can pass in whatever s/he wants. + val executorIds = executorIdsString.flatMap { executorId => + try { + Some(executorId.toInt) + } catch { + case e: NumberFormatException => + logError(s"Application $appId requested to kill an " + + s"executor with a non-integer ID: $executorId. Ignoring.") + None + } + } + context.reply(handleKillExecutors(appId, executorIds)) } override def onDisconnected(address: RpcAddress): Unit = { @@ -559,12 +578,22 @@ private[master] class Master( // Try to spread out each app among all the workers, until it has all its cores for (app <- waitingApps if app.coresLeft > 0) { val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) - .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && - worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1)) + .filter { worker => + worker.memoryFree >= app.desc.memoryPerExecutorMB && + worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1) && + !app.blacklistedWorkers.contains(worker.id) + } .sortBy(_.coresFree).reverse val numUsable = usableWorkers.length val assigned = new Array[Int](numUsable) // Number of cores to give on each node var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) + // If all executors in this application have the same number of cores, + // use the executor limit to bound the number of cores to assign. + app.desc.coresPerExecutor.foreach { coresPerExecutor => + val maxNewExecutors = app.executorLimit - app.executors.size + val maxNewCores = math.min(maxNewExecutors.toLong * coresPerExecutor, Integer.MAX_VALUE) + toAssign = math.min(toAssign, maxNewCores.toInt) + } var pos = 0 while (toAssign > 0) { if (usableWorkers(pos).coresFree - assigned(pos) > 0) { @@ -743,9 +772,7 @@ private[master] class Master( rebuildSparkUI(app) for (exec <- app.executors.values) { - exec.worker.removeExecutor(exec) - exec.worker.endpoint.send(KillExecutor(masterUrl, exec.application.id, exec.id)) - exec.state = ExecutorState.KILLED + killExecutor(exec) } app.markFinished(state) if (state != ApplicationState.FINISHED) { @@ -761,6 +788,86 @@ private[master] class Master( } } + /** + * Handle a request from an application to set the executors limit for this application. + * @return whether the application has previously registered with this Master. + */ + private def handleRequestExecutors(appId: String, requestedTotal: Int): Boolean = { + idToApp.get(appId) match { + case Some(appInfo) => + logInfo(s"Application $appId requested to set total executors to $requestedTotal.") + appInfo.executorLimit = requestedTotal + + // We may have previously added workers to the blacklist. Now that the application + // explicitly requests more executors, we can fulfill the request by removing workers + // from the blacklist, if any. + if (appInfo.desc.coresPerExecutor.isEmpty) { + val numMissingExecutors = appInfo.executorLimit - appInfo.executors.size + if (numMissingExecutors > 0) { + appInfo.blacklistedWorkers.take(numMissingExecutors).foreach { workerId => + appInfo.blacklistedWorkers.remove(workerId) + } + } + } + schedule() + true + case None => + logWarning(s"Unknown application $appId requested $requestedTotal total executors.") + false + } + } + + /** + * Handle a request from an application to kill the specified list of executors. + * @return whether the application has previously registered with this Master. + */ + private def handleKillExecutors(appId: String, executorIds: Seq[Int]): Boolean = { + idToApp.get(appId) match { + case Some(appInfo) => + val (known, unknown) = executorIds.partition(appInfo.executors.contains) + if (known.nonEmpty) { + logInfo(s"Application $appId attempted to kill executors: " + known.mkString(", ")) + val executors = known.map { executorId => appInfo.executors(executorId) } + executors.foreach { desc => killExecutor(desc) } + + // If cores per executor is not set, then each worker runs at most one executor. + // In this case, after killing an executor we need to blacklist its worker since + // we don't want the worker to immediately launch a new executor. + if (appInfo.desc.coresPerExecutor.isEmpty) { + // There may be executors waiting to be scheduled once space frees up. + // If so, keep around a few non-blacklisted workers to launch these executors. + // Note that this assumes the executor limit has already been adjusted downwards + // through a separate request message. + val numExecutorsAfterKill = appInfo.executors.size - known.size + val numWaitingExecutors = math.max(0, appInfo.executorLimit - numExecutorsAfterKill) + val workersToBlacklist = executors.drop(numWaitingExecutors).map(_.worker.id) + workersToBlacklist.foreach { workerId => appInfo.blacklistedWorkers += workerId } + } + } + + // Warn against executor IDs we don't know about + if (unknown.nonEmpty) { + logWarning(s"Application $appId attempted to kill non-existent executors: " + + unknown.mkString(", ")) + } + + schedule() + true + case None => + logWarning(s"Unregistered application $appId requested us to kill executors!") + false + } + } + + /** + * Ask the Worker on which the specified executor is launched to kill the executor. + */ + private def killExecutor(exec: ExecutorDesc): Unit = { + exec.worker.removeExecutor(exec) + exec.worker.endpoint.send(KillExecutor(masterUrl, exec.application.id, exec.id)) + exec.state = ExecutorState.KILLED + } + /** * Rebuild a new SparkUI from the given application's event logs. * Return the UI if successful, else None diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index f14c603ac6891..be144f418b522 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -404,7 +404,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp /** * Kill the given list of executors through the cluster manager. - * Return whether the kill request is acknowledged. + * @return whether the kill request is acknowledged. */ protected def doKillExecutors(executorIds: Seq[String]): Boolean = false diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 687ae9620460f..bbe51b4a09a22 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -152,6 +152,34 @@ private[spark] class SparkDeploySchedulerBackend( super.applicationId } + /** + * Request executors from the Master by specifying the total number desired, + * including existing pending and running executors. + * + * @return whether the request is acknowledged. + */ + protected override def doRequestTotalExecutors(requestedTotal: Int): Boolean = { + Option(client) match { + case Some(c) => c.requestTotalExecutors(requestedTotal) + case None => + logWarning("Attempted to request executors before driver fully initialized.") + false + } + } + + /** + * Kill the given list of executors through the Master. + * @return whether the kill request is acknowledged. + */ + protected override def doKillExecutors(executorIds: Seq[String]): Boolean = { + Option(client) match { + case Some(c) => c.killExecutors(executorIds) + case None => + logWarning("Attempted to kill executors before driver fully initialized.") + false + } + } + private def waitForRegistration() = { registrationBarrier.acquire() } From 49702d133af9f049638a540bd4979b0c1a515b7d Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 20 Jul 2015 01:04:52 -0700 Subject: [PATCH 02/12] Clean up shuffle files after application exits --- .../org/apache/spark/deploy/ExternalShuffleService.scala | 5 +++++ .../main/scala/org/apache/spark/deploy/worker/Worker.scala | 1 + 2 files changed, 6 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala index 09973a0a2c998..08bf0d46232ba 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala @@ -70,6 +70,11 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana server = transportContext.createServer(port, bootstraps) } + /** Clean up all shuffle files associated with an application that has exited. */ + def applicationRemoved(appId: String): Unit = { + blockHandler.applicationRemoved(appId, /* cleanupLocalDirs */ true) + } + def stop() { if (server != null) { server.close() diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 82e9578bbcba5..b518db5fe4c53 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -582,6 +582,7 @@ private[worker] class Worker( Utils.deleteRecursively(new File(dir)) } } + shuffleService.applicationRemoved(id) } } From 42ac215da68bc0b777983a7872c239f7874e1dfb Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 20 Jul 2015 13:45:24 -0700 Subject: [PATCH 03/12] Clean up comments and rewrite code for readability The blacklisting logic is fairly intricate, but there was not really high level documentation describing why the blacklist is needed. This commit addresses that. --- .../spark/deploy/master/ApplicationInfo.scala | 15 +++++ .../apache/spark/deploy/master/Master.scala | 65 ++++++++++++------- 2 files changed, 57 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index b9edff182f6e3..94b6d7c0df8bc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -109,6 +109,21 @@ private[spark] class ApplicationInfo( private[master] def coresLeft: Int = requestedCores - coresGranted + /** + * Return the number of executors waiting to be scheduled once space frees up. + * + * This is only defined if the application explicitly set the executor limit. For instance, + * if an application asks for 8 executors but there is only space for 5, then there will be + * 3 waiting executors. + */ + private[master] def numWaitingExecutors: Int = { + if (executorLimit != Integer.MAX_VALUE) { + math.max(0, executorLimit - executors.size) + } else { + 0 + } + } + private var _retryCount = 0 private[master] def retryCount = _retryCount diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index f9bcc7b6ec89a..767ba8a209f29 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -789,7 +789,7 @@ private[master] class Master( } /** - * Handle a request from an application to set the executors limit for this application. + * Handle a request to set the target number of executors for this application. * @return whether the application has previously registered with this Master. */ private def handleRequestExecutors(appId: String, requestedTotal: Int): Boolean = { @@ -798,16 +798,13 @@ private[master] class Master( logInfo(s"Application $appId requested to set total executors to $requestedTotal.") appInfo.executorLimit = requestedTotal - // We may have previously added workers to the blacklist. Now that the application - // explicitly requests more executors, we can fulfill the request by removing workers - // from the blacklist, if any. + // If the application raises the executor limit, then we can launch new executors. + // If there are previously blacklisted workers, then we can launch these new executors + // by unblacklisting a subset of these workers. For more detail, see `handleKillExecutors`. if (appInfo.desc.coresPerExecutor.isEmpty) { - val numMissingExecutors = appInfo.executorLimit - appInfo.executors.size - if (numMissingExecutors > 0) { - appInfo.blacklistedWorkers.take(numMissingExecutors).foreach { workerId => - appInfo.blacklistedWorkers.remove(workerId) - } - } + appInfo.blacklistedWorkers + .take(appInfo.numWaitingExecutors) + .foreach { workerId => appInfo.blacklistedWorkers.remove(workerId) } } schedule() true @@ -818,7 +815,28 @@ private[master] class Master( } /** - * Handle a request from an application to kill the specified list of executors. + * Handle a kill request from the given application. + * + * There are two distinct ways of handling kill requests. For applications that explicitly + * set `spark.executor.cores`, all executors have exactly N cores. In this mode, we can simply + * multiply the application's executor limit by N to determine a cap on the number of cores + * to assign to this application. + * + * The kill mechanism for applications that did not set `spark.executor.cores` is more complex. + * In this mode, each executor grabs all the available cores on the worker, so we cannot simply + * rely on the executor limit as the executors may not be uniform in the number of cores. + * Instead, we use a blacklisting mechanism to enforce kills. When an executor is killed, we + * blacklist its worker so that we do not immediately launch a new executor on the worker. + * A worker is removed from the blacklist only when a request to add executors is serviced. + * + * Note that in this case, we may not always want to blacklist the worker. For instance, if we + * previously requested 10 new executors but there is only room for 3, then we have 7 executors + * waiting to be scheduled once space frees up. In this case, after we kill an executor we do + * NOT add its worker to the blacklist because there is a prior request that we need to service. + * + * Note: this method assumes the executor limit has already been adjusted downwards through + * a separate [[RequestExecutors]] message. + * * @return whether the application has previously registered with this Master. */ private def handleKillExecutors(appId: String, executorIds: Seq[Int]): Boolean = { @@ -827,21 +845,22 @@ private[master] class Master( val (known, unknown) = executorIds.partition(appInfo.executors.contains) if (known.nonEmpty) { logInfo(s"Application $appId attempted to kill executors: " + known.mkString(", ")) - val executors = known.map { executorId => appInfo.executors(executorId) } - executors.foreach { desc => killExecutor(desc) } + val executorsToKill = known.map { executorId => appInfo.executors(executorId) } - // If cores per executor is not set, then each worker runs at most one executor. - // In this case, after killing an executor we need to blacklist its worker since - // we don't want the worker to immediately launch a new executor. + // Ask the worker to kill the executor and remove state about it + executorsToKill.foreach { desc => + killExecutor(desc) + appInfo.executors.remove(desc.id) + } + + // If cores per executor is not set, then we need to use the blacklist mechanism in + // addition to the executor limit. For more detail, see the java doc of this method. if (appInfo.desc.coresPerExecutor.isEmpty) { // There may be executors waiting to be scheduled once space frees up. - // If so, keep around a few non-blacklisted workers to launch these executors. - // Note that this assumes the executor limit has already been adjusted downwards - // through a separate request message. - val numExecutorsAfterKill = appInfo.executors.size - known.size - val numWaitingExecutors = math.max(0, appInfo.executorLimit - numExecutorsAfterKill) - val workersToBlacklist = executors.drop(numWaitingExecutors).map(_.worker.id) - workersToBlacklist.foreach { workerId => appInfo.blacklistedWorkers += workerId } + // If so, leave a few workers unblacklisted to launch these executors. + executorsToKill.drop(appInfo.numWaitingExecutors).foreach { desc => + appInfo.blacklistedWorkers += desc.worker.id + } } } From 58cb06f41d25ee7af3056814b6a13fddfe35f64a Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 20 Jul 2015 13:56:46 -0700 Subject: [PATCH 04/12] Privatize worker blacklist for cleanliness --- .../spark/deploy/master/ApplicationInfo.scala | 28 ++++++++++++++++++- .../apache/spark/deploy/master/Master.scala | 8 ++---- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index 94b6d7c0df8bc..5a0a207ddaaaa 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -52,7 +52,7 @@ private[spark] class ApplicationInfo( // at most one executor from this application can be run on each worker. When an executor is // killed, its worker is added to the blacklist to avoid having the master immediately schedule // a new executor on the worker. - @transient var blacklistedWorkers: mutable.HashSet[String] = _ + @transient private var blacklistedWorkers: mutable.HashSet[String] = _ @transient private var nextExecutorId: Int = _ @@ -124,6 +124,32 @@ private[spark] class ApplicationInfo( } } + /** + * Add a worker to the blacklist, called when the executor running on the worker is killed. + * This is used only if cores per executor is not set. + */ + private[master] def blacklistWorker(workerId: String): Unit = { + blacklistedWorkers += workerId + } + + /** + * Remove workers from the blacklist, called when the application requests new executors. + * This is used only if cores per executor is not set. + */ + private[master] def removeFromBlacklist(numWorkers: Int): Unit = { + blacklistedWorkers.take(numWorkers).foreach { workerId => + blacklistedWorkers.remove(workerId) + } + } + + /** + * Return whether the specified worker is blacklisted. + * This is used only if cores per executor is not set. + */ + private[master] def isBlacklisted(workerId: String): Boolean = { + blacklistedWorkers.contains(workerId) + } + private var _retryCount = 0 private[master] def retryCount = _retryCount diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 767ba8a209f29..497864388f16a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -581,7 +581,7 @@ private[master] class Master( .filter { worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1) && - !app.blacklistedWorkers.contains(worker.id) + !app.isBlacklisted(worker.id) } .sortBy(_.coresFree).reverse val numUsable = usableWorkers.length @@ -802,9 +802,7 @@ private[master] class Master( // If there are previously blacklisted workers, then we can launch these new executors // by unblacklisting a subset of these workers. For more detail, see `handleKillExecutors`. if (appInfo.desc.coresPerExecutor.isEmpty) { - appInfo.blacklistedWorkers - .take(appInfo.numWaitingExecutors) - .foreach { workerId => appInfo.blacklistedWorkers.remove(workerId) } + appInfo.removeFromBlacklist(appInfo.numWaitingExecutors) } schedule() true @@ -859,7 +857,7 @@ private[master] class Master( // There may be executors waiting to be scheduled once space frees up. // If so, leave a few workers unblacklisted to launch these executors. executorsToKill.drop(appInfo.numWaitingExecutors).foreach { desc => - appInfo.blacklistedWorkers += desc.worker.id + appInfo.blacklistWorker(desc.worker.id) } } } From 32abe44ccac3e493538dda486fc64e3739f7b295 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 20 Jul 2015 14:26:02 -0700 Subject: [PATCH 05/12] Fix style There's a weird scalastyle bug that thinks there is no space after the comma in the following: ``` someMethod(name, /* clean up */ true) ``` --- .../scala/org/apache/spark/deploy/ExternalShuffleService.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala index 08bf0d46232ba..4089c3e771fa8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala @@ -72,7 +72,7 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana /** Clean up all shuffle files associated with an application that has exited. */ def applicationRemoved(appId: String): Unit = { - blockHandler.applicationRemoved(appId, /* cleanupLocalDirs */ true) + blockHandler.applicationRemoved(appId, true /* cleanupLocalDirs */) } def stop() { From 1334e9ad38ca5f66e8c0b739e273219bf561778f Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 20 Jul 2015 14:47:30 -0700 Subject: [PATCH 06/12] Fix MiMa --- project/MimaExcludes.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index a2595ff6c22f4..4d781bb2d42a4 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -104,6 +104,10 @@ object MimaExcludes { // SPARK-7422 add argmax for sparse vectors ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.mllib.linalg.Vector.argmax") + ) ++ Seq( + // SPARK-4751 Dynamic allocation for standalone mode + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.SparkContext.supportDynamicAllocation") ) case v if v.startsWith("1.4") => From 0a8be7944c38ecf76ffbf2eb2e3303058fa4968a Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sun, 26 Jul 2015 13:07:04 -0700 Subject: [PATCH 07/12] Simplify logic by removing the worker blacklist This commit significantly simplifies the logic by using the executor limit as a cap. Instead of a completely separate code path that added and removed workers from blacklist, reusing the executor limit allows us to reason about the behavior of dynamic allocation more straightforwardly. --- .../spark/deploy/master/ApplicationInfo.scala | 49 +------- .../apache/spark/deploy/master/Master.scala | 119 ++++++++---------- 2 files changed, 54 insertions(+), 114 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index 5a0a207ddaaaa..18525482f6314 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -47,13 +47,6 @@ private[spark] class ApplicationInfo( // by the application will this be set to a finite value. @transient var executorLimit: Int = _ - // A set of workers on which this application cannot launch executors. - // This is used to handle kill requests when `spark.executor.cores` is NOT set. In this mode, - // at most one executor from this application can be run on each worker. When an executor is - // killed, its worker is added to the blacklist to avoid having the master immediately schedule - // a new executor on the worker. - @transient private var blacklistedWorkers: mutable.HashSet[String] = _ - @transient private var nextExecutorId: Int = _ init() @@ -72,7 +65,6 @@ private[spark] class ApplicationInfo( nextExecutorId = 0 removedExecutors = new ArrayBuffer[ExecutorDesc] executorLimit = Integer.MAX_VALUE - blacklistedWorkers = new mutable.HashSet[String] } private def newExecutorId(useID: Option[Int] = None): Int = { @@ -110,45 +102,12 @@ private[spark] class ApplicationInfo( private[master] def coresLeft: Int = requestedCores - coresGranted /** - * Return the number of executors waiting to be scheduled once space frees up. + * Return whether this application should launch at most one executor per worker. * - * This is only defined if the application explicitly set the executor limit. For instance, - * if an application asks for 8 executors but there is only space for 5, then there will be - * 3 waiting executors. + * This is true if cores per executor is not defined, in which case the executor should + * grab all the available cores on the worker instead. */ - private[master] def numWaitingExecutors: Int = { - if (executorLimit != Integer.MAX_VALUE) { - math.max(0, executorLimit - executors.size) - } else { - 0 - } - } - - /** - * Add a worker to the blacklist, called when the executor running on the worker is killed. - * This is used only if cores per executor is not set. - */ - private[master] def blacklistWorker(workerId: String): Unit = { - blacklistedWorkers += workerId - } - - /** - * Remove workers from the blacklist, called when the application requests new executors. - * This is used only if cores per executor is not set. - */ - private[master] def removeFromBlacklist(numWorkers: Int): Unit = { - blacklistedWorkers.take(numWorkers).foreach { workerId => - blacklistedWorkers.remove(workerId) - } - } - - /** - * Return whether the specified worker is blacklisted. - * This is used only if cores per executor is not set. - */ - private[master] def isBlacklisted(workerId: String): Boolean = { - blacklistedWorkers.contains(workerId) - } + private[master] def oneExecutorPerWorker(): Boolean = desc.coresPerExecutor.isDefined private var _retryCount = 0 diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 657b1a17916dd..c1ec4880cd90e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -472,21 +472,9 @@ private[master] class Master( case RequestExecutors(appId, requestedTotal) => context.reply(handleRequestExecutors(appId, requestedTotal)) - case KillExecutors(appId, executorIdsString) => - // All executors IDs should be integers since we launched these executors. - // However, the kill interface on the driver side accepts strings, so we need to handle - // non-integer executor IDs just to be safe since the user can pass in whatever s/he wants. - val executorIds = executorIdsString.flatMap { executorId => - try { - Some(executorId.toInt) - } catch { - case e: NumberFormatException => - logError(s"Application $appId requested to kill an " + - s"executor with a non-integer ID: $executorId. Ignoring.") - None - } - } - context.reply(handleKillExecutors(appId, executorIds)) + case KillExecutors(appId, executorIds) => + val formattedExecutorIds = formatExecutorIds(executorIds) + context.reply(handleKillExecutors(appId, formattedExecutorIds)) } override def onDisconnected(address: RpcAddress): Unit = { @@ -582,41 +570,40 @@ private[master] class Master( app: ApplicationInfo, usableWorkers: Array[WorkerInfo], spreadOutApps: Boolean): Array[Int] = { - // If the number of cores per executor is not specified, then we can just schedule - // 1 core at a time since we expect a single executor to be launched on each worker - val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1) + val coresPerExecutor = app.desc.coresPerExecutor + val minCoresPerExecutor = coresPerExecutor.getOrElse(1) val memoryPerExecutor = app.desc.memoryPerExecutorMB val numUsable = usableWorkers.length val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker - val assignedMemory = new Array[Int](numUsable) // Amount of memory to give to each worker + val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) - var freeWorkers = (0 until numUsable).toIndexedSeq - - // If all executors in this application have the same number of cores, - // use the executor limit to bound the number of cores to assign. - app.desc.coresPerExecutor.foreach { coresPerExecutor => - val maxNewExecutors = app.executorLimit - app.executors.size - val maxNewCores = math.min(maxNewExecutors.toLong * coresPerExecutor, Integer.MAX_VALUE) - toAssign = math.min(toAssign, maxNewCores.toInt) - } + /** Return whether the specified worker can launch an executor for this app. */ def canLaunchExecutor(pos: Int): Boolean = { - usableWorkers(pos).coresFree - assignedCores(pos) >= coresPerExecutor && - usableWorkers(pos).memoryFree - assignedMemory(pos) >= memoryPerExecutor && - !app.isBlacklisted(usableWorkers(pos).id) + val assignedMemory = assignedExecutors(pos) * memoryPerExecutor + usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor && + usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor && + coresToAssign >= minCoresPerExecutor && + assignedExecutors.sum + app.executors.size < app.executorLimit } - while (coresToAssign >= coresPerExecutor && freeWorkers.nonEmpty) { - freeWorkers = freeWorkers.filter(canLaunchExecutor) + // Keep launching executors until no more workers can accommodate + // any more, or if we have reached this application's limits + var freeWorkers = (0 until numUsable).filter(canLaunchExecutor) + while (freeWorkers.nonEmpty) { freeWorkers.foreach { pos => var keepScheduling = true - while (keepScheduling && canLaunchExecutor(pos) && coresToAssign >= coresPerExecutor) { - coresToAssign -= coresPerExecutor - assignedCores(pos) += coresPerExecutor - // If cores per executor is not set, we are assigning 1 core at a time - // without actually meaning to launch 1 executor for each core assigned - if (app.desc.coresPerExecutor.isDefined) { - assignedMemory(pos) += memoryPerExecutor + while (keepScheduling && canLaunchExecutor(pos)) { + coresToAssign -= minCoresPerExecutor + assignedCores(pos) += minCoresPerExecutor + + // If cores per executor is set, every iteration of this loop schedules one executor. + // Otherwise, we are simply assigning 1 core at a time, and each worker should have + // at most 1 executor. + if (app.oneExecutorPerWorker()) { + assignedExecutors(pos) += 1 + } else { + assignedExecutors(pos) = 1 } // Spreading out an application means spreading out its executors across as @@ -628,6 +615,7 @@ private[master] class Master( } } } + freeWorkers = freeWorkers.filter(canLaunchExecutor) } assignedCores } @@ -838,13 +826,6 @@ private[master] class Master( case Some(appInfo) => logInfo(s"Application $appId requested to set total executors to $requestedTotal.") appInfo.executorLimit = requestedTotal - - // If the application raises the executor limit, then we can launch new executors. - // If there are previously blacklisted workers, then we can launch these new executors - // by unblacklisting a subset of these workers. For more detail, see `handleKillExecutors`. - if (appInfo.desc.coresPerExecutor.isEmpty) { - appInfo.removeFromBlacklist(appInfo.numWaitingExecutors) - } schedule() true case None => @@ -881,34 +862,15 @@ private[master] class Master( private def handleKillExecutors(appId: String, executorIds: Seq[Int]): Boolean = { idToApp.get(appId) match { case Some(appInfo) => + logInfo(s"Application $appId requests to kill executors: " + executorIds.mkString(", ")) val (known, unknown) = executorIds.partition(appInfo.executors.contains) - if (known.nonEmpty) { - logInfo(s"Application $appId attempted to kill executors: " + known.mkString(", ")) - val executorsToKill = known.map { executorId => appInfo.executors(executorId) } - - // Ask the worker to kill the executor and remove state about it - executorsToKill.foreach { desc => - killExecutor(desc) - appInfo.executors.remove(desc.id) - } - - // If cores per executor is not set, then we need to use the blacklist mechanism in - // addition to the executor limit. For more detail, see the java doc of this method. - if (appInfo.desc.coresPerExecutor.isEmpty) { - // There may be executors waiting to be scheduled once space frees up. - // If so, leave a few workers unblacklisted to launch these executors. - executorsToKill.drop(appInfo.numWaitingExecutors).foreach { desc => - appInfo.blacklistWorker(desc.worker.id) - } - } + known.foreach { executorId => + appInfo.executors.remove(executorId).foreach(killExecutor) } - - // Warn against executor IDs we don't know about if (unknown.nonEmpty) { logWarning(s"Application $appId attempted to kill non-existent executors: " + unknown.mkString(", ")) } - schedule() true case None => @@ -917,6 +879,25 @@ private[master] class Master( } } + /** + * Cast the given executor IDs to integers and filter out the ones that are not. + * + * All executors IDs should be integers since we launched these executors. However, the + * kill interface on the driver side accepts strings, so we need to handle non-integer + * executor IDs just to be safe since the user can pass in whatever s/he wants. + */ + private def formatExecutorIds(executorIds: Seq[String]): Seq[Int] = { + executorIds.flatMap { executorId => + try { + Some(executorId.toInt) + } catch { + case e: NumberFormatException => + logError(s"Encountered executor with a non-integer ID: $executorId. Ignoring") + None + } + } + } + /** * Ask the Worker on which the specified executor is launched to kill the executor. */ From a82e907ea24cf68fc17a3204bf3ea0b4632ab804 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sun, 26 Jul 2015 21:47:40 -0700 Subject: [PATCH 08/12] Fix comments --- .../scala/org/apache/spark/SparkContext.scala | 10 ++--- .../spark/deploy/master/ApplicationInfo.scala | 6 +-- .../apache/spark/deploy/master/Master.scala | 45 +++++++------------ 3 files changed, 25 insertions(+), 36 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4205c13397f61..a92b413e80312 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1371,7 +1371,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** * Express a preference to the cluster manager for a given total number of executors. * This can result in canceling pending requests or filing additional requests. - * This is currently only supported in YARN mode. Return whether the request is received. + * @return whether the request is received. */ private[spark] override def requestTotalExecutors(numExecutors: Int): Boolean = { schedulerBackend match { @@ -1386,7 +1386,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** * :: DeveloperApi :: * Request an additional number of executors from the cluster manager. - * This is currently only supported in YARN mode. Return whether the request is received. + * @return whether the request is received. */ @DeveloperApi override def requestExecutors(numAdditionalExecutors: Int): Boolean = { @@ -1408,7 +1408,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * through this method with new ones, it should follow up explicitly with a call to * {{SparkContext#requestExecutors}}. * - * This is currently only supported in YARN mode. Return whether the request is received. + * @return whether the request is received. */ @DeveloperApi override def killExecutors(executorIds: Seq[String]): Boolean = { @@ -1430,7 +1430,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * through this method with a new one, it should follow up explicitly with a call to * {{SparkContext#requestExecutors}}. * - * This is currently only supported in YARN mode. Return whether the request is received. + * @return whether the request is received. */ @DeveloperApi override def killExecutor(executorId: String): Boolean = super.killExecutor(executorId) @@ -1447,7 +1447,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * can steal the window of opportunity and acquire this application's resources in the * mean time. * - * This is currently only supported in YARN mode. Return whether the request is received. + * @return whether the request is received. */ private[spark] def killAndReplaceExecutor(executorId: String): Boolean = { schedulerBackend match { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index 18525482f6314..8c53b26595975 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -43,9 +43,9 @@ private[spark] class ApplicationInfo( @transient var appSource: ApplicationSource = _ // A cap on the number of executors this application can have at any given time. - // By default, this is infinite. Only after the first allocation request is issued - // by the application will this be set to a finite value. - @transient var executorLimit: Int = _ + // By default, this is infinite. Only after the first allocation request is issued by the + // application will this be set to a finite value. This is used for dynamic allocation. + @transient private[master] var executorLimit: Int = _ @transient private var nextExecutorId: Int = _ diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index c1ec4880cd90e..643caa0f43954 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -587,8 +587,8 @@ private[master] class Master( assignedExecutors.sum + app.executors.size < app.executorLimit } - // Keep launching executors until no more workers can accommodate - // any more, or if we have reached this application's limits + // Keep launching executors until no more workers can accommodate any + // more executors, or if we have reached this application's limits var freeWorkers = (0 until numUsable).filter(canLaunchExecutor) while (freeWorkers.nonEmpty) { freeWorkers.foreach { pos => @@ -598,8 +598,8 @@ private[master] class Master( assignedCores(pos) += minCoresPerExecutor // If cores per executor is set, every iteration of this loop schedules one executor. - // Otherwise, we are simply assigning 1 core at a time, and each worker should have - // at most 1 executor. + // Otherwise, we are simply assigning 1 core at a time, and all of these cores belong + // to a single executor (one per worker). if (app.oneExecutorPerWorker()) { assignedExecutors(pos) += 1 } else { @@ -819,6 +819,11 @@ private[master] class Master( /** * Handle a request to set the target number of executors for this application. + * + * If the executor limit is adjusted upwards, new executors will be launched provided + * that there are workers with sufficient resources. If it is adjusted downwards, however, + * we do not kill existing executors until we explicitly receive a kill request. + * * @return whether the application has previously registered with this Master. */ private def handleRequestExecutors(appId: String, requestedTotal: Int): Boolean = { @@ -837,25 +842,9 @@ private[master] class Master( /** * Handle a kill request from the given application. * - * There are two distinct ways of handling kill requests. For applications that explicitly - * set `spark.executor.cores`, all executors have exactly N cores. In this mode, we can simply - * multiply the application's executor limit by N to determine a cap on the number of cores - * to assign to this application. - * - * The kill mechanism for applications that did not set `spark.executor.cores` is more complex. - * In this mode, each executor grabs all the available cores on the worker, so we cannot simply - * rely on the executor limit as the executors may not be uniform in the number of cores. - * Instead, we use a blacklisting mechanism to enforce kills. When an executor is killed, we - * blacklist its worker so that we do not immediately launch a new executor on the worker. - * A worker is removed from the blacklist only when a request to add executors is serviced. - * - * Note that in this case, we may not always want to blacklist the worker. For instance, if we - * previously requested 10 new executors but there is only room for 3, then we have 7 executors - * waiting to be scheduled once space frees up. In this case, after we kill an executor we do - * NOT add its worker to the blacklist because there is a prior request that we need to service. - * - * Note: this method assumes the executor limit has already been adjusted downwards through - * a separate [[RequestExecutors]] message. + * This method assumes the executor limit has already been adjusted downwards through + * a separate [[RequestExecutors]] message, such that we do not immediately launch new + * executors immediately after the old ones are removed. * * @return whether the application has previously registered with this Master. */ @@ -880,11 +869,11 @@ private[master] class Master( } /** - * Cast the given executor IDs to integers and filter out the ones that are not. + * Cast the given executor IDs to integers and filter out the ones that fail. * - * All executors IDs should be integers since we launched these executors. However, the - * kill interface on the driver side accepts strings, so we need to handle non-integer - * executor IDs just to be safe since the user can pass in whatever s/he wants. + * All executors IDs should be integers since we launched these executors. However, + * the kill interface on the driver side accepts arbitrary strings, so we need to + * handle non-integer executor IDs just to be safe. */ private def formatExecutorIds(executorIds: Seq[String]): Seq[Int] = { executorIds.flatMap { executorId => @@ -899,7 +888,7 @@ private[master] class Master( } /** - * Ask the Worker on which the specified executor is launched to kill the executor. + * Ask the worker on which the specified executor is launched to kill the executor. */ private def killExecutor(exec: ExecutorDesc): Unit = { exec.worker.removeExecutor(exec) From 6832bd7b8f0cf596ba4e76b47c6919d0a5a0ef4e Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sun, 26 Jul 2015 23:25:10 -0700 Subject: [PATCH 09/12] Add tests for scheduling with executor limit This commit also uncovers a bug where the limit is not applied correctly for cases where cores per executor is not defined. The symptom is that every executor launched under this limit has exactly 1 core. There is a regression test for this case now. --- .../spark/deploy/master/ApplicationInfo.scala | 2 +- .../apache/spark/deploy/master/Master.scala | 20 +- .../spark/deploy/master/MasterSuite.scala | 256 ++++++++++-------- 3 files changed, 161 insertions(+), 117 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index 8c53b26595975..f9367934e6e17 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -107,7 +107,7 @@ private[spark] class ApplicationInfo( * This is true if cores per executor is not defined, in which case the executor should * grab all the available cores on the worker instead. */ - private[master] def oneExecutorPerWorker(): Boolean = desc.coresPerExecutor.isDefined + private[master] def oneExecutorPerWorker(): Boolean = desc.coresPerExecutor.isEmpty private var _retryCount = 0 diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 643caa0f43954..a1c3a088969bb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -581,10 +581,19 @@ private[master] class Master( /** Return whether the specified worker can launch an executor for this app. */ def canLaunchExecutor(pos: Int): Boolean = { val assignedMemory = assignedExecutors(pos) * memoryPerExecutor + val underLimit = + if (app.oneExecutorPerWorker() && assignedExecutors(pos) == 1) { + // We only have one executor per worker and have already started to assign cores to it, + // so assigning more to it does not change the number of executors we'll end up with + true + } else { + // Otherwise, we should launch a new executor only if we do not exceed the limit + assignedExecutors.sum + app.executors.size < app.executorLimit + } usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor && usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor && coresToAssign >= minCoresPerExecutor && - assignedExecutors.sum + app.executors.size < app.executorLimit + underLimit } // Keep launching executors until no more workers can accommodate any @@ -597,13 +606,12 @@ private[master] class Master( coresToAssign -= minCoresPerExecutor assignedCores(pos) += minCoresPerExecutor - // If cores per executor is set, every iteration of this loop schedules one executor. - // Otherwise, we are simply assigning 1 core at a time, and all of these cores belong - // to a single executor (one per worker). + // If we are launching one executor per worker, then every iteration assigns 1 core + // to the executor. Otherwise, every iteration assigns cores to a new executor. if (app.oneExecutorPerWorker()) { - assignedExecutors(pos) += 1 - } else { assignedExecutors(pos) = 1 + } else { + assignedExecutors(pos) += 1 } // Spreading out an application means spreading out its executors across as diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 4d7016d1e594b..30780a0da7f8d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -120,7 +120,7 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva CustomRecoveryModeFactory.instantiationAttempts should be > instantiationAttempts } - test("Master & worker web ui available") { + test("master/worker web ui available") { implicit val formats = org.json4s.DefaultFormats val conf = new SparkConf() val localCluster = new LocalSparkCluster(2, 2, 512, conf) @@ -144,174 +144,202 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva } test("basic scheduling - spread out") { - testBasicScheduling(spreadOut = true) + basicScheduling(spreadOut = true) } test("basic scheduling - no spread out") { - testBasicScheduling(spreadOut = false) + basicScheduling(spreadOut = false) } test("scheduling with max cores - spread out") { - testSchedulingWithMaxCores(spreadOut = true) + schedulingWithMaxCores(spreadOut = true) } test("scheduling with max cores - no spread out") { - testSchedulingWithMaxCores(spreadOut = false) + schedulingWithMaxCores(spreadOut = false) } test("scheduling with cores per executor - spread out") { - testSchedulingWithCoresPerExecutor(spreadOut = true) + schedulingWithCoresPerExecutor(spreadOut = true) } test("scheduling with cores per executor - no spread out") { - testSchedulingWithCoresPerExecutor(spreadOut = false) + schedulingWithCoresPerExecutor(spreadOut = false) } test("scheduling with cores per executor AND max cores - spread out") { - testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = true) + schedulingWithCoresPerExecutorAndMaxCores(spreadOut = true) } test("scheduling with cores per executor AND max cores - no spread out") { - testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = false) + schedulingWithCoresPerExecutorAndMaxCores(spreadOut = false) } - private def testBasicScheduling(spreadOut: Boolean): Unit = { + test("scheduling with executor limit - spread out") { + schedulingWithExecutorLimit(spreadOut = true) + } + + test("scheduling with executor limit - no spread out") { + schedulingWithExecutorLimit(spreadOut = false) + } + + test("scheduling with executor limit AND max cores - spread out") { + schedulingWithExecutorLimitAndMaxCores(spreadOut = true) + } + + test("scheduling with executor limit AND max cores - no spread out") { + schedulingWithExecutorLimitAndMaxCores(spreadOut = false) + } + + test("scheduling with executor limit AND cores per executor - spread out") { + schedulingWithExecutorLimitAndCoresPerExecutor(spreadOut = true) + } + + test("scheduling with executor limit AND cores per executor - no spread out") { + schedulingWithExecutorLimitAndCoresPerExecutor(spreadOut = false) + } + + test("scheduling with executor limit AND cores per executor AND max cores - spread out") { + schedulingWithEverything(spreadOut = true) + } + + test("scheduling with executor limit AND cores per executor AND max cores - no spread out") { + schedulingWithEverything(spreadOut = false) + } + + private def basicScheduling(spreadOut: Boolean): Unit = { val master = makeMaster() val appInfo = makeAppInfo(1024) - val workerInfo = makeWorkerInfo(4096, 10) - val workerInfos = Array(workerInfo, workerInfo, workerInfo) - val scheduledCores = master.invokePrivate( - _scheduleExecutorsOnWorkers(appInfo, workerInfos, spreadOut)) - assert(scheduledCores.length === 3) - assert(scheduledCores(0) === 10) - assert(scheduledCores(1) === 10) - assert(scheduledCores(2) === 10) + val scheduledCores = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut) + assert(scheduledCores === Array(10, 10, 10)) } - private def testSchedulingWithMaxCores(spreadOut: Boolean): Unit = { + private def schedulingWithMaxCores(spreadOut: Boolean): Unit = { val master = makeMaster() val appInfo1 = makeAppInfo(1024, maxCores = Some(8)) val appInfo2 = makeAppInfo(1024, maxCores = Some(16)) - val workerInfo = makeWorkerInfo(4096, 10) - val workerInfos = Array(workerInfo, workerInfo, workerInfo) - var scheduledCores = master.invokePrivate( - _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut)) - assert(scheduledCores.length === 3) - // With spreading out, each worker should be assigned a few cores - if (spreadOut) { - assert(scheduledCores(0) === 3) - assert(scheduledCores(1) === 3) - assert(scheduledCores(2) === 2) - } else { - // Without spreading out, the cores should be concentrated on the first worker - assert(scheduledCores(0) === 8) - assert(scheduledCores(1) === 0) - assert(scheduledCores(2) === 0) - } - // Now test the same thing with max cores > cores per worker - scheduledCores = master.invokePrivate( - _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut)) - assert(scheduledCores.length === 3) + val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut) + val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut) if (spreadOut) { - assert(scheduledCores(0) === 6) - assert(scheduledCores(1) === 5) - assert(scheduledCores(2) === 5) + assert(scheduledCores1 === Array(3, 3, 2)) + assert(scheduledCores2 === Array(6, 5, 5)) } else { - // Without spreading out, the first worker should be fully booked, - // and the leftover cores should spill over to the second worker only. - assert(scheduledCores(0) === 10) - assert(scheduledCores(1) === 6) - assert(scheduledCores(2) === 0) + assert(scheduledCores1 === Array(8, 0, 0)) + assert(scheduledCores2 === Array(10, 6, 0)) } } - private def testSchedulingWithCoresPerExecutor(spreadOut: Boolean): Unit = { + private def schedulingWithCoresPerExecutor(spreadOut: Boolean): Unit = { val master = makeMaster() val appInfo1 = makeAppInfo(1024, coresPerExecutor = Some(2)) val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2)) val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3)) - val workerInfo = makeWorkerInfo(4096, 10) - val workerInfos = Array(workerInfo, workerInfo, workerInfo) - // Each worker should end up with 4 executors with 2 cores each - // This should be 4 because of the memory restriction on each worker - var scheduledCores = master.invokePrivate( - _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut)) - assert(scheduledCores.length === 3) - assert(scheduledCores(0) === 8) - assert(scheduledCores(1) === 8) - assert(scheduledCores(2) === 8) - // Now test the same thing without running into the worker memory limit - // Each worker should now end up with 5 executors with 2 cores each - scheduledCores = master.invokePrivate( - _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut)) - assert(scheduledCores.length === 3) - assert(scheduledCores(0) === 10) - assert(scheduledCores(1) === 10) - assert(scheduledCores(2) === 10) - // Now test the same thing with a cores per executor that 10 is not divisible by - scheduledCores = master.invokePrivate( - _scheduleExecutorsOnWorkers(appInfo3, workerInfos, spreadOut)) - assert(scheduledCores.length === 3) - assert(scheduledCores(0) === 9) - assert(scheduledCores(1) === 9) - assert(scheduledCores(2) === 9) + val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut) + val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut) + val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo3, workerInfos, spreadOut) + assert(scheduledCores1 === Array(8, 8, 8)) // 4 * 2 because of memory limits + assert(scheduledCores2 === Array(10, 10, 10)) // 5 * 2 + assert(scheduledCores3 === Array(9, 9, 9)) // 3 * 3 } // Sorry for the long method name! - private def testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut: Boolean): Unit = { + private def schedulingWithCoresPerExecutorAndMaxCores(spreadOut: Boolean): Unit = { val master = makeMaster() val appInfo1 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = Some(4)) val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = Some(20)) val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3), maxCores = Some(20)) - val workerInfo = makeWorkerInfo(4096, 10) - val workerInfos = Array(workerInfo, workerInfo, workerInfo) - // We should only launch two executors, each with exactly 2 cores - var scheduledCores = master.invokePrivate( - _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut)) - assert(scheduledCores.length === 3) + val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut) + val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut) + val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo3, workerInfos, spreadOut) + if (spreadOut) { + assert(scheduledCores1 === Array(2, 2, 0)) + assert(scheduledCores2 === Array(8, 6, 6)) + assert(scheduledCores3 === Array(6, 6, 6)) + } else { + assert(scheduledCores1 === Array(4, 0, 0)) + assert(scheduledCores2 === Array(10, 10, 0)) + assert(scheduledCores3 === Array(9, 9, 0)) + } + } + + private def schedulingWithExecutorLimit(spreadOut: Boolean): Unit = { + val master = makeMaster() + val appInfo = makeAppInfo(256) + appInfo.executorLimit = 0 + val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut) + appInfo.executorLimit = 2 + val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut) + appInfo.executorLimit = 5 + val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut) + assert(scheduledCores1 === Array(0, 0, 0)) + assert(scheduledCores2 === Array(10, 10, 0)) + assert(scheduledCores3 === Array(10, 10, 10)) + } + + private def schedulingWithExecutorLimitAndMaxCores(spreadOut: Boolean): Unit = { + val master = makeMaster() + val appInfo = makeAppInfo(256, maxCores = Some(16)) + appInfo.executorLimit = 0 + val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut) + appInfo.executorLimit = 2 + val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut) + appInfo.executorLimit = 5 + val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut) + assert(scheduledCores1 === Array(0, 0, 0)) if (spreadOut) { - assert(scheduledCores(0) === 2) - assert(scheduledCores(1) === 2) - assert(scheduledCores(2) === 0) + assert(scheduledCores2 === Array(8, 8, 0)) + assert(scheduledCores3 === Array(6, 5, 5)) } else { - assert(scheduledCores(0) === 4) - assert(scheduledCores(1) === 0) - assert(scheduledCores(2) === 0) + assert(scheduledCores2 === Array(10, 6, 0)) + assert(scheduledCores3 === Array(10, 6, 0)) } - // Test max cores > number of cores per worker - scheduledCores = master.invokePrivate( - _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut)) - assert(scheduledCores.length === 3) + } + + private def schedulingWithExecutorLimitAndCoresPerExecutor(spreadOut: Boolean): Unit = { + val master = makeMaster() + val appInfo = makeAppInfo(256, coresPerExecutor = Some(4)) + appInfo.executorLimit = 0 + val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut) + appInfo.executorLimit = 2 + val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut) + appInfo.executorLimit = 5 + val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut) + assert(scheduledCores1 === Array(0, 0, 0)) if (spreadOut) { - assert(scheduledCores(0) === 8) - assert(scheduledCores(1) === 6) - assert(scheduledCores(2) === 6) + assert(scheduledCores2 === Array(4, 4, 0)) } else { - assert(scheduledCores(0) === 10) - assert(scheduledCores(1) === 10) - assert(scheduledCores(2) === 0) + assert(scheduledCores2 === Array(8, 0, 0)) } - // Test max cores > number of cores per worker AND - // a cores per executor that is 10 is not divisible by - scheduledCores = master.invokePrivate( - _scheduleExecutorsOnWorkers(appInfo3, workerInfos, spreadOut)) - assert(scheduledCores.length === 3) + assert(scheduledCores3 === Array(8, 8, 4)) + } + + // Everything being: executor limit + cores per executor + max cores + private def schedulingWithEverything(spreadOut: Boolean): Unit = { + val master = makeMaster() + val appInfo = makeAppInfo(256, coresPerExecutor = Some(4), maxCores = Some(18)) + appInfo.executorLimit = 0 + val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut) + appInfo.executorLimit = 2 + val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut) + appInfo.executorLimit = 5 + val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut) + assert(scheduledCores1 === Array(0, 0, 0)) if (spreadOut) { - assert(scheduledCores(0) === 6) - assert(scheduledCores(1) === 6) - assert(scheduledCores(2) === 6) + assert(scheduledCores2 === Array(4, 4, 0)) + assert(scheduledCores3 === Array(8, 4, 4)) } else { - assert(scheduledCores(0) === 9) - assert(scheduledCores(1) === 9) - assert(scheduledCores(2) === 0) + assert(scheduledCores2 === Array(8, 0, 0)) + assert(scheduledCores3 === Array(8, 8, 0)) } } - // =============================== - // | Utility methods for testing | - // =============================== + // ========================================== + // | Utility methods and fields for testing | + // ========================================== private val _scheduleExecutorsOnWorkers = PrivateMethod[Array[Int]]('scheduleExecutorsOnWorkers) + private val workerInfo = makeWorkerInfo(4096, 10) + private val workerInfos = Array(workerInfo, workerInfo, workerInfo) private def makeMaster(conf: SparkConf = new SparkConf): Master = { val securityMgr = new SecurityManager(conf) @@ -335,4 +363,12 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva new WorkerInfo(workerId, "host", 100, cores, memoryMb, null, 101, "address") } + private def scheduleExecutorsOnWorkers( + master: Master, + appInfo: ApplicationInfo, + workerInfos: Array[WorkerInfo], + spreadOut: Boolean): Array[Int] = { + master.invokePrivate(_scheduleExecutorsOnWorkers(appInfo, workerInfos, spreadOut)) + } + } From c0a2c02b3f9a98fd15efe10787358ff2fcaade1e Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 27 Jul 2015 18:28:51 -0700 Subject: [PATCH 10/12] Fix build after merge conflict --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index bce69b35fed4a..5186fa128646b 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1387,8 +1387,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli localityAwareTasks: Int, hostToLocalTaskCount: scala.collection.immutable.Map[String, Int] ): Boolean = { - assert(supportDynamicAllocation, - "Requesting executors is currently only supported in YARN and Mesos modes") schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.requestTotalExecutors(numExecutors, localityAwareTasks, hostToLocalTaskCount) From accc8f6f2129272626a276139a8452973fddba0f Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 31 Jul 2015 14:24:20 -0700 Subject: [PATCH 11/12] Address comments --- .../spark/deploy/master/ApplicationInfo.scala | 8 ------- .../apache/spark/deploy/master/Master.scala | 21 ++++++++++--------- 2 files changed, 11 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index f9367934e6e17..04f80be25e40a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -101,14 +101,6 @@ private[spark] class ApplicationInfo( private[master] def coresLeft: Int = requestedCores - coresGranted - /** - * Return whether this application should launch at most one executor per worker. - * - * This is true if cores per executor is not defined, in which case the executor should - * grab all the available cores on the worker instead. - */ - private[master] def oneExecutorPerWorker(): Boolean = desc.coresPerExecutor.isEmpty - private var _retryCount = 0 private[master] def retryCount = _retryCount diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index a1c3a088969bb..1d6ee83363ff9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -572,6 +572,7 @@ private[master] class Master( spreadOutApps: Boolean): Array[Int] = { val coresPerExecutor = app.desc.coresPerExecutor val minCoresPerExecutor = coresPerExecutor.getOrElse(1) + val oneExecutorPerWorker = coresPerExecutor.isEmpty val memoryPerExecutor = app.desc.memoryPerExecutorMB val numUsable = usableWorkers.length val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker @@ -580,16 +581,16 @@ private[master] class Master( /** Return whether the specified worker can launch an executor for this app. */ def canLaunchExecutor(pos: Int): Boolean = { - val assignedMemory = assignedExecutors(pos) * memoryPerExecutor + // If we allow multiple executors per worker, then we can always launch new executors. + // Otherwise, we may have already started assigning cores to the executor on this worker. + val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0 val underLimit = - if (app.oneExecutorPerWorker() && assignedExecutors(pos) == 1) { - // We only have one executor per worker and have already started to assign cores to it, - // so assigning more to it does not change the number of executors we'll end up with - true - } else { - // Otherwise, we should launch a new executor only if we do not exceed the limit + if (launchingNewExecutor) { assignedExecutors.sum + app.executors.size < app.executorLimit + } else { + true } + val assignedMemory = assignedExecutors(pos) * memoryPerExecutor usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor && usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor && coresToAssign >= minCoresPerExecutor && @@ -608,7 +609,7 @@ private[master] class Master( // If we are launching one executor per worker, then every iteration assigns 1 core // to the executor. Otherwise, every iteration assigns cores to a new executor. - if (app.oneExecutorPerWorker()) { + if (oneExecutorPerWorker) { assignedExecutors(pos) = 1 } else { assignedExecutors(pos) += 1 @@ -851,8 +852,8 @@ private[master] class Master( * Handle a kill request from the given application. * * This method assumes the executor limit has already been adjusted downwards through - * a separate [[RequestExecutors]] message, such that we do not immediately launch new - * executors immediately after the old ones are removed. + * a separate [[RequestExecutors]] message, such that we do not launch new executors + * immediately after the old ones are removed. * * @return whether the application has previously registered with this Master. */ From 879e928c1f7b01f4d542b1ebf7f990eee01d8fe7 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sat, 1 Aug 2015 00:27:34 -0700 Subject: [PATCH 12/12] Add end-to-end tests for standalone dynamic allocation This commit is surprisingly difficult because of the complex interactions between various components in the cluster. Some modifications to the code were necessary to ensure a strict ordering of events that the test can assume. This test uncovered one race condition! If we send an add request immediately after a kill request, we may not get an executor back because we never released the cores in the ApplicationInfo. Only until after the executor is actually killed by the worker do we release these cores. This is now fixed with a regression test. --- .../spark/deploy/master/ApplicationInfo.scala | 6 + .../apache/spark/deploy/master/Master.scala | 6 +- .../apache/spark/deploy/worker/Worker.scala | 12 +- .../CoarseGrainedSchedulerBackend.scala | 3 +- .../StandaloneDynamicAllocationSuite.scala | 363 ++++++++++++++++++ 5 files changed, 382 insertions(+), 8 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index 04f80be25e40a..b40d20f9f7868 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -121,6 +121,12 @@ private[spark] class ApplicationInfo( state != ApplicationState.WAITING && state != ApplicationState.RUNNING } + /** + * Return the limit on the number of executors this application can have. + * For testing only. + */ + private[deploy] def getExecutorLimit: Int = executorLimit + def duration: Long = { if (endTime != -1) { endTime - startTime diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 1d6ee83363ff9..e38e437fe1c5a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -45,7 +45,7 @@ import org.apache.spark.serializer.{JavaSerializer, Serializer} import org.apache.spark.ui.SparkUI import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils} -private[master] class Master( +private[deploy] class Master( override val rpcEnv: RpcEnv, address: RpcAddress, webUiPort: Int, @@ -863,7 +863,9 @@ private[master] class Master( logInfo(s"Application $appId requests to kill executors: " + executorIds.mkString(", ")) val (known, unknown) = executorIds.partition(appInfo.executors.contains) known.foreach { executorId => - appInfo.executors.remove(executorId).foreach(killExecutor) + val desc = appInfo.executors(executorId) + appInfo.removeExecutor(desc) + killExecutor(desc) } if (unknown.nonEmpty) { logWarning(s"Application $appId attempted to kill non-existent executors: " diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index b9555ef28dad0..c82a7ccab54dc 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -40,7 +40,7 @@ import org.apache.spark.metrics.MetricsSystem import org.apache.spark.rpc._ import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils} -private[worker] class Worker( +private[deploy] class Worker( override val rpcEnv: RpcEnv, webUiPort: Int, cores: Int, @@ -661,6 +661,9 @@ private[worker] class Worker( } private[deploy] object Worker extends Logging { + val SYSTEM_NAME = "sparkWorker" + val ENDPOINT_NAME = "Worker" + def main(argStrings: Array[String]) { SignalLogger.register(log) val conf = new SparkConf @@ -682,13 +685,12 @@ private[deploy] object Worker extends Logging { conf: SparkConf = new SparkConf): RpcEnv = { // The LocalSparkCluster runs multiple local sparkWorkerX actor systems - val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("") - val actorName = "Worker" + val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("") val securityMgr = new SecurityManager(conf) val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr) val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_)) - rpcEnv.setupEndpoint(actorName, new Worker(rpcEnv, webUiPort, cores, memory, masterAddresses, - systemName, actorName, workDir, conf, securityMgr)) + rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory, + masterAddresses, systemName, ENDPOINT_NAME, workDir, conf, securityMgr)) rpcEnv } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 44e8b299e8ddd..6acf8a9a5e9b4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -134,7 +134,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) } else { logInfo("Registered executor: " + executorRef + " with ID " + executorId) - context.reply(RegisteredExecutor) addressToExecutorId(executorRef.address) = executorId totalCoreCount.addAndGet(cores) totalRegisteredExecutors.addAndGet(1) @@ -149,6 +148,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp logDebug(s"Decremented number of pending executors ($numPendingExecutors left)") } } + // Note: some tests expect the reply to come after we put the executor in the map + context.reply(RegisteredExecutor) listenerBus.post( SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data)) makeOffers() diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala new file mode 100644 index 0000000000000..08c41a897a861 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import org.mockito.Mockito.{mock, when} +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark._ +import org.apache.spark.deploy.master.Master +import org.apache.spark.deploy.worker.Worker +import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv} +import org.apache.spark.scheduler.cluster._ +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor + +/** + * End-to-end tests for dynamic allocation in standalone mode. + */ +class StandaloneDynamicAllocationSuite + extends SparkFunSuite + with LocalSparkContext + with BeforeAndAfterAll { + + private val numWorkers = 2 + private val conf = new SparkConf() + private val securityManager = new SecurityManager(conf) + + private var masterRpcEnv: RpcEnv = null + private var workerRpcEnvs: Seq[RpcEnv] = null + private var master: Master = null + private var workers: Seq[Worker] = null + + /** + * Start the local cluster. + * Note: local-cluster mode is insufficient because we want a reference to the Master. + */ + override def beforeAll(): Unit = { + super.beforeAll() + masterRpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 0, conf, securityManager) + workerRpcEnvs = (0 until numWorkers).map { i => + RpcEnv.create(Worker.SYSTEM_NAME + i, "localhost", 0, conf, securityManager) + } + master = makeMaster() + workers = makeWorkers(10, 2048) + } + + override def afterAll(): Unit = { + masterRpcEnv.shutdown() + workerRpcEnvs.foreach(_.shutdown()) + master.stop() + workers.foreach(_.stop()) + masterRpcEnv = null + workerRpcEnvs = null + master = null + workers = null + super.afterAll() + } + + test("dynamic allocation default behavior") { + sc = new SparkContext(appConf) + val appId = sc.applicationId + assert(master.apps.size === 1) + assert(master.apps.head.id === appId) + assert(master.apps.head.executors.size === 2) + assert(master.apps.head.getExecutorLimit === Int.MaxValue) + // kill all executors + assert(killAllExecutors(sc)) + assert(master.apps.head.executors.size === 0) + assert(master.apps.head.getExecutorLimit === 0) + // request 1 + assert(sc.requestExecutors(1)) + assert(master.apps.head.executors.size === 1) + assert(master.apps.head.getExecutorLimit === 1) + // request 1 more + assert(sc.requestExecutors(1)) + assert(master.apps.head.executors.size === 2) + assert(master.apps.head.getExecutorLimit === 2) + // request 1 more; this one won't go through + assert(sc.requestExecutors(1)) + assert(master.apps.head.executors.size === 2) + assert(master.apps.head.getExecutorLimit === 3) + // kill all existing executors; we should end up with 3 - 2 = 1 executor + assert(killAllExecutors(sc)) + assert(master.apps.head.executors.size === 1) + assert(master.apps.head.getExecutorLimit === 1) + // kill all executors again; this time we'll have 1 - 1 = 0 executors left + assert(killAllExecutors(sc)) + assert(master.apps.head.executors.size === 0) + assert(master.apps.head.getExecutorLimit === 0) + // request many more; this increases the limit well beyond the cluster capacity + assert(sc.requestExecutors(1000)) + assert(master.apps.head.executors.size === 2) + assert(master.apps.head.getExecutorLimit === 1000) + } + + test("dynamic allocation with max cores <= cores per worker") { + sc = new SparkContext(appConf.set("spark.cores.max", "8")) + val appId = sc.applicationId + assert(master.apps.size === 1) + assert(master.apps.head.id === appId) + assert(master.apps.head.executors.size === 2) + assert(master.apps.head.executors.values.map(_.cores).toArray === Array(4, 4)) + assert(master.apps.head.getExecutorLimit === Int.MaxValue) + // kill all executors + assert(killAllExecutors(sc)) + assert(master.apps.head.executors.size === 0) + assert(master.apps.head.getExecutorLimit === 0) + // request 1 + assert(sc.requestExecutors(1)) + assert(master.apps.head.executors.size === 1) + assert(master.apps.head.executors.values.head.cores === 8) + assert(master.apps.head.getExecutorLimit === 1) + // request 1 more; this one won't go through because we're already at max cores. + // This highlights a limitation of using dynamic allocation with max cores WITHOUT + // setting cores per executor: once an application scales down and then scales back + // up, its executors may not be spread out anymore! + assert(sc.requestExecutors(1)) + assert(master.apps.head.executors.size === 1) + assert(master.apps.head.getExecutorLimit === 2) + // request 1 more; this one also won't go through for the same reason + assert(sc.requestExecutors(1)) + assert(master.apps.head.executors.size === 1) + assert(master.apps.head.getExecutorLimit === 3) + // kill all existing executors; we should end up with 3 - 1 = 2 executor + // Note: we scheduled these executors together, so their cores should be evenly distributed + assert(killAllExecutors(sc)) + assert(master.apps.head.executors.size === 2) + assert(master.apps.head.executors.values.map(_.cores).toArray === Array(4, 4)) + assert(master.apps.head.getExecutorLimit === 2) + // kill all executors again; this time we'll have 1 - 1 = 0 executors left + assert(killAllExecutors(sc)) + assert(master.apps.head.executors.size === 0) + assert(master.apps.head.getExecutorLimit === 0) + // request many more; this increases the limit well beyond the cluster capacity + assert(sc.requestExecutors(1000)) + assert(master.apps.head.executors.size === 2) + assert(master.apps.head.executors.values.map(_.cores).toArray === Array(4, 4)) + assert(master.apps.head.getExecutorLimit === 1000) + } + + test("dynamic allocation with max cores > cores per worker") { + sc = new SparkContext(appConf.set("spark.cores.max", "16")) + val appId = sc.applicationId + assert(master.apps.size === 1) + assert(master.apps.head.id === appId) + assert(master.apps.head.executors.size === 2) + assert(master.apps.head.executors.values.map(_.cores).toArray === Array(8, 8)) + assert(master.apps.head.getExecutorLimit === Int.MaxValue) + // kill all executors + assert(killAllExecutors(sc)) + assert(master.apps.head.executors.size === 0) + assert(master.apps.head.getExecutorLimit === 0) + // request 1 + assert(sc.requestExecutors(1)) + assert(master.apps.head.executors.size === 1) + assert(master.apps.head.executors.values.head.cores === 10) + assert(master.apps.head.getExecutorLimit === 1) + // request 1 more + // Note: the cores are not evenly distributed because we scheduled these executors 1 by 1 + assert(sc.requestExecutors(1)) + assert(master.apps.head.executors.size === 2) + assert(master.apps.head.executors.values.map(_.cores).toSet === Set(10, 6)) + assert(master.apps.head.getExecutorLimit === 2) + // request 1 more; this one won't go through + assert(sc.requestExecutors(1)) + assert(master.apps.head.executors.size === 2) + assert(master.apps.head.getExecutorLimit === 3) + // kill all existing executors; we should end up with 3 - 2 = 1 executor + assert(killAllExecutors(sc)) + assert(master.apps.head.executors.size === 1) + assert(master.apps.head.executors.values.head.cores === 10) + assert(master.apps.head.getExecutorLimit === 1) + // kill all executors again; this time we'll have 1 - 1 = 0 executors left + assert(killAllExecutors(sc)) + assert(master.apps.head.executors.size === 0) + assert(master.apps.head.getExecutorLimit === 0) + // request many more; this increases the limit well beyond the cluster capacity + assert(sc.requestExecutors(1000)) + assert(master.apps.head.executors.size === 2) + assert(master.apps.head.executors.values.map(_.cores).toArray === Array(8, 8)) + assert(master.apps.head.getExecutorLimit === 1000) + } + + test("dynamic allocation with cores per executor") { + sc = new SparkContext(appConf.set("spark.executor.cores", "2")) + val appId = sc.applicationId + assert(master.apps.size === 1) + assert(master.apps.head.id === appId) + assert(master.apps.head.executors.size === 10) // 20 cores total + assert(master.apps.head.getExecutorLimit === Int.MaxValue) + // kill all executors + assert(killAllExecutors(sc)) + assert(master.apps.head.executors.size === 0) + assert(master.apps.head.getExecutorLimit === 0) + // request 1 + assert(sc.requestExecutors(1)) + assert(master.apps.head.executors.size === 1) + assert(master.apps.head.getExecutorLimit === 1) + // request 3 more + assert(sc.requestExecutors(3)) + assert(master.apps.head.executors.size === 4) + assert(master.apps.head.getExecutorLimit === 4) + // request 10 more; only 6 will go through + assert(sc.requestExecutors(10)) + assert(master.apps.head.executors.size === 10) + assert(master.apps.head.getExecutorLimit === 14) + // kill 2 executors; we should get 2 back immediately + assert(killNExecutors(sc, 2)) + assert(master.apps.head.executors.size === 10) + assert(master.apps.head.getExecutorLimit === 12) + // kill 4 executors; we should end up with 12 - 4 = 8 executors + assert(killNExecutors(sc, 4)) + assert(master.apps.head.executors.size === 8) + assert(master.apps.head.getExecutorLimit === 8) + // kill all executors; this time we'll have 8 - 8 = 0 executors left + assert(killAllExecutors(sc)) + assert(master.apps.head.executors.size === 0) + assert(master.apps.head.getExecutorLimit === 0) + // request many more; this increases the limit well beyond the cluster capacity + assert(sc.requestExecutors(1000)) + assert(master.apps.head.executors.size === 10) + assert(master.apps.head.getExecutorLimit === 1000) + } + + test("dynamic allocation with cores per executor AND max cores") { + sc = new SparkContext(appConf + .set("spark.executor.cores", "2") + .set("spark.cores.max", "8")) + val appId = sc.applicationId + assert(master.apps.size === 1) + assert(master.apps.head.id === appId) + assert(master.apps.head.executors.size === 4) // 8 cores total + assert(master.apps.head.getExecutorLimit === Int.MaxValue) + // kill all executors + assert(killAllExecutors(sc)) + assert(master.apps.head.executors.size === 0) + assert(master.apps.head.getExecutorLimit === 0) + // request 1 + assert(sc.requestExecutors(1)) + assert(master.apps.head.executors.size === 1) + assert(master.apps.head.getExecutorLimit === 1) + // request 3 more + assert(sc.requestExecutors(3)) + assert(master.apps.head.executors.size === 4) + assert(master.apps.head.getExecutorLimit === 4) + // request 10 more; none will go through + assert(sc.requestExecutors(10)) + assert(master.apps.head.executors.size === 4) + assert(master.apps.head.getExecutorLimit === 14) + // kill all executors; 4 executors will be launched immediately + assert(killAllExecutors(sc)) + assert(master.apps.head.executors.size === 4) + assert(master.apps.head.getExecutorLimit === 10) + // ... and again + assert(killAllExecutors(sc)) + assert(master.apps.head.executors.size === 4) + assert(master.apps.head.getExecutorLimit === 6) + // ... and again; now we end up with 6 - 4 = 2 executors left + assert(killAllExecutors(sc)) + assert(master.apps.head.executors.size === 2) + assert(master.apps.head.getExecutorLimit === 2) + // ... and again; this time we have 2 - 2 = 0 executors left + assert(killAllExecutors(sc)) + assert(master.apps.head.executors.size === 0) + assert(master.apps.head.getExecutorLimit === 0) + // request many more; this increases the limit well beyond the cluster capacity + assert(sc.requestExecutors(1000)) + assert(master.apps.head.executors.size === 4) + assert(master.apps.head.getExecutorLimit === 1000) + } + + // =============================== + // | Utility methods for testing | + // =============================== + + /** Return a SparkConf for applications that want to talk to our Master. */ + private def appConf: SparkConf = { + new SparkConf() + .setMaster(masterRpcEnv.address.toSparkURL) + .setAppName("test") + .set("spark.executor.memory", "256m") + } + + /** Make a master to which our application will send executor requests. */ + private def makeMaster(): Master = { + val master = new Master(masterRpcEnv, masterRpcEnv.address, 0, securityManager, conf) + masterRpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master) + master + } + + /** Make a few workers that talk to our master. */ + private def makeWorkers(cores: Int, memory: Int): Seq[Worker] = { + (0 until numWorkers).map { i => + val rpcEnv = workerRpcEnvs(i) + val worker = new Worker(rpcEnv, 0, cores, memory, Array(masterRpcEnv.address), + Worker.SYSTEM_NAME + i, Worker.ENDPOINT_NAME, null, conf, securityManager) + rpcEnv.setupEndpoint(Worker.ENDPOINT_NAME, worker) + worker + } + } + + /** Kill all executors belonging to this application. */ + private def killAllExecutors(sc: SparkContext): Boolean = { + killNExecutors(sc, Int.MaxValue) + } + + /** Kill N executors belonging to this application. */ + private def killNExecutors(sc: SparkContext, n: Int): Boolean = { + syncExecutors(sc) + sc.killExecutors(getExecutorIds(sc).take(n)) + } + + /** + * Return a list of executor IDs belonging to this application. + * + * Note that we must use the executor IDs according to the Master, which has the most + * updated view. We cannot rely on the executor IDs according to the driver because we + * don't wait for executors to register. Otherwise the tests will take much longer to run. + */ + private def getExecutorIds(sc: SparkContext): Seq[String] = { + assert(master.idToApp.contains(sc.applicationId)) + master.idToApp(sc.applicationId).executors.keys.map(_.toString).toSeq + } + + /** + * Sync executor IDs between the driver and the Master. + * + * This allows us to avoid waiting for new executors to register with the driver before + * we submit a request to kill them. This must be called before each kill request. + */ + private def syncExecutors(sc: SparkContext): Unit = { + val driverExecutors = sc.getExecutorStorageStatus + .map(_.blockManagerId.executorId) + .filter { _ != SparkContext.DRIVER_IDENTIFIER} + val masterExecutors = getExecutorIds(sc) + val missingExecutors = masterExecutors.toSet.diff(driverExecutors.toSet).toSeq.sorted + missingExecutors.foreach { id => + // Fake an executor registration so the driver knows about us + val port = System.currentTimeMillis % 65536 + val endpointRef = mock(classOf[RpcEndpointRef]) + val mockAddress = mock(classOf[RpcAddress]) + when(endpointRef.address).thenReturn(mockAddress) + val message = RegisterExecutor(id, endpointRef, s"localhost:$port", 10, Map.empty) + val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend] + backend.driverEndpoint.askWithRetry[CoarseGrainedClusterMessage](message) + } + } + +}