Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
.foreach { log =>
log.appId.foreach { appId =>
cleanAppData(appId, log.attemptId, log.logPath)
listing.delete(classOf[LogInfo], log.logPath)
listing.synchronized {
listing.delete(classOf[LogInfo], log.logPath)
}
}
}

Expand Down Expand Up @@ -698,7 +700,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// If the LogInfo read had succeeded, but the ApplicationInafoWrapper
// read failure and throw the exception, we should also cleanup the log
// info from listing db.
listing.delete(classOf[LogInfo], reader.rootPath.toString)
listing.synchronized {
listing.delete(classOf[LogInfo], reader.rootPath.toString)
}
false
} else if (count < conf.get(UPDATE_BATCHSIZE)) {
listing.write(LogInfo(reader.rootPath.toString(), newLastScanTime,
Expand Down Expand Up @@ -1194,7 +1198,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
logInfo(log"Deleting invalid / corrupt event log ${MDC(PATH, log.logPath)}")
val logPath = new Path(log.logPath)
deleteLog(fsForPath(logPath), logPath)
listing.delete(classOf[LogInfo], log.logPath)
listing.synchronized {
listing.delete(classOf[LogInfo], log.logPath)
}
}

log.appId.foreach { appId =>
Expand Down Expand Up @@ -1228,22 +1234,28 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

// Delete log files that don't have a valid application and exceed the configured max age.
val stale = KVUtils.viewToSeq(listing.view(classOf[LogInfo])
.index("lastProcessed")
.reverse()
.first(maxTime), Int.MaxValue) { l => l.logType == null || l.logType == LogType.EventLogs }
val stale = listing.synchronized {
KVUtils.viewToSeq(listing.view(classOf[LogInfo])
.index("lastProcessed")
.reverse()
.first(maxTime), Int.MaxValue) { l => l.logType == null || l.logType == LogType.EventLogs }
}
stale.filterNot(isProcessing).foreach { log =>
if (log.appId.isEmpty) {
logInfo(log"Deleting invalid / corrupt event log ${MDC(PATH, log.logPath)}")
val logPath = new Path(log.logPath)
deleteLog(fsForPath(logPath), logPath)
listing.delete(classOf[LogInfo], log.logPath)
listing.synchronized {
listing.delete(classOf[LogInfo], log.logPath)
}
}
}

// If the number of files is bigger than MAX_LOG_NUM,
// clean up all completed attempts per application one by one.
val num = KVUtils.size(listing.view(classOf[LogInfo]).index("lastProcessed"))
Copy link
Copy Markdown
Member Author

@pan3793 pan3793 Mar 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NoSuchElementException is thrown from here, because the LevelDB/RocksDB-based KVStore does not support MVCC, we must use a relatively heavy synchronized to fix the concurrency issue.

val num = listing.synchronized {
KVUtils.size(listing.view(classOf[LogInfo]).index("lastProcessed"))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we wrap with synchronize here too?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sarutak, it seems I can not jump to the line in my browser, could you please provide the line number instead?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it's L1348.

val stale = KVUtils.viewToSeq(listing.view(classOf[LogInfo])

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense, updated

}
var count = num - maxNum
if (count > 0) {
logInfo(log"Try to delete ${MDC(NUM_FILES, count)} old event logs" +
Expand Down Expand Up @@ -1324,20 +1336,28 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
if (deleteFile) {
logInfo(log"Deleting expired driver log for: ${MDC(PATH, logFileStr)}")
listing.delete(classOf[LogInfo], logFileStr)
listing.synchronized {
listing.delete(classOf[LogInfo], logFileStr)
}
deleteLog(driverLogFs, f.getPath())
}
}

// Delete driver log file entries that exceed the configured max age and
// may have been deleted on filesystem externally.
val stale = KVUtils.viewToSeq(listing.view(classOf[LogInfo])
.index("lastProcessed")
.reverse()
.first(maxTime), Int.MaxValue) { l => l.logType != null && l.logType == LogType.DriverLogs }
val stale = listing.synchronized {
KVUtils.viewToSeq(listing.view(classOf[LogInfo])
.index("lastProcessed")
.reverse()
.first(maxTime), Int.MaxValue) { l =>
l.logType != null && l.logType == LogType.DriverLogs
}
}
stale.filterNot(isProcessing).foreach { log =>
logInfo(log"Deleting invalid driver log ${MDC(PATH, log.logPath)}")
listing.delete(classOf[LogInfo], log.logPath)
listing.synchronized {
listing.delete(classOf[LogInfo], log.logPath)
}
deleteLog(driverLogFs, new Path(log.logPath))
}
}
Expand Down