Skip to content

Commit

Permalink
Update monitoring.md, won't create memoryManager if hybrid store is d…
Browse files Browse the repository at this point in the history
…isabled, seperate logic of create leveldb store.
  • Loading branch information
Baohe Zhang committed Jun 15, 2020
1 parent 76dbd18 commit 025b13c
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

private val hybridStoreEnabled = conf.get(History.HYBRID_STORE_ENABLED)

private val memoryManager = new HistoryServerMemoryManager(conf)

// Visible for testing.
private[history] val listing: KVStore = storePath.map { path =>
val dbPath = Files.createDirectories(new File(path, "listing.ldb").toPath()).toFile()
Expand Down Expand Up @@ -162,6 +160,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
new HistoryServerDiskManager(conf, path, listing, clock)
}

private var memoryManager: HistoryServerMemoryManager = null
if (hybridStoreEnabled) {
memoryManager = new HistoryServerMemoryManager(conf)
}

private val fileCompactor = new EventLogFileCompactor(conf, hadoopConf, fs,
conf.get(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN), conf.get(EVENT_LOG_COMPACTION_SCORE_THRESHOLD))

Expand Down Expand Up @@ -266,7 +269,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

private def startPolling(): Unit = {
diskManager.foreach(_.initialize())
memoryManager.initialize()
if (memoryManager != null) {
memoryManager.initialize()
}

// Validate the log directory.
val path = new Path(logDir)
Expand Down Expand Up @@ -1172,7 +1177,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// At this point the disk data either does not exist or was deleted because it failed to
// load, so the event log needs to be replayed.

// If hybrid store is enabled, try it first.
// If the hybrid store is enabled, try it first and fail back to leveldb store.
if (hybridStoreEnabled) {
try {
return createHybridStore(dm, appId, attempt, metadata)
Expand All @@ -1183,34 +1188,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

var retried = false
var newStorePath: File = null
while (newStorePath == null) {
val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
attempt.lastIndex)
val isCompressed = reader.compressionCodec.isDefined
logInfo(s"Leasing disk manager space for app $appId / ${attempt.info.attemptId}...")
val lease = dm.lease(reader.totalSize, isCompressed)
try {
Utils.tryWithResource(KVUtils.open(lease.tmpPath, metadata)) { store =>
rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime())
}
newStorePath = lease.commit(appId, attempt.info.attemptId)
} catch {
case _: IOException if !retried =>
// compaction may touch the file(s) which app rebuild wants to read
// compaction wouldn't run in short interval, so try again...
logWarning(s"Exception occurred while rebuilding app $appId - trying again...")
lease.rollback()
retried = true

case e: Exception =>
lease.rollback()
throw e
}
}

KVUtils.open(newStorePath, metadata)
createLevelDBStore(dm, appId, attempt, metadata)
}

private def createHybridStore(
Expand Down Expand Up @@ -1283,6 +1261,41 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
hybridStore
}

private def createLevelDBStore(
dm: HistoryServerDiskManager,
appId: String,
attempt: AttemptInfoWrapper,
metadata: AppStatusStoreMetadata): KVStore = {
var retried = false
var newStorePath: File = null
while (newStorePath == null) {
val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
attempt.lastIndex)
val isCompressed = reader.compressionCodec.isDefined
logInfo(s"Leasing disk manager space for app $appId / ${attempt.info.attemptId}...")
val lease = dm.lease(reader.totalSize, isCompressed)
try {
Utils.tryWithResource(KVUtils.open(lease.tmpPath, metadata)) { store =>
rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime())
}
newStorePath = lease.commit(appId, attempt.info.attemptId)
} catch {
case _: IOException if !retried =>
// compaction may touch the file(s) which app rebuild wants to read
// compaction wouldn't run in short interval, so try again...
logWarning(s"Exception occurred while rebuilding app $appId - trying again...")
lease.rollback()
retried = true

case e: Exception =>
lease.rollback()
throw e
}
}

KVUtils.open(newStorePath, metadata)
}

private def createInMemoryStore(attempt: AttemptInfoWrapper): KVStore = {
var retried = false
var store: KVStore = null
Expand Down
19 changes: 19 additions & 0 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,25 @@ Security options for the Spark History Server are covered more detail in the
</td>
<td>3.0.0</td>
</tr>
<tr>
<td>spark.history.store.hybridStore.enabled</td>
<td>false</td>
<td>
Whether to use HybridStore as the store when parsing event logs. HybridStore will first write data
to an in-memory store and having a background thread that dumps data to a disk store after the writing
to in-memory store is completed.
</td>
<td>3.1.0</td>
</tr>
<tr>
<td>spark.history.store.hybridStore.maxMemoryUsage</td>
<td>2g</td>
<td>
Maximum memory space that can be used to create HybridStore. The HybridStore co-uses the heap memory,
so the heap memory should be increased through the memory option for SHS if the HybridStore is enabled.
</td>
<td>3.1.0</td>
</tr>
</table>

Note that in all of these UIs, the tables are sortable by clicking their headers,
Expand Down

0 comments on commit 025b13c

Please sign in to comment.