From 41e8a394cd74e42f2228eb880442cb0d6902f275 Mon Sep 17 00:00:00 2001 From: Kyle Nusbaum Date: Tue, 24 Jun 2014 20:19:16 +0000 Subject: [PATCH 1/9] Testing --- core/src/main/scala/org/apache/spark/SparkConf.scala | 8 ++++++++ .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 5 +++++ 2 files changed, 13 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 8ce4b91cae8ae..04b790668b8dd 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -124,6 +124,14 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { set("spark.home", home) } + /** + * Set the max number of submission retries the Spark client will attempt + * before giving up + */ + def setMaxAppAttempts(max: Int): SparkConf = { + set("spark.maxappattempts", max.toString()) + } + /** Set multiple parameters together */ def setAll(settings: Traversable[(String, String)]) = { this.settings ++= settings diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 82f79d88a3009..a7b8213db6f8c 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -108,6 +108,11 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) appContext.setApplicationId(appId) appContext.setApplicationName(args.appName) + logInfo("CONSTRUCTING APP CONTEXT") + if(sparkConf.get("spark.maxappattempts", false)) { + appContext.setMaxAppAttempts(sparkConf.get("spark.maxappattempts")) + logInfo("Trying a max of %d times.".format(sparkConf.get("spark.maxappattempts"))) + } appContext } From c2a2b69b623a792bc3e7e1e278a2be2668573632 Mon Sep 17 00:00:00 2001 From: Kyle Nusbaum Date: Tue, 1 Jul 2014 20:46:35 +0000 Subject: [PATCH 2/9] Preparing for pull --- .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 7 +++---- .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 4 ++++ 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index a7b8213db6f8c..1861e39c691f5 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -108,10 +108,9 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) appContext.setApplicationId(appId) appContext.setApplicationName(args.appName) - logInfo("CONSTRUCTING APP CONTEXT") - if(sparkConf.get("spark.maxappattempts", false)) { - appContext.setMaxAppAttempts(sparkConf.get("spark.maxappattempts")) - logInfo("Trying a max of %d times.".format(sparkConf.get("spark.maxappattempts"))) + if(!sparkConf.get("spark.maxappattempts", "false").equals("false")) { + appContext.setMaxAppAttempts(sparkConf.getInt("spark.maxappattempts", -1)) + logInfo("Trying a max of %d times.".format(sparkConf.getInt("spark.maxappattempts", -1))) } appContext } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 15f3c4f180ea3..f969761e68f8c 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -81,6 +81,10 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa appContext.setQueue(args.amQueue) appContext.setAMContainerSpec(amContainer) appContext.setApplicationType("SPARK") + if(!sparkConf.get("spark.maxappattempts", "false").equals("false")) { + appContext.setMaxAppAttempts(sparkConf.getInt("spark.maxappattempts", -1)) + logInfo("Got maxattempts. Trying a max of %d times.".format(sparkConf.getInt("spark.maxappattempts", -1))) + } // Memory for the ApplicationMaster. val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource] From 2532b6755ff2876516679b0c90e97fd031a111df Mon Sep 17 00:00:00 2001 From: Kyle Nusbaum Date: Tue, 1 Jul 2014 21:05:15 +0000 Subject: [PATCH 3/9] Cleanup --- .../src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 1 - .../src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 1 - 2 files changed, 2 deletions(-) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 1861e39c691f5..a9af48673a71f 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -110,7 +110,6 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa appContext.setApplicationName(args.appName) if(!sparkConf.get("spark.maxappattempts", "false").equals("false")) { appContext.setMaxAppAttempts(sparkConf.getInt("spark.maxappattempts", -1)) - logInfo("Trying a max of %d times.".format(sparkConf.getInt("spark.maxappattempts", -1))) } appContext } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index f969761e68f8c..54e8f94dde3bd 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -83,7 +83,6 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa appContext.setApplicationType("SPARK") if(!sparkConf.get("spark.maxappattempts", "false").equals("false")) { appContext.setMaxAppAttempts(sparkConf.getInt("spark.maxappattempts", -1)) - logInfo("Got maxattempts. Trying a max of %d times.".format(sparkConf.getInt("spark.maxappattempts", -1))) } // Memory for the ApplicationMaster. From 9a54afdb15deaa0665ee98a14686406a56a91a33 Mon Sep 17 00:00:00 2001 From: Kyle Nusbaum Date: Tue, 1 Jul 2014 22:18:51 +0000 Subject: [PATCH 4/9] Updating to more stable code. --- core/src/main/scala/org/apache/spark/SparkConf.scala | 2 ++ .../src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 5 +++-- .../src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 5 +++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 04b790668b8dd..2bfbc50a46072 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -175,6 +175,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { getOption(key).map(_.toInt).getOrElse(defaultValue) } + def getIntOption(key: String): Option[Int] = getOption(key).map(_.toInt) + /** Get a parameter as a long, falling back to a default if not set */ def getLong(key: String, defaultValue: Long): Long = { getOption(key).map(_.toLong).getOrElse(defaultValue) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index a9af48673a71f..09b83ec610472 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -108,8 +108,9 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) appContext.setApplicationId(appId) appContext.setApplicationName(args.appName) - if(!sparkConf.get("spark.maxappattempts", "false").equals("false")) { - appContext.setMaxAppAttempts(sparkConf.getInt("spark.maxappattempts", -1)) + sparkConf.getIntOption("spark.maxappattempts") match { + case Some(v) => appContext.setMaxAppAttempts(v) + case None => logDebug("Not setting max app attempts.") } appContext } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 54e8f94dde3bd..3ec2057d6b8c1 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -81,8 +81,9 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa appContext.setQueue(args.amQueue) appContext.setAMContainerSpec(amContainer) appContext.setApplicationType("SPARK") - if(!sparkConf.get("spark.maxappattempts", "false").equals("false")) { - appContext.setMaxAppAttempts(sparkConf.getInt("spark.maxappattempts", -1)) + sparkConf.getIntOption("spark.maxappattempts") match { + case Some(v) => appContext.setMaxAppAttempts(v) + case None => logDebug("Not setting max app attempts.") } // Memory for the ApplicationMaster. From c0faef0e8168ab66dfe506414b36063ae0a56c96 Mon Sep 17 00:00:00 2001 From: Kyle Nusbaum Date: Fri, 25 Jul 2014 18:57:26 +0000 Subject: [PATCH 5/9] Making changes specified by tgravescs --- .../main/scala/org/apache/spark/SparkConf.scala | 10 ---------- .../org/apache/spark/deploy/yarn/Client.scala | 4 ---- .../org/apache/spark/deploy/yarn/Client.scala | 14 +++++++++----- 3 files changed, 9 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 2bfbc50a46072..8ce4b91cae8ae 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -124,14 +124,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { set("spark.home", home) } - /** - * Set the max number of submission retries the Spark client will attempt - * before giving up - */ - def setMaxAppAttempts(max: Int): SparkConf = { - set("spark.maxappattempts", max.toString()) - } - /** Set multiple parameters together */ def setAll(settings: Traversable[(String, String)]) = { this.settings ++= settings @@ -175,8 +167,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { getOption(key).map(_.toInt).getOrElse(defaultValue) } - def getIntOption(key: String): Option[Int] = getOption(key).map(_.toInt) - /** Get a parameter as a long, falling back to a default if not set */ def getLong(key: String, defaultValue: Long): Long = { getOption(key).map(_.toLong).getOrElse(defaultValue) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 09b83ec610472..82f79d88a3009 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -108,10 +108,6 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) appContext.setApplicationId(appId) appContext.setApplicationName(args.appName) - sparkConf.getIntOption("spark.maxappattempts") match { - case Some(v) => appContext.setMaxAppAttempts(v) - case None => logDebug("Not setting max app attempts.") - } appContext } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 3ec2057d6b8c1..2f575f3716775 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -81,7 +81,11 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa appContext.setQueue(args.amQueue) appContext.setAMContainerSpec(amContainer) appContext.setApplicationType("SPARK") - sparkConf.getIntOption("spark.maxappattempts") match { + sparkConf.getOption("spark.maxappattempts").map(_.toInt) match { + case Some(v) => logInfo("Setting maxAppAttemtps!") + case None => logInfo("*Not* setting maxAppAttempts! WOWOWOWO") + } + sparkConf.getOption("spark.maxappattempts").map(_.toInt) match { case Some(v) => appContext.setMaxAppAttempts(v) case None => logDebug("Not setting max app attempts.") } @@ -118,10 +122,10 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa def calculateAMMemory(newApp: GetNewApplicationResponse) :Int = { // TODO: Need a replacement for the following code to fix -Xmx? - // val minResMemory: Int = newApp.getMinimumResourceCapability().getMemory() - // var amMemory = ((args.amMemory / minResMemory) * minResMemory) + - // ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) - - // memoryOverhead ) + val minResMemory: Int = newApp.getMinimumResourceCapability().getMemory() + var amMemory = ((args.amMemory / minResMemory) * minResMemory) + + ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) - + memoryOverhead ) args.amMemory } From 89b0e584018a1cac33ad31b5c911da06b6dcdbd4 Mon Sep 17 00:00:00 2001 From: Kyle Nusbaum Date: Fri, 25 Jul 2014 19:02:12 +0000 Subject: [PATCH 6/9] cleanup --- .../scala/org/apache/spark/deploy/yarn/Client.scala | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 2f575f3716775..093662720a268 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -81,10 +81,6 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa appContext.setQueue(args.amQueue) appContext.setAMContainerSpec(amContainer) appContext.setApplicationType("SPARK") - sparkConf.getOption("spark.maxappattempts").map(_.toInt) match { - case Some(v) => logInfo("Setting maxAppAttemtps!") - case None => logInfo("*Not* setting maxAppAttempts! WOWOWOWO") - } sparkConf.getOption("spark.maxappattempts").map(_.toInt) match { case Some(v) => appContext.setMaxAppAttempts(v) case None => logDebug("Not setting max app attempts.") @@ -122,10 +118,10 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa def calculateAMMemory(newApp: GetNewApplicationResponse) :Int = { // TODO: Need a replacement for the following code to fix -Xmx? - val minResMemory: Int = newApp.getMinimumResourceCapability().getMemory() - var amMemory = ((args.amMemory / minResMemory) * minResMemory) + - ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) - - memoryOverhead ) + // val minResMemory: Int = newApp.getMinimumResourceCapability().getMemory() + // var amMemory = ((args.amMemory / minResMemory) * minResMemory) + + // ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) - + // memoryOverhead ) args.amMemory } From 256d0e9b0e16bccf17ac3189f999e51aa05763db Mon Sep 17 00:00:00 2001 From: Kyle Nusbaum Date: Mon, 18 Aug 2014 18:46:41 +0000 Subject: [PATCH 7/9] Made maxappattempts a yarn-specific option and added documentation for it. --- docs/running-on-yarn.md | 8 ++++++++ .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 5d8d603aa3e37..4449daff8d341 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -106,6 +106,14 @@ Most of the configs are the same for Spark on YARN as for other deployment modes set this configuration to "hdfs:///some/path". + + spark.yarn.maxappattempts + YARN Default + + The maximum number of attempts that will be made to submit the application. + See this YARN Doc. + + # Launching Spark on YARN diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 093662720a268..4278831f6568d 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -81,7 +81,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa appContext.setQueue(args.amQueue) appContext.setAMContainerSpec(amContainer) appContext.setApplicationType("SPARK") - sparkConf.getOption("spark.maxappattempts").map(_.toInt) match { + sparkConf.getOption("spark.yarn.maxappattempts").map(_.toInt) match { case Some(v) => appContext.setMaxAppAttempts(v) case None => logDebug("Not setting max app attempts.") } From c5fa23ee1247ef664b6a401e444c0f14d9f9d6d8 Mon Sep 17 00:00:00 2001 From: Kyle Nusbaum Date: Mon, 18 Aug 2014 18:56:34 +0000 Subject: [PATCH 8/9] Better log message. --- .../src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 4278831f6568d..c986ca043aebc 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -83,7 +83,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa appContext.setApplicationType("SPARK") sparkConf.getOption("spark.yarn.maxappattempts").map(_.toInt) match { case Some(v) => appContext.setMaxAppAttempts(v) - case None => logDebug("Not setting max app attempts.") + case None => logDebug("Not setting maxappattemtps. Cluster default will be used.") } // Memory for the ApplicationMaster. From f84879792caba054df5c6804582dcd494998e060 Mon Sep 17 00:00:00 2001 From: Kyle Nusbaum Date: Mon, 18 Aug 2014 18:58:16 +0000 Subject: [PATCH 9/9] Typo fix --- .../src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index c986ca043aebc..80255f9ce3acc 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -83,7 +83,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa appContext.setApplicationType("SPARK") sparkConf.getOption("spark.yarn.maxappattempts").map(_.toInt) match { case Some(v) => appContext.setMaxAppAttempts(v) - case None => logDebug("Not setting maxappattemtps. Cluster default will be used.") + case None => logDebug("Not setting spark.yarn.maxappattempts. Cluster default will be used.") } // Memory for the ApplicationMaster.