Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29043][Core] Improve the concurrent performance of History Server #25797

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ package org.apache.spark.deploy.history
import java.io.{File, FileNotFoundException, IOException}
import java.lang.{Long => JLong}
import java.nio.file.Files
import java.util.{Date, ServiceLoader}
import java.util.{Date, NoSuchElementException, ServiceLoader}
import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future, TimeUnit}
import java.util.zip.ZipOutputStream

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.ExecutionException
import scala.io.Source
import scala.xml.Node

Expand Down Expand Up @@ -160,6 +159,26 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
new HistoryServerDiskManager(conf, path, listing, clock)
}

// Used to store the paths, which are being processed. This enable the replay log tasks execute
// asynchronously and make sure that checkForLogs would not process a path repeatedly.
private val processing = ConcurrentHashMap.newKeySet[String]

private def isProcessing(path: Path): Boolean = {
processing.contains(path.getName)
}

private def isProcessing(info: LogInfo): Boolean = {
processing.contains(info.logPath.split("/").last)
}

private def processing(path: Path): Unit = {
turboFei marked this conversation as resolved.
Show resolved Hide resolved
processing.add(path.getName)
}

private def endProcessing(path: Path): Unit = {
processing.remove(path.getName)
}

private val blacklist = new ConcurrentHashMap[String, Long]

// Visible for testing
Expand Down Expand Up @@ -440,6 +459,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

