From fb22ffcf0bbe34a16527d274564894fdc2be8807 Mon Sep 17 00:00:00 2001 From: Wenbo Zhao Date: Sat, 20 May 2017 21:09:54 -0400 Subject: [PATCH] Support manual executor allocation --- core/pom.xml | 4 +- .../CoarseCookSchedulerBackend.scala | 93 +++++------ .../CookSchedulerConfiguration.scala | 145 ++++++++++++++++++ 3 files changed, 197 insertions(+), 45 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/scheduler/CookSchedulerConfiguration.scala diff --git a/core/pom.xml b/core/pom.xml index be8d9137e2a42..c7d1ccd02770a 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -35,9 +35,9 @@ http://spark.apache.org/ - com.twosigma + twosigma cook-jobclient - 0.1.0 + 0.1.2-SNAPSHOT org.apache.avro diff --git a/core/src/main/scala/org/apache/spark/scheduler/CoarseCookSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/CoarseCookSchedulerBackend.scala index b612105f4f3ca..4e6a12ba920e2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/CoarseCookSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/CoarseCookSchedulerBackend.scala @@ -63,8 +63,6 @@ object CoarseCookSchedulerBackend { } } - - /** * A SchedulerBackend that runs tasks using Cook, using "coarse-grained" tasks, where it holds * onto Cook instances for the duration of the Spark job instead of relinquishing cores whenever @@ -79,27 +77,11 @@ class CoarseCookSchedulerBackend( cookPort: Int) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) with Logging { - val maxCores = conf.getInt("spark.cores.max", 0) - val maxCoresPerJob = conf.getInt("spark.executor.cores", 1) - val priority = conf.getInt("spark.cook.priority", 75) - val jobNamePrefix = conf.get("spark.cook.job.name.prefix", "sparkjob") - val maxFailures = conf.getInt("spark.executor.failures", 5) - val dynamicAllocationEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false) - - if (conf.contains("spark.cores.max") && dynamicAllocationEnabled) { - logWarning("spark.cores.max is ignored when dynamic allocation is enabled. Use spark.dynamicAllocation.maxExecutors instead") - } - - def currentInstancesToRequest: Int = (executorsToRequest - totalInstancesRequested) - var executorsToRequest: Int = if (dynamicAllocationEnabled) { - conf.getInt("spark.dynamicAllocation.minExecutors", 0) - } else { - maxCores / maxCoresPerJob - } - var totalInstancesRequested = 0 - var totalFailures = 0 - val jobIds = mutable.Set[UUID]() - val abortedJobIds = mutable.Set[UUID]() + private[this] val schedulerConf = CookSchedulerConfiguration.conf(conf) + private[this] var executorsRequested = 0 + private[this] var totalFailures = 0 + private[this] val jobIds = mutable.Set[UUID]() + private[this] val abortedJobIds = mutable.Set[UUID]() private[this] val jobClient = new JobClient.Builder() .setHost(cookHost) @@ -117,7 +99,7 @@ class CoarseCookSchedulerBackend( val isAborted = abortedJobIds.contains(job.getUUID) if (isCompleted) { - totalInstancesRequested -= 1 + executorsRequested -= 1 abortedJobIds -= job.getUUID jobIds -= job.getUUID @@ -127,18 +109,21 @@ class CoarseCookSchedulerBackend( if (!job.isSuccess && !isAborted) { totalFailures += 1 - logWarning(s"Job ${job.getUUID} has died. Failure ($totalFailures/$maxFailures)") + logWarning(s"Job ${job.getUUID} has died. " + + s"Failure ($totalFailures/$schedulerConf.getMaximumExecutorFailures)") jobIds -= job.getUUID - if (totalFailures >= maxFailures) { + if (totalFailures >= schedulerConf.getMaximumExecutorFailures) { // TODO should we abort the outstanding tasks now? - logError(s"We have exceeded our maximum failures ($maxFailures)" + + logError(s"We have exceeded our maximum failures " + + s"($schedulerConf.getMaximumExecutorFailures)" + "and will not relaunch any more tasks") } } } } } - def executorUUIDWriter: UUID => Unit = + + private def executorUUIDWriter: UUID => Unit = conf.getOption("spark.cook.executoruuid.log").fold { _: UUID => () } { _file => def file(ct: Int) = s"${_file}.$ct" def path(ct: Int) = Paths.get(file(ct)) @@ -167,13 +152,13 @@ class CoarseCookSchedulerBackend( } } - val sparkMesosScheduler = + private[this] val sparkMesosScheduler = new CoarseMesosSchedulerBackend(scheduler, sc, "", sc.env.securityManager) override def applicationId(): String = conf.get("spark.cook.applicationId", super.applicationId()) override def applicationAttemptId(): Option[String] = Some(applicationId()) - def createJob(numCores: Double): Job = { + private def createJob(numCores: Double): Job = { import CoarseCookSchedulerBackend.fetchUri val jobId = UUID.randomUUID() @@ -272,11 +257,11 @@ class CoarseCookSchedulerBackend( val builder = new Job.Builder() .setUUID(jobId) - .setName(jobNamePrefix) + .setName(schedulerConf.getPrefixOfCookJobName) .setCommand(cmds.mkString("; ")) .setMemory(sparkMesosScheduler.calculateTotalMemory(sc).toDouble) .setCpus(numCores) - .setPriority(priority) + .setPriority(schedulerConf.getPriorityPerCookJob) val container = conf.get("spark.executor.cook.container", null) if(container != null) { @@ -288,7 +273,8 @@ class CoarseCookSchedulerBackend( builder.build() } - private[this] val minExecutorsNecessary = currentInstancesToRequest * minRegisteredRatio + private[this] val minExecutorsNecessary = + schedulerConf.getExecutorsToRequest(0) * minRegisteredRatio override def sufficientResourcesRegistered(): Boolean = totalRegisteredExecutors.get >= minExecutorsNecessary @@ -307,7 +293,7 @@ class CoarseCookSchedulerBackend( ret } - // In our fake offer mesos adds some autoincrementing ID per job but + // In our fake offer mesos adds some auto-increasing ID per job but // this sticks around in the executorId so we strop it out to get the actual executor ID private def instanceIdFromExecutorId(executorId: String): UUID = { UUID.fromString(executorId.split('/')(0)) @@ -369,8 +355,8 @@ class CoarseCookSchedulerBackend( override def doRequestTotalExecutors(requestedTotal: Int): Boolean = { logInfo(s"Setting total amount of executors to request to $requestedTotal") - executorsToRequest = requestedTotal - requestRemainingInstances() + schedulerConf.setMaximumCores(requestedTotal) + requestExecutorsIfNecessary() true } @@ -384,21 +370,39 @@ class CoarseCookSchedulerBackend( @annotation.tailrec def loop(instancesRemaining: Double, jobs: List[Job]): List[Job] = if (instancesRemaining <= 0) jobs - else loop(instancesRemaining - 1, createJob(maxCoresPerJob) :: jobs) - loop(currentInstancesToRequest, Nil).reverse + else loop(instancesRemaining - 1, createJob(schedulerConf.getCoresPerCookJob) :: jobs) + + loop(schedulerConf.getExecutorsToRequest(executorsRequested), Nil).reverse } /** - * Request cores from Cook via cook jobs. + * Kill the extra executors if necessary. */ - private[this] def requestRemainingInstances(): Unit = { + private[this] def killExecutorsIfNecessary(): Unit = { + val executorsToKill = schedulerConf.getExecutorsToKil(executorsRequested) + if (executorsToKill > 0) { + val jobIdsToKill = jobIds.take(executorsToKill) + Try[Unit](jobClient.abort(jobIdsToKill.asJava)) match { + case Failure(e) => + logWarning("Failed to abort redundant jobs", e) + case Success(_) => + logInfo(s"Successfully abort $executorsToKill jobs.") + jobIdsToKill.foreach(abortedJobIds.add) + } + } + } + + /** + * Request more executors from Cook via cook jobs if necessary. + */ + private[this] def requestExecutorsIfNecessary(): Unit = { val jobs = createRemainingJobs() if (jobs.nonEmpty) { Try[Unit](jobClient.submit(jobs.asJava, jobListener)) match { case Failure(e) => logWarning("Can't request more instances", e) case Success(_) => { logInfo(s"Successfully requested ${jobs.size} instances") - totalInstancesRequested += jobs.size + executorsRequested += jobs.size jobs.map(_.getUUID).foreach(jobIds.add) } } @@ -421,9 +425,12 @@ class CoarseCookSchedulerBackend( override def start(): Unit = { super.start() - requestRemainingInstances() + requestExecutorsIfNecessary() resourceManagerService.scheduleAtFixedRate(new Runnable() { - override def run(): Unit = requestRemainingInstances() + override def run(): Unit = { + requestExecutorsIfNecessary() + killExecutorsIfNecessary() + } }, 10, 10, TimeUnit.SECONDS) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/CookSchedulerConfiguration.scala b/core/src/main/scala/org/apache/spark/scheduler/CookSchedulerConfiguration.scala new file mode 100644 index 0000000000000..ec352b3e392b4 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/CookSchedulerConfiguration.scala @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.SparkConf +import org.apache.spark.Logging + +/** + * To use this configuration in PySpark, one could do + * {{{ + * >>> cook_conf_obj = sc._jvm.org.apache.spark.scheduler.CookSchedulerConfiguration + * >>> cook_conf = cook_conf_obj.conf() + * >>> cook_conf.setMaximumCores(8) + * }}} + */ +class CookSchedulerConfiguration( + @transient val conf: SparkConf +) extends Logging { + + private[this] val SPARK_MAX_CORES = "spark.cores.max" + private[this] val SPARK_EXECUTOR_CORES = "spark.executor.cores" + private[this] val SPARK_EXECUTOR_FAILURES = "spark.executor.failures" + private[this] val SPARK_DYNAMICALLOCATION_ENABLED = + "spark.dynamicAllocation.enabled" + private[this] val SPARK_DYNAMICALLOCATION_MIN_EXECUTORS = + "spark.dynamicAllocation.minExecutors" + private[this] val SPARK_DYNAMICALLOCATION_MAX_EXECUTORS = + "spark.dynamicAllocation.maxExecutors" + private[this] val SPARK_COOK_PRIORITY = "spark.cook.priority" + private[this] val SPARK_COOK_JOB_NAME_PREFIX = "spark.cook.job.name.prefix" + + private[this] val dynamicAllocationEnabled = + conf.getBoolean(SPARK_DYNAMICALLOCATION_ENABLED, defaultValue = false) + private[this] val coresPerCookJob = conf.getInt(SPARK_EXECUTOR_CORES, 1) + + // ========================================================================== + // Config options + private[this] var maximumCores = if (dynamicAllocationEnabled) { + conf.getInt(SPARK_DYNAMICALLOCATION_MIN_EXECUTORS, 0) * coresPerCookJob + } else { + conf.getInt(SPARK_MAX_CORES, 0) + } + private[this] var maximumExecutorFailures = + conf.getInt(SPARK_EXECUTOR_FAILURES, 5) + private[this] var priorityOfCookJob = conf.getInt(SPARK_COOK_PRIORITY, 75) + private[this] var prefixOfCookJobName = + conf.get(SPARK_COOK_JOB_NAME_PREFIX, "sparkjob") + + // ==========================================================================1 + + if (conf.getOption(SPARK_MAX_CORES).isDefined && dynamicAllocationEnabled) { + logWarning( + s"$SPARK_MAX_CORES is ignored when dynamic allocation is enabled. " + + s"Use $SPARK_DYNAMICALLOCATION_MAX_EXECUTORS instead.") + } + + def getCoresPerCookJob: Int = coresPerCookJob + + def getMaximumCores: Int = maximumCores + + def setMaximumCores(cores: Int): CookSchedulerConfiguration = { + require(cores >= 0, "The maximum number of cores should be non-negative.") + maximumCores = cores + logInfo( + s"The maximum cores of Cook scheduler has been set to $maximumCores") + this + } + + def getPriorityPerCookJob: Int = priorityOfCookJob + + def setPriorityPerCookJob(priority: Int): CookSchedulerConfiguration = { + require( + 0 < priority && priority <= 100, + "The priority of Cook job must be within range of (0, 100]." + ) + priorityOfCookJob = priority + logInfo( + s"The priority of jobs in Cook scheduler has been set to $priorityOfCookJob") + this + } + + def getPrefixOfCookJobName: String = prefixOfCookJobName + + def setPrefixOfCookJobName(prefix: String): CookSchedulerConfiguration = { + prefixOfCookJobName = prefix + logInfo( + s"The name prefix of jobs in Cook scheduler has been set to $prefixOfCookJobName") + this + } + + def getMaximumExecutorFailures: Int = maximumExecutorFailures + + def setMaximumExecutorFailures(maxExecutorFailures: Int): CookSchedulerConfiguration = { + require( + maxExecutorFailures > 0, + "The maximum executor failures must be positive." + ) + maximumExecutorFailures = maxExecutorFailures + this + } + + def getExecutorsToRequest(executorsRequested: Int = 0): Int = + Math.max(maximumCores / coresPerCookJob - executorsRequested, 0) + + def getExecutorsToKil(executorsRequested: Int = 0): Int = + Math.max(executorsRequested - maximumCores / coresPerCookJob, 0) + +} + +object CookSchedulerConfiguration { + + @volatile + private[this] var configuration: CookSchedulerConfiguration = _ + + private[spark] def conf(conf: SparkConf): CookSchedulerConfiguration = { + if (configuration == null) { + this.synchronized { + if (configuration == null) { + configuration = new CookSchedulerConfiguration(conf) + } + } + } + configuration + } + + def conf(): CookSchedulerConfiguration = { + require(configuration != null, "It haven't been initialized yet.") + configuration + } +}