Skip to content

Commit

Permalink
[SPARK-30462][SS] Streamline the logic on file stream source and sink…
Browse files Browse the repository at this point in the history
… metadata log to avoid memory issue

### What changes were proposed in this pull request?

In many operations on CompactibleFileStreamLog reads a metadata log file and materializes all entries into memory. As the nature of the compact operation, CompactibleFileStreamLog may have a huge compact log file with bunch of entries included, and for now they're just monotonically increasing, which means the amount of memory to materialize also grows incrementally. This leads pressure on GC.

This patch proposes to streamline the logic on file stream source and sink whenever possible to avoid memory issue. To make this possible we have to break the existing behavior of excluding entries - now the `compactLogs` method is called with all entries, which forces us to materialize all entries into memory. This is hopefully no effect on end users, because only file stream sink has a condition to exclude entries, and the condition has been never true. (DELETE_ACTION has been never set.)

Based on the observation, this patch also changes the existing UT a bit which simulates the situation where "A" file is added, and another batch marks the "A" file as deleted. This situation simply doesn't work with the change, but as I mentioned earlier it hasn't been used. (I'm not sure the UT is from the actual run. I guess not.)

### Why are the changes needed?

The memory issue (OOME) is reported by both JIRA issue and user mailing list.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

* Existing UTs
* Manual test done

The manual test leverages the simple apps which continuously writes the file stream sink metadata log.

HeartSaVioR/spark-delegation-token-experiment@bea7680

The test is configured to have a batch metadata log file at 1.9M (10,000 entries) whereas other Spark configuration is set to the default. (compact interval = 10) The app runs as driver, and the heap memory on driver is set to 3g.

> before the patch

<img width="1094" alt="Screen Shot 2020-06-23 at 3 37 44 PM" src="https://user-images.githubusercontent.com/1317309/85375841-d94f3480-b571-11ea-817b-c6b48b34888a.png">

It only ran for 40 mins, with the latest compact batch file size as 1.3G. The process struggled with GC, and after some struggling, it threw OOME.

> after the patch

<img width="1094" alt="Screen Shot 2020-06-23 at 3 53 29 PM" src="https://user-images.githubusercontent.com/1317309/85375901-eff58b80-b571-11ea-837e-30d107f677f9.png">

It sustained 2 hours run (manually stopped as it's expected to run more), with the latest compact batch file size as 2.2G. The actual memory usage didn't even go up to 1.2G, and be cleaned up soon without outstanding GC activity.

Closes #28904 from HeartSaVioR/SPARK-30462.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
HeartSaVioR authored and dongjoon-hyun committed Aug 20, 2020
1 parent f793977 commit e6795cd
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 87 deletions.
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.streaming

import java.io.{InputStream, IOException, OutputStream}
import java.io.{FileNotFoundException, InputStream, IOException, OutputStream}
import java.nio.charset.StandardCharsets.UTF_8

import scala.io.{Source => IOSource}
Expand All @@ -28,7 +28,7 @@ import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization

import org.apache.spark.sql.SparkSession
import org.apache.spark.util.{SizeEstimator, Utils}
import org.apache.spark.util.Utils

/**
* An abstract class for compactible metadata logs. It will write one log file for each batch.
Expand Down Expand Up @@ -107,9 +107,12 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
}

/**
* Filter out the obsolete logs.
* Determine whether the log should be retained or not.
*
* Default implementation retains all log entries. Implementations should override the method
* to change the behavior.
*/
def compactLogs(logs: Seq[T]): Seq[T]
def shouldRetain(log: T): Boolean = true

override def batchIdToPath(batchId: Long): Path = {
if (isCompactionBatch(batchId, compactInterval)) {
Expand All @@ -132,12 +135,18 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
}
}

private def serializeEntry(entry: T, out: OutputStream): Unit = {
out.write(Serialization.write(entry).getBytes(UTF_8))
}

private def deserializeEntry(line: String): T = Serialization.read[T](line)

override def serialize(logData: Array[T], out: OutputStream): Unit = {
// called inside a try-finally where the underlying stream is closed in the caller
out.write(("v" + metadataLogVersion).getBytes(UTF_8))
logData.foreach { data =>
out.write('\n')
out.write(Serialization.write(data).getBytes(UTF_8))
serializeEntry(data, out)
}
}

Expand All @@ -147,7 +156,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
throw new IllegalStateException("Incomplete log file")
}
validateVersion(lines.next(), metadataLogVersion)
lines.map(Serialization.read[T]).toArray
lines.map(deserializeEntry).toArray
}

