Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21571][Scheduler] Spark history server leaves incomplete or unrea… #18791

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -107,6 +107,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
"; users with admin permissions: " + HISTORY_UI_ADMIN_ACLS.toString +
"; groups with admin permissions" + HISTORY_UI_ADMIN_ACLS_GROUPS.toString)

private val AGGRESSIVE_CLEANUP = conf.getBoolean("spark.history.fs.cleaner.aggressive", false)

private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
private val fs = new Path(logDir).getFileSystem(hadoopConf)

Expand Down Expand Up @@ -494,7 +496,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

private def getNewLastScanTime(): Long = {
protected def getNewLastScanTime(): Long = {
val fileName = "." + UUID.randomUUID().toString
val path = new Path(logDir, fileName)
val fos = fs.create(path)
Expand Down Expand Up @@ -587,17 +589,19 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

addListing(app)
}
listing.write(new LogInfo(logPath.toString(), fileStatus.getLen()))

listing.write(new LogInfo(logPath.toString(), fileStatus.getLen(),
fileStatus.getModificationTime()))
}

/**
* Delete event logs from the log directory according to the clean policy defined by the user.
*/
private[history] def cleanLogs(): Unit = {
val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000

var iterator: Option[KVStoreIterator[ApplicationInfoWrapper]] = None
try {
val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000

// Iterate descending over all applications whose oldest attempt happened before maxTime.
iterator = Some(listing.view(classOf[ApplicationInfoWrapper])
.index("oldestAttempt")
Expand All @@ -616,23 +620,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
listing.write(newApp)
}

toDelete.foreach { attempt =>
val logPath = new Path(logDir, attempt.logPath)
try {
listing.delete(classOf[LogInfo], logPath.toString())
} catch {
case _: NoSuchElementException =>
logDebug(s"Log info entry for $logPath not found.")
}
try {
fs.delete(logPath, true)
} catch {
case e: AccessControlException =>
logInfo(s"No permission to delete ${attempt.logPath}, ignoring.")
case t: IOException =>
logError(s"IOException in cleaning ${attempt.logPath}", t)
}
}
toDelete
.map(attempt => new Path(logDir, attempt.logPath))
.foreach(logPath => deleteLogInfo(logPath))

if (remaining.isEmpty) {
listing.delete(app.getClass(), app.id)
Expand All @@ -643,6 +633,44 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
} finally {
iterator.foreach(_.close())
}

// Clean corrupt or empty files that may have accumulated.
if (AGGRESSIVE_CLEANUP) {
var untracked: Option[KVStoreIterator[LogInfo]] = None
try {
untracked = Some(listing.view(classOf[LogInfo])
.index("lastModifiedTime")
.reverse()
.first(maxTime)
.closeableIterator())

untracked.get.asScala
.map(logInfo => new Path(logInfo.logPath))
.foreach(logPath => deleteLogInfo(logPath))
} catch {
case t: Exception => logError("Exception while cleaning logs", t)
} finally {
untracked.foreach(_.close())
}
}
}

private def deleteLogInfo(logPath: Path) = {
try {
listing.delete(classOf[LogInfo], logPath.toString)
} catch {
case _: NoSuchElementException =>
logDebug(s"Log info entry for $logPath not found.")
}

try {
fs.delete(logPath, true)
} catch {
case e: AccessControlException =>
logInfo(s"No permission to delete ${logPath}, ignoring.")
case t: IOException =>
logError(s"IOException in cleaning ${logPath}", t)
}
}

/**
Expand Down Expand Up @@ -710,13 +738,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

/**
* Return the last known size of the given event log, recorded the last time the file
* system scanner detected a change in the file.
* system scanner detected a change in the file. If the file has never been scanned we
* return -1 for file size so it starts being tracked.
*/
private def recordedFileSize(log: Path): Long = {
try {
listing.read(classOf[LogInfo], log.toString()).fileSize
} catch {
case _: NoSuchElementException => 0L
case _: NoSuchElementException => -1L
}
}

Expand Down Expand Up @@ -795,7 +824,8 @@ private[history] case class FsHistoryProviderMetadata(

private[history] case class LogInfo(
@KVIndexParam logPath: String,
fileSize: Long)
fileSize: Long,
@JsonIgnore @KVIndexParam("lastModifiedTime") lastModifiedTime: Long)

private[history] class AttemptInfoWrapper(
val info: v1.ApplicationAttemptInfo,
Expand Down
Expand Up @@ -142,7 +142,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
// setReadable(...) does not work on Windows. Please refer JDK-6728842.
assume(!Utils.isWindows)

class TestFsHistoryProvider extends FsHistoryProvider(createTestConf()) {
class TestFsHistoryProvider extends FsHistoryProvider(
createTestConf().set("spark.testing", "true")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is your reasoning for adding this? It doesn't seem related to the other tests you added

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test suite refused to pass on my machine so decided to fix it.

Mentioned in commit comments...

Fixed race condition in a test (SPARK-3697: ignore files that cannot be
read.) where the number of mergeApplicationListings could be more than 1
since the FsHistoryProvider would spin up an executor that also calls
checkForLogs in parallel with the test unless spark.testing=true configured.

var mergeApplicationListingCall = 0
override protected def mergeApplicationListing(fileStatus: FileStatus): Unit = {
super.mergeApplicationListing(fileStatus)
Expand Down Expand Up @@ -658,6 +659,103 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
freshUI.get.ui.store.job(0)
}

/**
* Validate aggressive clean up removes incomplete or corrupt history files that would
* otherwise be missed during clean up. Also validate no behavior change if aggressive
* clean up is disabled.
*/
test("SPARK-21571: aggressive clean up removes incomplete history files") {
createCleanAndCheckIncompleteLogFiles(14, 21, true, true, true, true, true)
createCleanAndCheckIncompleteLogFiles(14, 21, false, false, false, true, false)
createCleanAndCheckIncompleteLogFiles(14, 7, true, false, false, false, false)
createCleanAndCheckIncompleteLogFiles(14, 7, false, false, false, false, false)
}

/**
* Create four test incomplete/corrupt history files and invoke a check and clean cycle that
* passes followed by one occurring after the max age days rentention window and assert the
* expected number of history files remain.
* @param maxAgeDays maximum retention in days, used to simulate current time
* @param lastModifiedDaysAgo last modified date for test files relative to current time
* @param aggressiveCleanup aggressive clean up is enabled or not
* @param expectEmptyInprogressRemoved expect an empty inprogress file to be removed
* @param expectEmptyCorruptRemoved expect an empty corrupt complete file to be removed
* @param expectNonEmptyInprogressRemoved expect a non-empty inprogress file to be removed
* @param expectNonEmptyCorruptRemoved expect a non-empty corrupt complete file to be removed
*/
private def createCleanAndCheckIncompleteLogFiles(
maxAgeDays: Long,
lastModifiedDaysAgo: Long,
aggressiveCleanup: Boolean,
expectEmptyInprogressRemoved: Boolean,
expectEmptyCorruptRemoved: Boolean,
expectNonEmptyInprogressRemoved: Boolean,
expectNonEmptyCorruptRemoved: Boolean) = {
// Set current time as 2 * maximum retention period to allow for expired history files.
val currentTimeMillis = MILLISECONDS.convert(maxAgeDays * 2, TimeUnit.DAYS)
val clock = new ManualClock(currentTimeMillis)

val lastModifiedTime = currentTimeMillis -
MILLISECONDS.convert(lastModifiedDaysAgo, TimeUnit.DAYS)

val provider = new FsHistoryProvider(
createTestConf()
.set("spark.history.fs.cleaner.aggressive", s"${aggressiveCleanup}")
.set("spark.history.fs.cleaner.maxAge", s"${maxAgeDays}d")
.set("spark.testing", "true"),
clock) {
override def getNewLastScanTime(): Long = clock.getTimeMillis
}

// Create history files
// 1. 0-byte size files inprogress and corrupt complete files
// 2. >0 byte size files inprogress and corrupt complete files

try {
val logfile1 = newLogFile("emptyInprogressLogFile", None, inProgress = true)
logfile1.createNewFile
logfile1.setLastModified(lastModifiedTime)

val logfile2 = newLogFile("emptyCorruptLogFile", None, inProgress = false)
logfile2.createNewFile
logfile2.setLastModified(lastModifiedTime)

// Create an inprogress log file, has only start record.
val logfile3 = newLogFile("nonEmptyInprogressLogFile", None, inProgress = true)
writeFile(logfile3, true, None, SparkListenerApplicationStart(
"inProgress1", Some("inProgress1"), 3L, "test", Some("attempt1"))
)
logfile3.setLastModified(lastModifiedTime)

// Create an incomplete log file, has an end record but no start record.
val logfile4 = newLogFile("nonEmptyCorruptLogFile", None, inProgress = false)
writeFile(logfile4, true, None, SparkListenerApplicationEnd(0))
logfile4.setLastModified(lastModifiedTime)

// Simulate checking logs 1 day after initial creation. This is necessary because the log
// checker will sometimes use the current time in place of last modified time the first
// time it encounters an inprogress file to work around certain file system inconsistencies.
// No history files should clean up in first check and clean pass.
clock.setTime(lastModifiedTime + MILLISECONDS.convert(1, TimeUnit.DAYS))
provider.checkForLogs
provider.cleanLogs
assert(new File(testDir.toURI).listFiles().size == 4)

// Simulate checking logs at current time
clock.setTime(currentTimeMillis)
provider.checkForLogs
provider.cleanLogs

val remainingFiles = new File(testDir.toURI).listFiles.map(_.getName)
assert(remainingFiles.contains(logfile1.getName) == !expectEmptyInprogressRemoved)
assert(remainingFiles.contains(logfile2.getName) == !expectEmptyCorruptRemoved)
assert(remainingFiles.contains(logfile3.getName) == !expectNonEmptyInprogressRemoved)
assert(remainingFiles.contains(logfile4.getName) == !expectNonEmptyCorruptRemoved)
} finally {
new File(testDir.toURI).listFiles().map(_.delete())
}
}

/**
* Asks the provider to check for logs and calls a function to perform checks on the updated
* app list. Example:
Expand Down