val updated = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil)
.filter { entry => !isBlacklisted(entry.getPath) }
.filter { entry => !isProcessing(entry.getPath) }
.flatMap { entry => EventLogFileReader(fs, entry) }
.filter { reader =>
try {
Expand Down Expand Up @@ -512,43 +532,17 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
logDebug(s"New/updated attempts found: ${updated.size} ${updated.map(_.rootPath)}")
}

val tasks = updated.flatMap { entry =>
updated.foreach { entry =>
processing(entry.rootPath)
try {
val task: Future[Unit] = replayExecutor.submit(
() => mergeApplicationListing(entry, newLastScanTime, true))
Some(task -> entry.rootPath)
val task: Runnable = () => mergeApplicationListing(entry, newLastScanTime, true)
replayExecutor.submit(task)
} catch {
// let the iteration over the updated entries break, since an exception on
// replayExecutor.submit (..) indicates the ExecutorService is unable
// to take any more submissions at this time
case e: Exception =>
logError(s"Exception while submitting event log for replay", e)
None
}
}

pendingReplayTasksCount.addAndGet(tasks.size)

// Wait for all tasks to finish. This makes sure that checkForLogs
// is not scheduled again while some tasks are already running in
// the replayExecutor.
turboFei marked this conversation as resolved.
Show resolved Hide resolved
tasks.foreach { case (task, path) =>
try {
task.get()
} catch {
case e: InterruptedException =>
throw e
case e: ExecutionException if e.getCause.isInstanceOf[AccessControlException] =>
// We don't have read permissions on the log file
logWarning(s"Unable to read log $path", e.getCause)
blacklist(path)
// SPARK-28157 We should remove this blacklisted entry from the KVStore
// to handle permission-only changes with the same file sizes later.
listing.delete(classOf[LogInfo], path.toString)
case e: Exception =>
logError("Exception while merging application listings", e)
} finally {
pendingReplayTasksCount.decrementAndGet()
}
}

Expand All @@ -563,7 +557,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
.last(newLastScanTime - 1)
.asScala
.toList
stale.foreach { log =>
stale.filterNot(isProcessing).foreach { log =>
log.appId.foreach { appId =>
cleanAppData(appId, log.attemptId, log.logPath)
listing.delete(classOf[LogInfo], log.logPath)
Expand Down Expand Up @@ -664,10 +658,39 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

private def mergeApplicationListing(
reader: EventLogFileReader,
scanTime: Long,
enableOptimizations: Boolean): Unit = {
try {
pendingReplayTasksCount.incrementAndGet()
doMergeApplicationListing(reader, scanTime, enableOptimizations)
if (conf.get(CLEANER_ENABLED)) {
checkAndCleanLog(reader.rootPath.toString)
}
} catch {
case e: InterruptedException =>
turboFei marked this conversation as resolved.
Show resolved Hide resolved
throw e
case e: AccessControlException =>
// We don't have read permissions on the log file
logWarning(s"Unable to read log ${reader.rootPath}", e)
blacklist(reader.rootPath)
// SPARK-28157 We should remove this blacklisted entry from the KVStore
// to handle permission-only changes with the same file sizes later.
listing.delete(classOf[LogInfo], reader.rootPath.toString)
case e: Exception =>
logError("Exception while merging application listings", e)
} finally {
endProcessing(reader.rootPath)
pendingReplayTasksCount.decrementAndGet()
}
}

/**
* Replay the given log file, saving the application in the listing db.
* Visable for testing
*/
protected def mergeApplicationListing(
private[history] def doMergeApplicationListing(
reader: EventLogFileReader,
scanTime: Long,
enableOptimizations: Boolean): Unit = {
Expand Down Expand Up @@ -773,7 +796,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// mean the end event is before the configured threshold, so call the method again to
// re-parse the whole log.
logInfo(s"Reparsing $logPath since end event was not found.")
mergeApplicationListing(reader, scanTime, enableOptimizations = false)
doMergeApplicationListing(reader, scanTime, enableOptimizations = false)

case _ =>
// If the app hasn't written down its app ID to the logs, still record the entry in the
Expand All @@ -798,6 +821,30 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

/**
* Check and delete specified event log according to the max log age defined by the user.
*/
private[history] def checkAndCleanLog(logPath: String): Unit = Utils.tryLog {
val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000
val log = listing.read(classOf[LogInfo], logPath)

if (log.lastProcessed <= maxTime && log.appId.isEmpty) {
turboFei marked this conversation as resolved.
Show resolved Hide resolved
logInfo(s"Deleting invalid / corrupt event log ${log.logPath}")
deleteLog(fs, new Path(log.logPath))
listing.delete(classOf[LogInfo], log.logPath)
}

log.appId.foreach { appId =>
val app = listing.read(classOf[ApplicationInfoWrapper], appId)
if (app.oldestAttempt() <= maxTime) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the logic is consistent with cleanLogs().
But, I think there is an overlap between app.oldestAttempt() <= maxTime and attempt.info.lastUpdated.getTime() >= maxTime, even it does not matter.

val (remaining, toDelete) = app.attempts.partition { attempt =>
attempt.info.lastUpdated.getTime() >= maxTime
}
deleteAttemptLogs(app, remaining, toDelete)
}
}
}

/**
* Delete event logs from the log directory according to the clean policy defined by the user.
*/
Expand Down Expand Up @@ -827,7 +874,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
.asScala
.filter { l => l.logType == null || l.logType == LogType.EventLogs }
.toList
stale.foreach { log =>
stale.filterNot(isProcessing).foreach { log =>
if (log.appId.isEmpty) {
logInfo(s"Deleting invalid / corrupt event log ${log.logPath}")
deleteLog(fs, new Path(log.logPath))
Expand Down Expand Up @@ -935,7 +982,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
.asScala
.filter { l => l.logType != null && l.logType == LogType.DriverLogs }
.toList
stale.foreach { log =>
stale.filterNot(isProcessing).foreach { log =>
logInfo(s"Deleting invalid driver log ${log.logPath}")
listing.delete(classOf[LogInfo], log.logPath)
deleteLog(driverLogFs, new Path(log.logPath))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,13 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
assume(!Utils.isWindows)

class TestFsHistoryProvider extends FsHistoryProvider(createTestConf()) {
var mergeApplicationListingCall = 0
override protected def mergeApplicationListing(
var doMergeApplicationListingCall = 0
override private[history] def doMergeApplicationListing(
reader: EventLogFileReader,
lastSeen: Long,
enableSkipToEnd: Boolean): Unit = {
super.mergeApplicationListing(reader, lastSeen, enableSkipToEnd)
mergeApplicationListingCall += 1
super.doMergeApplicationListing(reader, lastSeen, enableSkipToEnd)
doMergeApplicationListingCall += 1
}
}
val provider = new TestFsHistoryProvider
Expand All @@ -187,7 +187,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
list.size should be (1)
}

provider.mergeApplicationListingCall should be (1)
provider.doMergeApplicationListingCall should be (1)
}

test("history file is renamed from inprogress to completed") {
Expand Down Expand Up @@ -1321,6 +1321,35 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
assertSerDe(serializer, attemptInfoWithIndex)
}

test("SPARK-29043: clean up specified event log") {
val clock = new ManualClock()
val conf = createTestConf().set(MAX_LOG_AGE_S.key, "0").set(CLEANER_ENABLED.key, "true")
Copy link
Contributor

@vanzin vanzin Dec 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to use .key here.

(I'll fix this during merge.)

val provider = new FsHistoryProvider(conf, clock)

// create an invalid application log file
val inValidLogFile = newLogFile("inValidLogFile", None, inProgress = true)
inValidLogFile.createNewFile()
writeFile(inValidLogFile, None,
SparkListenerApplicationStart(inValidLogFile.getName, None, 1L, "test", None))
inValidLogFile.setLastModified(clock.getTimeMillis())

// create a valid application log file
val validLogFile = newLogFile("validLogFile", None, inProgress = true)
validLogFile.createNewFile()
writeFile(validLogFile, None,
SparkListenerApplicationStart(validLogFile.getName, Some("local_123"), 1L, "test", None))
validLogFile.setLastModified(clock.getTimeMillis())

provider.checkForLogs()
// The invalid application log file would be cleaned by checkAndCleanLog().
assert(new File(testDir.toURI).listFiles().size === 1)

clock.advance(1)
// cleanLogs() would clean the valid application log file.
provider.cleanLogs()
assert(new File(testDir.toURI).listFiles().size === 0)
}

private def assertOptionAfterSerde(opt: Option[Long], expected: Option[Long]): Unit = {
if (expected.isEmpty) {
assert(opt.isEmpty)
Expand Down