From 9cc2e0b6d3e249a5f9eb32b954aa29900555fa69 Mon Sep 17 00:00:00 2001 From: Niranjan Padmanabhan Date: Fri, 24 Jul 2015 10:51:57 -0700 Subject: [PATCH 1/9] [SPARK-9092] Fixed incompatibility when both num-executors and dynamic allocation are set. Now, dynamic allocation is set to false when num-executors is explicitly specified as an argument. Consequently, executorAllocationManager in not initialized in the SparkContext. --- core/src/main/scala/org/apache/spark/SparkContext.scala | 3 ++- .../test/scala/org/apache/spark/SparkContextSuite.scala | 7 +++++++ .../org/apache/spark/deploy/yarn/ClientArguments.scala | 3 ++- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 9ced44131b0d9..7dab4121cf47a 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -528,7 +528,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } // Optionally scale number of executors dynamically based on workload. Exposed for testing. - val dynamicAllocationEnabled = _conf.getBoolean("spark.dynamicAllocation.enabled", false) + val dynamicAllocationEnabled = _conf.getBoolean("spark.dynamicAllocation.enabled", false) && + (_conf.getInt("spark.executor.instances", 0) == 0) _executorAllocationManager = if (dynamicAllocationEnabled) { Some(new ExecutorAllocationManager(this, listenerBus, _conf)) diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 5c57940fa5f77..2ff7c29930219 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -285,4 +285,11 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { } } + test("No exception when both num-executors and dynamic allocation set") { + noException should be thrownBy { + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local") + .set("spark.dynamicAllocation.enabled", "true").set("spark.executor.instances", "6")) + assert(sc.executorAllocationManager === None) + } + } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 20d63d40cf605..05a6159c91f66 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -54,7 +54,8 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) private val driverCoresKey = "spark.driver.cores" private val amCoresKey = "spark.yarn.am.cores" private val isDynamicAllocationEnabled = - sparkConf.getBoolean("spark.dynamicAllocation.enabled", false) + sparkConf.getBoolean("spark.dynamicAllocation.enabled", false) && + (sparkConf.getInt("spark.executor.instances", 0) == 0) parseArgs(args.toList) loadEnvironmentArgs() From e34e728c47fca824f094941a5ce32dd5292dea28 Mon Sep 17 00:00:00 2001 From: Niranjan Padmanabhan Date: Thu, 30 Jul 2015 11:47:10 -0700 Subject: [PATCH 2/9] [SPARK-9092] Fix incompatibility when both num executors and dynamic allocation set --- .../main/scala/org/apache/spark/SparkContext.scala | 7 +++++-- .../scala/org/apache/spark/deploy/SparkSubmit.scala | 4 ++-- core/src/main/scala/org/apache/spark/util/Utils.scala | 11 +++++++++++ .../scala/org/apache/spark/SparkContextSuite.scala | 5 +++-- .../org/apache/spark/deploy/SparkSubmitSuite.scala | 1 - docs/running-on-yarn.md | 2 +- .../apache/spark/deploy/yarn/ApplicationMaster.scala | 1 - .../deploy/yarn/ApplicationMasterArguments.scala | 4 ---- .../scala/org/apache/spark/deploy/yarn/Client.scala | 3 ++- .../apache/spark/deploy/yarn/ClientArguments.scala | 9 +-------- 10 files changed, 25 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 7dab4121cf47a..d489d8a7657e9 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -528,8 +528,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } // Optionally scale number of executors dynamically based on workload. Exposed for testing. - val dynamicAllocationEnabled = _conf.getBoolean("spark.dynamicAllocation.enabled", false) && - (_conf.getInt("spark.executor.instances", 0) == 0) + val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf) + if (!dynamicAllocationEnabled) { + logInfo(s"Dynamic Allocation and num executors both set, thus dynamic allocation disabled.") + } + _executorAllocationManager = if (dynamicAllocationEnabled) { Some(new ExecutorAllocationManager(this, listenerBus, _conf)) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 7ac6cbce4cd1d..02fa3088eded0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -422,7 +422,8 @@ object SparkSubmit { // Yarn client only OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"), - OptionAssigner(args.numExecutors, YARN, CLIENT, sysProp = "spark.executor.instances"), + OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES, + sysProp = "spark.executor.instances"), OptionAssigner(args.files, YARN, CLIENT, sysProp = "spark.yarn.dist.files"), OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"), OptionAssigner(args.principal, YARN, CLIENT, sysProp = "spark.yarn.principal"), @@ -433,7 +434,6 @@ object SparkSubmit { OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"), OptionAssigner(args.driverCores, YARN, CLUSTER, clOption = "--driver-cores"), OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"), - OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"), OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = "--executor-memory"), OptionAssigner(args.executorCores, YARN, CLUSTER, clOption = "--executor-cores"), OptionAssigner(args.files, YARN, CLUSTER, clOption = "--files"), diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index c4012d0e83f7d..ca5bd0898d1df 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2286,6 +2286,17 @@ private[spark] object Utils extends Logging { isInDirectory(parent, child.getParentFile) } + /** + * Return whether dynamic allocation is enabled in the system conf + * Dynamic allocation and explicitly setting the number of executors are inherently + * incompatible. In environments where dynamic allocation is turned on by default, + * the latter should override the former (SPARK-9092). + */ + def isDynamicAllocationEnabled(conf: SparkConf): Boolean = { + return conf.contains("spark.dynamicAllocation.enabled") && + !conf.contains("spark.executor.instances") + } + } private [util] class SparkShutdownHookManager { diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 2ff7c29930219..cae56b9cc6648 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -285,11 +285,12 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { } } - test("No exception when both num-executors and dynamic allocation set") { + test("No exception when both num-executors and dynamic allocation set.") { noException should be thrownBy { sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local") .set("spark.dynamicAllocation.enabled", "true").set("spark.executor.instances", "6")) - assert(sc.executorAllocationManager === None) + assert(sc.executorAllocationManager.isEmpty) + assert(sc.getConf.get("spark.executor.instances").toInt == 6) } } } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 757e0ce3d278b..2456c5d0d49b0 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -159,7 +159,6 @@ class SparkSubmitSuite childArgsStr should include ("--executor-cores 5") childArgsStr should include ("--arg arg1 --arg arg2") childArgsStr should include ("--queue thequeue") - childArgsStr should include ("--num-executors 6") childArgsStr should include regex ("--jar .*thejar.jar") childArgsStr should include regex ("--addJars .*one.jar,.*two.jar,.*three.jar") childArgsStr should include regex ("--files .*file1.txt,.*file2.txt") diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index cac08a91b97d9..61b3c39d058fa 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -199,7 +199,7 @@ If you need a reference to the proper location to put log files in the YARN so t spark.executor.instances 2 - The number of executors. Note that this property is incompatible with spark.dynamicAllocation.enabled. + The number of executors. Note that this property is incompatible with spark.dynamicAllocation.enabled. If both spark.dynamicAllocation.enabled and spark.executor.instances are specified, dynamicAllocation is turned off and the specified number of spark.executor.instances is used. diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 1d67b3ebb51b7..3380dd7399374 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -493,7 +493,6 @@ private[spark] class ApplicationMaster( */ private def startUserApplication(): Thread = { logInfo("Starting the user application in a separate Thread") - System.setProperty("spark.executor.instances", args.numExecutors.toString) val classpath = Client.getUserClasspath(sparkConf) val urls = classpath.map { entry => diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala index 37f793763367e..c6fb93ec5c587 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala @@ -63,10 +63,6 @@ class ApplicationMasterArguments(val args: Array[String]) { userArgsBuffer += value args = tail - case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail => - numExecutors = value - args = tail - case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail => executorMemory = value args = tail diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index b4ba3f0221600..28d04b4be0d30 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -751,7 +751,6 @@ private[spark] class Client( userArgs ++ Seq( "--executor-memory", args.executorMemory.toString + "m", "--executor-cores", args.executorCores.toString, - "--num-executors ", args.numExecutors.toString, "--properties-file", buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) @@ -960,6 +959,8 @@ object Client extends Logging { val sparkConf = new SparkConf val args = new ClientArguments(argStrings, sparkConf) + // to maintan backwards-compatibility + sparkConf.setIfMissing("spark.executor.instances", args.numExecutors.toString) new Client(args, sparkConf).run() } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 05a6159c91f66..4f42ffefa77f9 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -53,9 +53,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) private val amMemOverheadKey = "spark.yarn.am.memoryOverhead" private val driverCoresKey = "spark.driver.cores" private val amCoresKey = "spark.yarn.am.cores" - private val isDynamicAllocationEnabled = - sparkConf.getBoolean("spark.dynamicAllocation.enabled", false) && - (sparkConf.getInt("spark.executor.instances", 0) == 0) + private val isDynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(sparkConf) parseArgs(args.toList) loadEnvironmentArgs() @@ -197,11 +195,6 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) if (args(0) == "--num-workers") { println("--num-workers is deprecated. Use --num-executors instead.") } - // Dynamic allocation is not compatible with this option - if (isDynamicAllocationEnabled) { - throw new IllegalArgumentException("Explicitly setting the number " + - "of executors is not compatible with spark.dynamicAllocation.enabled!") - } numExecutors = value args = tail From 66d083c04ed80a9aad7ef5d7c2c3aeb51b743a6b Mon Sep 17 00:00:00 2001 From: Niranjan Padmanabhan Date: Tue, 4 Aug 2015 17:37:52 -0700 Subject: [PATCH 3/9] [SPARK-9092] Make --num-executors compatible with dynamic Allocation. Modified dependencies of this change deeper down in the code base. --- .../main/scala/org/apache/spark/SparkConf.scala | 17 +++++++++++++++++ .../org/apache/spark/SparkContextSuite.scala | 2 +- .../spark/deploy/yarn/ApplicationMaster.scala | 3 ++- .../yarn/ApplicationMasterArguments.scala | 1 - .../spark/deploy/yarn/YarnAllocator.scala | 2 +- .../cluster/YarnClientSchedulerBackend.scala | 3 --- .../spark/deploy/yarn/YarnAllocatorSuite.scala | 5 +++-- 7 files changed, 24 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 8ff154fb5e334..dc9f3aa3c2d44 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -476,6 +476,23 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { } } } + + sys.env.get("SPARK_WORKER_INSTANCES").foreach { value => + val warning = + s""" + |SPARK_WORKER_INSTANCES was detected (set to '$value'). + |This is deprecated in Spark 1.0+. + | + |Please instead use: + | - ./spark-submit with --num-executors to specify the number of executors + | - Or set SPARK_EXECUTOR_INSTANCES + | - spark.executor.instances to configure the number of instances in the spark config. + """.stripMargin + logWarning(warning) + + set("spark.executor.instances", value.toString()) + + } } /** diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index cae56b9cc6648..231dcc0d28e2f 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -290,7 +290,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local") .set("spark.dynamicAllocation.enabled", "true").set("spark.executor.instances", "6")) assert(sc.executorAllocationManager.isEmpty) - assert(sc.getConf.get("spark.executor.instances").toInt == 6) + assert(sc.getConf.get("spark.executor.instances").toInt === 6) } } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 3380dd7399374..3ec2246010575 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -64,7 +64,8 @@ private[spark] class ApplicationMaster( // Default to numExecutors * 2, with minimum of 3 private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures", - sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3))) + sparkConf.getInt("spark.yarn.max.worker.failures", + math.max(sparkConf.get("spark.executor.instances").toInt * 2, 3))) @volatile private var exitCode = 0 @volatile private var unregistered = false diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala index c6fb93ec5c587..b08412414aa1c 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala @@ -29,7 +29,6 @@ class ApplicationMasterArguments(val args: Array[String]) { var userArgs: Seq[String] = Nil var executorMemory = 1024 var executorCores = 1 - var numExecutors = DEFAULT_NUMBER_EXECUTORS var propertiesFile: String = null parseArgs(args.toList) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 59caa787b6e20..adce543fe533a 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -86,7 +86,7 @@ private[yarn] class YarnAllocator( private var executorIdCounter = 0 @volatile private var numExecutorsFailed = 0 - @volatile private var targetNumExecutors = args.numExecutors + @volatile private var targetNumExecutors = sparkConf.get("spark.executor.instances").toInt // Keep track of which container is running which executor to remove the executors later // Visible for testing. diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index d225061fcd1b4..d06d95140438c 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -81,8 +81,6 @@ private[spark] class YarnClientSchedulerBackend( // List of (target Client argument, environment variable, Spark property) val optionTuples = List( - ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.executor.instances"), - ("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"), ("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"), ("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"), ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"), @@ -92,7 +90,6 @@ private[spark] class YarnClientSchedulerBackend( ) // Warn against the following deprecated environment variables: env var -> suggestion val deprecatedEnvVars = Map( - "SPARK_WORKER_INSTANCES" -> "SPARK_WORKER_INSTANCES or --num-executors through spark-submit", "SPARK_WORKER_MEMORY" -> "SPARK_EXECUTOR_MEMORY or --executor-memory through spark-submit", "SPARK_WORKER_CORES" -> "SPARK_EXECUTOR_CORES or --executor-cores through spark-submit") optionTuples.foreach { case (optionName, envVar, sparkProp) => diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 58318bf9bcc08..5d05f514adde3 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -87,16 +87,17 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter def createAllocator(maxExecutors: Int = 5): YarnAllocator = { val args = Array( - "--num-executors", s"$maxExecutors", "--executor-cores", "5", "--executor-memory", "2048", "--jar", "somejar.jar", "--class", "SomeClass") + val sparkConfClone = sparkConf.clone() + sparkConfClone.set("spark.executor.instances", maxExecutors.toString) new YarnAllocator( "not used", mock(classOf[RpcEndpointRef]), conf, - sparkConf, + sparkConfClone, rmClient, appAttemptId, new ApplicationMasterArguments(args), From 4fe523282c477016781adbb0c1abe384e064766e Mon Sep 17 00:00:00 2001 From: Niranjan Padmanabhan Date: Wed, 5 Aug 2015 10:56:57 -0700 Subject: [PATCH 4/9] Update getter methods, style changes, and test if executor instances present before logWarning --- .../scala/org/apache/spark/SparkConf.scala | 28 ++++++++++--------- .../org/apache/spark/SparkContextSuite.scala | 2 +- .../spark/deploy/yarn/ApplicationMaster.scala | 2 +- .../spark/deploy/yarn/YarnAllocator.scala | 2 +- 4 files changed, 18 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index dc9f3aa3c2d44..67ce00ead64fb 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -389,6 +389,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { val driverOptsKey = "spark.driver.extraJavaOptions" val driverClassPathKey = "spark.driver.extraClassPath" val driverLibraryPathKey = "spark.driver.extraLibraryPath" + val sparkExecutorInstances = "spark.executor.instances" // Used by Yarn in 1.1 and before sys.props.get("spark.driver.libraryPath").foreach { value => @@ -477,21 +478,22 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { } } - sys.env.get("SPARK_WORKER_INSTANCES").foreach { value => - val warning = - s""" - |SPARK_WORKER_INSTANCES was detected (set to '$value'). - |This is deprecated in Spark 1.0+. - | - |Please instead use: - | - ./spark-submit with --num-executors to specify the number of executors - | - Or set SPARK_EXECUTOR_INSTANCES - | - spark.executor.instances to configure the number of instances in the spark config. + if (getOption(sparkExecutorInstances).isEmpty) { + sys.env.get("SPARK_WORKER_INSTANCES").foreach { value => + val warning = + s""" + |SPARK_WORKER_INSTANCES was detected (set to '$value'). + |This is deprecated in Spark 1.0+. + | + |Please instead use: + | - ./spark-submit with --num-executors to specify the number of executors + | - Or set SPARK_EXECUTOR_INSTANCES + | - spark.executor.instances to configure the number of instances in the spark config. """.stripMargin - logWarning(warning) - - set("spark.executor.instances", value.toString()) + logWarning(warning) + set("spark.executor.instances", value.toString()) + } } } diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 231dcc0d28e2f..d4f2ea87650a9 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -290,7 +290,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local") .set("spark.dynamicAllocation.enabled", "true").set("spark.executor.instances", "6")) assert(sc.executorAllocationManager.isEmpty) - assert(sc.getConf.get("spark.executor.instances").toInt === 6) + assert(sc.getConf.getInt("spark.executor.instances", 0) === 6) } } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 3ec2246010575..e19940d8d6642 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -65,7 +65,7 @@ private[spark] class ApplicationMaster( // Default to numExecutors * 2, with minimum of 3 private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures", sparkConf.getInt("spark.yarn.max.worker.failures", - math.max(sparkConf.get("spark.executor.instances").toInt * 2, 3))) + math.max(sparkConf.getInt("spark.executor.instances", 0) * 2, 3))) @volatile private var exitCode = 0 @volatile private var unregistered = false diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index adce543fe533a..ad7ca44b977b7 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -86,7 +86,7 @@ private[yarn] class YarnAllocator( private var executorIdCounter = 0 @volatile private var numExecutorsFailed = 0 - @volatile private var targetNumExecutors = sparkConf.get("spark.executor.instances").toInt + @volatile private var targetNumExecutors = sparkConf.getInt("spark.executor.instances", 2) // Keep track of which container is running which executor to remove the executors later // Visible for testing. From a3fea570656fc95cf2bfffc6b83e4457b7320b79 Mon Sep 17 00:00:00 2001 From: Niranjan Padmanabhan Date: Thu, 6 Aug 2015 10:43:43 -0700 Subject: [PATCH 5/9] Style, documentation, and syntax changes in series of commits for SPARK-9092 --- core/src/main/scala/org/apache/spark/SparkConf.scala | 2 +- core/src/main/scala/org/apache/spark/util/Utils.scala | 4 ++-- docs/running-on-yarn.md | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 67ce00ead64fb..7ee2781e703fe 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -478,7 +478,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { } } - if (getOption(sparkExecutorInstances).isEmpty) { + if (!contains(sparkExecutorInstances)) { sys.env.get("SPARK_WORKER_INSTANCES").foreach { value => val warning = s""" diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index ca5bd0898d1df..f829900a5bd64 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2287,13 +2287,13 @@ private[spark] object Utils extends Logging { } /** - * Return whether dynamic allocation is enabled in the system conf + * Return whether dynamic allocation is enabled in the given conf * Dynamic allocation and explicitly setting the number of executors are inherently * incompatible. In environments where dynamic allocation is turned on by default, * the latter should override the former (SPARK-9092). */ def isDynamicAllocationEnabled(conf: SparkConf): Boolean = { - return conf.contains("spark.dynamicAllocation.enabled") && + conf.contains("spark.dynamicAllocation.enabled") && !conf.contains("spark.executor.instances") } diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 61b3c39d058fa..ec32c419b7c51 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -199,7 +199,7 @@ If you need a reference to the proper location to put log files in the YARN so t spark.executor.instances 2 - The number of executors. Note that this property is incompatible with spark.dynamicAllocation.enabled. If both spark.dynamicAllocation.enabled and spark.executor.instances are specified, dynamicAllocation is turned off and the specified number of spark.executor.instances is used. + The number of executors. Note that this property is incompatible with spark.dynamicAllocation.enabled. If both spark.dynamicAllocation.enabled and spark.executor.instances are specified, dynamic allocation is turned off and the specified number of spark.executor.instances is used. From 09f5d0f6b5d2fd0d7dd40284dc115c42706c51b7 Mon Sep 17 00:00:00 2001 From: Niranjan Padmanabhan Date: Mon, 10 Aug 2015 10:13:27 -0700 Subject: [PATCH 6/9] Check dynamic allocation before setting executor instances --- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index f829900a5bd64..be340523d1a60 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2294,7 +2294,7 @@ private[spark] object Utils extends Logging { */ def isDynamicAllocationEnabled(conf: SparkConf): Boolean = { conf.contains("spark.dynamicAllocation.enabled") && - !conf.contains("spark.executor.instances") + (conf.getInt("spark.executor.instances", 0) == 0) } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 28d04b4be0d30..ac51084892f57 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -959,8 +959,10 @@ object Client extends Logging { val sparkConf = new SparkConf val args = new ClientArguments(argStrings, sparkConf) - // to maintan backwards-compatibility - sparkConf.setIfMissing("spark.executor.instances", args.numExecutors.toString) + // to maintain backwards-compatibility + if(!Utils.isDynamicAllocationEnabled(sparkConf)){ + sparkConf.setIfMissing("spark.executor.instances", args.numExecutors.toString) + } new Client(args, sparkConf).run() } From 682626e592272e32d4bda99102c02aa52e2b776b Mon Sep 17 00:00:00 2001 From: Niranjan Padmanabhan Date: Mon, 10 Aug 2015 13:04:57 -0700 Subject: [PATCH 7/9] [SPARK-9092] additional changes from reviewers addressed --- core/src/main/scala/org/apache/spark/SparkConf.scala | 2 +- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 +- .../main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 7ee2781e703fe..b344b5e173d67 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -492,7 +492,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { """.stripMargin logWarning(warning) - set("spark.executor.instances", value.toString()) + set("spark.executor.instances", value) } } } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d489d8a7657e9..d437997adebc3 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -529,7 +529,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // Optionally scale number of executors dynamically based on workload. Exposed for testing. val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf) - if (!dynamicAllocationEnabled) { + if (!dynamicAllocationEnabled && _conf.getBoolean("spark.dynamicAllocation.enabled", false)) { logInfo(s"Dynamic Allocation and num executors both set, thus dynamic allocation disabled.") } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index be340523d1a60..a90d8541366f4 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2294,7 +2294,7 @@ private[spark] object Utils extends Logging { */ def isDynamicAllocationEnabled(conf: SparkConf): Boolean = { conf.contains("spark.dynamicAllocation.enabled") && - (conf.getInt("spark.executor.instances", 0) == 0) + conf.getInt("spark.executor.instances", 0) == 0 } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index ac51084892f57..6d63ddaf15852 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -960,7 +960,7 @@ object Client extends Logging { val args = new ClientArguments(argStrings, sparkConf) // to maintain backwards-compatibility - if(!Utils.isDynamicAllocationEnabled(sparkConf)){ + if (!Utils.isDynamicAllocationEnabled(sparkConf)) { sparkConf.setIfMissing("spark.executor.instances", args.numExecutors.toString) } new Client(args, sparkConf).run() diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index ad7ca44b977b7..272545c733ab1 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -86,7 +86,7 @@ private[yarn] class YarnAllocator( private var executorIdCounter = 0 @volatile private var numExecutorsFailed = 0 - @volatile private var targetNumExecutors = sparkConf.getInt("spark.executor.instances", 2) + @volatile private var targetNumExecutors = YarnSparkHadoopUtil.DEFAULT_NUMBER_EXECUTORS // Keep track of which container is running which executor to remove the executors later // Visible for testing. From 6da06c4865220b39a1d9522280478483d887f2a3 Mon Sep 17 00:00:00 2001 From: Niranjan Padmanabhan Date: Mon, 10 Aug 2015 19:13:57 -0700 Subject: [PATCH 8/9] Conditionally set targetNumExecutors in YarnAllocator --- .../org/apache/spark/deploy/yarn/YarnAllocator.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 272545c733ab1..ccf753e69f4b6 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -21,6 +21,8 @@ import java.util.Collections import java.util.concurrent._ import java.util.regex.Pattern +import org.apache.spark.util.Utils + import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} @@ -86,7 +88,12 @@ private[yarn] class YarnAllocator( private var executorIdCounter = 0 @volatile private var numExecutorsFailed = 0 - @volatile private var targetNumExecutors = YarnSparkHadoopUtil.DEFAULT_NUMBER_EXECUTORS + @volatile private var targetNumExecutors = + if (Utils.isDynamicAllocationEnabled(sparkConf)) { + sparkConf.getInt("spark.dynamicAllocation.initialExecutors", 0) + } else { + sparkConf.getInt("spark.executor.instances", YarnSparkHadoopUtil.DEFAULT_NUMBER_EXECUTORS) + } // Keep track of which container is running which executor to remove the executors later // Visible for testing. From 7f3e1ffcc2a5a3db839ccb96143b6d82d160e1e5 Mon Sep 17 00:00:00 2001 From: Niranjan Padmanabhan Date: Wed, 12 Aug 2015 13:30:39 -0700 Subject: [PATCH 9/9] Remove un-necessary s in logWarning --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d437997adebc3..844d9450fefcf 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -530,7 +530,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // Optionally scale number of executors dynamically based on workload. Exposed for testing. val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf) if (!dynamicAllocationEnabled && _conf.getBoolean("spark.dynamicAllocation.enabled", false)) { - logInfo(s"Dynamic Allocation and num executors both set, thus dynamic allocation disabled.") + logInfo("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.") } _executorAllocationManager =