Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7336][HistoryServer] Fix bug that applications status uncorrect on JobHistory UI. #5886

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.deploy.history

import java.io.{BufferedInputStream, FileNotFoundException, InputStream, IOException, OutputStream}
import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
import java.util.UUID
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: out of order

import java.util.zip.{ZipEntry, ZipOutputStream}

import scala.collection.mutable
Expand Down Expand Up @@ -73,7 +74,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// The modification time of the newest log detected during the last scan. This is used
// to ignore logs that are older during subsequent scans, to avoid processing data that
// is already known.
private var lastModifiedTime = -1L
private var lastScanTime = -1L

// Mapping of application IDs to their metadata, in descending end time order. Apps are inserted
// into the map in order, so the LinkedHashMap maintains the correct ordering.
Expand Down Expand Up @@ -179,15 +180,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
*/
private[history] def checkForLogs(): Unit = {
try {
val newLastScanTime = getNewLastScanTime()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, doesn't this have the opposite 'problem', that a log file will get scanned twice even if it hasn't changed in some cases?

If lastScanTime is 90, and newLastScanTime is 100, and a file is modified (once) at 101 (just after newLastScanTime is established) then it will be read twice. I'm just double-checking that this is fine if it happens only once in a while.

Touching a file seems a little icky but I understand the logic. I can't think of something better that doesn't involve listing the dir again, processing files again frequently, or taking arbitrary guesses about how long the listing takes.

This is worth it in the sense that the cost of missing an update in this very rare case is high? like a correctness issue? you'd miss a bit of history forever?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I mentioned that my suggestion would make SPARK-7189 worse. There are ways to fix it, such as those discussed in the bug, that don't require too much extra memory in the HS - basically keep track of all log files that have lastModified > newLastScanTime and, if they haven't changed in the next poll, don't re-parse them. That's a lot less state to keep track of than the previous approach in this PR - in fact most of the time there will be nothing to keep track of.

val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
.getOrElse(Seq[FileStatus]())
var newLastModifiedTime = lastModifiedTime
val logInfos: Seq[FileStatus] = statusList
.filter { entry =>
try {
getModificationTime(entry).map { time =>
newLastModifiedTime = math.max(newLastModifiedTime, time)
time >= lastModifiedTime
time >= lastScanTime
}.getOrElse(false)
} catch {
case e: AccessControlException =>
Expand Down Expand Up @@ -224,12 +224,29 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

lastModifiedTime = newLastModifiedTime
lastScanTime = newLastScanTime
} catch {
case e: Exception => logError("Exception in checking for event log updates", e)
}
}

private def getNewLastScanTime(): Long = {
val fileName = "." + UUID.randomUUID().toString
val path = new Path(logDir, fileName)
val fos = fs.create(path)

try {
fos.close()
fs.getFileStatus(path).getModificationTime
} catch {
case e: Exception =>
logError("Exception encountered when attempting to update last scan time", e)
lastScanTime
} finally {
fs.delete(path)
}
}

override def writeEventLogs(
appId: String,
attemptId: Option[String],
Expand Down