From 55e66b21cdcd68861db0f1045186048c54b13153 Mon Sep 17 00:00:00 2001 From: sharkdtu Date: Thu, 7 Jul 2016 19:04:11 +0800 Subject: [PATCH 1/7] Fix bugs for "Can not get user config when calling SparkHadoopUtil.get.conf in other places, such as DataSourceStrategy" --- .../spark/deploy/yarn/ApplicationMaster.scala | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index b6f45dd63473b..6250ce24efe24 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -50,14 +50,6 @@ private[spark] class ApplicationMaster( client: YarnRMClient) extends Logging { - // Load the properties file with the Spark configuration and set entries as system properties, - // so that user code run inside the AM also has access to them. - if (args.propertiesFile != null) { - Utils.getPropertiesFromFile(args.propertiesFile).foreach { case (k, v) => - sys.props(k) = v - } - } - // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be // optimal as more containers are available. Might need to handle this better. @@ -743,6 +735,15 @@ object ApplicationMaster extends Logging { def main(args: Array[String]): Unit = { SignalUtils.registerLogger(log) val amArgs = new ApplicationMasterArguments(args) + + // Load the properties file with the Spark configuration and set entries as system properties, + // so that user code run inside the AM also has access to them. + if (amArgs.propertiesFile != null) { + Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach { case (k, v) => + sys.props(k) = v + } + } + SparkHadoopUtil.get.runAsSparkUser { () => master = new ApplicationMaster(amArgs, new YarnRMClient) System.exit(master.run()) From 35593867f540dd24c67aeb4d9835e429087ba7b4 Mon Sep 17 00:00:00 2001 From: sharkd Date: Fri, 8 Jul 2016 06:45:42 +0800 Subject: [PATCH 2/7] fix style --- .../scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 6250ce24efe24..5087d0083a23d 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -743,7 +743,6 @@ object ApplicationMaster extends Logging { sys.props(k) = v } } - SparkHadoopUtil.get.runAsSparkUser { () => master = new ApplicationMaster(amArgs, new YarnRMClient) System.exit(master.run()) From d5bcab9ff4b7319aa7fd23017de96c7c6e43be58 Mon Sep 17 00:00:00 2001 From: sharkd Date: Fri, 8 Jul 2016 21:46:18 +0800 Subject: [PATCH 3/7] extend the comment --- .../scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 5087d0083a23d..c371ad616a47a 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -738,6 +738,7 @@ object ApplicationMaster extends Logging { // Load the properties file with the Spark configuration and set entries as system properties, // so that user code run inside the AM also has access to them. + // Note: we must do this before SparkHadoopUtil instantiated if (amArgs.propertiesFile != null) { Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach { case (k, v) => sys.props(k) = v From a90fe481fe50a4d4e67320fad6628395493d4a57 Mon Sep 17 00:00:00 2001 From: sharkd Date: Tue, 12 Jul 2016 00:07:15 +0800 Subject: [PATCH 4/7] add a unit test --- .../spark/deploy/yarn/YarnClusterSuite.scala | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 9085fca1d3cc0..de878663cd05c 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -32,6 +32,7 @@ import org.scalatest.Matchers import org.scalatest.concurrent.Eventually._ import org.apache.spark._ +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.launcher._ import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationStart, @@ -106,6 +107,10 @@ class YarnClusterSuite extends BaseYarnClusterSuite { )) } + test("run Spark in yarn-cluster mode with using SparkHadoopUtil.conf") { + testYarnAppUseSparkHadoopUtilConf() + } + test("run Spark in yarn-client mode with additional jar") { testWithAddJar(true) } @@ -181,6 +186,15 @@ class YarnClusterSuite extends BaseYarnClusterSuite { checkResult(finalState, result) } + private def testYarnAppUseSparkHadoopUtilConf(): Unit = { + val result = File.createTempFile("result", null, tempDir) + val finalState = runSpark(false, + mainClassName(YarnClusterDriverUseSparkHadoopUtilConf.getClass), + appArgs = Seq("spark.hadoop.key=value", result.getAbsolutePath()), + extraConf = Map(("spark.hadoop.key", "value"))) + checkResult(finalState, result) + } + private def testWithAddJar(clientMode: Boolean): Unit = { val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir) val driverResult = File.createTempFile("driver", null, tempDir) @@ -274,6 +288,37 @@ private object YarnClusterDriverWithFailure extends Logging with Matchers { } } +private object YarnClusterDriverUseSparkHadoopUtilConf extends Logging with Matchers { + def main(args: Array[String]): Unit = { + if (args.length != 2) { + // scalastyle:off println + System.err.println( + s""" + |Invalid command line: ${args.mkString(" ")} + | + |Usage: YarnClusterDriverUseSparkHadoopUtilConf [propertyKey=value] [result file] + """.stripMargin) + // scalastyle:on println + System.exit(1) + } + + val sc = new SparkContext(new SparkConf() + .set("spark.extraListeners", classOf[SaveExecutorInfo].getName) + .setAppName("yarn test using SparkHadoopUtil's conf")) + + val propertyKeyValue = args(0).split("=") + val status = new File(args(1)) + var result = "failure" + try { + SparkHadoopUtil.get.conf.get(propertyKeyValue(0)) should be (propertyKeyValue(1)) + result = "success" + } finally { + Files.write(result, status, StandardCharsets.UTF_8) + sc.stop() + } + } +} + private object YarnClusterDriver extends Logging with Matchers { val WAIT_TIMEOUT_MILLIS = 10000 From 352f8527e436a919e74b107de51c6dee24c63b73 Mon Sep 17 00:00:00 2001 From: sharkd Date: Tue, 12 Jul 2016 00:49:56 +0800 Subject: [PATCH 5/7] fix style --- .../org/apache/spark/deploy/yarn/YarnClusterSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index de878663cd05c..93cb00b00bf15 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -191,7 +191,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite { val finalState = runSpark(false, mainClassName(YarnClusterDriverUseSparkHadoopUtilConf.getClass), appArgs = Seq("spark.hadoop.key=value", result.getAbsolutePath()), - extraConf = Map(("spark.hadoop.key", "value"))) + extraConf = Map("spark.hadoop.key" -> "value")) checkResult(finalState, result) } @@ -294,8 +294,8 @@ private object YarnClusterDriverUseSparkHadoopUtilConf extends Logging with Matc // scalastyle:off println System.err.println( s""" - |Invalid command line: ${args.mkString(" ")} - | + |Invalid command line: ${args.mkString(" ")} + | |Usage: YarnClusterDriverUseSparkHadoopUtilConf [propertyKey=value] [result file] """.stripMargin) // scalastyle:on println From bd7cf696c11f4e74de9f96d104e4df6caa305357 Mon Sep 17 00:00:00 2001 From: sharkd Date: Tue, 12 Jul 2016 01:05:49 +0800 Subject: [PATCH 6/7] fix the unit test --- .../scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 93cb00b00bf15..9ba9abf18a1e2 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -310,7 +310,7 @@ private object YarnClusterDriverUseSparkHadoopUtilConf extends Logging with Matc val status = new File(args(1)) var result = "failure" try { - SparkHadoopUtil.get.conf.get(propertyKeyValue(0)) should be (propertyKeyValue(1)) + SparkHadoopUtil.get.conf.get(propertyKeyValue(0).drop(13)) should be (propertyKeyValue(1)) result = "success" } finally { Files.write(result, status, StandardCharsets.UTF_8) From d86ac9c532896cd84474f194279e216081970a8c Mon Sep 17 00:00:00 2001 From: sharkd Date: Tue, 12 Jul 2016 08:48:10 +0800 Subject: [PATCH 7/7] make the unit test more clearly --- .../org/apache/spark/deploy/yarn/YarnClusterSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 9ba9abf18a1e2..874e3045b4058 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -190,7 +190,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite { val result = File.createTempFile("result", null, tempDir) val finalState = runSpark(false, mainClassName(YarnClusterDriverUseSparkHadoopUtilConf.getClass), - appArgs = Seq("spark.hadoop.key=value", result.getAbsolutePath()), + appArgs = Seq("key=value", result.getAbsolutePath()), extraConf = Map("spark.hadoop.key" -> "value")) checkResult(finalState, result) } @@ -296,7 +296,7 @@ private object YarnClusterDriverUseSparkHadoopUtilConf extends Logging with Matc s""" |Invalid command line: ${args.mkString(" ")} | - |Usage: YarnClusterDriverUseSparkHadoopUtilConf [propertyKey=value] [result file] + |Usage: YarnClusterDriverUseSparkHadoopUtilConf [hadoopConfKey=value] [result file] """.stripMargin) // scalastyle:on println System.exit(1) @@ -306,11 +306,11 @@ private object YarnClusterDriverUseSparkHadoopUtilConf extends Logging with Matc .set("spark.extraListeners", classOf[SaveExecutorInfo].getName) .setAppName("yarn test using SparkHadoopUtil's conf")) - val propertyKeyValue = args(0).split("=") + val kv = args(0).split("=") val status = new File(args(1)) var result = "failure" try { - SparkHadoopUtil.get.conf.get(propertyKeyValue(0).drop(13)) should be (propertyKeyValue(1)) + SparkHadoopUtil.get.conf.get(kv(0)) should be (kv(1)) result = "success" } finally { Files.write(result, status, StandardCharsets.UTF_8)