Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-8372] Do not show applications that haven't recorded their app ID yet. #7097

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// List of application logs to be deleted by event log cleaner.
private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]

// Constants used to parse Spark 1.0.0 log directories.
private[history] val LOG_PREFIX = "EVENT_LOG_"
private[history] val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
private[history] val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + "_"
private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"

/**
* Return a runnable that performs the given operation on the event logs.
* This operation is expected to be executed periodically.
Expand Down Expand Up @@ -146,7 +140,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
override def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI] = {
try {
applications.get(appId).flatMap { appInfo =>
appInfo.attempts.find(_.attemptId == attemptId).map { attempt =>
appInfo.attempts.find(_.attemptId == attemptId).flatMap { attempt =>
val replayBus = new ReplayListenerBus()
val ui = {
val conf = this.conf.clone()
Expand All @@ -155,20 +149,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
HistoryServer.getAttemptURI(appId, attempt.attemptId), attempt.startTime)
// Do not call ui.bind() to avoid creating a new server for each application
}

val appListener = new ApplicationEventListener()
replayBus.addListener(appListener)
val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus)

ui.setAppName(s"${appInfo.name} ($appId)")

val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
ui.getSecurityManager.setAcls(uiAclsEnabled)
// make sure to set admin acls before view acls so they are properly picked up
ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
ui.getSecurityManager.setViewAcls(attempt.sparkUser,
appListener.viewAcls.getOrElse(""))
ui
appInfo.map { info =>
ui.setAppName(s"${info.name} ($appId)")

val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
ui.getSecurityManager.setAcls(uiAclsEnabled)
// make sure to set admin acls before view acls so they are properly picked up
ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
ui.getSecurityManager.setViewAcls(attempt.sparkUser,
appListener.viewAcls.getOrElse(""))
ui
}
}
}
} catch {
Expand Down Expand Up @@ -282,8 +276,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val newAttempts = logs.flatMap { fileStatus =>
try {
val res = replay(fileStatus, bus)
logInfo(s"Application log ${res.logPath} loaded successfully.")
Some(res)
res match {
case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully.")
case None => logWarning(s"Failed to load application log ${fileStatus.getPath}. " +
"The application may have not started.")
}
res
} catch {
case e: Exception =>
logError(
Expand Down Expand Up @@ -429,9 +427,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

/**
* Replays the events in the specified log file and returns information about the associated
* application.
* application. Return `None` if the application ID cannot be located.
*/
private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = {
private def replay(
eventLog: FileStatus,
bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
val logPath = eventLog.getPath()
logInfo(s"Replaying log path: $logPath")
val logInput =
Expand All @@ -445,16 +445,24 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val appCompleted = isApplicationCompleted(eventLog)
bus.addListener(appListener)
bus.replay(logInput, logPath.toString, !appCompleted)
new FsApplicationAttemptInfo(
logPath.getName(),
appListener.appName.getOrElse(NOT_STARTED),
appListener.appId.getOrElse(logPath.getName()),
appListener.appAttemptId,
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog).get,
appListener.sparkUser.getOrElse(NOT_STARTED),
appCompleted)

// Without an app ID, new logs will render incorrectly in the listing page, so do not list or
// try to show their UI. Some old versions of Spark generate logs without an app ID, so let
// logs generated by those versions go through.
if (appListener.appId.isDefined || !sparkVersionHasAppId(eventLog)) {
Some(new FsApplicationAttemptInfo(
logPath.getName(),
appListener.appName.getOrElse(NOT_STARTED),
appListener.appId.getOrElse(logPath.getName()),
appListener.appAttemptId,
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog).get,
appListener.sparkUser.getOrElse(NOT_STARTED),
appCompleted))
} else {
None
}
} finally {
logInput.close()
}
Expand Down Expand Up @@ -529,10 +537,34 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

