Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import java.util.zip.{ZipEntry, ZipOutputStream}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.io.Source
import scala.util.Try
import scala.util.{Success, Try}
import scala.xml.Node

import com.fasterxml.jackson.annotation.JsonIgnore
Expand Down Expand Up @@ -401,6 +401,52 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

val APP_GRP = "appId"
val ATTEMPT_GRP = "attemptId"
val appIdRegex = """(application_\d+_\d+)_(\d+).*""".r(APP_GRP, ATTEMPT_GRP)
private def updateAppList(statuses: Seq[FileStatus]) = {
statuses.foreach { fstat =>
logInfo(s"Processing file status $fstat")
val logPath = fstat.getPath
appIdRegex
.findAllMatchIn(logPath.getName)
.foreach { m =>
val appId = m.group(APP_GRP)
val attemptId = m.group(ATTEMPT_GRP)

Try(load(appId)).recover {
case e: NoSuchElementException =>
logInfo(s"Gernerating application event for $appId from $fstat")
val attemptInfo = ApplicationAttemptInfo(
Some(attemptId),
// timestamp will be refined during replay
new Date(fstat.getModificationTime),
new Date(fstat.getModificationTime),
new Date(fstat.getModificationTime),
1000,
fstat.getOwner,
false,
"2.3.0-tbd-by-replay"
)
val appInfo = new ApplicationInfoWrapper(
new ApplicationInfo(appId, "tbd-by-replay-" + logPath.getName,
None, None, None, None, Seq(attemptInfo)),
List(new AttemptInfoWrapper(attemptInfo, logPath.toString, fstat.getLen,
None, None, None, None)))

synchronized {
activeUIs.get((appId, Some(attemptId))).foreach { ui =>
ui.invalidate()
ui.ui.store.close()
}
}
listing.write(appInfo)
Success(appInfo)
}
}
}
}

/**
* Builds the application list based on the current contents of the log directory.
* Tries to reuse as much of the data already in memory as possible, by not reading
Expand Down Expand Up @@ -460,7 +506,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
if (updated.nonEmpty) {
logDebug(s"New/updated attempts found: ${updated.size} ${updated.map(_.getPath)}")
}

updateAppList(updated)
val tasks = updated.map { entry =>
try {
replayExecutor.submit(new Runnable {
Expand Down