From e79fe20dee37946a188e1b86578fa1f0872b39e6 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 4 Mar 2016 13:44:13 -0800 Subject: [PATCH] SPARK-13688: Add spark.dynamicAllocation.overrideNumInstances. When both the number of executor instances and dynamic allocation are configured, dynamic allocation is ignored and the number of executors is statically set. This introduces a configuration property to change this behavior so that cluster administrators can make dynamic allocation the default when users set --num-executors instead of spark.dynamicAllocation.minExecutors and unintentionally disable dynamic allocation for a job. --- .../spark/ExecutorAllocationManager.scala | 7 ++--- .../scala/org/apache/spark/util/Utils.scala | 29 +++++++++++++++++-- .../org/apache/spark/util/UtilsSuite.scala | 26 +++++++++++++++++ docs/configuration.md | 12 ++++++++ .../deploy/yarn/YarnSparkHadoopUtil.scala | 4 +-- 5 files changed, 70 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 9b8279f43e75c..4468239b7e1d8 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -26,7 +26,7 @@ import com.codahale.metrics.{Gauge, MetricRegistry} import org.apache.spark.metrics.source.Source import org.apache.spark.scheduler._ -import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} +import org.apache.spark.util.{Utils, Clock, SystemClock, ThreadUtils} /** * An agent that dynamically allocates and removes executors based on the workload. @@ -86,9 +86,8 @@ private[spark] class ExecutorAllocationManager( import ExecutorAllocationManager._ // Lower and upper bounds on the number of executors. - private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0) - private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", - Integer.MAX_VALUE) + private val minNumExecutors = Utils.getDynamicAllocationMinExecutors(conf) + private val maxNumExecutors = Utils.getDynamicAllocationMaxExecutors(conf) private val initialNumExecutors = conf.getInt("spark.dynamicAllocation.initialExecutors", minNumExecutors) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 9688cca4f0fcd..e23a16f0e7b14 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2253,13 +2253,38 @@ private[spark] object Utils extends Logging { def isDynamicAllocationEnabled(conf: SparkConf): Boolean = { val numExecutor = conf.getInt("spark.executor.instances", 0) val dynamicAllocationEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false) + val overrideExecutors = conf.getBoolean("spark.dynamicAllocation.overrideNumInstances", false) if (numExecutor != 0 && dynamicAllocationEnabled) { - logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.") + if (overrideExecutors) { + logWarning( + "Dynamic Allocation and num executors both set, spark.executor.instances will override " + + "spark.dynamicAllocation.minExecutors.") + } else { + logWarning( + "Dynamic Allocation and num executors both set, thus dynamic allocation disabled.") + } } - numExecutor == 0 && dynamicAllocationEnabled && + dynamicAllocationEnabled && (numExecutor == 0 || overrideExecutors) && (!isLocalMaster(conf) || conf.getBoolean("spark.dynamicAllocation.testing", false)) } + /** + * Return the minimum number of executors for dynamic allocation. + * + * If spark.executor.instances is set and larger than spark.dynamicAllocation.minExecutors, the + * it will be returned as the dynamic allocation minimum. This assumes that it was overridden by + * spark.dynamicAllocation.overrideNumInstances (or else this method would not be called). + */ + def getDynamicAllocationMinExecutors(conf: SparkConf): Int = { + math.max( + conf.getInt("spark.dynamicAllocation.minExecutors", 0), + conf.getInt("spark.executor.instances", 0)) + } + + def getDynamicAllocationMaxExecutors(conf: SparkConf): Int = { + conf.getInt("spark.dynamicAllocation.maxExecutors", Integer.MAX_VALUE) + } + def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = { val resource = createResource try f.apply(resource) finally resource.close() diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 412c0ac9d9be3..d43ece3b5145b 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -732,10 +732,36 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { conf.set("spark.executor.instances", "1")) === false) assert(Utils.isDynamicAllocationEnabled( conf.set("spark.executor.instances", "0")) === true) + assert(Utils.isDynamicAllocationEnabled(conf.set("spark.executor.instances", "1") + .set("spark.dynamicAllocation.overrideNumInstances", "true")) === true) + assert(Utils.isDynamicAllocationEnabled(conf.set("spark.executor.instances", "0") + .set("spark.dynamicAllocation.overrideNumInstances", "true")) === true) + assert(Utils.isDynamicAllocationEnabled(conf.set("spark.executor.instances", "1") + .set("spark.dynamicAllocation.overrideNumInstances", "false")) === false) + assert(Utils.isDynamicAllocationEnabled(conf.set("spark.executor.instances", "0") + .set("spark.dynamicAllocation.overrideNumInstances", "false")) === true) assert(Utils.isDynamicAllocationEnabled(conf.set("spark.master", "local")) === false) assert(Utils.isDynamicAllocationEnabled(conf.set("spark.dynamicAllocation.testing", "true"))) } + test("getDynamicAllocationMinExecutors") { + val conf = new SparkConf() + assert(Utils.getDynamicAllocationMinExecutors(conf) === 0) + assert(Utils.getDynamicAllocationMinExecutors( + conf.set("spark.dynamicAllocation.minExecutors", "3")) === 3) + assert(Utils.getDynamicAllocationMinExecutors( // should use minExecutors + conf.set("spark.executor.instances", "2")) === 3) + assert(Utils.getDynamicAllocationMinExecutors( // should use executor.instances + conf.set("spark.executor.instances", "4")) === 4) + } + + test("getDynamicAllocationMaxExecutors") { + val conf = new SparkConf() + assert(Utils.getDynamicAllocationMaxExecutors(conf) === Integer.MAX_VALUE) + assert(Utils.getDynamicAllocationMaxExecutors( + conf.set("spark.dynamicAllocation.maxExecutors", "500")) === 500) + } + test("encodeFileNameToURIRawPath") { assert(Utils.encodeFileNameToURIRawPath("abc") === "abc") assert(Utils.encodeFileNameToURIRawPath("abc xyz") === "abc%20xyz") diff --git a/docs/configuration.md b/docs/configuration.md index e9b66238bd189..0ce189a591776 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1164,6 +1164,10 @@ Apart from these, the following properties are also available, and may be useful For more detail, see the description here.

+ This property is incompatible with spark.executor.instances, which statically sets + the number of executors. When both properties are set, static resource allocation will be used. + To change the default behavior, use spark.dynamicAllocation.overrideNumInstances. +

This requires spark.shuffle.service.enabled to be set. The following configurations are also relevant: spark.dynamicAllocation.minExecutors, @@ -1171,6 +1175,14 @@ Apart from these, the following properties are also available, and may be useful spark.dynamicAllocation.initialExecutors + + spark.dynamicAllocation.overrideNumInstances + + Whether to use dynamic resource allocation when dynamic allocation is enabled, but a static + number of executor instances, spark.executor.instances, is set. By default, a + statically configured number of executor instances will turn off dynamic resource allocation. + + spark.dynamicAllocation.executorIdleTimeout 60s diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index aef78fdfd4c57..d235382538574 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -509,10 +509,10 @@ object YarnSparkHadoopUtil { conf: SparkConf, numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = { if (Utils.isDynamicAllocationEnabled(conf)) { - val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0) + val minNumExecutors = Utils.getDynamicAllocationMinExecutors(conf) val initialNumExecutors = conf.getInt("spark.dynamicAllocation.initialExecutors", minNumExecutors) - val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", Int.MaxValue) + val maxNumExecutors = Utils.getDynamicAllocationMaxExecutors(conf) require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors, s"initial executor number $initialNumExecutors must between min executor number" + s"$minNumExecutors and max executor number $maxNumExecutors")