From 28b2906d922bfc8631f54d49951eae478a2a6db2 Mon Sep 17 00:00:00 2001 From: "peng.zhang" Date: Wed, 29 Jun 2016 10:37:55 +0800 Subject: [PATCH 1/5] Add test case for failure in cluster mode --- .../spark/deploy/yarn/YarnClusterSuite.scala | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 6b20dea5908a8..2e6eaa37b5cbd 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -120,6 +120,11 @@ class YarnClusterSuite extends BaseYarnClusterSuite { finalState should be (SparkAppHandle.State.FAILED) } + test("run Spark in yarn-cluster mode falure after sc initialized") { + val finalState = runSpark(false, mainClassName(YarnClusterDriverWithFailure.getClass), Seq("abc")) + finalState should be (SparkAppHandle.State.FAILED) + } + test("run Python application in yarn-client mode") { testPySpark(true) } @@ -259,6 +264,43 @@ private[spark] class SaveExecutorInfo extends SparkListener { } } +private object YarnClusterDriverWithFailure extends Logging with Matchers { + + val WAIT_TIMEOUT_MILLIS = 10000 + + def main(args: Array[String]): Unit = { + if (args.length != 1) { + // scalastyle:off println + System.err.println( + s""" + |Invalid command line: ${args.mkString(" ")} + | + |Usage: YarnClusterDriver [result file] + """.stripMargin) + // scalastyle:on println + System.exit(1) + } + + val sc = new SparkContext(new SparkConf() + .set("spark.extraListeners", classOf[SaveExecutorInfo].getName) + .setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns")) + val conf = sc.getConf + val status = new File(args(0)) + var result = "failure" + try { + val data = sc.parallelize(1 to 4, 4).collect().toSet + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + data should be(Set(1, 2, 3, 4)) + result = "success" + } finally { + Files.write(result, status, StandardCharsets.UTF_8) + sc.stop() + } + + assert(false) + } +} + private object YarnClusterDriver extends Logging with Matchers { val WAIT_TIMEOUT_MILLIS = 10000 From 92ec2048390fc32cfe5693e518eccbb21e6400d0 Mon Sep 17 00:00:00 2001 From: "peng.zhang" Date: Wed, 29 Jun 2016 11:10:30 +0800 Subject: [PATCH 2/5] Fix scala style --- .../scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 2e6eaa37b5cbd..e666a70dcf6bf 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -121,7 +121,8 @@ class YarnClusterSuite extends BaseYarnClusterSuite { } test("run Spark in yarn-cluster mode falure after sc initialized") { - val finalState = runSpark(false, mainClassName(YarnClusterDriverWithFailure.getClass), Seq("abc")) + val finalState = runSpark(false, mainClassName(YarnClusterDriverWithFailure.getClass), + appArgs = Seq("abc")) finalState should be (SparkAppHandle.State.FAILED) } From db65931df5744a28d87df5c0773c79c586d48641 Mon Sep 17 00:00:00 2001 From: "peng.zhang" Date: Wed, 29 Jun 2016 12:44:00 +0800 Subject: [PATCH 3/5] Yarn cluster mode should return correct state for SparkLauncher --- .../org/apache/spark/deploy/yarn/Client.scala | 7 +++++- .../spark/deploy/yarn/YarnClusterSuite.scala | 22 +++++++++---------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 9bb369549d943..4364619230e7c 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -1053,7 +1053,12 @@ private[spark] class Client( case YarnApplicationState.RUNNING => reportLauncherState(SparkAppHandle.State.RUNNING) case YarnApplicationState.FINISHED => - reportLauncherState(SparkAppHandle.State.FINISHED) + report.getFinalApplicationStatus match { + case FinalApplicationStatus.FAILED => + reportLauncherState(SparkAppHandle.State.FAILED) + case _ => + reportLauncherState(SparkAppHandle.State.FINISHED) + } case YarnApplicationState.FAILED => reportLauncherState(SparkAppHandle.State.FAILED) case YarnApplicationState.KILLED => diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index e666a70dcf6bf..fbf93e49e0a07 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -274,9 +274,9 @@ private object YarnClusterDriverWithFailure extends Logging with Matchers { // scalastyle:off println System.err.println( s""" - |Invalid command line: ${args.mkString(" ")} - | - |Usage: YarnClusterDriver [result file] + |Invalid command line: ${args.mkString(" ")} + | + |Usage: YarnClusterDriver [result file] """.stripMargin) // scalastyle:on println System.exit(1) @@ -330,19 +330,19 @@ private object YarnClusterDriver extends Logging with Matchers { sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) data should be (Set(1, 2, 3, 4)) result = "success" + + // Verify that the config archive is correctly placed in the classpath of all containers. + val confFile = "/" + Client.SPARK_CONF_FILE + assert(getClass().getResource(confFile) != null) + val configFromExecutors = sc.parallelize(1 to 4, 4) + .map { _ => Option(getClass().getResource(confFile)).map(_.toString).orNull } + .collect() + assert(configFromExecutors.find(_ == null) === None) } finally { Files.write(result, status, StandardCharsets.UTF_8) sc.stop() } - // Verify that the config archive is correctly placed in the classpath of all containers. - val confFile = "/" + Client.SPARK_CONF_FILE - assert(getClass().getResource(confFile) != null) - val configFromExecutors = sc.parallelize(1 to 4, 4) - .map { _ => Option(getClass().getResource(confFile)).map(_.toString).orNull } - .collect() - assert(configFromExecutors.find(_ == null) === None) - // verify log urls are present val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo] assert(listeners.size === 1) From 5687d7801c8202187fedb54d0609c92c4e90addf Mon Sep 17 00:00:00 2001 From: "peng.zhang" Date: Wed, 29 Jun 2016 15:11:17 +0800 Subject: [PATCH 4/5] Fix related test case --- .../org/apache/spark/deploy/yarn/YarnClusterSuite.scala | 6 ------ 1 file changed, 6 deletions(-) diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index fbf93e49e0a07..18417826ea9c7 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -373,9 +373,6 @@ private object YarnClusterDriver extends Logging with Matchers { } private object YarnClasspathTest extends Logging { - - var exitCode = 0 - def error(m: String, ex: Throwable = null): Unit = { logError(m, ex) // scalastyle:off println @@ -404,7 +401,6 @@ private object YarnClasspathTest extends Logging { } finally { sc.stop() } - System.exit(exitCode) } private def readResource(resultPath: String): Unit = { @@ -417,8 +413,6 @@ private object YarnClasspathTest extends Logging { } catch { case t: Throwable => error(s"loading test.resource to $resultPath", t) - // set the exit code if not yet set - exitCode = 2 } finally { Files.write(result, new File(resultPath), StandardCharsets.UTF_8) } From d73418672ed895ab5d79adda46adb60064204760 Mon Sep 17 00:00:00 2001 From: "peng.zhang" Date: Fri, 1 Jul 2016 23:03:09 +0800 Subject: [PATCH 5/5] Simplify YarnClusterDriverWithFailure & handle KILLED status --- .../org/apache/spark/deploy/yarn/Client.scala | 2 ++ .../spark/deploy/yarn/YarnClusterSuite.scala | 36 +++---------------- 2 files changed, 6 insertions(+), 32 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 4364619230e7c..01aa12a3c9a71 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -1056,6 +1056,8 @@ private[spark] class Client( report.getFinalApplicationStatus match { case FinalApplicationStatus.FAILED => reportLauncherState(SparkAppHandle.State.FAILED) + case FinalApplicationStatus.KILLED => + reportLauncherState(SparkAppHandle.State.KILLED) case _ => reportLauncherState(SparkAppHandle.State.FINISHED) } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 18417826ea9c7..9085fca1d3cc0 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -120,9 +120,8 @@ class YarnClusterSuite extends BaseYarnClusterSuite { finalState should be (SparkAppHandle.State.FAILED) } - test("run Spark in yarn-cluster mode falure after sc initialized") { - val finalState = runSpark(false, mainClassName(YarnClusterDriverWithFailure.getClass), - appArgs = Seq("abc")) + test("run Spark in yarn-cluster mode failure after sc initialized") { + val finalState = runSpark(false, mainClassName(YarnClusterDriverWithFailure.getClass)) finalState should be (SparkAppHandle.State.FAILED) } @@ -266,39 +265,12 @@ private[spark] class SaveExecutorInfo extends SparkListener { } private object YarnClusterDriverWithFailure extends Logging with Matchers { - - val WAIT_TIMEOUT_MILLIS = 10000 - def main(args: Array[String]): Unit = { - if (args.length != 1) { - // scalastyle:off println - System.err.println( - s""" - |Invalid command line: ${args.mkString(" ")} - | - |Usage: YarnClusterDriver [result file] - """.stripMargin) - // scalastyle:on println - System.exit(1) - } - val sc = new SparkContext(new SparkConf() .set("spark.extraListeners", classOf[SaveExecutorInfo].getName) - .setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns")) - val conf = sc.getConf - val status = new File(args(0)) - var result = "failure" - try { - val data = sc.parallelize(1 to 4, 4).collect().toSet - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - data should be(Set(1, 2, 3, 4)) - result = "success" - } finally { - Files.write(result, status, StandardCharsets.UTF_8) - sc.stop() - } + .setAppName("yarn test with failure")) - assert(false) + throw new Exception("exception after sc initialized") } }