From 556fe01b3d15fca76b9f86142c3f1fef9b130f24 Mon Sep 17 00:00:00 2001 From: xiaowen147 Date: Wed, 30 Sep 2015 10:40:31 +0800 Subject: [PATCH] spark on yarn support priority option --- .../main/scala/org/apache/spark/deploy/SparkSubmit.scala | 2 ++ .../org/apache/spark/deploy/SparkSubmitArguments.scala | 6 ++++++ .../scala/org/apache/spark/deploy/SparkSubmitSuite.scala | 4 ++++ .../org/apache/spark/launcher/SparkSubmitOptionParser.java | 2 ++ .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 1 + .../org/apache/spark/deploy/yarn/ClientArguments.scala | 7 ++++++- .../scheduler/cluster/YarnClientSchedulerBackend.scala | 1 + .../scala/org/apache/spark/deploy/yarn/ClientSuite.scala | 4 +++- 8 files changed, 25 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index ad92f5635af3..39632891346c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -422,6 +422,7 @@ object SparkSubmit { // Yarn client only OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"), + OptionAssigner(args.priority, YARN, CLIENT, sysProp = "spark.yarn.priority"), OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES, sysProp = "spark.executor.instances"), OptionAssigner(args.files, YARN, CLIENT, sysProp = "spark.yarn.dist.files"), @@ -434,6 +435,7 @@ object SparkSubmit { OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"), OptionAssigner(args.driverCores, YARN, CLUSTER, clOption = "--driver-cores"), OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"), + OptionAssigner(args.priority, YARN, CLUSTER, clOption = "--priority"), OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = "--executor-memory"), OptionAssigner(args.executorCores, YARN, CLUSTER, clOption = "--executor-cores"), OptionAssigner(args.files, YARN, CLUSTER, clOption = "--files"), diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 18a1c52ae53f..a7d6237ba363 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -48,6 +48,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S var driverExtraLibraryPath: String = null var driverExtraJavaOptions: String = null var queue: String = null + var priority: String = null var numExecutors: String = null var files: String = null var archives: String = null @@ -293,6 +294,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | driverExtraJavaOptions $driverExtraJavaOptions | supervise $supervise | queue $queue + | priority $priority | numExecutors $numExecutors | files $files | pyFiles $pyFiles @@ -381,6 +383,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S case QUEUE => queue = value + case PRIORITY => + priority = value + case FILES => files = Utils.resolveURIs(value) @@ -538,6 +543,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | --driver-cores NUM Number of cores used by the driver, only in cluster mode | (Default: 1). | --queue QUEUE_NAME The YARN queue to submit to (Default: "default"). + | --priority PRIORITY The priority of your YARN application (Default: 0). | --num-executors NUM Number of executors to launch (Default: 2). | --archives ARCHIVES Comma separated list of archives to be extracted into the | working directory of each executor. diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 1fd470cd3b01..118026a77ee0 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -143,6 +143,7 @@ class SparkSubmitSuite "--jars", "one.jar,two.jar,three.jar", "--driver-memory", "4g", "--queue", "thequeue", + "--priority", "10", "--files", "file1.txt,file2.txt", "--archives", "archive1.txt,archive2.txt", "--num-executors", "6", @@ -159,6 +160,7 @@ class SparkSubmitSuite childArgsStr should include ("--executor-cores 5") childArgsStr should include ("--arg arg1 --arg arg2") childArgsStr should include ("--queue thequeue") + childArgsStr should include ("--priority 10") childArgsStr should include regex ("--jar .*thejar.jar") childArgsStr should include regex ("--addJars .*one.jar,.*two.jar,.*three.jar") childArgsStr should include regex ("--files .*file1.txt,.*file2.txt") @@ -181,6 +183,7 @@ class SparkSubmitSuite "--jars", "one.jar,two.jar,three.jar", "--driver-memory", "4g", "--queue", "thequeue", + "--priority", "10", "--files", "file1.txt,file2.txt", "--archives", "archive1.txt,archive2.txt", "--num-executors", "6", @@ -201,6 +204,7 @@ class SparkSubmitSuite sysProps("spark.executor.memory") should be ("5g") sysProps("spark.executor.cores") should be ("5") sysProps("spark.yarn.queue") should be ("thequeue") + sysProps("spark.yarn.priority") should be ("10") sysProps("spark.executor.instances") should be ("6") sysProps("spark.yarn.dist.files") should include regex (".*file1.txt,.*file2.txt") sysProps("spark.yarn.dist.archives") should include regex (".*archive1.txt,.*archive2.txt") diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java index 6767cc507964..89ea075eb0a6 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java @@ -75,6 +75,7 @@ class SparkSubmitOptionParser { protected final String NUM_EXECUTORS = "--num-executors"; protected final String PRINCIPAL = "--principal"; protected final String QUEUE = "--queue"; + protected final String PRIORITY = "--priority"; /** * This is the canonical list of spark-submit options. Each entry in the array contains the @@ -112,6 +113,7 @@ class SparkSubmitOptionParser { { PROXY_USER }, { PY_FILES }, { QUEUE }, + { PRIORITY }, { REPOSITORIES }, { STATUS }, { TOTAL_EXECUTOR_CORES }, diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 8c53c24a79c4..e379c2f2cc93 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -165,6 +165,7 @@ private[spark] class Client( val appContext = newApp.getApplicationSubmissionContext appContext.setApplicationName(args.appName) appContext.setQueue(args.amQueue) + appContext.setPriority(Priority.newInstance(args.priority)) appContext.setAMContainerSpec(containerContext) appContext.setApplicationType("SPARK") sparkConf.getOption(CONF_SPARK_YARN_APPLICATION_TAGS) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 1165061db21e..8363bfc95058 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -41,7 +41,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) var amMemory: Int = 512 // MB var amCores: Int = 1 var appName: String = "Spark" - var priority = 0 + var priority = sparkConf.getInt("spark.yarn.priority", 0) var principal: String = null var keytab: String = null def isClusterMode: Boolean = userClass != null @@ -201,6 +201,10 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) amQueue = value args = tail + case ("--priority") :: IntParam(value) :: tail => + priority = value + args = tail + case ("--name") :: value :: tail => appName = value args = tail @@ -265,6 +269,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) | --name NAME The name of your application (Default: Spark) | --queue QUEUE The hadoop queue to use for allocation requests (Default: | 'default') + | --priority PRIORITY The priority of your YARN application (Default: 0) | --addJars jars Comma separated list of local jars that want SparkContext.addJar | to work with. | --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index d06d95140438..317c264f7530 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -86,6 +86,7 @@ private[spark] class YarnClientSchedulerBackend( ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"), ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"), ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"), + ("--priority", "SPARK_YARN_PRIORITY", "spark.yarn.priority"), ("--py-files", null, "spark.submit.pyFiles") ) // Warn against the following deprecated environment variables: env var -> suggestion diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index e7f2501e7899..78a479211308 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -183,7 +183,8 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll { .set("spark.yarn.maxAppAttempts", "42") val args = new ClientArguments(Array( "--name", "foo-test-app", - "--queue", "staging-queue"), sparkConf) + "--queue", "staging-queue", + "--priority", "10"), sparkConf) val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse]) @@ -196,6 +197,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll { appContext.getApplicationName should be ("foo-test-app") appContext.getQueue should be ("staging-queue") + appContext.getPriority.getPriority should be (10) appContext.getAMContainerSpec should be (containerLaunchContext) appContext.getApplicationType should be ("SPARK") appContext.getClass.getMethods.filter(_.getName.equals("getApplicationTags")).foreach{ method =>