Skip to content

Commit

Permalink
SPARK-2150: Provide direct link to finished application UI in yarn re…
Browse files Browse the repository at this point in the history
…sou...

...rce manager UI

Use the event logger directory to provide a direct link to finished
application UI in yarn resourcemanager UI.

Author: Rahul Singhal <rahul.singhal@guavus.com>

Closes #1094 from rahulsinghaliitd/SPARK-2150 and squashes the following commits:

95f230c [Rahul Singhal] SPARK-2150: Provide direct link to finished application UI in yarn resource manager UI
  • Loading branch information
Rahul Singhal authored and tgravescs committed Jul 24, 2014
1 parent 42dfab7 commit 46e224a
Show file tree
Hide file tree
Showing 11 changed files with 46 additions and 13 deletions.
Expand Up @@ -169,7 +169,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
val ui: SparkUI = if (renderUI) {
val conf = this.conf.clone()
val appSecManager = new SecurityManager(conf)
new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId)
new SparkUI(conf, appSecManager, replayBus, appId,
HistoryServer.UI_PATH_PREFIX + s"/$appId")
// Do not call ui.bind() to avoid creating a new server for each application
} else {
null
Expand Down
Expand Up @@ -75,7 +75,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
"Last Updated")

private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
val uiAddress = "/history/" + info.id
val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}"
val startTime = UIUtils.formatDate(info.startTime)
val endTime = UIUtils.formatDate(info.endTime)
val duration = UIUtils.formatDuration(info.endTime - info.startTime)
Expand Down
Expand Up @@ -114,7 +114,7 @@ class HistoryServer(
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))

