Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2664,6 +2664,16 @@ object SQLConf {
.doubleConf
.createWithDefault(0.3)

val MAX_VERSIONS_TO_DELETE_PER_MAINTENANCE =
buildConf("spark.sql.streaming.stateStore.maxVersionsToDeletePerMaintenance")
.internal()
.doc("The maximum number of versions to delete per maintenance operation. By default, " +
"this value is set to -1, which means no limit. Note that, currently this is only " +
"supported for the RocksDB state store provider.")
.version("4.1.0")
.intConf
.createWithDefault(-1)

val MAX_BATCHES_TO_RETAIN_IN_MEMORY = buildConf("spark.sql.streaming.maxBatchesToRetainInMemory")
.internal()
.doc("The maximum number of batches which will be retained in memory to avoid " +
Expand Down Expand Up @@ -6693,6 +6703,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)

def maxVersionsToDeletePerMaintenance: Int = getConf(MAX_VERSIONS_TO_DELETE_PER_MAINTENANCE)

def ratioExtraSpaceAllowedInCheckpoint: Double = getConf(RATIO_EXTRA_SPACE_ALLOWED_IN_CHECKPOINT)

def maxBatchesToRetainInMemory: Int = getConf(MAX_BATCHES_TO_RETAIN_IN_MEMORY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1396,6 +1396,7 @@ class RocksDB(
val cleanupTime = timeTakenMs {
fileManager.deleteOldVersions(
numVersionsToRetain = conf.minVersionsToRetain,
maxVersionsToDeletePerMaintenance = conf.maxVersionsToDeletePerMaintenance,
minVersionsToDelete = conf.minVersionsToDelete)
}
logInfo(log"Cleaned old data, time taken: ${MDC(LogKeys.TIME_UNITS, cleanupTime)} ms")
Expand Down Expand Up @@ -1953,7 +1954,8 @@ case class RocksDBConf(
compressionCodec: String,
allowFAllocate: Boolean,
compression: String,
reportSnapshotUploadLag: Boolean)
reportSnapshotUploadLag: Boolean,
maxVersionsToDeletePerMaintenance: Int)

object RocksDBConf {
/** Common prefix of all confs in SQLConf that affects RocksDB */
Expand Down Expand Up @@ -2144,7 +2146,8 @@ object RocksDBConf {
storeConf.compressionCodec,
getBooleanConf(ALLOW_FALLOCATE_CONF),
getStringConf(COMPRESSION_CONF),
storeConf.reportSnapshotUploadLag)
storeConf.reportSnapshotUploadLag,
storeConf.maxVersionsToDeletePerMaintenance)
}

def apply(): RocksDBConf = apply(new StateStoreConf())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,8 +580,22 @@ class RocksDBFileManager(
* - Partially written SST files
* - SST files that were used in a version, but that version got overwritten with a different
* set of SST files.
*
* @param numVersionsToRetain the number of RocksDB versions to keep in object store after the
* deletion. Must be greater than 0, or -1 to retain all versions.
* @param maxVersionsToDeletePerMaintenance the max number of RocksDB versions
* to delete per maintenance operation.
* Must be greater than 0, or -1 to delete all stale versions.
* @param minVersionsToDelete the min number of stale versions required to trigger deletion.
* If its set to <= 0, then we will always perform list operations
* to determine deletion candidates. If set to a positive value, then
* we will skip deletion if the number of stale versions is less than
* this value.
*/
def deleteOldVersions(numVersionsToRetain: Int, minVersionsToDelete: Long = 0): Unit = {
def deleteOldVersions(
numVersionsToRetain: Int,
maxVersionsToDeletePerMaintenance: Int = -1,
minVersionsToDelete: Long = 0): Unit = {
// Check if enough stale version files present
if (shouldSkipDeletion(numVersionsToRetain, minVersionsToDelete)) return

Expand All @@ -603,14 +617,22 @@ class RocksDBFileManager(

// Find the versions to delete
val maxSnapshotVersionPresent = sortedSnapshotVersionsAndUniqueIds.last._1
val minSnapshotVersionPresent = sortedSnapshotVersionsAndUniqueIds.head._1

// In order to reconstruct numVersionsToRetain version, retain the latest snapshot
// that satisfies (version <= maxSnapshotVersionPresent - numVersionsToRetain + 1).
// Also require
// minVersionToRetain <= minSnapshotVersionPresent + maxVersionsToDeletePerMaintenance.
// If none of the snapshots satisfy the condition, minVersionToRetain will be 0 and
// no version gets deleted.
val minVersionToRetain = sortedSnapshotVersionsAndUniqueIds
.map(_._1)
.filter(_ <= maxSnapshotVersionPresent - numVersionsToRetain + 1)
.filter( v =>
if (maxVersionsToDeletePerMaintenance != -1) {
v <= minSnapshotVersionPresent + maxVersionsToDeletePerMaintenance
} else true
)
.foldLeft(0L)(math.max)

// When snapshotVersionToDelete is non-empty, there are at least 2 snapshot versions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ class StateStoreConf(
/** Maximum count of versions a State Store implementation should retain in memory */
val maxVersionsToRetainInMemory: Int = sqlConf.maxBatchesToRetainInMemory

/** Maximum number of versions to delete per maintenance operation */
val maxVersionsToDeletePerMaintenance: Int = sqlConf.maxVersionsToDeletePerMaintenance

/**
* Optional fully qualified name of the subclass of [[StateStoreProvider]]
* managing state data. That is, the implementation of the State Store to use.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,126 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession
}
}

testWithStateStoreCheckpointIdsAndColumnFamilies(
"RocksDB: purge version files with minVersionsToDelete > 0 " +
"and maxVersionsToDeletePerMaintenance > 0",
TestWithBothChangelogCheckpointingEnabledAndDisabled) {
case (enableStateStoreCheckpointIds, colFamiliesEnabled) =>
val remoteDir = Utils.createTempDir().toString
new File(remoteDir).delete() // to make sure that the directory gets created
val conf = dbConf.copy(
minVersionsToRetain = 3, minDeltasForSnapshot = 1, minVersionsToDelete = 3,
maxVersionsToDeletePerMaintenance = 1)
withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled,
enableStateStoreCheckpointIds = enableStateStoreCheckpointIds) { db =>
// Commit 5 versions
// stale versions: (1, 2)
// keep versions: (3, 4, 5)
for (version <- 0 to 4) {
// Should upload latest snapshot but not delete any files
// since number of stale versions < minVersionsToDelete
db.load(version)
db.commit()
db.doMaintenance()
}

// Commit 1 more version
// stale versions: (1, 2, 3)
// keep versions: (4, 5, 6)
db.load(5)
db.commit()

// Checkpoint directory before maintenance
if (isChangelogCheckpointingEnabled) {
assert(snapshotVersionsPresent(remoteDir) == (1 to 5))
assert(changelogVersionsPresent(remoteDir) == (1 to 6))
} else {
assert(snapshotVersionsPresent(remoteDir) == (1 to 6))
}

// Should delete stale versions for zip files and change log files
// since number of stale versions >= minVersionsToDelete
db.doMaintenance()

// Checkpoint directory after maintenance
// Verify that only one version is deleted because maxVersionsToDeletePerMaintenance = 1
assert(snapshotVersionsPresent(remoteDir) == Seq(2, 3, 4, 5, 6))
if (isChangelogCheckpointingEnabled) {
assert(changelogVersionsPresent(remoteDir) == Seq(2, 3, 4, 5, 6))
}

// Commit 1 more version to ensure that minVersionsToDelete constraint is satisfied
db.load(6)
db.commit()
db.doMaintenance()
// Verify that only one version is deleted because maxVersionsToDeletePerMaintenance = 1
assert(snapshotVersionsPresent(remoteDir) == Seq(3, 4, 5, 6, 7))
if (isChangelogCheckpointingEnabled) {
assert(changelogVersionsPresent(remoteDir) == Seq(3, 4, 5, 6, 7))
}
}
}

testWithStateStoreCheckpointIdsAndColumnFamilies(
"RocksDB: purge version files with minVersionsToDelete < maxVersionsToDeletePerMaintenance",
TestWithBothChangelogCheckpointingEnabledAndDisabled) {
case (enableStateStoreCheckpointIds, colFamiliesEnabled) =>
val remoteDir = Utils.createTempDir().toString
new File(remoteDir).delete() // to make sure that the directory gets created
val conf = dbConf.copy(
minVersionsToRetain = 3, minDeltasForSnapshot = 1, minVersionsToDelete = 1,
maxVersionsToDeletePerMaintenance = 2)
withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled,
enableStateStoreCheckpointIds = enableStateStoreCheckpointIds) { db =>
// Commit 5 versions
// stale versions: (1, 2)
// keep versions: (3, 4, 5)
for (version <- 0 to 4) {
// Should upload latest snapshot but not delete any files
// since number of stale versions < minVersionsToDelete
db.load(version)
db.commit()
db.doMaintenance()
}

// Commit 1 more version
// stale versions: (1, 2, 3)
// keep versions: (4, 5, 6)
db.load(5)
db.commit()

// Checkpoint directory before maintenance
// Verify that 2 oldest stale versions are deleted
if (isChangelogCheckpointingEnabled) {
assert(snapshotVersionsPresent(remoteDir) == Seq(3, 4, 5))
assert(changelogVersionsPresent(remoteDir) == Seq(3, 4, 5, 6))
} else {
assert(snapshotVersionsPresent(remoteDir) == Seq(3, 4, 5, 6))
}

// Should delete stale versions for zip files and change log files
// since number of stale versions >= minVersionsToDelete
db.doMaintenance()

// Checkpoint directory after maintenance
// Verify that only one version is deleted since thats the only stale version left
assert(snapshotVersionsPresent(remoteDir) == Seq(4, 5, 6))
if (isChangelogCheckpointingEnabled) {
assert(changelogVersionsPresent(remoteDir) == Seq(4, 5, 6))
}

// Commit 1 more version to ensure that minVersionsToDelete constraint is satisfied
db.load(6)
db.commit()
db.doMaintenance()
// Verify that only one version is deleted since thats the only stale version left
assert(snapshotVersionsPresent(remoteDir) == Seq(5, 6, 7))
if (isChangelogCheckpointingEnabled) {
assert(changelogVersionsPresent(remoteDir) == Seq(5, 6, 7))
}
}
}

testWithStateStoreCheckpointIdsAndColumnFamilies(
"RocksDB: minDeltasForSnapshot",
TestWithChangelogCheckpointingEnabled) {
Expand Down