diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 5937d0d7ecb7c..54bc290b3787f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -584,7 +584,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) .foreach { log => log.appId.foreach { appId => cleanAppData(appId, log.attemptId, log.logPath) - listing.delete(classOf[LogInfo], log.logPath) + listing.synchronized { + listing.delete(classOf[LogInfo], log.logPath) + } } } @@ -698,7 +700,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // If the LogInfo read had succeeded, but the ApplicationInafoWrapper // read failure and throw the exception, we should also cleanup the log // info from listing db. - listing.delete(classOf[LogInfo], reader.rootPath.toString) + listing.synchronized { + listing.delete(classOf[LogInfo], reader.rootPath.toString) + } false } else if (count < conf.get(UPDATE_BATCHSIZE)) { listing.write(LogInfo(reader.rootPath.toString(), newLastScanTime, @@ -1194,7 +1198,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) logInfo(log"Deleting invalid / corrupt event log ${MDC(PATH, log.logPath)}") val logPath = new Path(log.logPath) deleteLog(fsForPath(logPath), logPath) - listing.delete(classOf[LogInfo], log.logPath) + listing.synchronized { + listing.delete(classOf[LogInfo], log.logPath) + } } log.appId.foreach { appId => @@ -1228,22 +1234,28 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } // Delete log files that don't have a valid application and exceed the configured max age. - val stale = KVUtils.viewToSeq(listing.view(classOf[LogInfo]) - .index("lastProcessed") - .reverse() - .first(maxTime), Int.MaxValue) { l => l.logType == null || l.logType == LogType.EventLogs } + val stale = listing.synchronized { + KVUtils.viewToSeq(listing.view(classOf[LogInfo]) + .index("lastProcessed") + .reverse() + .first(maxTime), Int.MaxValue) { l => l.logType == null || l.logType == LogType.EventLogs } + } stale.filterNot(isProcessing).foreach { log => if (log.appId.isEmpty) { logInfo(log"Deleting invalid / corrupt event log ${MDC(PATH, log.logPath)}") val logPath = new Path(log.logPath) deleteLog(fsForPath(logPath), logPath) - listing.delete(classOf[LogInfo], log.logPath) + listing.synchronized { + listing.delete(classOf[LogInfo], log.logPath) + } } } // If the number of files is bigger than MAX_LOG_NUM, // clean up all completed attempts per application one by one. - val num = KVUtils.size(listing.view(classOf[LogInfo]).index("lastProcessed")) + val num = listing.synchronized { + KVUtils.size(listing.view(classOf[LogInfo]).index("lastProcessed")) + } var count = num - maxNum if (count > 0) { logInfo(log"Try to delete ${MDC(NUM_FILES, count)} old event logs" + @@ -1324,20 +1336,28 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } if (deleteFile) { logInfo(log"Deleting expired driver log for: ${MDC(PATH, logFileStr)}") - listing.delete(classOf[LogInfo], logFileStr) + listing.synchronized { + listing.delete(classOf[LogInfo], logFileStr) + } deleteLog(driverLogFs, f.getPath()) } } // Delete driver log file entries that exceed the configured max age and // may have been deleted on filesystem externally. - val stale = KVUtils.viewToSeq(listing.view(classOf[LogInfo]) - .index("lastProcessed") - .reverse() - .first(maxTime), Int.MaxValue) { l => l.logType != null && l.logType == LogType.DriverLogs } + val stale = listing.synchronized { + KVUtils.viewToSeq(listing.view(classOf[LogInfo]) + .index("lastProcessed") + .reverse() + .first(maxTime), Int.MaxValue) { l => + l.logType != null && l.logType == LogType.DriverLogs + } + } stale.filterNot(isProcessing).foreach { log => logInfo(log"Deleting invalid driver log ${MDC(PATH, log.logPath)}") - listing.delete(classOf[LogInfo], log.logPath) + listing.synchronized { + listing.delete(classOf[LogInfo], log.logPath) + } deleteLog(driverLogFs, new Path(log.logPath)) } }