val contextHandler = new ServletContextHandler
contextHandler.setContextPath("/history")
contextHandler.setContextPath(HistoryServer.UI_PATH_PREFIX)
contextHandler.addServlet(new ServletHolder(loaderServlet), "/*")
attachHandler(contextHandler)
}
Expand Down Expand Up @@ -172,6 +172,8 @@ class HistoryServer(
object HistoryServer extends Logging {
private val conf = new SparkConf

val UI_PATH_PREFIX = "/history"

def main(argStrings: Array[String]) {
SignalLogger.register(log)
initSecurity()
Expand Down
11 changes: 7 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Expand Up @@ -35,6 +35,7 @@ import akka.serialization.SerializationExtension
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
Expand Down Expand Up @@ -664,9 +665,10 @@ private[spark] class Master(
*/
def rebuildSparkUI(app: ApplicationInfo): Boolean = {
val appName = app.desc.name
val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
val eventLogDir = app.desc.eventLogDir.getOrElse {
// Event logging is not enabled for this application
app.desc.appUiUrl = "/history/not-found"
app.desc.appUiUrl = notFoundBasePath
return false
}
val fileSystem = Utils.getHadoopFileSystem(eventLogDir)
Expand All @@ -681,13 +683,14 @@ private[spark] class Master(
logWarning(msg)
msg += " Did you specify the correct logging directory?"
msg = URLEncoder.encode(msg, "UTF-8")
app.desc.appUiUrl = s"/history/not-found?msg=$msg&title=$title"
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&title=$title"
return false
}

try {
val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec)
val ui = new SparkUI(new SparkConf, replayBus, appName + " (completed)", "/history/" + app.id)
val ui = new SparkUI(new SparkConf, replayBus, appName + " (completed)",
HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
replayBus.replay()
appIdToUI(app.id) = ui
webUi.attachSparkUI(ui)
Expand All @@ -702,7 +705,7 @@ private[spark] class Master(
var msg = s"Exception in replaying log for application $appName!"
logError(msg, e)
msg = URLEncoder.encode(msg, "UTF-8")
app.desc.appUiUrl = s"/history/not-found?msg=$msg&exception=$exception&title=$title"
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&exception=$exception&title=$title"
false
}
}
Expand Down
Expand Up @@ -63,6 +63,13 @@ private[spark] class EventLoggingListener(
// For testing. Keep track of all JSON serialized events that have been logged.
private[scheduler] val loggedEvents = new ArrayBuffer[JValue]

/**
* Return only the unique application directory without the base directory.
*/
def getApplicationLogDir(): String = {
name
}

/**
* Begin logging events.
* If compression is used, log a file that indicates which compression library is used.
Expand Down
Expand Up @@ -60,6 +60,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private var yarnAllocator: YarnAllocationHandler = _
private var isFinished: Boolean = false
private var uiAddress: String = _
private var uiHistoryAddress: String = _
private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
private var isLastAMRetry: Boolean = true
Expand Down Expand Up @@ -237,6 +238,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,

if (null != sparkContext) {
uiAddress = sparkContext.ui.appUIHostPort
uiHistoryAddress = YarnSparkHadoopUtil.getUIHistoryAddress(sparkContext, sparkConf)
this.yarnAllocator = YarnAllocationHandler.newAllocator(
yarnConf,
resourceManager,
Expand Down Expand Up @@ -360,7 +362,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
finishReq.setAppAttemptId(appAttemptId)
finishReq.setFinishApplicationStatus(status)
finishReq.setDiagnostics(diagnostics)
finishReq.setTrackingUrl(sparkConf.get("spark.yarn.historyServer.address", ""))
finishReq.setTrackingUrl(uiHistoryAddress)
resourceManager.finishApplicationMaster(finishReq)
}
}
Expand Down
Expand Up @@ -289,7 +289,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId)
finishReq.setFinishApplicationStatus(status)
finishReq.setTrackingUrl(sparkConf.get("spark.yarn.historyServer.address", ""))
finishReq.setTrackingUrl(sparkConf.get("spark.driver.appUIHistoryAddress", ""))
resourceManager.finishApplicationMaster(finishReq)
}

Expand Down
Expand Up @@ -30,6 +30,9 @@ import org.apache.hadoop.util.StringInterner
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.conf.Configuration

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.SparkHadoopUtil

/**
Expand Down Expand Up @@ -132,4 +135,17 @@ object YarnSparkHadoopUtil {
}
}

def getUIHistoryAddress(sc: SparkContext, conf: SparkConf) : String = {
val eventLogDir = sc.eventLogger match {
case Some(logger) => logger.getApplicationLogDir()
case None => ""
}
val historyServerAddress = conf.get("spark.yarn.historyServer.address", "")
if (historyServerAddress != "" && eventLogDir != "") {
historyServerAddress + HistoryServer.UI_PATH_PREFIX + s"/$eventLogDir"
} else {
""
}
}

}
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster

import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
import org.apache.spark.{SparkException, Logging, SparkContext}
import org.apache.spark.deploy.yarn.{Client, ClientArguments, ExecutorLauncher}
import org.apache.spark.deploy.yarn.{Client, ClientArguments, ExecutorLauncher, YarnSparkHadoopUtil}
import org.apache.spark.scheduler.TaskSchedulerImpl

import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -54,6 +54,7 @@ private[spark] class YarnClientSchedulerBackend(
val driverPort = conf.get("spark.driver.port")
val hostport = driverHost + ":" + driverPort
conf.set("spark.driver.appUIAddress", sc.ui.appUIHostPort)
conf.set("spark.driver.appUIHistoryAddress", YarnSparkHadoopUtil.getUIHistoryAddress(sc, conf))

val argsArrayBuf = new ArrayBuffer[String]()
argsArrayBuf += (
Expand Down
Expand Up @@ -59,6 +59,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private var yarnAllocator: YarnAllocationHandler = _
private var isFinished: Boolean = false
private var uiAddress: String = _
private var uiHistoryAddress: String = _
private val maxAppAttempts: Int = conf.getInt(
YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
private var isLastAMRetry: Boolean = true
Expand Down Expand Up @@ -216,6 +217,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,

if (sparkContext != null) {
uiAddress = sparkContext.ui.appUIHostPort
uiHistoryAddress = YarnSparkHadoopUtil.getUIHistoryAddress(sparkContext, sparkConf)
this.yarnAllocator = YarnAllocationHandler.newAllocator(
yarnConf,
amClient,
Expand Down Expand Up @@ -312,8 +314,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,

logInfo("Unregistering ApplicationMaster with " + status)
if (registered) {
val trackingUrl = sparkConf.get("spark.yarn.historyServer.address", "")
amClient.unregisterApplicationMaster(status, diagnostics, trackingUrl)
amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress)
}
}
}
Expand Down
Expand Up @@ -250,7 +250,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp

def finishApplicationMaster(status: FinalApplicationStatus) {
logInfo("Unregistering ApplicationMaster with " + status)
val trackingUrl = sparkConf.get("spark.yarn.historyServer.address", "")
val trackingUrl = sparkConf.get("spark.driver.appUIHistoryAddress", "")
amClient.unregisterApplicationMaster(status, "" /* appMessage */ , trackingUrl)
}

Expand Down

0 comments on commit 46e224a

Please sign in to comment.