/**
* Returns whether the version of Spark that generated logs records app IDs. App IDs were added
* in Spark 1.1.
*/
private def sparkVersionHasAppId(entry: FileStatus): Boolean = {
if (isLegacyLogDirectory(entry)) {
fs.listStatus(entry.getPath())
.find { status => status.getPath().getName().startsWith(SPARK_VERSION_PREFIX) }
.map { status =>
val version = status.getPath().getName().substring(SPARK_VERSION_PREFIX.length())
version != "1.0" && version != "1.1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we check for 1.1 here if app IDs are already added then? Were they missing in some but not all of the 1.1.x releases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then the java doc a few lines up is wrong. "App IDs were added in Spark 1.1."

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right? If so I'll fix this when I merge

}
.getOrElse(true)
} else {
true
}
}

}

private object FsHistoryProvider {
private[history] object FsHistoryProvider {
val DEFAULT_LOG_DIR = "file:/tmp/spark-events"

// Constants used to parse Spark 1.0.0 log directories.
val LOG_PREFIX = "EVENT_LOG_"
val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + "_"
val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
}

private class FsApplicationAttemptInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import org.apache.spark.util.{JsonProtocol, ManualClock, Utils}

class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging {

import FsHistoryProvider._

private var testDir: File = null

before {
Expand Down Expand Up @@ -67,43 +69,39 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
// Write a new-style application log.
val newAppComplete = newLogFile("new1", None, inProgress = false)
writeFile(newAppComplete, true, None,
SparkListenerApplicationStart("new-app-complete", None, 1L, "test", None),
SparkListenerApplicationStart(newAppComplete.getName(), Some("new-app-complete"), 1L, "test",
None),
SparkListenerApplicationEnd(5L)
)

// Write a new-style application log.
val newAppCompressedComplete = newLogFile("new1compressed", None, inProgress = false,
Some("lzf"))
writeFile(newAppCompressedComplete, true, None,
SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test", None),
SparkListenerApplicationStart(newAppCompressedComplete.getName(), Some("new-complete-lzf"),
1L, "test", None),
SparkListenerApplicationEnd(4L))

// Write an unfinished app, new-style.
val newAppIncomplete = newLogFile("new2", None, inProgress = true)
writeFile(newAppIncomplete, true, None,
SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test", None)
SparkListenerApplicationStart(newAppIncomplete.getName(), Some("new-incomplete"), 1L, "test",
None)
)

// Write an old-style application log.
val oldAppComplete = new File(testDir, "old1")
oldAppComplete.mkdir()
createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None,
SparkListenerApplicationStart("old-app-complete", None, 2L, "test", None),
val oldAppComplete = writeOldLog("old1", "1.0", None, true,
SparkListenerApplicationStart("old1", Some("old-app-complete"), 2L, "test", None),
SparkListenerApplicationEnd(3L)
)
createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE))

// Check for logs so that we force the older unfinished app to be loaded, to make
// sure unfinished apps are also sorted correctly.
provider.checkForLogs()

// Write an unfinished app, old-style.
val oldAppIncomplete = new File(testDir, "old2")
oldAppIncomplete.mkdir()
createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, None,
SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test", None)
val oldAppIncomplete = writeOldLog("old2", "1.0", None, false,
SparkListenerApplicationStart("old2", None, 2L, "test", None)
)

// Force a reload of data from the log directory, and check that both logs are loaded.
Expand All @@ -124,16 +122,15 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed)))
}

