Skip to content

Commit

Permalink
[SPARK-43421][SS] Implement Changelog based Checkpointing for RocksDB…
Browse files Browse the repository at this point in the history
… State Store Provider

### What changes were proposed in this pull request?
In order to reduce the checkpoint duration and end to end latency, we propose Changelog Based Checkpointing for RocksDB State Store Provider. Below is the mechanism.
1. Changelog checkpoint: Upon each put() delete() call to local rocksdb instance, log the operation to a changelog file. During the state change commit,  sync the compressed change log of the current batch to DFS as checkpointDir/{version}.delta.
2. Version reconstruction: For version j, find latest snapshot i.zip such that i <= j, load snapshot i, and replay i+1.delta ~ j.delta. This is used in loading the initial state as well as creating the latest version snapshot. Note: If a query is shutdown without exception, there won’t be changelog replay during query restart because a maintenance task is executed before the state store instance is unloaded.
3. Background snapshot: A maintenance thread in executors will launch maintenance tasks periodically. Inside the maintenance task, sync the latest RocksDB local snapshot to DFS as checkpointDir/{version}.zip. Snapshot enables faster failure recovery and allows old versions to be purged.
4. Garbage collection: Inside the maintenance task, delete snapshot and delta files from DFS for versions that is out of retention range(default retained version number is 100)

### Why are the changes needed?
We have identified state checkpointing latency as one of the major performance bottlenecks for stateful streaming queries. Currently, RocksDB state store pauses the RocksDB instances to upload a snapshot to the cloud when committing a batch, which is heavy weight and has unpredictable performance.
With changelog based checkpointing, we allow the RocksDB instance to run uninterruptibly, which improves RocksDB operation performance. This also dramatically reduces the commit time and batch duration because we are uploading a smaller amount of data during state commit. With this change, stateful query with RocksDB state store will have lower and more predictable latency.

### How was this patch tested?
Add unit test for changelog checkpointing utility.
Add unit test and integration test that check backward compatibility with existing checkpoint.
Enable RocksDB state store unit test and stateful streaming query integration test to run with changelog checkpointing enabled.

Closes #41099 from chaoqin-li1123/changelog.

Authored-by: Chaoqin Li <chaoqin.li@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
  • Loading branch information
chaoqin-li1123 authored and HeartSaVioR committed Jun 1, 2023
1 parent 10ee643 commit c02c8be
Show file tree
Hide file tree
Showing 14 changed files with 1,012 additions and 201 deletions.
18 changes: 18 additions & 0 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -2320,6 +2320,11 @@ Here are the configs regarding to RocksDB instance of the state store provider:
<td>Whether we perform a range compaction of RocksDB instance for commit operation</td>
<td>False</td>
</tr>
<tr>
<td>spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled</td>
<td>Whether to upload changelog instead of snapshot during RocksDB StateStore commit</td>
<td>False</td>
</tr>
<tr>
<td>spark.sql.streaming.stateStore.rocksdb.blockSizeKB</td>
<td>Approximate size in KB of user data packed per block for a RocksDB BlockBasedTable, which is a RocksDB's default SST file format.</td>
Expand Down Expand Up @@ -2389,6 +2394,19 @@ If you want to cap RocksDB memory usage in your Spark Structured Streaming deplo
You can also determine the max allowed memory for RocksDB instances by setting the `spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB` value to a static number or as a fraction of the physical memory available on the node.
Limits for individual RocksDB instances can also be configured by setting `spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB` and `spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber` to the required values. By default, RocksDB internal defaults are used for these settings.

##### RocksDB State Store Changelog Checkpointing
In newer version of Spark, changelog checkpointing is introduced for RocksDB state store. The traditional checkpointing mechanism for RocksDB State Store is incremental snapshot checkpointing, where the manifest files and newly generated RocksDB SST files of RocksDB instances are uploaded to a durable storage.
Instead of uploading data files of RocksDB instances, changelog checkpointing uploads changes made to the state since the last checkpoint for durability.
Snapshots are persisted periodically in the background for predictable failure recovery and changelog trimming.
Changelog checkpointing avoids cost of capturing and uploading snapshots of RocksDB instances and significantly reduce streaming query latency.

