Skip to content

Commit

Permalink
Deprecate old property
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Dec 10, 2014
1 parent 42b5df7 commit 20b9887
Showing 1 changed file with 20 additions and 16 deletions.
Expand Up @@ -329,23 +329,27 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,

private def waitForSparkContextInitialized(): SparkContext = {
logInfo("Waiting for spark context initialization")
try {
sparkContextRef.synchronized {
val totalWaitTime = sparkConf.getInt("spark.yarn.am.waitTime", 100000)
val deadline = System.currentTimeMillis() + totalWaitTime
sparkContextRef.synchronized {
val waitTries = sparkConf.getOption("spark.yarn.applicationMaster.waitTries")
.map(_.toLong * 10000L)
if (waitTries.isDefined) {
logWarning(
"spark.yarn.applicationMaster.waitTries is deprecated, use spark.yarn.am.waitTime")
}
val totalWaitTime = sparkConf.getLong("spark.yarn.am.waitTime", waitTries.getOrElse(100000L))
val deadline = System.currentTimeMillis() + totalWaitTime

while (sparkContextRef.get() == null && System.currentTimeMillis < deadline && !finished) {
logInfo("Waiting for spark context initialization ... ")
sparkContextRef.wait(10000L)
}
while (sparkContextRef.get() == null && System.currentTimeMillis < deadline && !finished) {
logInfo("Waiting for spark context initialization ... ")
sparkContextRef.wait(10000L)
}

val sparkContext = sparkContextRef.get()
if (sparkContext == null) {
logError(("SparkContext did not initialize after waiting for %d ms. Please check earlier"
+ " log output for errors. Failing the application.").format(totalWaitTime))
}
sparkContext
val sparkContext = sparkContextRef.get()
if (sparkContext == null) {
logError(("SparkContext did not initialize after waiting for %d ms. Please check earlier"
+ " log output for errors. Failing the application.").format(totalWaitTime))
}
sparkContext
}
}

Expand All @@ -357,7 +361,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,

// Spark driver should already be up since it launched us, but we don't want to
// wait forever, so wait 100 seconds max to match the cluster mode setting.
val totalWaitTime = sparkConf.getInt("spark.yarn.am.waitTime", 100000)
val totalWaitTime = sparkConf.getLong("spark.yarn.am.waitTime", 100000L)
val deadline = System.currentTimeMillis + totalWaitTime

while (!driverUp && !finished && System.currentTimeMillis < deadline) {
Expand All @@ -370,7 +374,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
case e: Exception =>
logError("Failed to connect to driver at %s:%s, retrying ...".
format(driverHost, driverPort))
Thread.sleep(100)
Thread.sleep(100L)
}
}

Expand Down

0 comments on commit 20b9887

Please sign in to comment.