Skip to content

Commit

Permalink
[SPARK-5582] [history] Ignore empty log directories.
Browse files Browse the repository at this point in the history
Empty log directories are not useful at the moment, but if one ends
up showing in the log root, it breaks the code that checks for log
directories.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #4352 from vanzin/SPARK-5582 and squashes the following commits:

1a6a3d4 [Marcelo Vanzin] [SPARK-5582] Fix exception when looking at empty directories.
  • Loading branch information
Marcelo Vanzin authored and JoshRosen committed Feb 6, 2015
1 parent 25d8044 commit faccdcb
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
val logInfos = statusList
.filter { entry =>
try {
val modTime = getModificationTime(entry)
newLastModifiedTime = math.max(newLastModifiedTime, modTime)
modTime >= lastModifiedTime
getModificationTime(entry).map { time =>
newLastModifiedTime = math.max(newLastModifiedTime, time)
time >= lastModifiedTime
}.getOrElse(false)
} catch {
case e: AccessControlException =>
// Do not use "logInfo" since these messages can get pretty noisy if printed on
Expand Down Expand Up @@ -251,7 +252,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
appListener.appName.getOrElse(NOT_STARTED),
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog),
getModificationTime(eventLog).get,
appListener.sparkUser.getOrElse(NOT_STARTED),
isApplicationCompleted(eventLog))
} finally {
Expand Down Expand Up @@ -310,11 +311,16 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
*/
private def isLegacyLogDirectory(entry: FileStatus): Boolean = entry.isDir()

private def getModificationTime(fsEntry: FileStatus): Long = {
if (fsEntry.isDir) {
fs.listStatus(fsEntry.getPath).map(_.getModificationTime()).max
/**
* Returns the modification time of the given event log. If the status points at an empty
* directory, `None` is returned, indicating that there isn't an event log at that location.
*/
private def getModificationTime(fsEntry: FileStatus): Option[Long] = {
if (isLegacyLogDirectory(fsEntry)) {
val statusList = fs.listStatus(fsEntry.getPath)
if (!statusList.isEmpty) Some(statusList.map(_.getModificationTime()).max) else None
} else {
fsEntry.getModificationTime()
Some(fsEntry.getModificationTime())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,24 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
appListAfterRename.head.logPath should not endWith(EventLoggingListener.IN_PROGRESS)
}

test("SPARK-5582: empty log directory") {
val conf = new SparkConf()
.set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
val provider = new FsHistoryProvider(conf)

val logFile1 = new File(testDir, "app1" + EventLoggingListener.IN_PROGRESS)
writeFile(logFile1, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test"),
SparkListenerApplicationEnd(2L))

val oldLog = new File(testDir, "old1")
oldLog.mkdir()

provider.checkForLogs()
val appListAfterRename = provider.getListing()
appListAfterRename.size should be (1)
}

private def writeFile(file: File, isNewFormat: Boolean, codec: Option[CompressionCodec],
events: SparkListenerEvent*) = {
val out =
Expand Down

0 comments on commit faccdcb

Please sign in to comment.