Skip to content

Commit

Permalink
[SPARK-12062][CORE] Change Master to asyc rebuild UI when application…
Browse files Browse the repository at this point in the history
… completes

This change builds the event history of completed apps asynchronously so the RPC thread will not be blocked and allow new workers to register/remove if the event log history is very large and takes a long time to rebuild.

Author: Bryan Cutler <bjcutler@us.ibm.com>

Closes #10284 from BryanCutler/async-MasterUI-SPARK-12062.
  • Loading branch information
BryanCutler authored and Andrew Or committed Dec 16, 2015
1 parent 8a215d2 commit c5b6b39
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 29 deletions.
79 changes: 50 additions & 29 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import java.io.FileNotFoundException
import java.net.URLEncoder
import java.text.SimpleDateFormat
import java.util.Date
import java.util.concurrent.{ScheduledFuture, TimeUnit}
import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit}

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.language.postfixOps
import scala.util.Random

Expand Down Expand Up @@ -56,6 +58,10 @@ private[deploy] class Master(
private val forwardMessageThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")

private val rebuildUIThread =
ThreadUtils.newDaemonSingleThreadExecutor("master-rebuild-ui-thread")
private val rebuildUIContext = ExecutionContext.fromExecutor(rebuildUIThread)

private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)

private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
Expand All @@ -78,7 +84,8 @@ private[deploy] class Master(
private val addressToApp = new HashMap[RpcAddress, ApplicationInfo]
private val completedApps = new ArrayBuffer[ApplicationInfo]
private var nextAppNumber = 0
private val appIdToUI = new HashMap[String, SparkUI]
// Using ConcurrentHashMap so that master-rebuild-ui-thread can add a UI after asyncRebuildUI
private val appIdToUI = new ConcurrentHashMap[String, SparkUI]

private val drivers = new HashSet[DriverInfo]
private val completedDrivers = new ArrayBuffer[DriverInfo]
Expand Down Expand Up @@ -191,6 +198,7 @@ private[deploy] class Master(
checkForWorkerTimeOutTask.cancel(true)
}
forwardMessageThread.shutdownNow()
rebuildUIThread.shutdownNow()
webUi.stop()
restServer.foreach(_.stop())
masterMetricsSystem.stop()
Expand Down Expand Up @@ -367,6 +375,10 @@ private[deploy] class Master(
case CheckForWorkerTimeOut => {
timeOutDeadWorkers()
}

case AttachCompletedRebuildUI(appId) =>
// An asyncRebuildSparkUI has completed, so need to attach to master webUi
Option(appIdToUI.get(appId)).foreach { ui => webUi.attachSparkUI(ui) }
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand Down Expand Up @@ -809,7 +821,7 @@ private[deploy] class Master(
if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
completedApps.take(toRemove).foreach( a => {
appIdToUI.remove(a.id).foreach { ui => webUi.detachSparkUI(ui) }
Option(appIdToUI.remove(a.id)).foreach { ui => webUi.detachSparkUI(ui) }
applicationMetricsSystem.removeSource(a.appSource)
})
completedApps.trimStart(toRemove)
Expand All @@ -818,7 +830,7 @@ private[deploy] class Master(
waitingApps -= app

// If application events are logged, use them to rebuild the UI
rebuildSparkUI(app)
asyncRebuildSparkUI(app)

for (exec <- app.executors.values) {
killExecutor(exec)
Expand Down Expand Up @@ -923,49 +935,57 @@ private[deploy] class Master(
* Return the UI if successful, else None
*/
private[master] def rebuildSparkUI(app: ApplicationInfo): Option[SparkUI] = {
val futureUI = asyncRebuildSparkUI(app)
Await.result(futureUI, Duration.Inf)
}

/** Rebuild a new SparkUI asynchronously to not block RPC event loop */
private[master] def asyncRebuildSparkUI(app: ApplicationInfo): Future[Option[SparkUI]] = {
val appName = app.desc.name
val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
try {
val eventLogDir = app.desc.eventLogDir
.getOrElse {
// Event logging is not enabled for this application
app.appUIUrlAtHistoryServer = Some(notFoundBasePath)
return None
}

val eventLogDir = app.desc.eventLogDir
.getOrElse {
// Event logging is disabled for this application
app.appUIUrlAtHistoryServer = Some(notFoundBasePath)
return Future.successful(None)
}
val futureUI = Future {
val eventLogFilePrefix = EventLoggingListener.getLogPath(
eventLogDir, app.id, appAttemptId = None, compressionCodecName = app.desc.eventLogCodec)
eventLogDir, app.id, appAttemptId = None, compressionCodecName = app.desc.eventLogCodec)
val fs = Utils.getHadoopFileSystem(eventLogDir, hadoopConf)
val inProgressExists = fs.exists(new Path(eventLogFilePrefix +
EventLoggingListener.IN_PROGRESS))
EventLoggingListener.IN_PROGRESS))

if (inProgressExists) {
val eventLogFile = if (inProgressExists) {
// Event logging is enabled for this application, but the application is still in progress
logWarning(s"Application $appName is still in progress, it may be terminated abnormally.")
}

val (eventLogFile, status) = if (inProgressExists) {
(eventLogFilePrefix + EventLoggingListener.IN_PROGRESS, " (in progress)")
eventLogFilePrefix + EventLoggingListener.IN_PROGRESS
} else {
(eventLogFilePrefix, " (completed)")
eventLogFilePrefix
}

val logInput = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
val replayBus = new ReplayListenerBus()
val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
appName, HistoryServer.UI_PATH_PREFIX + s"/${app.id}", app.startTime)
val maybeTruncated = eventLogFile.endsWith(EventLoggingListener.IN_PROGRESS)
try {
replayBus.replay(logInput, eventLogFile, maybeTruncated)
replayBus.replay(logInput, eventLogFile, inProgressExists)
} finally {
logInput.close()
}
appIdToUI(app.id) = ui
webUi.attachSparkUI(ui)

Some(ui)
}(rebuildUIContext)

futureUI.onSuccess { case Some(ui) =>
appIdToUI.put(app.id, ui)
self.send(AttachCompletedRebuildUI(app.id))
// Application UI is successfully rebuilt, so link the Master UI to it
// NOTE - app.appUIUrlAtHistoryServer is volatile
app.appUIUrlAtHistoryServer = Some(ui.basePath)
Some(ui)
} catch {
}(ThreadUtils.sameThread)

futureUI.onFailure {
case fnf: FileNotFoundException =>
// Event logging is enabled for this application, but no event logs are found
val title = s"Application history not found (${app.id})"
Expand All @@ -974,7 +994,7 @@ private[deploy] class Master(
msg += " Did you specify the correct logging directory?"
msg = URLEncoder.encode(msg, "UTF-8")
app.appUIUrlAtHistoryServer = Some(notFoundBasePath + s"?msg=$msg&title=$title")
None

case e: Exception =>
// Relay exception message to application UI page
val title = s"Application history load error (${app.id})"
Expand All @@ -984,8 +1004,9 @@ private[deploy] class Master(
msg = URLEncoder.encode(msg, "UTF-8")
app.appUIUrlAtHistoryServer =
Some(notFoundBasePath + s"?msg=$msg&exception=$exception&title=$title")
None
}
}(ThreadUtils.sameThread)

futureUI
}

/** Generate a new app ID given a app's submission date */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,6 @@ private[master] object MasterMessages {
case object BoundPortsRequest

case class BoundPortsResponse(rpcEndpointPort: Int, webUIPort: Int, restPort: Option[Int])

case class AttachCompletedRebuildUI(appId: String)
}

0 comments on commit c5b6b39

Please sign in to comment.