From b39a10000088692c424d0d044ff8b2f8079a2473 Mon Sep 17 00:00:00 2001 From: WangTao Date: Tue, 13 Jan 2015 22:17:35 +0800 Subject: [PATCH 1/5] specify # cores for ApplicationMaster --- .../org/apache/spark/deploy/SparkSubmit.scala | 1 + .../spark/deploy/SparkSubmitArguments.scala | 3 +++ docs/running-on-yarn.md | 16 ++++++++++++++++ .../org/apache/spark/deploy/yarn/Client.scala | 1 + .../spark/deploy/yarn/ClientArguments.scala | 16 ++++++++++++++-- 5 files changed, 35 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 955cbd6dab96d..1c6f810c6338c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -200,6 +200,7 @@ object SparkSubmit { // Yarn cluster only OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name"), OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"), + OptionAssigner(args.driverCores, YARN, CLUSTER, clOption = "--cores"), OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"), OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"), OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = "--executor-memory"), diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index f14ef4d299383..5eee68a8b3ccb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -108,6 +108,9 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St .orElse(sparkProperties.get("spark.driver.memory")) .orElse(env.get("SPARK_DRIVER_MEMORY")) .orNull + driverCores = Option(driverCores) + .orElse(sparkProperties.get("spark.driver.cores")) + .orNull executorMemory = Option(executorMemory) .orElse(sparkProperties.get("spark.executor.memory")) .orElse(env.get("SPARK_EXECUTOR_MEMORY")) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 4f273098c5db3..567cc471db850 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -29,6 +29,22 @@ Most of the configs are the same for Spark on YARN as for other deployment modes In cluster mode, use spark.driver.memory instead. + + spark.driver.cores + 1 + + Number of cores to use for the YARN Application Master in cluster mode. + In client mode, use spark.yarn.am.cores instead. + + + + spark.yarn.am.cores + 1 + + Number of cores to use for the YARN Application Master in client mode. + In cluster mode, use spark.driver.cores instead. + + spark.yarn.am.waitTime 100000 diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 032106371cd60..d4eeccf64275f 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -127,6 +127,7 @@ private[spark] class Client( } val capability = Records.newRecord(classOf[Resource]) capability.setMemory(args.amMemory + amMemoryOverhead) + capability.setVirtualCores(args.amCores) appContext.setResource(capability) appContext } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index fdbf9f8eed029..0fb825b5670b1 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -36,14 +36,18 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) var numExecutors = DEFAULT_NUMBER_EXECUTORS var amQueue = sparkConf.get("spark.yarn.queue", "default") var amMemory: Int = 512 // MB + var amCores: Int = 1 var appName: String = "Spark" var priority = 0 def isClusterMode: Boolean = userClass != null private var driverMemory: Int = 512 // MB + private var driverCores: Int = 1 private val driverMemOverheadKey = "spark.yarn.driver.memoryOverhead" private val amMemKey = "spark.yarn.am.memory" private val amMemOverheadKey = "spark.yarn.am.memoryOverhead" + private val driverCoresKey = "spark.driver.cores" + private val amCoresKey = "spark.yarn.am.cores" private val isDynamicAllocationEnabled = sparkConf.getBoolean("spark.dynamicAllocation.enabled", false) @@ -92,19 +96,23 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) "You must specify at least 1 executor!\n" + getUsageMessage()) } if (isClusterMode) { - for (key <- Seq(amMemKey, amMemOverheadKey)) { + for (key <- Seq(amMemKey, amMemOverheadKey, amCoresKey)) { if (sparkConf.contains(key)) { println(s"$key is set but does not apply in cluster mode.") } } amMemory = driverMemory + amCores = driverCores } else { - if (sparkConf.contains(driverMemOverheadKey)) { + if (sparkConf.contains(driverMemOverheadKey, driverCoresKey)) { println(s"$driverMemOverheadKey is set but does not apply in client mode.") } sparkConf.getOption(amMemKey) .map(Utils.memoryStringToMb) .foreach { mem => amMemory = mem } + sparkConf.getOption(amCoresKey) + .map(_.toInt) + .foreach { cores => amCores = cores } } } @@ -140,6 +148,10 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) driverMemory = value args = tail + case ("--cores") :: IntParam(value) :: tail => + driverCores = value + args = tail + case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail => if (args(0) == "--num-workers") { println("--num-workers is deprecated. Use --num-executors instead.") From 43c9392ca05dab54f9a181a5b081a54bb137f469 Mon Sep 17 00:00:00 2001 From: WangTao Date: Tue, 13 Jan 2015 23:23:48 +0800 Subject: [PATCH 2/5] fix compile error --- .../org/apache/spark/deploy/yarn/ClientArguments.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 0fb825b5670b1..58d5aad234c4c 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -104,8 +104,10 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) amMemory = driverMemory amCores = driverCores } else { - if (sparkConf.contains(driverMemOverheadKey, driverCoresKey)) { - println(s"$driverMemOverheadKey is set but does not apply in client mode.") + for (key <- Seq(driverMemOverheadKey, driverCoresKey)) { + if (sparkConf.contains(key)) { + println(s"$key is set but does not apply in client mode.") + } } sparkConf.getOption(amMemKey) .map(Utils.memoryStringToMb) From d86557c25fd2d63773c6c361346de00b916b6636 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Wed, 14 Jan 2015 10:41:49 +0800 Subject: [PATCH 3/5] some comments amend --- .../apache/spark/deploy/ClientArguments.scala | 6 +-- .../spark/deploy/SparkSubmitArguments.scala | 2 + docs/configuration.md | 15 +++++-- docs/running-on-yarn.md | 5 ++- .../spark/deploy/yarn/ClientArguments.scala | 39 ++++++++++--------- 5 files changed, 39 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala index 2e1e52906ceeb..e5873ce724b9f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala @@ -23,7 +23,7 @@ import scala.collection.mutable.ListBuffer import org.apache.log4j.Level -import org.apache.spark.util.MemoryParam +import org.apache.spark.util.{IntParam, MemoryParam} /** * Command-line parser for the driver client. @@ -51,8 +51,8 @@ private[spark] class ClientArguments(args: Array[String]) { parse(args.toList) def parse(args: List[String]): Unit = args match { - case ("--cores" | "-c") :: value :: tail => - cores = value.toInt + case ("--cores" | "-c") :: IntParam(value) :: tail => + cores = value parse(tail) case ("--memory" | "-m") :: MemoryParam(value) :: tail => diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 5eee68a8b3ccb..04c472573d2de 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -404,6 +404,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St | --total-executor-cores NUM Total cores for all executors. | | YARN-only: + | --driver-cores NUM Number of cores used by the driver, only in cluster mode + | (Default: 1). | --executor-cores NUM Number of cores per executor (Default: 1). | --queue QUEUE_NAME The YARN queue to submit to (Default: "default"). | --num-executors NUM Number of executors to launch (Default: 2). diff --git a/docs/configuration.md b/docs/configuration.md index f292bfbb7dcd6..3d4347010d015 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -102,11 +102,10 @@ of the most common options to set are: - spark.executor.memory - 512m + spark.driver.cores + 1 - Amount of memory to use per executor process, in the same format as JVM memory strings - (e.g. 512m, 2g). + Number of cores to use for the driver process, only in cluster mode. @@ -117,6 +116,14 @@ of the most common options to set are: (e.g. 512m, 2g). + + spark.executor.memory + 512m + + Amount of memory to use per executor process, in the same format as JVM memory strings + (e.g. 512m, 2g). + + spark.driver.maxResultSize 1g diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 567cc471db850..68ab127bcf087 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -33,8 +33,9 @@ Most of the configs are the same for Spark on YARN as for other deployment modes spark.driver.cores 1 - Number of cores to use for the YARN Application Master in cluster mode. - In client mode, use spark.yarn.am.cores instead. + Number of cores used by the driver in YARN cluster mode. + Since the driver is run in the same JVM as the YARN Application Master in cluster mode, this also controls the cores used by the YARN AM. + In client mode, use spark.yarn.am.cores to control the number of cores used by the YARN AM instead. diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 58d5aad234c4c..41abe4e05b839 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -213,24 +213,25 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) private def getUsageMessage(unknownParam: List[String] = null): String = { val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else "" message + """ - |Usage: org.apache.spark.deploy.yarn.Client [options] - |Options: - | --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster - | mode) - | --class CLASS_NAME Name of your application's main class (required) - | --arg ARG Argument to be passed to your application's main class. - | Multiple invocations are possible, each will be passed in order. - | --num-executors NUM Number of executors to start (Default: 2) - | --executor-cores NUM Number of cores for the executors (Default: 1). - | --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512 Mb) - | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G) - | --name NAME The name of your application (Default: Spark) - | --queue QUEUE The hadoop queue to use for allocation requests (Default: - | 'default') - | --addJars jars Comma separated list of local jars that want SparkContext.addJar - | to work with. - | --files files Comma separated list of files to be distributed with the job. - | --archives archives Comma separated list of archives to be distributed with the job. - """.stripMargin + |Usage: org.apache.spark.deploy.yarn.Client [options] + |Options: + | --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster + | mode) + | --class CLASS_NAME Name of your application's main class (required) + | --arg ARG Argument to be passed to your application's main class. + | Multiple invocations are possible, each will be passed in order. + | --num-executors NUM Number of executors to start (Default: 2) + | --executor-cores NUM Number of cores for the executors (Default: 1). + | --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512 Mb) + | --cores NUM Number of cores used by the driver (Default: 1). + | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G) + | --name NAME The name of your application (Default: Spark) + | --queue QUEUE The hadoop queue to use for allocation requests (Default: + | 'default') + | --addJars jars Comma separated list of local jars that want SparkContext.addJar + | to work with. + | --files files Comma separated list of files to be distributed with the job. + | --archives archives Comma separated list of archives to be distributed with the job. + """.stripMargin } } From b255795ad487542d177fd9384b10e384feb9e34f Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Wed, 14 Jan 2015 10:46:43 +0800 Subject: [PATCH 4/5] indet thing --- .../spark/deploy/yarn/ClientArguments.scala | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 41abe4e05b839..633cf8440c0a9 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -213,25 +213,25 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) private def getUsageMessage(unknownParam: List[String] = null): String = { val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else "" message + """ - |Usage: org.apache.spark.deploy.yarn.Client [options] - |Options: - | --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster - | mode) - | --class CLASS_NAME Name of your application's main class (required) - | --arg ARG Argument to be passed to your application's main class. - | Multiple invocations are possible, each will be passed in order. - | --num-executors NUM Number of executors to start (Default: 2) - | --executor-cores NUM Number of cores for the executors (Default: 1). - | --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512 Mb) - | --cores NUM Number of cores used by the driver (Default: 1). - | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G) - | --name NAME The name of your application (Default: Spark) - | --queue QUEUE The hadoop queue to use for allocation requests (Default: - | 'default') - | --addJars jars Comma separated list of local jars that want SparkContext.addJar - | to work with. - | --files files Comma separated list of files to be distributed with the job. - | --archives archives Comma separated list of archives to be distributed with the job. - """.stripMargin + |Usage: org.apache.spark.deploy.yarn.Client [options] + |Options: + | --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster + | mode) + | --class CLASS_NAME Name of your application's main class (required) + | --arg ARG Argument to be passed to your application's main class. + | Multiple invocations are possible, each will be passed in order. + | --num-executors NUM Number of executors to start (Default: 2) + | --executor-cores NUM Number of cores for the executors (Default: 1). + | --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512 Mb) + | --cores NUM Number of cores used by the driver (Default: 1). + | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G) + | --name NAME The name of your application (Default: Spark) + | --queue QUEUE The hadoop queue to use for allocation requests (Default: + | 'default') + | --addJars jars Comma separated list of local jars that want SparkContext.addJar + | to work with. + | --files files Comma separated list of files to be distributed with the job. + | --archives archives Comma separated list of archives to be distributed with the job. + """.stripMargin } } From 01419d3b0b4bdf79f27f889af39cdeffb911886d Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Thu, 15 Jan 2015 12:17:46 +0800 Subject: [PATCH 5/5] amend the args name --- .../main/scala/org/apache/spark/deploy/SparkSubmit.scala | 2 +- .../org/apache/spark/deploy/yarn/ClientArguments.scala | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 1c6f810c6338c..050ba91eb2bc3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -200,7 +200,7 @@ object SparkSubmit { // Yarn cluster only OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name"), OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"), - OptionAssigner(args.driverCores, YARN, CLUSTER, clOption = "--cores"), + OptionAssigner(args.driverCores, YARN, CLUSTER, clOption = "--driver-cores"), OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"), OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"), OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = "--executor-memory"), diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 633cf8440c0a9..a6342cf70921c 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -150,7 +150,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) driverMemory = value args = tail - case ("--cores") :: IntParam(value) :: tail => + case ("--driver-cores") :: IntParam(value) :: tail => driverCores = value args = tail @@ -212,7 +212,8 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) private def getUsageMessage(unknownParam: List[String] = null): String = { val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else "" - message + """ + message + + """ |Usage: org.apache.spark.deploy.yarn.Client [options] |Options: | --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster @@ -223,7 +224,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) | --num-executors NUM Number of executors to start (Default: 2) | --executor-cores NUM Number of cores for the executors (Default: 1). | --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512 Mb) - | --cores NUM Number of cores used by the driver (Default: 1). + | --driver-cores NUM Number of cores used by the driver (Default: 1). | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G) | --name NAME The name of your application (Default: Spark) | --queue QUEUE The hadoop queue to use for allocation requests (Default: @@ -232,6 +233,6 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) | to work with. | --files files Comma separated list of files to be distributed with the job. | --archives archives Comma separated list of archives to be distributed with the job. - """.stripMargin + """.stripMargin } }