Skip to content

Commit

Permalink
SPARK-1032. If Yarn app fails before registering, app master stays ar…
Browse files Browse the repository at this point in the history
…oun...

...d long after

This reopens https://github.com/apache/incubator-spark/pull/648 against the new repo.

Author: Sandy Ryza <sandy@cloudera.com>

Closes #28 from sryza/sandy-spark-1032 and squashes the following commits:

5953f50 [Sandy Ryza] SPARK-1032. If Yarn app fails before registering, app master stays around long after

Conflicts:
	yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
	yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
  • Loading branch information
sryza authored and tgravescs committed Mar 20, 2014
1 parent 748f002 commit c6630d3
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
math.max(args.numWorkers * 2, 3))

private var registered = false

private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
SparkContext.SPARK_UNKNOWN_USER)

Expand Down Expand Up @@ -114,7 +116,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
waitForSparkContextInitialized()

// Do this after spark master is up and SparkContext is created so that we can register UI Url
val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
synchronized {
if (!isFinished) {
registerApplicationMaster()
registered = true
}
}

// Allocate all containers
allocateWorkers()
Expand Down Expand Up @@ -212,7 +219,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
var count = 0
val waitTime = 10000L
val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10)
while (ApplicationMaster.sparkContextRef.get() == null && count < numTries) {
while (ApplicationMaster.sparkContextRef.get() == null && count < numTries
&& !isFinished) {
logInfo("Waiting for spark context initialization ... " + count)
count = count + 1
ApplicationMaster.sparkContextRef.wait(waitTime)
Expand Down Expand Up @@ -345,17 +353,19 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
return
}
isFinished = true

logInfo("finishApplicationMaster with " + status)
if (registered) {
val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId)
finishReq.setFinishApplicationStatus(status)
finishReq.setDiagnostics(diagnostics)
// Set tracking url to empty since we don't have a history server.
finishReq.setTrackingUrl("")
resourceManager.finishApplicationMaster(finishReq)
}
}

logInfo("finishApplicationMaster with " + status)
val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId)
finishReq.setFinishApplicationStatus(status)
finishReq.setDiagnostics(diagnostics)
// Set tracking url to empty since we don't have a history server.
finishReq.setTrackingUrl("")
resourceManager.finishApplicationMaster(finishReq)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
math.max(args.numWorkers * 2, 3))

private var registered = false

private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
SparkContext.SPARK_UNKNOWN_USER)

Expand Down Expand Up @@ -103,7 +105,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
waitForSparkContextInitialized()

// Do this after Spark master is up and SparkContext is created so that we can register UI Url.
val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
synchronized {
if (!isFinished) {
registerApplicationMaster()
registered = true
}
}

// Allocate all containers
allocateWorkers()
Expand Down Expand Up @@ -184,7 +191,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
var numTries = 0
val waitTime = 10000L
val maxNumTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 10)
while (ApplicationMaster.sparkContextRef.get() == null && numTries < maxNumTries) {
while (ApplicationMaster.sparkContextRef.get() == null && numTries < maxNumTries
&& !isFinished) {
logInfo("Waiting for Spark context initialization ... " + numTries)
numTries = numTries + 1
ApplicationMaster.sparkContextRef.wait(waitTime)
Expand Down Expand Up @@ -317,11 +325,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
return
}
isFinished = true
}

logInfo("finishApplicationMaster with " + status)
// Set tracking URL to empty since we don't have a history server.
amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */)
logInfo("finishApplicationMaster with " + status)
if (registered) {
// Set tracking URL to empty since we don't have a history server.
amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */)
}
}
}

/**
Expand Down

0 comments on commit c6630d3

Please sign in to comment.