Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4705] Handle multiple app attempts event logs, history server. #5432

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
0eb7722
SPARK-4705: Doing cherry-pick of fix into master
twinkle-g Feb 2, 2015
4c1fc26
SPARK-4705 Incorporating the review comments regarding formatting, wi…
Feb 25, 2015
6b2e521
SPARK-4705 Incorporating the review comments regarding formatting, wi…
Feb 25, 2015
318525a
SPARK-4705: 1) moved from directory structure to single file, as per …
twinkle-g Mar 1, 2015
5fd5c6f
Fix my broken rebase.
Apr 7, 2015
3245aa2
Make app attempts part of the history server model.
Apr 8, 2015
88b1de8
Add a test for apps with multiple attempts.
Apr 8, 2015
cbe8bba
Attempt ID in listener event should be an option.
Apr 8, 2015
ce5ee5d
Misc UI, test, style fixes.
Apr 8, 2015
c3e0a82
Move app name to app info, more UI fixes.
Apr 9, 2015
657ec18
Fix yarn history URL, app links.
Apr 9, 2015
3a14503
Argh scalastyle.
Apr 9, 2015
9092af5
Fix HistoryServer test.
Apr 9, 2015
07446c6
Disable striping for app id / name when multiple attempts exist.
Apr 10, 2015
86de638
Merge branch 'master' into SPARK-4705
Apr 13, 2015
9092d39
Merge branch 'master' into SPARK-4705
Apr 17, 2015
c14ec19
Merge branch 'master' into SPARK-4705
Apr 20, 2015
f1cb9b3
Merge branch 'master' into SPARK-4705
Apr 23, 2015
ba34b69
Use Option[String] for attempt id.
Apr 23, 2015
d5a9c37
Update JsonProtocol test, make property name consistent.
Apr 23, 2015
9d59d92
Scalastyle...
Apr 23, 2015
2ad77e7
Missed a reference to the old property name.
Apr 23, 2015
1aa309d
Improve sorting of app attempts.
Apr 23, 2015
7c381ec
Merge branch 'master' into SPARK-4705
Apr 24, 2015
76a3651
Fix log cleaner, add test.
Apr 24, 2015
bc885b7
Review feedback.
Apr 28, 2015
f66dcc5
Merge branch 'master' into SPARK-4705
Apr 28, 2015
7e289fa
Review feedback.
Apr 30, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private var _heartbeatReceiver: RpcEndpointRef = _
@volatile private var _dagScheduler: DAGScheduler = _
private var _applicationId: String = _
private var _applicationAttemptId: Option[String] = None
private var _eventLogger: Option[EventLoggingListener] = None
private var _executorAllocationManager: Option[ExecutorAllocationManager] = None
private var _cleaner: Option[ContextCleaner] = None
Expand Down Expand Up @@ -313,6 +314,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}

def applicationId: String = _applicationId
def applicationAttemptId: Option[String] = _applicationAttemptId

def metricsSystem: MetricsSystem = if (_env != null) _env.metricsSystem else null

Expand Down Expand Up @@ -470,6 +472,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_taskScheduler.start()

_applicationId = _taskScheduler.applicationId()
_applicationAttemptId = taskScheduler.applicationAttemptId()
_conf.set("spark.app.id", _applicationId)
_env.blockManager.initialize(_applicationId)

Expand All @@ -482,7 +485,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_eventLogger =
if (isEventLogEnabled) {
val logger =
new EventLoggingListener(_applicationId, _eventLogDir.get, _conf, _hadoopConfiguration)
new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
_conf, _hadoopConfiguration)
logger.start()
listenerBus.addListener(logger)
Some(logger)
Expand Down Expand Up @@ -1855,7 +1859,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Note: this code assumes that the task scheduler has been initialized and has contacted
// the cluster manager to get an application ID (in case the cluster manager provides one).
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
startTime, sparkUser))
startTime, sparkUser, applicationAttemptId))
}