list(0) should be (makeAppInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L,
list(0) should be (makeAppInfo("new-app-complete", newAppComplete.getName(), 1L, 5L,
newAppComplete.lastModified(), "test", true))
list(1) should be (makeAppInfo(newAppCompressedComplete.getName(),
"new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test",
true))
list(2) should be (makeAppInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
list(1) should be (makeAppInfo("new-complete-lzf", newAppCompressedComplete.getName(),
1L, 4L, newAppCompressedComplete.lastModified(), "test", true))
list(2) should be (makeAppInfo("old-app-complete", oldAppComplete.getName(), 2L, 3L,
oldAppComplete.lastModified(), "test", true))
list(3) should be (makeAppInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L, -1L,
oldAppIncomplete.lastModified(), "test", false))
list(4) should be (makeAppInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L, -1L,
list(3) should be (makeAppInfo(oldAppIncomplete.getName(), oldAppIncomplete.getName(), 2L,
-1L, oldAppIncomplete.lastModified(), "test", false))
list(4) should be (makeAppInfo("new-incomplete", newAppIncomplete.getName(), 1L, -1L,
newAppIncomplete.lastModified(), "test", false))

// Make sure the UI can be rendered.
Expand All @@ -155,12 +152,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
val codec = if (valid) CompressionCodec.createCodec(new SparkConf(), codecName) else null
val logDir = new File(testDir, codecName)
logDir.mkdir()
createEmptyFile(new File(logDir, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(logDir, provider.LOG_PREFIX + "1"), false, Option(codec),
createEmptyFile(new File(logDir, SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(logDir, LOG_PREFIX + "1"), false, Option(codec),
SparkListenerApplicationStart("app2", None, 2L, "test", None),
SparkListenerApplicationEnd(3L)
)
createEmptyFile(new File(logDir, provider.COMPRESSION_CODEC_PREFIX + codecName))
createEmptyFile(new File(logDir, COMPRESSION_CODEC_PREFIX + codecName))

val logPath = new Path(logDir.getAbsolutePath())
try {
Expand All @@ -180,12 +177,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
test("SPARK-3697: ignore directories that cannot be read.") {
val logFile1 = newLogFile("new1", None, inProgress = false)
writeFile(logFile1, true, None,
SparkListenerApplicationStart("app1-1", None, 1L, "test", None),
SparkListenerApplicationStart("app1-1", Some("app1-1"), 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
val logFile2 = newLogFile("new2", None, inProgress = false)
writeFile(logFile2, true, None,
SparkListenerApplicationStart("app1-2", None, 1L, "test", None),
SparkListenerApplicationStart("app1-2", Some("app1-2"), 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
logFile2.setReadable(false, false)
Expand Down Expand Up @@ -218,6 +215,18 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
}
}

test("Parse logs that application is not started") {
val provider = new FsHistoryProvider((createTestConf()))

val logFile1 = newLogFile("app1", None, inProgress = true)
writeFile(logFile1, true, None,
SparkListenerLogStart("1.4")
)
updateAndCheck(provider) { list =>
list.size should be (0)
}
}

test("SPARK-5582: empty log directory") {
val provider = new FsHistoryProvider(createTestConf())

Expand Down Expand Up @@ -373,6 +382,33 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
}
}

test("SPARK-8372: new logs with no app ID are ignored") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for adding the tests here.

val provider = new FsHistoryProvider(createTestConf())

// Write a new log file without an app id, to make sure it's ignored.
val logFile1 = newLogFile("app1", None, inProgress = true)
writeFile(logFile1, true, None,
SparkListenerLogStart("1.4")
)

// Write a 1.2 log file with no start event (= no app id), it should be ignored.
writeOldLog("v12Log", "1.2", None, false)

// Write 1.0 and 1.1 logs, which don't have app ids.
writeOldLog("v11Log", "1.1", None, true,
SparkListenerApplicationStart("v11Log", None, 2L, "test", None),
SparkListenerApplicationEnd(3L))
writeOldLog("v10Log", "1.0", None, true,
SparkListenerApplicationStart("v10Log", None, 2L, "test", None),
SparkListenerApplicationEnd(4L))

updateAndCheck(provider) { list =>
list.size should be (2)
list(0).id should be ("v10Log")
list(1).id should be ("v11Log")
}
}

/**
* Asks the provider to check for logs and calls a function to perform checks on the updated
* app list. Example:
Expand Down Expand Up @@ -412,4 +448,23 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
new SparkConf().set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
}

private def writeOldLog(
fname: String,
sparkVersion: String,
codec: Option[CompressionCodec],
completed: Boolean,
events: SparkListenerEvent*): File = {
val log = new File(testDir, fname)
log.mkdir()

val oldEventLog = new File(log, LOG_PREFIX + "1")
createEmptyFile(new File(log, SPARK_VERSION_PREFIX + sparkVersion))
writeFile(new File(log, LOG_PREFIX + "1"), false, codec, events: _*)
if (completed) {
createEmptyFile(new File(log, APPLICATION_COMPLETE))
}

log
}

}