Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2080] Yarn: report HS URL in client mode, correct user in cluster mode. #1002

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Expand Up @@ -297,7 +297,7 @@ class SparkContext(config: SparkConf) extends Logging {

// Set SPARK_USER for user who is running SparkContext.
val sparkUser = Option {
Option(System.getProperty("user.name")).getOrElse(System.getenv("SPARK_USER"))
Option(System.getenv("SPARK_USER")).getOrElse(System.getProperty("user.name"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jerryshao I think you had originally added this logic, Is there a usecase where this order is required? It always seemed odd to me since I expect user.name to always be set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @tgravescs , originally I put SPARK_USER before user.name when I submitted a PR, and someone suggested me to change the order to keep consistent with other Spark parameter, so I changed this. I assume it's OK in standalone and mesos mode, but I didn't test it in Yarn mode.

It looks good to me to change this order if needed. :)

}.getOrElse {
SparkContext.SPARK_UNKNOWN_USER
}
Expand Down
Expand Up @@ -271,6 +271,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId)
finishReq.setFinishApplicationStatus(status)
finishReq.setTrackingUrl(sparkConf.get("spark.yarn.historyServer.address", ""))
resourceManager.finishApplicationMaster(finishReq)
}

Expand Down
Expand Up @@ -115,7 +115,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
val interval = math.min(timeoutInterval / 2, schedulerInterval)

reporterThread = launchReporterThread(interval)


// Wait for the reporter thread to Finish.
reporterThread.join()
Expand All @@ -134,12 +134,12 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
.orElse(Option(System.getenv("LOCAL_DIRS")))

localDirs match {
case None => throw new Exception("Yarn Local dirs can't be empty")
case Some(l) => l
}
}
}

private def getApplicationAttemptId(): ApplicationAttemptId = {
val envs = System.getenv()
Expand Down Expand Up @@ -247,7 +247,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp

def finishApplicationMaster(status: FinalApplicationStatus) {
logInfo("finish ApplicationMaster with " + status)
amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */)
val trackingUrl = sparkConf.get("spark.yarn.historyServer.address", "")
amClient.unregisterApplicationMaster(status, "" /* appMessage */ , trackingUrl)
}

}
Expand Down