From 9a959a10a70e2c4b4562efa6c9a1c2f72a817341 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Tue, 25 Nov 2014 21:15:26 -0800 Subject: [PATCH 1/3] SPARK-3779. yarn spark.yarn.applicationMaster.waitTries config should be changed to a time period --- docs/running-on-yarn.md | 8 +++++--- .../spark/deploy/yarn/ApplicationMaster.scala | 20 ++++++++++--------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 62b317129b72a..0872f18bda14d 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -22,10 +22,12 @@ Most of the configs are the same for Spark on YARN as for other deployment modes - - + + diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 987b3373fb8ff..45184b207a09f 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -333,8 +333,10 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, sparkContextRef.synchronized { var count = 0 val waitTime = 10000L - val numTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 10) - while (sparkContextRef.get() == null && count < numTries && !finished) { + val totalWaitTime = sparkConf.getInt("spark.yarn.applicationMaster.waitTime", 100000) + val deadline = System.currentTimeMillis() + totalWaitTime + + while (sparkContextRef.get() == null && System.currentTimeMillis < deadline && !finished) { logInfo("Waiting for spark context initialization ... " + count) count = count + 1 sparkContextRef.wait(waitTime) @@ -343,7 +345,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, val sparkContext = sparkContextRef.get() if (sparkContext == null) { logError(("SparkContext did not initialize after waiting for %d ms. Please check earlier" - + " log output for errors. Failing the application.").format(numTries * waitTime)) + + " log output for errors. Failing the application.").format(totalWaitTime)) } sparkContext } @@ -357,13 +359,13 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, val hostport = args.userArgs(0) val (driverHost, driverPort) = Utils.parseHostPort(hostport) - // spark driver should already be up since it launched us, but we don't want to + // Spark driver should already be up since it launched us, but we don't want to // wait forever, so wait 100 seconds max to match the cluster mode setting. - // Leave this config unpublished for now. SPARK-3779 to investigating changing - // this config to be time based. - val numTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 1000) + val waitTime = 100 + val totalWaitTime = sparkConf.getInt("spark.yarn.applicationMaster.waitTime", 100000) + val deadline = System.currentTimeMillis + totalWaitTime - while (!driverUp && !finished && count < numTries) { + while (!driverUp && !finished && System.currentTimeMillis < deadline + waitTime) { try { count = count + 1 val socket = new Socket(driverHost, driverPort) @@ -374,7 +376,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, case e: Exception => logError("Failed to connect to driver at %s:%s, retrying ...". format(driverHost, driverPort)) - Thread.sleep(100) + Thread.sleep(waitTime) } } From 42b5df73cd3fd7ab274488e85b36cf8f4fb2cf5a Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Thu, 4 Dec 2014 10:04:31 -0800 Subject: [PATCH 2/3] Review feedback --- docs/running-on-yarn.md | 2 +- .../spark/deploy/yarn/ApplicationMaster.scala | 18 ++++++------------ 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 0872f18bda14d..7c74ce7e78b4d 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -22,7 +22,7 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
Property NameDefaultMeaning
spark.yarn.applicationMaster.waitTries10spark.yarn.applicationMaster.waitTime100000 - Set the number of times the ApplicationMaster waits for the the Spark master and then also the number of tries it waits for the SparkContext to be initialized + In yarn-cluster mode, time in milliseconds for the application master to wait for the + SparkContext to be initialized. In yarn-client mode, time for the application master to wait + for the driver to connect to it.
- +
Property NameDefaultMeaning
spark.yarn.applicationMaster.waitTimespark.yarn.am.waitTime 100000 In yarn-cluster mode, time in milliseconds for the application master to wait for the diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 45184b207a09f..396d3507ecaa1 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -331,15 +331,12 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, logInfo("Waiting for spark context initialization") try { sparkContextRef.synchronized { - var count = 0 - val waitTime = 10000L - val totalWaitTime = sparkConf.getInt("spark.yarn.applicationMaster.waitTime", 100000) + val totalWaitTime = sparkConf.getInt("spark.yarn.am.waitTime", 100000) val deadline = System.currentTimeMillis() + totalWaitTime while (sparkContextRef.get() == null && System.currentTimeMillis < deadline && !finished) { - logInfo("Waiting for spark context initialization ... " + count) - count = count + 1 - sparkContextRef.wait(waitTime) + logInfo("Waiting for spark context initialization ... ") + sparkContextRef.wait(10000L) } val sparkContext = sparkContextRef.get() @@ -355,19 +352,16 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, private def waitForSparkDriver(): ActorRef = { logInfo("Waiting for Spark driver to be reachable.") var driverUp = false - var count = 0 val hostport = args.userArgs(0) val (driverHost, driverPort) = Utils.parseHostPort(hostport) // Spark driver should already be up since it launched us, but we don't want to // wait forever, so wait 100 seconds max to match the cluster mode setting. - val waitTime = 100 - val totalWaitTime = sparkConf.getInt("spark.yarn.applicationMaster.waitTime", 100000) + val totalWaitTime = sparkConf.getInt("spark.yarn.am.waitTime", 100000) val deadline = System.currentTimeMillis + totalWaitTime - while (!driverUp && !finished && System.currentTimeMillis < deadline + waitTime) { + while (!driverUp && !finished && System.currentTimeMillis < deadline) { try { - count = count + 1 val socket = new Socket(driverHost, driverPort) socket.close() logInfo("Driver now available: %s:%s".format(driverHost, driverPort)) @@ -376,7 +370,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, case e: Exception => logError("Failed to connect to driver at %s:%s, retrying ...". format(driverHost, driverPort)) - Thread.sleep(waitTime) + Thread.sleep(100) } } From 20b9887bb9529f2792123778e6eeca6ba0e51c37 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Tue, 9 Dec 2014 16:22:38 -0800 Subject: [PATCH 3/3] Deprecate old property --- .../spark/deploy/yarn/ApplicationMaster.scala | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 396d3507ecaa1..dc7a078446324 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -329,23 +329,27 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, private def waitForSparkContextInitialized(): SparkContext = { logInfo("Waiting for spark context initialization") - try { - sparkContextRef.synchronized { - val totalWaitTime = sparkConf.getInt("spark.yarn.am.waitTime", 100000) - val deadline = System.currentTimeMillis() + totalWaitTime + sparkContextRef.synchronized { + val waitTries = sparkConf.getOption("spark.yarn.applicationMaster.waitTries") + .map(_.toLong * 10000L) + if (waitTries.isDefined) { + logWarning( + "spark.yarn.applicationMaster.waitTries is deprecated, use spark.yarn.am.waitTime") + } + val totalWaitTime = sparkConf.getLong("spark.yarn.am.waitTime", waitTries.getOrElse(100000L)) + val deadline = System.currentTimeMillis() + totalWaitTime - while (sparkContextRef.get() == null && System.currentTimeMillis < deadline && !finished) { - logInfo("Waiting for spark context initialization ... ") - sparkContextRef.wait(10000L) - } + while (sparkContextRef.get() == null && System.currentTimeMillis < deadline && !finished) { + logInfo("Waiting for spark context initialization ... ") + sparkContextRef.wait(10000L) + } - val sparkContext = sparkContextRef.get() - if (sparkContext == null) { - logError(("SparkContext did not initialize after waiting for %d ms. Please check earlier" - + " log output for errors. Failing the application.").format(totalWaitTime)) - } - sparkContext + val sparkContext = sparkContextRef.get() + if (sparkContext == null) { + logError(("SparkContext did not initialize after waiting for %d ms. Please check earlier" + + " log output for errors. Failing the application.").format(totalWaitTime)) } + sparkContext } } @@ -357,7 +361,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, // Spark driver should already be up since it launched us, but we don't want to // wait forever, so wait 100 seconds max to match the cluster mode setting. - val totalWaitTime = sparkConf.getInt("spark.yarn.am.waitTime", 100000) + val totalWaitTime = sparkConf.getLong("spark.yarn.am.waitTime", 100000L) val deadline = System.currentTimeMillis + totalWaitTime while (!driverUp && !finished && System.currentTimeMillis < deadline) { @@ -370,7 +374,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, case e: Exception => logError("Failed to connect to driver at %s:%s, retrying ...". format(driverHost, driverPort)) - Thread.sleep(100) + Thread.sleep(100L) } }