override def add(batchId: Long, logs: Array[T]): Boolean = {
Expand All @@ -173,37 +182,64 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
override def purge(thresholdBatchId: Long): Unit = throw new UnsupportedOperationException(
s"Cannot purge as it might break internal state.")

/**
* Apply function on all entries in the specific batch. The method will throw
* FileNotFoundException if the metadata log file doesn't exist.
*
* NOTE: This doesn't fail early on corruption. The caller should handle the exception
* properly and make sure the logic is not affected by failing in the middle.
*/
def foreachInBatch(batchId: Long)(fn: T => Unit): Unit = applyFnInBatch(batchId)(_.foreach(fn))

/**
* Apply filter on all entries in the specific batch.
*/
def filterInBatch(batchId: Long)(predicate: T => Boolean): Option[Array[T]] = {
try {
Some(applyFnInBatch(batchId)(_.filter(predicate).toArray))
} catch {
case _: FileNotFoundException => None
}
}

private def applyFnInBatch[RET](batchId: Long)(fn: Iterator[T] => RET): RET = {
applyFnToBatchByStream(batchId) { input =>
val lines = IOSource.fromInputStream(input, UTF_8.name()).getLines()
if (!lines.hasNext) {
throw new IllegalStateException("Incomplete log file")
}
validateVersion(lines.next(), metadataLogVersion)
fn(lines.map(deserializeEntry))
}
}

/**
* Compacts all logs before `batchId` plus the provided `logs`, and writes them into the
* corresponding `batchId` file. It will delete expired files as well if enabled.
*/
private def compact(batchId: Long, logs: Array[T]): Boolean = {
val (allLogs, loadElapsedMs) = Utils.timeTakenMs {
val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval)
validBatches.flatMap { id =>
super.get(id).getOrElse {
throw new IllegalStateException(
s"${batchIdToPath(id)} doesn't exist when compacting batch $batchId " +
s"(compactInterval: $compactInterval)")
}
} ++ logs
def writeEntry(entry: T, output: OutputStream): Unit = {
if (shouldRetain(entry)) {
output.write('\n')
serializeEntry(entry, output)
}
}
val compactedLogs = compactLogs(allLogs)

// Return false as there is another writer.
val (writeSucceed, writeElapsedMs) = Utils.timeTakenMs {
super.add(batchId, compactedLogs.toArray)
val (writeSucceed, elapsedMs) = Utils.timeTakenMs {
addNewBatchByStream(batchId) { output =>
output.write(("v" + metadataLogVersion).getBytes(UTF_8))
val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval)
validBatches.foreach { id =>
foreachInBatch(id) { entry => writeEntry(entry, output) }
}
logs.foreach { entry => writeEntry(entry, output) }
}
}

val elapsedMs = loadElapsedMs + writeElapsedMs
if (elapsedMs >= COMPACT_LATENCY_WARN_THRESHOLD_MS) {
logWarning(s"Compacting took $elapsedMs ms (load: $loadElapsedMs ms," +
s" write: $writeElapsedMs ms) for compact batch $batchId")
logWarning(s"Loaded ${allLogs.size} entries (estimated ${SizeEstimator.estimate(allLogs)} " +
s"bytes in memory), and wrote ${compactedLogs.size} entries for compact batch $batchId")
logWarning(s"Compacting took $elapsedMs ms for compact batch $batchId")
} else {
logDebug(s"Compacting took $elapsedMs ms (load: $loadElapsedMs ms," +
s" write: $writeElapsedMs ms) for compact batch $batchId")
logDebug(s"Compacting took $elapsedMs ms for compact batch $batchId")
}

writeSucceed
Expand All @@ -222,21 +258,22 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
try {
val logs =
getAllValidBatches(latestId, compactInterval).flatMap { id =>
super.get(id).getOrElse {
filterInBatch(id)(shouldRetain).getOrElse {
throw new IllegalStateException(
s"${batchIdToPath(id)} doesn't exist " +
s"(latestId: $latestId, compactInterval: $compactInterval)")
}
}
return compactLogs(logs).toArray
return logs.toArray
} catch {
case e: IOException =>
// Another process using `CompactibleFileStreamLog` may delete the batch files when
// `StreamFileIndex` are reading. However, it only happens when a compaction is
// deleting old files. If so, let's try the next compaction batch and we should find it.
// Otherwise, this is a real IO issue and we should throw it.
latestId = nextCompactionBatchId(latestId, compactInterval)
super.get(latestId).getOrElse {
val expectedMinLatestId = nextCompactionBatchId(latestId, compactInterval)
latestId = super.getLatestBatchId().getOrElse(-1)
if (latestId < expectedMinLatestId) {
throw e
}
}
Expand Down
Expand Up @@ -96,19 +96,11 @@ class FileStreamSinkLog(
require(defaultCompactInterval > 0,
s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $defaultCompactInterval) " +
"to a positive value.")

override def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = {
val deletedFiles = logs.filter(_.action == FileStreamSinkLog.DELETE_ACTION).map(_.path).toSet
if (deletedFiles.isEmpty) {
logs
} else {
logs.filter(f => !deletedFiles.contains(f.path))
}
}
}

object FileStreamSinkLog {
val VERSION = 1
// TODO: SPARK-32648 This action hasn't been used from the introduction, better to remove this.
val DELETE_ACTION = "delete"
val ADD_ACTION = "add"
}
Expand Up @@ -61,10 +61,6 @@ class FileStreamSourceLog(
}
}

def compactLogs(logs: Seq[FileEntry]): Seq[FileEntry] = {
logs
}

override def add(batchId: Long, logs: Array[FileEntry]): Boolean = {
if (super.add(batchId, logs)) {
if (isCompactionBatch(batchId, compactInterval)) {
Expand All @@ -84,7 +80,7 @@ class FileStreamSourceLog(
if (isCompactionBatch(id, compactInterval) && fileEntryCache.containsKey(id)) {
(id, Some(fileEntryCache.get(id)))
} else {
val logs = super.get(id).map(_.filter(_.batchId == id))
val logs = filterInBatch(id)(_.batchId == id)
(id, logs)
}
}.partition(_._2.isDefined)
Expand Down
Expand Up @@ -115,42 +115,34 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
*/
override def add(batchId: Long, metadata: T): Boolean = {
require(metadata != null, "'null' metadata cannot written to a metadata log")
get(batchId).map(_ => false).getOrElse {
// Only write metadata when the batch has not yet been written
writeBatchToFile(metadata, batchIdToPath(batchId))
true
}
addNewBatchByStream(batchId) { output => serialize(metadata, output) }
}

/** Write a batch to a temp file then rename it to the batch file.
*
* There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a
* valid behavior, we still need to prevent it from destroying the files.
*/
private def writeBatchToFile(metadata: T, path: Path): Unit = {
val output = fileManager.createAtomic(path, overwriteIfPossible = false)
override def get(batchId: Long): Option[T] = {
try {
serialize(metadata, output)
output.close()
applyFnToBatchByStream(batchId) { input => Some(deserialize(input)) }
} catch {
case e: FileAlreadyExistsException =>
output.cancel()
// If next batch file already exists, then another concurrently running query has
// written it.
throw new ConcurrentModificationException(
s"Multiple streaming queries are concurrently using $path", e)
case e: Throwable =>
output.cancel()
throw e
case fne: FileNotFoundException =>
logDebug(fne.getMessage)
None
}
}

override def get(batchId: Long): Option[T] = {
/**
* Apply provided function to each entry in the specific batch metadata log.
*
* Unlike get which will materialize all entries into memory, this method streamlines the process
* via READ-AND-PROCESS. This helps to avoid the memory issue on huge metadata log file.
*
* NOTE: This no longer fails early on corruption. The caller should handle the exception
* properly and make sure the logic is not affected by failing in the middle.
*/
def applyFnToBatchByStream[RET](batchId: Long)(fn: InputStream => RET): RET = {
val batchMetadataFile = batchIdToPath(batchId)
if (fileManager.exists(batchMetadataFile)) {
val input = fileManager.open(batchMetadataFile)
try {
Some(deserialize(input))
fn(input)
} catch {
case ise: IllegalStateException =>
// re-throw the exception with the log file path added
Expand All @@ -160,8 +152,42 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
IOUtils.closeQuietly(input)
}
} else {
logDebug(s"Unable to find batch $batchMetadataFile")
None
throw new FileNotFoundException(s"Unable to find batch $batchMetadataFile")
}
}

/**
* Store the metadata for the specified batchId and return `true` if successful. This method
* fills the content of metadata via executing function. If the function throws an exception,
* writing will be automatically cancelled and this method will propagate the exception.
*
* If the batchId's metadata has already been stored, this method will return `false`.
*
* Writing the metadata is done by writing a batch to a temp file then rename it to the batch
* file.
*
* There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a
* valid behavior, we still need to prevent it from destroying the files.
*/
def addNewBatchByStream(batchId: Long)(fn: OutputStream => Unit): Boolean = {
get(batchId).map(_ => false).getOrElse {
// Only write metadata when the batch has not yet been written
val output = fileManager.createAtomic(batchIdToPath(batchId), overwriteIfPossible = false)
try {
fn(output)
output.close()
} catch {
case e: FileAlreadyExistsException =>
output.cancel()
// If next batch file already exists, then another concurrently running query has
// written it.
throw new ConcurrentModificationException(
s"Multiple streaming queries are concurrently using $path", e)
case e: Throwable =>
output.cancel()
throw e
}
true
}
}

Expand Down
@@ -1,3 +1,2 @@
v1
{"path":"/a/b/8","size":800,"isDir":false,"modificationTime":800,"blockReplication":1,"blockSize":100,"action":"add"}
{"path":"/a/b/0","size":100,"isDir":false,"modificationTime":100,"blockReplication":1,"blockSize":100,"action":"delete"}
Expand Up @@ -297,6 +297,4 @@ class FakeCompactibleFileStreamLog(
override protected def defaultCompactInterval: Int = _defaultCompactInterval

override protected val minBatchesToRetain: Int = _defaultMinBatchesToRetain

override def compactLogs(logs: Seq[String]): Seq[String] = logs
}
Expand Up @@ -37,19 +37,13 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
import CompactibleFileStreamLog._
import FileStreamSinkLog._

test("compactLogs") {
test("shouldRetain") {
withFileStreamSinkLog { sinkLog =>
val logs = Seq(
newFakeSinkFileStatus("/a/b/x", FileStreamSinkLog.ADD_ACTION),
newFakeSinkFileStatus("/a/b/y", FileStreamSinkLog.ADD_ACTION),
newFakeSinkFileStatus("/a/b/z", FileStreamSinkLog.ADD_ACTION))
assert(logs === sinkLog.compactLogs(logs))

val logs2 = Seq(
newFakeSinkFileStatus("/a/b/m", FileStreamSinkLog.ADD_ACTION),
newFakeSinkFileStatus("/a/b/n", FileStreamSinkLog.ADD_ACTION),
newFakeSinkFileStatus("/a/b/z", FileStreamSinkLog.DELETE_ACTION))
assert(logs.dropRight(1) ++ logs2.dropRight(1) === sinkLog.compactLogs(logs ++ logs2))
val log = newFakeSinkFileStatus("/a/b/x", FileStreamSinkLog.ADD_ACTION)
val log2 = newFakeSinkFileStatus("/a/b/z", FileStreamSinkLog.DELETE_ACTION)

assert(sinkLog.shouldRetain(log))
assert(sinkLog.shouldRetain(log2))
}
}

Expand Down Expand Up @@ -235,7 +229,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {

test("read Spark 2.1.0 log format") {
assert(readFromResource("file-sink-log-version-2.1.0") === Seq(
// SinkFileStatus("/a/b/0", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION), -> deleted
SinkFileStatus("/a/b/0", 1, false, 1, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/1", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/2", 200, false, 200, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/3", 300, false, 300, 1, 100, FileStreamSinkLog.ADD_ACTION),
Expand Down

0 comments on commit e6795cd

Please sign in to comment.