From 20b9887bb9529f2792123778e6eeca6ba0e51c37 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Tue, 9 Dec 2014 16:22:38 -0800 Subject: [PATCH] Deprecate old property --- .../spark/deploy/yarn/ApplicationMaster.scala | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 396d3507ecaa1..dc7a078446324 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -329,23 +329,27 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, private def waitForSparkContextInitialized(): SparkContext = { logInfo("Waiting for spark context initialization") - try { - sparkContextRef.synchronized { - val totalWaitTime = sparkConf.getInt("spark.yarn.am.waitTime", 100000) - val deadline = System.currentTimeMillis() + totalWaitTime + sparkContextRef.synchronized { + val waitTries = sparkConf.getOption("spark.yarn.applicationMaster.waitTries") + .map(_.toLong * 10000L) + if (waitTries.isDefined) { + logWarning( + "spark.yarn.applicationMaster.waitTries is deprecated, use spark.yarn.am.waitTime") + } + val totalWaitTime = sparkConf.getLong("spark.yarn.am.waitTime", waitTries.getOrElse(100000L)) + val deadline = System.currentTimeMillis() + totalWaitTime - while (sparkContextRef.get() == null && System.currentTimeMillis < deadline && !finished) { - logInfo("Waiting for spark context initialization ... ") - sparkContextRef.wait(10000L) - } + while (sparkContextRef.get() == null && System.currentTimeMillis < deadline && !finished) { + logInfo("Waiting for spark context initialization ... ") + sparkContextRef.wait(10000L) + } - val sparkContext = sparkContextRef.get() - if (sparkContext == null) { - logError(("SparkContext did not initialize after waiting for %d ms. Please check earlier" - + " log output for errors. Failing the application.").format(totalWaitTime)) - } - sparkContext + val sparkContext = sparkContextRef.get() + if (sparkContext == null) { + logError(("SparkContext did not initialize after waiting for %d ms. Please check earlier" + + " log output for errors. Failing the application.").format(totalWaitTime)) } + sparkContext } } @@ -357,7 +361,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, // Spark driver should already be up since it launched us, but we don't want to // wait forever, so wait 100 seconds max to match the cluster mode setting. - val totalWaitTime = sparkConf.getInt("spark.yarn.am.waitTime", 100000) + val totalWaitTime = sparkConf.getLong("spark.yarn.am.waitTime", 100000L) val deadline = System.currentTimeMillis + totalWaitTime while (!driverUp && !finished && System.currentTimeMillis < deadline) { @@ -370,7 +374,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, case e: Exception => logError("Failed to connect to driver at %s:%s, retrying ...". format(driverHost, driverPort)) - Thread.sleep(100) + Thread.sleep(100L) } }