From addcecb8f6026299fc197678869e857031b6617c Mon Sep 17 00:00:00 2001 From: John Zhao Date: Mon, 16 Jun 2014 15:55:49 -0700 Subject: [PATCH 1/2] [SPARK-1516]Throw exception in yarn client instead of run system.exit directly. All the changes is in the package of "org.apache.spark.deploy.yarn": 1) Throw IllegalArgumentException in ClinetArguments instead of exit directly. 2) In Client's main method, if exception is caught, it will exit with code 1, otherwise exit with code 0. 3) In YarnClientSchedulerBackend's start method, if IllegalArgumentException is caught, it will exit with code 1, otherwise throw that exception. 4) Fix some message typo in the Client.scala After the fix, if user integrate the spark yarn client into their applications, when the argument is wrong or the running is finished, the application won't be terminated. --- .../org/apache/spark/deploy/yarn/Client.scala | 33 +++++++++++------ .../spark/deploy/yarn/ClientArguments.scala | 16 ++++----- .../cluster/YarnClientSchedulerBackend.scala | 15 +++++--- .../org/apache/spark/deploy/yarn/Client.scala | 35 ++++++++++++------- 4 files changed, 63 insertions(+), 36 deletions(-) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 9e5e2d5ceaca1..6db6bef3c3ca6 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -93,7 +93,6 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) def run() { val appId = runApp() monitorApplication(appId) - System.exit(0) } def validateArgs() = { @@ -109,7 +108,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) ).foreach { case(cond, errStr) => if (cond) { logError(errStr) - args.printUsageAndExit(1) + throw new IllegalArgumentException(args.getUsageMessage()) } } } @@ -135,17 +134,19 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) def verifyClusterResources(app: GetNewApplicationResponse) = { val maxMem = app.getMaximumResourceCapability().getMemory() - logInfo("Max mem capabililty of a single resource in this cluster " + maxMem) + logInfo("Max mem capability of a single resource in this cluster " + maxMem) // If we have requested more then the clusters max for a single resource then exit. if (args.workerMemory > maxMem) { - logError("the worker size is to large to run on this cluster " + args.workerMemory) - System.exit(1) + val errorMessage = s"the worker size is too large to run on this cluster ${args.workerMemory}" + logError(errorMessage) + throw new IllegalArgumentException(errorMessage) } val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD if (amMem > maxMem) { - logError("AM size is to large to run on this cluster " + amMem) - System.exit(1) + val errorMessage = s"AM size is too large to run on this cluster $amMem" + logError(errorMessage) + throw new IllegalArgumentException(errorMessage) } // We could add checks to make sure the entire cluster has enough resources but that involves @@ -229,8 +230,9 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) val delegTokenRenewer = Master.getMasterPrincipal(conf) if (UserGroupInformation.isSecurityEnabled()) { if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) { - logError("Can't get Master Kerberos principal for use as renewer") - System.exit(1) + val errorMessage = "Can't get Master Kerberos principal for use as renewer" + logError(errorMessage) + throw new IllegalArgumentException(errorMessage) } } val dst = new Path(fs.getHomeDirectory(), appStagingDir) @@ -475,9 +477,18 @@ object Client { System.setProperty("SPARK_YARN_MODE", "true") val sparkConf = new SparkConf - val args = new ClientArguments(argStrings, sparkConf) - new Client(args, sparkConf).run + try { + val args = new ClientArguments(argStrings, sparkConf) + new Client(args, sparkConf).run() + } catch { + case e: Exception => { + Console.err.println(e.getMessage) + System.exit(1) + } + } + + System.exit(0) } // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 1419f215c78e5..6bbe87d68024d 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -109,11 +109,11 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { case Nil => if (userJar == null || userClass == null) { - printUsageAndExit(1) + throw new IllegalArgumentException(getUsageMessage()) } case _ => - printUsageAndExit(1, args) + throw new IllegalArgumentException(getUsageMessage(args)) } } @@ -122,11 +122,10 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { } - def printUsageAndExit(exitCode: Int, unknownParam: Any = null) { - if (unknownParam != null) { - System.err.println("Unknown/unsupported param " + unknownParam) - } - System.err.println( + def getUsageMessage(unknownParam: Any = null): String = { + val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam \n" else "" + + message + "Usage: org.apache.spark.deploy.yarn.Client [options] \n" + "Options:\n" + " --jar JAR_PATH Path to your application's JAR file (required)\n" + @@ -143,8 +142,7 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { " --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" + " --files files Comma separated list of files to be distributed with the job.\n" + " --archives archives Comma separated list of archives to be distributed with the job." - ) - System.exit(exitCode) + } } diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 22e55e0c60647..5785ddc0cb2dc 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -73,10 +73,17 @@ private[spark] class YarnClientSchedulerBackend( .foreach { case (optName, optParam) => addArg(optName, optParam, argsArrayBuf) } logDebug("ClientArguments called with: " + argsArrayBuf) - val args = new ClientArguments(argsArrayBuf.toArray, conf) - client = new Client(args, conf) - appId = client.runApp() - waitForApp() + try { + val args = new ClientArguments(argsArrayBuf.toArray, conf) + client = new Client(args, conf) + appId = client.runApp() + waitForApp() + } catch { + case e: IllegalArgumentException => { + Console.err.println(e.getMessage) + System.exit(1) + } + } } def waitForApp() { diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 6ff8c6c3b2497..912bd8eae07d6 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -113,7 +113,6 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) def run() { val appId = runApp() monitorApplication(appId) - System.exit(0) } // TODO(harvey): This could just go in ClientArguments. @@ -130,7 +129,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) ).foreach { case(cond, errStr) => if (cond) { logError(errStr) - args.printUsageAndExit(1) + throw new IllegalArgumentException(args.getUsageMessage()) } } } @@ -160,15 +159,18 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) // If we have requested more then the clusters max for a single resource then exit. if (args.workerMemory > maxMem) { - logError("Required worker memory (%d MB), is above the max threshold (%d MB) of this cluster.". - format(args.workerMemory, maxMem)) - System.exit(1) + val errorMessage = + "Required worker memory (%d MB), is above the max threshold (%d MB) of this cluster." + .format(args.workerMemory, maxMem) + logError(errorMessage) + throw new IllegalArgumentException(errorMessage) } val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD if (amMem > maxMem) { - logError("Required AM memory (%d) is above the max threshold (%d) of this cluster". - format(args.amMemory, maxMem)) - System.exit(1) + val errorMessage = "Required AM memory (%d) is above the max threshold (%d) of this cluster" + .format(args.amMemory, maxMem) + logError(errorMessage) + throw new IllegalArgumentException(errorMessage) } // We could add checks to make sure the entire cluster has enough resources but that involves @@ -244,8 +246,9 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) val delegTokenRenewer = Master.getMasterPrincipal(conf) if (UserGroupInformation.isSecurityEnabled()) { if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) { - logError("Can't get Master Kerberos principal for use as renewer") - System.exit(1) + val errorMessage = "Can't get Master Kerberos principal for use as renewer" + logError(errorMessage) + throw new IllegalArgumentException(errorMessage) } } val dst = new Path(fs.getHomeDirectory(), appStagingDir) @@ -489,9 +492,17 @@ object Client { // see Client#setupLaunchEnv(). System.setProperty("SPARK_YARN_MODE", "true") val sparkConf = new SparkConf() - val args = new ClientArguments(argStrings, sparkConf) + try { + val args = new ClientArguments(argStrings, sparkConf) + new Client(args, sparkConf).run() + } catch { + case e: Exception => { + Console.err.println(e.getMessage) + System.exit(1) + } + } - new Client(args, sparkConf).run() + System.exit(0) } // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps From 00144b5421cff8b42f2a6d47181471387e07fe6f Mon Sep 17 00:00:00 2001 From: John Zhao Date: Tue, 17 Jun 2014 15:30:08 -0700 Subject: [PATCH 2/2] use e.printStackTrace() to replace "Console.err.println(e.getMessage)" so that client console can get more useful information when somthing is wrong. --- .../src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 +- .../spark/scheduler/cluster/YarnClientSchedulerBackend.scala | 2 +- .../src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 6db6bef3c3ca6..9998210997bee 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -483,7 +483,7 @@ object Client { new Client(args, sparkConf).run() } catch { case e: Exception => { - Console.err.println(e.getMessage) + e.printStackTrace() System.exit(1) } } diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 5785ddc0cb2dc..ac786afe1ff22 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -80,7 +80,7 @@ private[spark] class YarnClientSchedulerBackend( waitForApp() } catch { case e: IllegalArgumentException => { - Console.err.println(e.getMessage) + e.printStackTrace() System.exit(1) } } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 912bd8eae07d6..1bfcaf657c94a 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -497,7 +497,7 @@ object Client { new Client(args, sparkConf).run() } catch { case e: Exception => { - Console.err.println(e.getMessage) + e.printStackTrace() System.exit(1) } }