Skip to content

Commit

Permalink
[SPARK-29043] Improve the concurrent performance of History Server
Browse files Browse the repository at this point in the history
  • Loading branch information
turboFei committed Nov 6, 2019
1 parent 8353000 commit 66e9dd1
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import java.util.zip.{ZipEntry, ZipOutputStream}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.ExecutionException
import scala.io.Source
import scala.util.Try
import scala.xml.Node
Expand Down Expand Up @@ -161,6 +160,26 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
new HistoryServerDiskManager(conf, path, listing, clock)
}

// Used to store the paths, which are being processed. This enable the replay log tasks execute
// asynchronously and make sure that checkForLogs would not process a path repeatedly.
private val processing = ConcurrentHashMap.newKeySet[String]

private def isProcessing(path: Path): Boolean = {
processing.contains(path.getName)
}

private def isProcessing(info: LogInfo): Boolean = {
processing.contains(info.logPath.split("/").last)
}

private def processing(path: Path): Unit = {
processing.add(path.getName)
}

private def endProcessing(path: Path): Unit = {
processing.remove(path.getName)
}

private val blacklist = new ConcurrentHashMap[String, Long]

// Visible for testing
Expand Down Expand Up @@ -440,6 +459,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

val updated = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil)
.filter { entry => !isBlacklisted(entry.getPath) }
.filter { entry => !isProcessing(entry.getPath) }
.flatMap { entry => EventLogFileReader(fs, entry) }
.filter { reader =>
try {
Expand Down Expand Up @@ -512,11 +532,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
logDebug(s"New/updated attempts found: ${updated.size} ${updated.map(_.rootPath)}")
}

val tasks = updated.flatMap { entry =>
updated.foreach { entry =>
processing(entry.rootPath)
try {
val task: Future[Unit] = replayExecutor.submit(
() => mergeApplicationListing(entry, newLastScanTime, true))
Some(task -> entry.rootPath)
val task: Runnable = () => mergeApplicationListing(entry, newLastScanTime, true)
replayExecutor.submit(task)
} catch {
// let the iteration over the updated entries break, since an exception on
// replayExecutor.submit (..) indicates the ExecutorService is unable
Expand All @@ -527,31 +547,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

pendingReplayTasksCount.addAndGet(tasks.size)

// Wait for all tasks to finish. This makes sure that checkForLogs
// is not scheduled again while some tasks are already running in
// the replayExecutor.
tasks.foreach { case (task, path) =>
try {
task.get()
} catch {
case e: InterruptedException =>
throw e
case e: ExecutionException if e.getCause.isInstanceOf[AccessControlException] =>
// We don't have read permissions on the log file
logWarning(s"Unable to read log $path", e.getCause)
blacklist(path)
// SPARK-28157 We should remove this blacklisted entry from the KVStore
// to handle permission-only changes with the same file sizes later.
listing.delete(classOf[LogInfo], path.toString)
case e: Exception =>
logError("Exception while merging application listings", e)
} finally {
pendingReplayTasksCount.decrementAndGet()
}
}

// Delete all information about applications whose log files disappeared from storage.
// This is done by identifying the event logs which were not touched by the current
// directory scan.
Expand All @@ -563,7 +558,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
.last(newLastScanTime - 1)
.asScala
.toList
stale.foreach { log =>
stale.filterNot(isProcessing).foreach { log =>
log.appId.foreach { appId =>
cleanAppData(appId, log.attemptId, log.logPath)
listing.delete(classOf[LogInfo], log.logPath)
Expand Down Expand Up @@ -664,10 +659,42 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

private def mergeApplicationListing(
reader: EventLogFileReader,
scanTime: Long,
enableOptimizations: Boolean): Unit = {
try {
pendingReplayTasksCount.incrementAndGet()
doMergeApplicationListing(reader, scanTime, enableOptimizations)
} catch {
case e: InterruptedException =>
throw e
case e: AccessControlException =>
// We don't have read permissions on the log file
logWarning(s"Unable to read log ${reader.rootPath}", e)
blacklist(reader.rootPath)
// SPARK-28157 We should remove this blacklisted entry from the KVStore
// to handle permission-only changes with the same file sizes later.
listing.delete(classOf[LogInfo], reader.rootPath.toString)
case e: Exception =>
logError("Exception while merging application listings", e)
} finally {
endProcessing(reader.rootPath)
pendingReplayTasksCount.decrementAndGet()

val isExpired = scanTime + conf.get(MAX_LOG_AGE_S) * 1000 < clock.getTimeMillis()
if (isExpired) {
listing.delete(classOf[LogInfo], reader.rootPath.toString)
deleteLog(fs, reader.rootPath)
}
}
}

/**
* Replay the given log file, saving the application in the listing db.
* Visable for testing
*/
protected def mergeApplicationListing(
private[history] def doMergeApplicationListing(
reader: EventLogFileReader,
scanTime: Long,
enableOptimizations: Boolean): Unit = {
Expand Down Expand Up @@ -773,7 +800,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// mean the end event is before the configured threshold, so call the method again to
// re-parse the whole log.
logInfo(s"Reparsing $logPath since end event was not found.")
mergeApplicationListing(reader, scanTime, enableOptimizations = false)
doMergeApplicationListing(reader, scanTime, enableOptimizations = false)

case _ =>
// If the app hasn't written down its app ID to the logs, still record the entry in the
Expand Down Expand Up @@ -827,7 +854,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
.asScala
.filter { l => l.logType == null || l.logType == LogType.EventLogs }
.toList
stale.foreach { log =>
stale.filterNot(isProcessing).foreach { log =>
if (log.appId.isEmpty) {
logInfo(s"Deleting invalid / corrupt event log ${log.logPath}")
deleteLog(fs, new Path(log.logPath))
Expand Down Expand Up @@ -935,7 +962,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
.asScala
.filter { l => l.logType != null && l.logType == LogType.DriverLogs }
.toList
stale.foreach { log =>
stale.filterNot(isProcessing).foreach { log =>
logInfo(s"Deleting invalid driver log ${log.logPath}")
listing.delete(classOf[LogInfo], log.logPath)
deleteLog(driverLogFs, new Path(log.logPath))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,13 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
assume(!Utils.isWindows)

class TestFsHistoryProvider extends FsHistoryProvider(createTestConf()) {
var mergeApplicationListingCall = 0
override protected def mergeApplicationListing(
var doMergeApplicationListingCall = 0
override private[history] def doMergeApplicationListing(
reader: EventLogFileReader,
lastSeen: Long,
enableSkipToEnd: Boolean): Unit = {
super.mergeApplicationListing(reader, lastSeen, enableSkipToEnd)
mergeApplicationListingCall += 1
super.doMergeApplicationListing(reader, lastSeen, enableSkipToEnd)
doMergeApplicationListingCall += 1
}
}
val provider = new TestFsHistoryProvider
Expand All @@ -187,7 +187,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
list.size should be (1)
}

provider.mergeApplicationListingCall should be (1)
provider.doMergeApplicationListingCall should be (1)
}

test("history file is renamed from inprogress to completed") {
Expand Down

0 comments on commit 66e9dd1

Please sign in to comment.