From 47cff98f765328c2219160ac61aec09573a85dbf Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 4 Mar 2016 13:44:13 -0800 Subject: [PATCH 1/3] SPARK-13723: Change behavior of --num-executors with dynamic allocation. Previously, using --num-executors or spark.executor.instances with dynamic allocation enabled would disable dynamic allocation. This updates the behavior so these settings change the initial number of executors and do not disable dynamic allocation. --- .../spark/ExecutorAllocationManager.scala | 11 +++++----- .../scala/org/apache/spark/util/Utils.scala | 22 +++++++++++-------- .../org/apache/spark/util/UtilsSuite.scala | 18 ++++++++++++++- docs/configuration.md | 3 +++ .../deploy/yarn/YarnSparkHadoopUtil.scala | 2 +- 5 files changed, 39 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 0926d05414ba7..932ba16812bbb 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -25,9 +25,10 @@ import scala.util.control.ControlThrowable import com.codahale.metrics.{Gauge, MetricRegistry} import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS} import org.apache.spark.metrics.source.Source import org.apache.spark.scheduler._ -import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} +import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} /** * An agent that dynamically allocates and removes executors based on the workload. @@ -87,11 +88,9 @@ private[spark] class ExecutorAllocationManager( import ExecutorAllocationManager._ // Lower and upper bounds on the number of executors. - private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0) - private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", - Integer.MAX_VALUE) - private val initialNumExecutors = conf.getInt("spark.dynamicAllocation.initialExecutors", - minNumExecutors) + private val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS) + private val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS) + private val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf) // How long there must be backlogged tasks for before an addition is triggered (seconds) private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds( diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index f9d05409e1c3d..2a52be1f7791a 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -52,6 +52,7 @@ import org.slf4j.Logger import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{DYN_ALLOCATION_INITIAL_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS, EXECUTOR_INSTANCES} import org.apache.spark.network.util.JavaUtils import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} @@ -2309,21 +2310,24 @@ private[spark] object Utils extends Logging { } /** - * Return whether dynamic allocation is enabled in the given conf - * Dynamic allocation and explicitly setting the number of executors are inherently - * incompatible. In environments where dynamic allocation is turned on by default, - * the latter should override the former (SPARK-9092). + * Return whether dynamic allocation is enabled in the given conf. */ def isDynamicAllocationEnabled(conf: SparkConf): Boolean = { - val numExecutor = conf.getInt("spark.executor.instances", 0) val dynamicAllocationEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false) - if (numExecutor != 0 && dynamicAllocationEnabled) { - logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.") - } - numExecutor == 0 && dynamicAllocationEnabled && + dynamicAllocationEnabled && (!isLocalMaster(conf) || conf.getBoolean("spark.dynamicAllocation.testing", false)) } + /** + * Return the initial number of executors for dynamic allocation. + */ + def getDynamicAllocationInitialExecutors(conf: SparkConf): Int = { + Seq( + conf.get(DYN_ALLOCATION_MIN_EXECUTORS), + conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS), + conf.get(EXECUTOR_INSTANCES).getOrElse(0)).max + } + def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = { val resource = createResource try f.apply(resource) finally resource.close() diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index a5363f0bfd600..e3a8e83f3eaa5 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -761,13 +761,29 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assert(Utils.isDynamicAllocationEnabled( conf.set("spark.dynamicAllocation.enabled", "true")) === true) assert(Utils.isDynamicAllocationEnabled( - conf.set("spark.executor.instances", "1")) === false) + conf.set("spark.executor.instances", "1")) === true) assert(Utils.isDynamicAllocationEnabled( conf.set("spark.executor.instances", "0")) === true) assert(Utils.isDynamicAllocationEnabled(conf.set("spark.master", "local")) === false) assert(Utils.isDynamicAllocationEnabled(conf.set("spark.dynamicAllocation.testing", "true"))) } + test("getDynamicAllocationInitialExecutors") { + val conf = new SparkConf() + assert(Utils.getDynamicAllocationInitialExecutors(conf) === 0) + assert(Utils.getDynamicAllocationInitialExecutors( + conf.set("spark.dynamicAllocation.minExecutors", "3")) === 3) + assert(Utils.getDynamicAllocationInitialExecutors( // should use minExecutors + conf.set("spark.executor.instances", "2")) === 3) + assert(Utils.getDynamicAllocationInitialExecutors( // should use executor.instances + conf.set("spark.executor.instances", "4")) === 4) + assert(Utils.getDynamicAllocationInitialExecutors( // should use executor.instances + conf.set("spark.dynamicAllocation.initialExecutors", "3")) === 4) + assert(Utils.getDynamicAllocationInitialExecutors( // should use initialExecutors + conf.set("spark.dynamicAllocation.initialExecutors", "5")) === 5) + } + + test("encodeFileNameToURIRawPath") { assert(Utils.encodeFileNameToURIRawPath("abc") === "abc") assert(Utils.encodeFileNameToURIRawPath("abc xyz") === "abc%20xyz") diff --git a/docs/configuration.md b/docs/configuration.md index 32c3a9266078a..c70df5e8ded4f 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1235,6 +1235,9 @@ Apart from these, the following properties are also available, and may be useful spark.dynamicAllocation.minExecutors Initial number of executors to run if dynamic allocation is enabled. +

+ If `--num-executors` (or `spark.executor.instances`) is set and larger than this value, it will + be used as the initial number of executors. diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index de6cd946137cb..156a7a30eaa93 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -520,7 +520,7 @@ object YarnSparkHadoopUtil { numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = { if (Utils.isDynamicAllocationEnabled(conf)) { val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS) - val initialNumExecutors = conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS) + val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf) val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS) require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors, s"initial executor number $initialNumExecutors must between min executor number " + From bf22b5ab0bc8369949ac33833b078e7e13c7ce35 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Mon, 13 Jun 2016 13:32:16 -0700 Subject: [PATCH 2/3] SPARK-13723: Update --num-executors in spark-submit help. --- .../scala/org/apache/spark/deploy/SparkSubmitArguments.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 206c130c76373..f1761e7c1ec92 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -550,6 +550,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | (Default: 1). | --queue QUEUE_NAME The YARN queue to submit to (Default: "default"). | --num-executors NUM Number of executors to launch (Default: 2). + | If dynamic allocation is enabled, the initial number of + | executors will be at least NUM. | --archives ARCHIVES Comma separated list of archives to be extracted into the | working directory of each executor. | --principal PRINCIPAL Principal to be used to login to KDC, while running on From 0352883dba5aef4fd56d6aaddba129071b1d22ea Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Thu, 23 Jun 2016 09:24:03 -0700 Subject: [PATCH 3/3] SPARK-13723: Update running-on-yarn.md for these changes. --- docs/running-on-yarn.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 9833806716aab..dbd46cc48c14b 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -244,7 +244,7 @@ To use a custom metrics.properties for the application master and executors, upd spark.executor.instances 2 - The number of executors. Note that this property is incompatible with spark.dynamicAllocation.enabled. If both spark.dynamicAllocation.enabled and spark.executor.instances are specified, dynamic allocation is turned off and the specified number of spark.executor.instances is used. + The number of executors for static allocation. With spark.dynamicAllocation.enabled, the initial set of executors will be at least this large.