Skip to content

Commit

Permalink
check and delete expired log
Browse files Browse the repository at this point in the history
  • Loading branch information
turboFei committed Nov 7, 2019
1 parent 66e9dd1 commit d5ff72c
Showing 1 changed file with 40 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ package org.apache.spark.deploy.history

import java.io.{File, FileNotFoundException, IOException}
import java.nio.file.Files
import java.util.{Date, ServiceLoader}
import java.util.{Date, NoSuchElementException, ServiceLoader}
import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future, TimeUnit}
import java.util.zip.{ZipEntry, ZipOutputStream}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.io.Source
import scala.util.Try
import scala.util.{Failure, Success, Try}
import scala.xml.Node

import com.fasterxml.jackson.annotation.JsonIgnore
Expand Down Expand Up @@ -681,12 +681,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
} finally {
endProcessing(reader.rootPath)
pendingReplayTasksCount.decrementAndGet()

val isExpired = scanTime + conf.get(MAX_LOG_AGE_S) * 1000 < clock.getTimeMillis()
if (isExpired) {
listing.delete(classOf[LogInfo], reader.rootPath.toString)
deleteLog(fs, reader.rootPath)
}
checkAndCleanLog(reader.rootPath.toString)
}
}

Expand Down Expand Up @@ -825,6 +820,43 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

/**
* Check and delete specified event log according to the max log age defined by the user.
*/
private def checkAndCleanLog(logPath: String): Unit = Utils.tryLog {
val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000
val expiredLog = Try {
val log = listing.read(classOf[LogInfo], logPath)
if (log.lastProcessed < maxTime) Some(log) else None
} match {
case Success(log) => log
case Failure(_: NoSuchElementException) => None
case Failure(e) => throw e
}

expiredLog.foreach { log =>
log.appId.foreach { appId =>
listing.view(classOf[ApplicationInfoWrapper])
.index("oldestAttempt")
.reverse()
.first(maxTime)
.asScala
.filter(_.info.id == appId)
.foreach { app =>
val (remaining, toDelete) = app.attempts.partition { attempt =>
attempt.info.lastUpdated.getTime() >= maxTime
}
deleteAttemptLogs(app, remaining, toDelete)
}
}
if (log.appId.isEmpty) {
logInfo(s"Deleting invalid / corrupt event log ${log.logPath}")
deleteLog(fs, new Path(log.logPath))
listing.delete(classOf[LogInfo], log.logPath)
}
}
}

/**
* Delete event logs from the log directory according to the clean policy defined by the user.
*/
Expand Down

0 comments on commit d5ff72c

Please sign in to comment.