/** Post the application end event */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@ package org.apache.spark.deploy.history

import org.apache.spark.ui.SparkUI

private[history] case class ApplicationHistoryInfo(
id: String,
name: String,
private[history] case class ApplicationAttemptInfo(
attemptId: Option[String],
startTime: Long,
endTime: Long,
lastUpdated: Long,
sparkUser: String,
completed: Boolean = false)

private[history] case class ApplicationHistoryInfo(
id: String,
name: String,
attempts: List[ApplicationAttemptInfo])

private[history] abstract class ApplicationHistoryProvider {

/**
Expand All @@ -41,9 +45,10 @@ private[history] abstract class ApplicationHistoryProvider {
* Returns the Spark UI for a specific application.
*
* @param appId The application ID.
* @param attemptId The application attempt ID (or None if there is no attempt ID).
* @return The application's UI, or None if application is not found.
*/
def getAppUI(appId: String): Option[SparkUI]
def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI]

/**
* Called when the server is shutting down.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,20 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
import org.apache.spark.{Logging, SecurityManager, SparkConf}

/**
* A class that provides application history from event logs stored in the file system.
* This provider checks for new finished applications in the background periodically and
* renders the history application UI by parsing the associated event logs.
*/
private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
with Logging {
private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
extends ApplicationHistoryProvider with Logging {

def this(conf: SparkConf) = {
this(conf, new SystemClock())
}

import FsHistoryProvider._

Expand Down Expand Up @@ -75,8 +79,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
@volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
= new mutable.LinkedHashMap()

// List of applications to be deleted by event log cleaner.
private var appsToClean = new mutable.ListBuffer[FsApplicationHistoryInfo]
// List of application logs to be deleted by event log cleaner.
private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]

// Constants used to parse Spark 1.0.0 log directories.
private[history] val LOG_PREFIX = "EVENT_LOG_"
Expand Down Expand Up @@ -138,31 +142,33 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis

override def getListing(): Iterable[FsApplicationHistoryInfo] = applications.values

override def getAppUI(appId: String): Option[SparkUI] = {
override def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI] = {
try {
applications.get(appId).map { info =>
val replayBus = new ReplayListenerBus()
val ui = {
val conf = this.conf.clone()
val appSecManager = new SecurityManager(conf)
SparkUI.createHistoryUI(conf, replayBus, appSecManager, appId,
s"${HistoryServer.UI_PATH_PREFIX}/$appId")
// Do not call ui.bind() to avoid creating a new server for each application
}
applications.get(appId).flatMap { appInfo =>
appInfo.attempts.find(_.attemptId == attemptId).map { attempt =>
val replayBus = new ReplayListenerBus()
val ui = {
val conf = this.conf.clone()
val appSecManager = new SecurityManager(conf)
SparkUI.createHistoryUI(conf, replayBus, appSecManager, appId,
HistoryServer.getAttemptURI(appId, attempt.attemptId))
// Do not call ui.bind() to avoid creating a new server for each application
}

val appListener = new ApplicationEventListener()
replayBus.addListener(appListener)
val appInfo = replay(fs.getFileStatus(new Path(logDir, info.logPath)), replayBus)
val appListener = new ApplicationEventListener()
replayBus.addListener(appListener)
val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus)

ui.setAppName(s"${appInfo.name} ($appId)")
ui.setAppName(s"${appInfo.name} ($appId)")

val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
ui.getSecurityManager.setAcls(uiAclsEnabled)
// make sure to set admin acls before view acls so they are properly picked up
ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
ui.getSecurityManager.setViewAcls(appInfo.sparkUser,
appListener.viewAcls.getOrElse(""))
ui
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
ui.getSecurityManager.setAcls(uiAclsEnabled)
// make sure to set admin acls before view acls so they are properly picked up
ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
ui.getSecurityManager.setViewAcls(attempt.sparkUser,
appListener.viewAcls.getOrElse(""))
ui
}
}
} catch {
case e: FileNotFoundException => None
Expand Down Expand Up @@ -220,7 +226,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
*/
private def mergeApplicationListing(logs: Seq[FileStatus]): Unit = {
val bus = new ReplayListenerBus()
val newApps = logs.flatMap { fileStatus =>
val newAttempts = logs.flatMap { fileStatus =>
try {
val res = replay(fileStatus, bus)
logInfo(s"Application log ${res.logPath} loaded successfully.")
Expand All @@ -232,76 +238,104 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
e)
None
}
}.toSeq.sortWith(compareAppInfo)

// When there are new logs, merge the new list with the existing one, maintaining
// the expected ordering (descending end time). Maintaining the order is important
// to avoid having to sort the list every time there is a request for the log list.
if (newApps.nonEmpty) {
val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {
if (!mergedApps.contains(info.id) ||
mergedApps(info.id).logPath.endsWith(EventLoggingListener.IN_PROGRESS) &&
!info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) {
mergedApps += (info.id -> info)
}
}
}

val newIterator = newApps.iterator.buffered
val oldIterator = applications.values.iterator.buffered
while (newIterator.hasNext && oldIterator.hasNext) {
if (compareAppInfo(newIterator.head, oldIterator.head)) {
addIfAbsent(newIterator.next())
} else {
addIfAbsent(oldIterator.next())
if (newAttempts.isEmpty) {
return
}

// Build a map containing all apps that contain new attempts. The app information in this map
// contains both the new app attempt, and those that were already loaded in the existing apps
// map. If an attempt has been updated, it replaces the old attempt in the list.
val newAppMap = new mutable.HashMap[String, FsApplicationHistoryInfo]()
newAttempts.foreach { attempt =>
val appInfo = newAppMap.get(attempt.appId)
.orElse(applications.get(attempt.appId))
.map { app =>
val attempts =
app.attempts.filter(_.attemptId != attempt.attemptId).toList ++ List(attempt)
new FsApplicationHistoryInfo(attempt.appId, attempt.name,
attempts.sortWith(compareAttemptInfo))
}
.getOrElse(new FsApplicationHistoryInfo(attempt.appId, attempt.name, List(attempt)))
newAppMap(attempt.appId) = appInfo
}

// Merge the new app list with the existing one, maintaining the expected ordering (descending
// end time). Maintaining the order is important to avoid having to sort the list every time
// there is a request for the log list.
val newApps = newAppMap.values.toSeq.sortWith(compareAppInfo)
val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {
if (!mergedApps.contains(info.id)) {
mergedApps += (info.id -> info)
}
newIterator.foreach(addIfAbsent)
oldIterator.foreach(addIfAbsent)
}

applications = mergedApps
val newIterator = newApps.iterator.buffered
val oldIterator = applications.values.iterator.buffered
while (newIterator.hasNext && oldIterator.hasNext) {
if (newAppMap.contains(oldIterator.head.id)) {
oldIterator.next()
} else if (compareAppInfo(newIterator.head, oldIterator.head)) {
addIfAbsent(newIterator.next())
} else {
addIfAbsent(oldIterator.next())
}
}
newIterator.foreach(addIfAbsent)
oldIterator.foreach(addIfAbsent)

applications = mergedApps
}

/**
* Delete event logs from the log directory according to the clean policy defined by the user.
*/
private def cleanLogs(): Unit = {
private[history] def cleanLogs(): Unit = {
try {
val maxAge = conf.getTimeAsSeconds("spark.history.fs.cleaner.maxAge", "7d") * 1000

val now = System.currentTimeMillis()
val now = clock.getTimeMillis()
val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()

def shouldClean(attempt: FsApplicationAttemptInfo): Boolean = {
now - attempt.lastUpdated > maxAge && attempt.completed
}

// Scan all logs from the log directory.
// Only completed applications older than the specified max age will be deleted.
applications.values.foreach { info =>
if (now - info.lastUpdated <= maxAge || !info.completed) {
appsToRetain += (info.id -> info)
} else {
appsToClean += info
applications.values.foreach { app =>
val (toClean, toRetain) = app.attempts.partition(shouldClean)
attemptsToClean ++= toClean

if (toClean.isEmpty) {
appsToRetain += (app.id -> app)
} else if (toRetain.nonEmpty) {
appsToRetain += (app.id ->
new FsApplicationHistoryInfo(app.id, app.name, toRetain.toList))
}
}

applications = appsToRetain

val leftToClean = new mutable.ListBuffer[FsApplicationHistoryInfo]
appsToClean.foreach { info =>
val leftToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
attemptsToClean.foreach { attempt =>
try {
val path = new Path(logDir, info.logPath)
val path = new Path(logDir, attempt.logPath)
if (fs.exists(path)) {
fs.delete(path, true)
}
} catch {
case e: AccessControlException =>
logInfo(s"No permission to delete ${info.logPath}, ignoring.")
logInfo(s"No permission to delete ${attempt.logPath}, ignoring.")
case t: IOException =>
logError(s"IOException in cleaning logs of ${info.logPath}", t)
leftToClean += info
logError(s"IOException in cleaning ${attempt.logPath}", t)
leftToClean += attempt
}
}

appsToClean = leftToClean
attemptsToClean = leftToClean
} catch {
case t: Exception => logError("Exception in cleaning logs", t)
}
Expand All @@ -315,14 +349,36 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private def compareAppInfo(
i1: FsApplicationHistoryInfo,
i2: FsApplicationHistoryInfo): Boolean = {
if (i1.endTime != i2.endTime) i1.endTime >= i2.endTime else i1.startTime >= i2.startTime
val a1 = i1.attempts.head
val a2 = i2.attempts.head
if (a1.endTime != a2.endTime) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime
}

/**
* Comparison function that defines the sort order for application attempts within the same
* application. Order is: running attempts before complete attempts, running attempts sorted
* by start time, completed attempts sorted by end time.
*
* Normally applications should have a single running attempt; but failure to call sc.stop()
* may cause multiple running attempts to show up.
*
* @return Whether `a1` should precede `a2`.
*/
private def compareAttemptInfo(
a1: FsApplicationAttemptInfo,
a2: FsApplicationAttemptInfo): Boolean = {
if (a1.completed == a2.completed) {
if (a1.completed) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime
} else {
!a1.completed
}
}

/**
* Replays the events in the specified log file and returns information about the associated
* application.
*/
private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationHistoryInfo = {
private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = {
val logPath = eventLog.getPath()
logInfo(s"Replaying log path: $logPath")
val logInput =
Expand All @@ -336,10 +392,11 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
val appCompleted = isApplicationCompleted(eventLog)
bus.addListener(appListener)
bus.replay(logInput, logPath.toString, !appCompleted)
new FsApplicationHistoryInfo(
new FsApplicationAttemptInfo(
logPath.getName(),
appListener.appId.getOrElse(logPath.getName()),
appListener.appName.getOrElse(NOT_STARTED),
appListener.appId.getOrElse(logPath.getName()),
appListener.appAttemptId,
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog).get,
Expand Down Expand Up @@ -425,13 +482,21 @@ private object FsHistoryProvider {
val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
}

private class FsApplicationHistoryInfo(
private class FsApplicationAttemptInfo(
val logPath: String,
id: String,
name: String,
val name: String,
val appId: String,
attemptId: Option[String],
startTime: Long,
endTime: Long,
lastUpdated: Long,
sparkUser: String,
completed: Boolean = true)
extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser, completed)
extends ApplicationAttemptInfo(
attemptId, startTime, endTime, lastUpdated, sparkUser, completed)

private class FsApplicationHistoryInfo(
id: String,
override val name: String,
override val attempts: List[FsApplicationAttemptInfo])
extends ApplicationHistoryInfo(id, name, attempts)
Loading