From 529f4df4e6a5b84bc87d7d5d1a4d1c9409753ac4 Mon Sep 17 00:00:00 2001 From: GuoQiang Li Date: Tue, 5 Aug 2014 22:58:11 +0800 Subject: [PATCH] yarn-cluster check exit code --- .../spark/deploy/yarn/ApplicationMaster.scala | 34 +++++++++++++------ 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 035356d390c80..c707be0028c05 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -70,6 +70,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3))) private var registered = false + private var exitStatus = 0 def run() { // Setup the directories so things go to YARN approved directories rather @@ -127,7 +128,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, // Wait for the user class to finish userThread.join() - System.exit(0) + System.exit(exitStatus) } // add the yarn amIpFilter that Yarn requires for properly securing the UI @@ -151,7 +152,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X val localDirs = Option(System.getenv("YARN_LOCAL_DIRS")) .orElse(Option(System.getenv("LOCAL_DIRS"))) - + localDirs match { case None => throw new Exception("Yarn local dirs can't be empty") case Some(l) => l @@ -166,24 +167,37 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, private def startUserClass(): Thread = { logInfo("Starting the user JAR in a separate Thread") System.setProperty("spark.executor.instances", args.numExecutors.toString) - val mainMethod = Class.forName( - args.userClass, - false, + var stopped = false + case class NoExitsException(exitCode: Int) extends SecurityException + var succeeded = false + val mainMethod = Class.forName(args.userClass, false, Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]]) val t = new Thread { override def run() { - var succeeded = false + System.setSecurityManager(new java.lang.SecurityManager() { + override def checkExit(paramInt: Int) { + if (!stopped) { + throw NoExitsException(paramInt) + } + } + }) try { - // Copy val mainArgs = new Array[String](args.userArgs.size) args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size) mainMethod.invoke(null, mainArgs) - // Some apps have "System.exit(0)" at the end. The user thread will stop here unless - // it has an uncaught exception thrown out. It needs a shutdown hook to set SUCCEEDED. succeeded = true - } finally { + } + catch { + case NoExitsException(exitCode) => + exitStatus = exitCode + if (exitStatus == 0) { + succeeded = true + } + } + finally { logDebug("Finishing main") isLastAMRetry = true + stopped = true if (succeeded) { ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED) } else {