-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-8372] History server shows incorrect information for application not started #6827
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -160,7 +160,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) | |
replayBus.addListener(appListener) | ||
val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus) | ||
|
||
ui.setAppName(s"${appInfo.name} ($appId)") | ||
appInfo.foreach { app => ui.setAppName(s"${app.name} ($appId)") } | ||
|
||
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false) | ||
ui.getSecurityManager.setAcls(uiAclsEnabled) | ||
|
@@ -282,8 +282,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) | |
val newAttempts = logs.flatMap { fileStatus => | ||
try { | ||
val res = replay(fileStatus, bus) | ||
logInfo(s"Application log ${res.logPath} loaded successfully.") | ||
Some(res) | ||
res match { | ||
case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully.") | ||
case None => logWarning(s"Failed to load application log ${fileStatus.getPath}." + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. need space after the period |
||
"The application may have not started.") | ||
} | ||
res | ||
} catch { | ||
case e: Exception => | ||
logError( | ||
|
@@ -431,7 +435,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) | |
* Replays the events in the specified log file and returns information about the associated | ||
* application. | ||
*/ | ||
private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = { | ||
private def replay( | ||
eventLog: FileStatus, | ||
bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. need to update the javadoc to explain when we return |
||
val logPath = eventLog.getPath() | ||
logInfo(s"Replaying log path: $logPath") | ||
val logInput = | ||
|
@@ -445,16 +451,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) | |
val appCompleted = isApplicationCompleted(eventLog) | ||
bus.addListener(appListener) | ||
bus.replay(logInput, logPath.toString, !appCompleted) | ||
new FsApplicationAttemptInfo( | ||
logPath.getName(), | ||
appListener.appName.getOrElse(NOT_STARTED), | ||
appListener.appId.getOrElse(logPath.getName()), | ||
appListener.appAttemptId, | ||
appListener.startTime.getOrElse(-1L), | ||
appListener.endTime.getOrElse(-1L), | ||
getModificationTime(eventLog).get, | ||
appListener.sparkUser.getOrElse(NOT_STARTED), | ||
appCompleted) | ||
appListener.appId.map { appId => | ||
new FsApplicationAttemptInfo( | ||
logPath.getName(), | ||
appListener.appName.getOrElse(NOT_STARTED), | ||
appId, | ||
appListener.appAttemptId, | ||
appListener.startTime.getOrElse(-1L), | ||
appListener.endTime.getOrElse(-1L), | ||
getModificationTime(eventLog).get, | ||
appListener.sparkUser.getOrElse(NOT_STARTED), | ||
appCompleted) | ||
} | ||
} finally { | ||
logInput.close() | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -67,29 +67,33 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc | |
// Write a new-style application log. | ||
val newAppComplete = newLogFile("new1", None, inProgress = false) | ||
writeFile(newAppComplete, true, None, | ||
SparkListenerApplicationStart("new-app-complete", None, 1L, "test", None), | ||
SparkListenerApplicationStart( | ||
"new-app-complete", Some("new-app-complete"), 1L, "test", None), | ||
SparkListenerApplicationEnd(5L) | ||
) | ||
|
||
// Write a new-style application log. | ||
val newAppCompressedComplete = newLogFile("new1compressed", None, inProgress = false, | ||
Some("lzf")) | ||
writeFile(newAppCompressedComplete, true, None, | ||
SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test", None), | ||
SparkListenerApplicationStart( | ||
"new-app-compressed-complete", Some("new-app-compressed-complete"), 1L, "test", None), | ||
SparkListenerApplicationEnd(4L)) | ||
|
||
// Write an unfinished app, new-style. | ||
val newAppIncomplete = newLogFile("new2", None, inProgress = true) | ||
writeFile(newAppIncomplete, true, None, | ||
SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test", None) | ||
SparkListenerApplicationStart( | ||
"new-app-incomplete", Some("new-app-incomplete"), 1L, "test", None) | ||
) | ||
|
||
// Write an old-style application log. | ||
val oldAppComplete = new File(testDir, "old1") | ||
oldAppComplete.mkdir() | ||
createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + "1.0")) | ||
writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None, | ||
SparkListenerApplicationStart("old-app-complete", None, 2L, "test", None), | ||
SparkListenerApplicationStart( | ||
"old-app-complete", Some("old-app-complete"), 2L, "test", None), | ||
SparkListenerApplicationEnd(3L) | ||
) | ||
createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE)) | ||
|
@@ -103,7 +107,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc | |
oldAppIncomplete.mkdir() | ||
createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + "1.0")) | ||
writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, None, | ||
SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test", None) | ||
SparkListenerApplicationStart( | ||
"old-app-incomplete", Some("old-app-incomplete"), 2L, "test", None) | ||
) | ||
|
||
// Force a reload of data from the log directory, and check that both logs are loaded. | ||
|
@@ -124,16 +129,16 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc | |
List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed))) | ||
} | ||
|
||
list(0) should be (makeAppInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L, | ||
list(0) should be (makeAppInfo("new-app-complete", "new-app-complete", 1L, 5L, | ||
newAppComplete.lastModified(), "test", true)) | ||
list(1) should be (makeAppInfo(newAppCompressedComplete.getName(), | ||
list(1) should be (makeAppInfo("new-app-compressed-complete", | ||
"new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test", | ||
true)) | ||
list(2) should be (makeAppInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L, | ||
list(2) should be (makeAppInfo("old-app-complete", "old-app-complete", 2L, 3L, | ||
oldAppComplete.lastModified(), "test", true)) | ||
list(3) should be (makeAppInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L, -1L, | ||
list(3) should be (makeAppInfo("old-app-incomplete", "old-app-incomplete", 2L, -1L, | ||
oldAppIncomplete.lastModified(), "test", false)) | ||
list(4) should be (makeAppInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L, -1L, | ||
list(4) should be (makeAppInfo("new-app-incomplete", "new-app-incomplete", 1L, -1L, | ||
newAppIncomplete.lastModified(), "test", false)) | ||
|
||
// Make sure the UI can be rendered. | ||
|
@@ -157,7 +162,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc | |
logDir.mkdir() | ||
createEmptyFile(new File(logDir, provider.SPARK_VERSION_PREFIX + "1.0")) | ||
writeFile(new File(logDir, provider.LOG_PREFIX + "1"), false, Option(codec), | ||
SparkListenerApplicationStart("app2", None, 2L, "test", None), | ||
SparkListenerApplicationStart("app2", Some("app2"), 2L, "test", None), | ||
SparkListenerApplicationEnd(3L) | ||
) | ||
createEmptyFile(new File(logDir, provider.COMPRESSION_CODEC_PREFIX + codecName)) | ||
|
@@ -180,12 +185,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc | |
test("SPARK-3697: ignore directories that cannot be read.") { | ||
val logFile1 = newLogFile("new1", None, inProgress = false) | ||
writeFile(logFile1, true, None, | ||
SparkListenerApplicationStart("app1-1", None, 1L, "test", None), | ||
SparkListenerApplicationStart("app1-1", Some("app1-1"), 1L, "test", None), | ||
SparkListenerApplicationEnd(2L) | ||
) | ||
val logFile2 = newLogFile("new2", None, inProgress = false) | ||
writeFile(logFile2, true, None, | ||
SparkListenerApplicationStart("app1-2", None, 1L, "test", None), | ||
SparkListenerApplicationStart("app1-2", Some("app1-2"), 1L, "test", None), | ||
SparkListenerApplicationEnd(2L) | ||
) | ||
logFile2.setReadable(false, false) | ||
|
@@ -218,6 +223,18 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc | |
} | ||
} | ||
|
||
test("Parse logs that application is not started") { | ||
val provider = new FsHistoryProvider((createTestConf())) | ||
|
||
val logFile1 = newLogFile("app1", None, inProgress = true) | ||
writeFile(logFile1, true, None, | ||
SparkListenerLogStart("1.4") | ||
) | ||
updateAndCheck(provider) { list => | ||
list.size should be (0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test did not fail before your changes, did it? This was the expected previous behavior and was actually covered by other tests. Could you write a test that actually triggers the condition you're fixing? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @vanzin , it did fail without my changes and the list.size is 1. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh I see. The filtering of active / inactive is actually done in HistoryServer, not in FsHistoryProvider. My bad. So basically the new behavior is that if the app doesn't have an app id, it's not even listed, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that's correct. |
||
} | ||
} | ||
|
||
test("SPARK-5582: empty log directory") { | ||
val provider = new FsHistoryProvider(createTestConf()) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be info?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh never mind, just saw @vanzin's comment