Changelog checkpointing is disabled by default. You can enable RocksDB State Store changelog checkpointing by setting `spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled` config to `true`.
Changelog checkpointing is designed to be backward compatible with traditional checkpointing mechanism.
RocksDB state store provider offers seamless support for transitioning between two checkpointing mechanisms in both directions. This allows you to leverage the performance benefits of changelog checkpointing without discarding the old state checkpoint.
In a version of spark that supports changelog checkpointing, you can migrate streaming queries from older versions of Spark to changelog checkpointing by enabling changelog checkpointing in the spark session.
Vice versa, you can disable changelog checkpointing safely in newer version of Spark, then any query that already run with changelog checkpointing will switch back to traditional checkpointing.
You would need to restart you streaming queries for change in checkpointing mechanism to be applied, but you won't observe any performance degrade in the process.

##### Performance-aspect considerations

1. You may want to disable the track of total number of rows to aim the better performance on RocksDB state store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ class RocksDB(
hadoopConf: Configuration = new Configuration,
loggingId: String = "") extends Logging {

case class RocksDBSnapshot(checkpointDir: File, version: Long, numKeys: Long) {
def close(): Unit = {
silentDeleteRecursively(checkpointDir, s"Free up local checkpoint of snapshot $version")
}
}

@volatile private var latestSnapshot: Option[RocksDBSnapshot] = None
@volatile private var lastSnapshotVersion = 0L

RocksDBLoader.loadLibrary()

// Java wrapper objects linking to native RocksDB objects
Expand Down Expand Up @@ -109,13 +118,15 @@ class RocksDB(
private val nativeStats = dbOptions.statistics()

private val workingDir = createTempDir("workingDir")
private val fileManager = new RocksDBFileManager(
dfsRootDir, createTempDir("fileManager"), hadoopConf, loggingId = loggingId)
private val fileManager = new RocksDBFileManager(dfsRootDir, createTempDir("fileManager"),
hadoopConf, conf.compressionCodec, loggingId = loggingId)
private val byteArrayPair = new ByteArrayPair()
private val commitLatencyMs = new mutable.HashMap[String, Long]()
private val acquireLock = new Object

@volatile private var db: NativeRocksDB = _
@volatile private var changelogWriter: Option[StateStoreChangelogWriter] = None
private val enableChangelogCheckpointing: Boolean = conf.enableChangelogCheckpointing
@volatile private var loadedVersion = -1L // -1 = nothing valid is loaded
@volatile private var numKeysOnLoadedVersion = 0L
@volatile private var numKeysOnWritingVersion = 0L
Expand All @@ -129,17 +140,20 @@ class RocksDB(
* Note that this will copy all the necessary file from DFS to local disk as needed,
* and possibly restart the native RocksDB instance.
*/
def load(version: Long): RocksDB = {
def load(version: Long, readOnly: Boolean = false): RocksDB = {
assert(version >= 0)
acquire()
logInfo(s"Loading $version")
try {
if (loadedVersion != version) {
closeDB()
val metadata = fileManager.loadCheckpointFromDfs(version, workingDir)
val latestSnapshotVersion = fileManager.getLatestSnapshotVersion(version)
val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir)
loadedVersion = latestSnapshotVersion

openDB()

val numKeys = if (!conf.trackTotalNumberOfRows) {
numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) {
// we don't track the total number of rows - discard the number being track
-1L
} else if (metadata.numKeys < 0) {
Expand All @@ -149,10 +163,10 @@ class RocksDB(
} else {
metadata.numKeys
}
numKeysOnWritingVersion = numKeys
numKeysOnLoadedVersion = numKeys

loadedVersion = version
if (loadedVersion != version) replayChangelog(version)
// After changelog replay the numKeysOnWritingVersion will be updated to
// the correct number of keys in the loaded version.
numKeysOnLoadedVersion = numKeysOnWritingVersion
fileManagerMetrics = fileManager.latestLoadCheckpointMetrics
}
if (conf.resetStatsOnLoad) {
Expand All @@ -164,9 +178,36 @@ class RocksDB(
loadedVersion = -1 // invalidate loaded data
throw t
}
if (enableChangelogCheckpointing && !readOnly) {
// Make sure we don't leak resource.
changelogWriter.foreach(_.abort())
changelogWriter = Some(fileManager.getChangeLogWriter(version + 1))
}
this
}

/**
* Replay change log from the loaded version to the target version.
*/
private def replayChangelog(endVersion: Long): Unit = {
for (v <- loadedVersion + 1 to endVersion) {
var changelogReader: StateStoreChangelogReader = null
try {
changelogReader = fileManager.getChangelogReader(v)
changelogReader.foreach { case (key, value) =>
if (value != null) {
put(key, value)
} else {
remove(key)
}
}
} finally {
if (changelogReader != null) changelogReader.close()
}
}
loadedVersion = endVersion
}

/**
* Get the value for the given key if present, or null.
* @note This will return the last written value even if it was uncommitted.
Expand All @@ -187,6 +228,7 @@ class RocksDB(
}
}
db.put(writeOptions, key, value)
changelogWriter.foreach(_.put(key, value))
}

/**
Expand All @@ -201,6 +243,7 @@ class RocksDB(
}
}
db.delete(writeOptions, key)
changelogWriter.foreach(_.delete(key))
}

/**
Expand Down Expand Up @@ -286,44 +329,66 @@ class RocksDB(
*/
def commit(): Long = {
val newVersion = loadedVersion + 1
val checkpointDir = createTempDir("checkpoint")
var rocksDBBackgroundThreadPaused = false
try {
// Make sure the directory does not exist. Native RocksDB fails if the directory to
// checkpoint exists.
Utils.deleteRecursively(checkpointDir)

logInfo(s"Flushing updates for $newVersion")
val flushTimeMs = timeTakenMs { db.flush(flushOptions) }

val compactTimeMs = if (conf.compactOnCommit) {
logInfo("Compacting")
timeTakenMs { db.compactRange() }
} else 0

logInfo("Pausing background work")
val pauseTimeMs = timeTakenMs {
db.pauseBackgroundWork() // To avoid files being changed while committing
rocksDBBackgroundThreadPaused = true
}

logInfo(s"Creating checkpoint for $newVersion in $checkpointDir")
val checkpointTimeMs = timeTakenMs {
val cp = Checkpoint.create(db)
cp.createCheckpoint(checkpointDir.toString)
var compactTimeMs = 0L
var flushTimeMs = 0L
var checkpointTimeMs = 0L
if (shouldCreateSnapshot()) {
// Need to flush the change to disk before creating a checkpoint
// because rocksdb wal is disabled.
logInfo(s"Flushing updates for $newVersion")
flushTimeMs = timeTakenMs { db.flush(flushOptions) }
if (conf.compactOnCommit) {
logInfo("Compacting")
compactTimeMs = timeTakenMs { db.compactRange() }
}
checkpointTimeMs = timeTakenMs {
val checkpointDir = createTempDir("checkpoint")
logInfo(s"Creating checkpoint for $newVersion in $checkpointDir")
// Make sure the directory does not exist. Native RocksDB fails if the directory to
// checkpoint exists.
Utils.deleteRecursively(checkpointDir)
// We no longer pause background operation before creating a RocksDB checkpoint because
// it is unnecessary. The captured snapshot will still be consistent with ongoing
// background operations.
val cp = Checkpoint.create(db)
cp.createCheckpoint(checkpointDir.toString)
synchronized {
// if changelog checkpointing is disabled, the snapshot is uploaded synchronously
// inside the uploadSnapshot() called below.
// If changelog checkpointing is enabled, snapshot will be uploaded asynchronously
// during state store maintenance.
latestSnapshot.foreach(_.close())
latestSnapshot = Some(
RocksDBSnapshot(checkpointDir, newVersion, numKeysOnWritingVersion))
lastSnapshotVersion = newVersion
}
}
}

logInfo(s"Syncing checkpoint for $newVersion to DFS")
val fileSyncTimeMs = timeTakenMs {
fileManager.saveCheckpointToDfs(checkpointDir, newVersion, numKeysOnWritingVersion)
if (enableChangelogCheckpointing) {
try {
assert(changelogWriter.isDefined)
changelogWriter.foreach(_.commit())
} finally {
changelogWriter = None
}
} else {
assert(changelogWriter.isEmpty)
uploadSnapshot()
}
}

numKeysOnLoadedVersion = numKeysOnWritingVersion
loadedVersion = newVersion
fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
commitLatencyMs ++= Map(
"flush" -> flushTimeMs,
"compact" -> compactTimeMs,
"pause" -> pauseTimeMs,
"checkpoint" -> checkpointTimeMs,
"fileSync" -> fileSyncTimeMs
)
Expand All @@ -334,25 +399,60 @@ class RocksDB(
loadedVersion = -1 // invalidate loaded version
throw t
} finally {
if (rocksDBBackgroundThreadPaused) db.continueBackgroundWork()
silentDeleteRecursively(checkpointDir, s"committing $newVersion")
// reset resources as either 1) we already pushed the changes and it has been committed or
// 2) commit has failed and the current version is "invalidated".
release()
}
}

private def shouldCreateSnapshot(): Boolean = {
if (enableChangelogCheckpointing) {
assert(changelogWriter.isDefined)
val newVersion = loadedVersion + 1
newVersion - lastSnapshotVersion >= conf.minDeltasForSnapshot ||
changelogWriter.get.size > 10000
} else true
}

private def uploadSnapshot(): Unit = {
val localCheckpoint = synchronized {
val checkpoint = latestSnapshot
latestSnapshot = None
checkpoint
}
localCheckpoint match {
case Some(RocksDBSnapshot(localDir, version, numKeys)) =>
try {
val uploadTime = timeTakenMs {
fileManager.saveCheckpointToDfs(localDir, version, numKeys)
fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
}
logInfo(s"$loggingId: Upload snapshot of version $version," +
s" time taken: $uploadTime ms")
} finally {
localCheckpoint.foreach(_.close())
}
case _ =>
}
}

/**
* Drop uncommitted changes, and roll back to previous version.
*/
def rollback(): Unit = {
numKeysOnWritingVersion = numKeysOnLoadedVersion
loadedVersion = -1L
changelogWriter.foreach(_.abort())
// Make sure changelogWriter gets recreated next time.
changelogWriter = None
release()
logInfo(s"Rolled back to $loadedVersion")
}

def cleanup(): Unit = {
def doMaintenance(): Unit = {
if (enableChangelogCheckpointing) {
uploadSnapshot()
}
val cleanupTime = timeTakenMs {
fileManager.deleteOldVersions(conf.minVersionsToRetain)
}
Expand All @@ -369,6 +469,9 @@ class RocksDB(
flushOptions.close()
dbOptions.close()
dbLogger.close()
synchronized {
latestSnapshot.foreach(_.close())
}
silentDeleteRecursively(localRootDir, "closing RocksDB")
} catch {
case e: Exception =>
Expand Down Expand Up @@ -550,7 +653,9 @@ class ByteArrayPair(var key: Array[Byte] = null, var value: Array[Byte] = null)
*/
case class RocksDBConf(
minVersionsToRetain: Int,
minDeltasForSnapshot: Int,
compactOnCommit: Boolean,
enableChangelogCheckpointing: Boolean,
blockSizeKB: Long,
blockCacheSizeMB: Long,
lockAcquireTimeoutMs: Long,
Expand All @@ -563,7 +668,8 @@ case class RocksDBConf(
boundedMemoryUsage: Boolean,
totalMemoryUsageMB: Long,
writeBufferCacheRatio: Double,
highPriorityPoolRatio: Double)
highPriorityPoolRatio: Double,
compressionCodec: String)

object RocksDBConf {
/** Common prefix of all confs in SQLConf that affects RocksDB */
Expand All @@ -585,6 +691,8 @@ object RocksDBConf {

// Configuration that specifies whether to compact the RocksDB data every time data is committed
private val COMPACT_ON_COMMIT_CONF = SQLConfEntry("compactOnCommit", "false")
private val ENABLE_CHANGELOG_CHECKPOINTING_CONF = SQLConfEntry(
"changelogCheckpointing.enabled", "false")
private val BLOCK_SIZE_KB_CONF = SQLConfEntry("blockSizeKB", "4")
private val BLOCK_CACHE_SIZE_MB_CONF = SQLConfEntry("blockCacheSizeMB", "8")
// See SPARK-42794 for details.
Expand Down Expand Up @@ -705,7 +813,9 @@ object RocksDBConf {

RocksDBConf(
storeConf.minVersionsToRetain,
storeConf.minDeltasForSnapshot,
getBooleanConf(COMPACT_ON_COMMIT_CONF),
getBooleanConf(ENABLE_CHANGELOG_CHECKPOINTING_CONF),
getPositiveLongConf(BLOCK_SIZE_KB_CONF),
getPositiveLongConf(BLOCK_CACHE_SIZE_MB_CONF),
getPositiveLongConf(LOCK_ACQUIRE_TIMEOUT_MS_CONF),
Expand All @@ -718,7 +828,8 @@ object RocksDBConf {
getBooleanConf(BOUNDED_MEMORY_USAGE_CONF),
getLongConf(MAX_MEMORY_USAGE_MB_CONF),
getRatioConf(WRITE_BUFFER_CACHE_RATIO_CONF),
getRatioConf(HIGH_PRIORITY_POOL_RATIO_CONF))
getRatioConf(HIGH_PRIORITY_POOL_RATIO_CONF),
storeConf.compressionCodec)
}

def apply(): RocksDBConf = apply(new StateStoreConf())
Expand Down

0 comments on commit c02c8be

Please sign in to comment.