diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index eea92dffb0488..c17e6910a561a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2664,6 +2664,16 @@ object SQLConf { .doubleConf .createWithDefault(0.3) + val MAX_VERSIONS_TO_DELETE_PER_MAINTENANCE = + buildConf("spark.sql.streaming.stateStore.maxVersionsToDeletePerMaintenance") + .internal() + .doc("The maximum number of versions to delete per maintenance operation. By default, " + + "this value is set to -1, which means no limit. Note that, currently this is only " + + "supported for the RocksDB state store provider.") + .version("4.1.0") + .intConf + .createWithDefault(-1) + val MAX_BATCHES_TO_RETAIN_IN_MEMORY = buildConf("spark.sql.streaming.maxBatchesToRetainInMemory") .internal() .doc("The maximum number of batches which will be retained in memory to avoid " + @@ -6693,6 +6703,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN) + def maxVersionsToDeletePerMaintenance: Int = getConf(MAX_VERSIONS_TO_DELETE_PER_MAINTENANCE) + def ratioExtraSpaceAllowedInCheckpoint: Double = getConf(RATIO_EXTRA_SPACE_ALLOWED_IN_CHECKPOINT) def maxBatchesToRetainInMemory: Int = getConf(MAX_BATCHES_TO_RETAIN_IN_MEMORY) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 651617119f45a..774ed23ed55be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -1396,6 +1396,7 @@ class RocksDB( val cleanupTime = timeTakenMs { fileManager.deleteOldVersions( numVersionsToRetain = conf.minVersionsToRetain, + maxVersionsToDeletePerMaintenance = conf.maxVersionsToDeletePerMaintenance, minVersionsToDelete = conf.minVersionsToDelete) } logInfo(log"Cleaned old data, time taken: ${MDC(LogKeys.TIME_UNITS, cleanupTime)} ms") @@ -1953,7 +1954,8 @@ case class RocksDBConf( compressionCodec: String, allowFAllocate: Boolean, compression: String, - reportSnapshotUploadLag: Boolean) + reportSnapshotUploadLag: Boolean, + maxVersionsToDeletePerMaintenance: Int) object RocksDBConf { /** Common prefix of all confs in SQLConf that affects RocksDB */ @@ -2144,7 +2146,8 @@ object RocksDBConf { storeConf.compressionCodec, getBooleanConf(ALLOW_FALLOCATE_CONF), getStringConf(COMPRESSION_CONF), - storeConf.reportSnapshotUploadLag) + storeConf.reportSnapshotUploadLag, + storeConf.maxVersionsToDeletePerMaintenance) } def apply(): RocksDBConf = apply(new StateStoreConf()) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 2015a5e27a2c0..b99b128791357 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -580,8 +580,22 @@ class RocksDBFileManager( * - Partially written SST files * - SST files that were used in a version, but that version got overwritten with a different * set of SST files. + * + * @param numVersionsToRetain the number of RocksDB versions to keep in object store after the + * deletion. Must be greater than 0, or -1 to retain all versions. + * @param maxVersionsToDeletePerMaintenance the max number of RocksDB versions + * to delete per maintenance operation. + * Must be greater than 0, or -1 to delete all stale versions. + * @param minVersionsToDelete the min number of stale versions required to trigger deletion. + * If its set to <= 0, then we will always perform list operations + * to determine deletion candidates. If set to a positive value, then + * we will skip deletion if the number of stale versions is less than + * this value. */ - def deleteOldVersions(numVersionsToRetain: Int, minVersionsToDelete: Long = 0): Unit = { + def deleteOldVersions( + numVersionsToRetain: Int, + maxVersionsToDeletePerMaintenance: Int = -1, + minVersionsToDelete: Long = 0): Unit = { // Check if enough stale version files present if (shouldSkipDeletion(numVersionsToRetain, minVersionsToDelete)) return @@ -603,14 +617,22 @@ class RocksDBFileManager( // Find the versions to delete val maxSnapshotVersionPresent = sortedSnapshotVersionsAndUniqueIds.last._1 + val minSnapshotVersionPresent = sortedSnapshotVersionsAndUniqueIds.head._1 // In order to reconstruct numVersionsToRetain version, retain the latest snapshot // that satisfies (version <= maxSnapshotVersionPresent - numVersionsToRetain + 1). + // Also require + // minVersionToRetain <= minSnapshotVersionPresent + maxVersionsToDeletePerMaintenance. // If none of the snapshots satisfy the condition, minVersionToRetain will be 0 and // no version gets deleted. val minVersionToRetain = sortedSnapshotVersionsAndUniqueIds .map(_._1) .filter(_ <= maxSnapshotVersionPresent - numVersionsToRetain + 1) + .filter( v => + if (maxVersionsToDeletePerMaintenance != -1) { + v <= minSnapshotVersionPresent + maxVersionsToDeletePerMaintenance + } else true + ) .foldLeft(0L)(math.max) // When snapshotVersionToDelete is non-empty, there are at least 2 snapshot versions. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala index 00bb7de46dc4d..3cf302cd2be89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala @@ -62,6 +62,9 @@ class StateStoreConf( /** Maximum count of versions a State Store implementation should retain in memory */ val maxVersionsToRetainInMemory: Int = sqlConf.maxBatchesToRetainInMemory + /** Maximum number of versions to delete per maintenance operation */ + val maxVersionsToDeletePerMaintenance: Int = sqlConf.maxVersionsToDeletePerMaintenance + /** * Optional fully qualified name of the subclass of [[StateStoreProvider]] * managing state data. That is, the implementation of the State Store to use. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 6f4125bb8b5c5..801e74d288b4a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -783,6 +783,126 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession } } + testWithStateStoreCheckpointIdsAndColumnFamilies( + "RocksDB: purge version files with minVersionsToDelete > 0 " + + "and maxVersionsToDeletePerMaintenance > 0", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { + case (enableStateStoreCheckpointIds, colFamiliesEnabled) => + val remoteDir = Utils.createTempDir().toString + new File(remoteDir).delete() // to make sure that the directory gets created + val conf = dbConf.copy( + minVersionsToRetain = 3, minDeltasForSnapshot = 1, minVersionsToDelete = 3, + maxVersionsToDeletePerMaintenance = 1) + withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled, + enableStateStoreCheckpointIds = enableStateStoreCheckpointIds) { db => + // Commit 5 versions + // stale versions: (1, 2) + // keep versions: (3, 4, 5) + for (version <- 0 to 4) { + // Should upload latest snapshot but not delete any files + // since number of stale versions < minVersionsToDelete + db.load(version) + db.commit() + db.doMaintenance() + } + + // Commit 1 more version + // stale versions: (1, 2, 3) + // keep versions: (4, 5, 6) + db.load(5) + db.commit() + + // Checkpoint directory before maintenance + if (isChangelogCheckpointingEnabled) { + assert(snapshotVersionsPresent(remoteDir) == (1 to 5)) + assert(changelogVersionsPresent(remoteDir) == (1 to 6)) + } else { + assert(snapshotVersionsPresent(remoteDir) == (1 to 6)) + } + + // Should delete stale versions for zip files and change log files + // since number of stale versions >= minVersionsToDelete + db.doMaintenance() + + // Checkpoint directory after maintenance + // Verify that only one version is deleted because maxVersionsToDeletePerMaintenance = 1 + assert(snapshotVersionsPresent(remoteDir) == Seq(2, 3, 4, 5, 6)) + if (isChangelogCheckpointingEnabled) { + assert(changelogVersionsPresent(remoteDir) == Seq(2, 3, 4, 5, 6)) + } + + // Commit 1 more version to ensure that minVersionsToDelete constraint is satisfied + db.load(6) + db.commit() + db.doMaintenance() + // Verify that only one version is deleted because maxVersionsToDeletePerMaintenance = 1 + assert(snapshotVersionsPresent(remoteDir) == Seq(3, 4, 5, 6, 7)) + if (isChangelogCheckpointingEnabled) { + assert(changelogVersionsPresent(remoteDir) == Seq(3, 4, 5, 6, 7)) + } + } + } + + testWithStateStoreCheckpointIdsAndColumnFamilies( + "RocksDB: purge version files with minVersionsToDelete < maxVersionsToDeletePerMaintenance", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { + case (enableStateStoreCheckpointIds, colFamiliesEnabled) => + val remoteDir = Utils.createTempDir().toString + new File(remoteDir).delete() // to make sure that the directory gets created + val conf = dbConf.copy( + minVersionsToRetain = 3, minDeltasForSnapshot = 1, minVersionsToDelete = 1, + maxVersionsToDeletePerMaintenance = 2) + withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled, + enableStateStoreCheckpointIds = enableStateStoreCheckpointIds) { db => + // Commit 5 versions + // stale versions: (1, 2) + // keep versions: (3, 4, 5) + for (version <- 0 to 4) { + // Should upload latest snapshot but not delete any files + // since number of stale versions < minVersionsToDelete + db.load(version) + db.commit() + db.doMaintenance() + } + + // Commit 1 more version + // stale versions: (1, 2, 3) + // keep versions: (4, 5, 6) + db.load(5) + db.commit() + + // Checkpoint directory before maintenance + // Verify that 2 oldest stale versions are deleted + if (isChangelogCheckpointingEnabled) { + assert(snapshotVersionsPresent(remoteDir) == Seq(3, 4, 5)) + assert(changelogVersionsPresent(remoteDir) == Seq(3, 4, 5, 6)) + } else { + assert(snapshotVersionsPresent(remoteDir) == Seq(3, 4, 5, 6)) + } + + // Should delete stale versions for zip files and change log files + // since number of stale versions >= minVersionsToDelete + db.doMaintenance() + + // Checkpoint directory after maintenance + // Verify that only one version is deleted since thats the only stale version left + assert(snapshotVersionsPresent(remoteDir) == Seq(4, 5, 6)) + if (isChangelogCheckpointingEnabled) { + assert(changelogVersionsPresent(remoteDir) == Seq(4, 5, 6)) + } + + // Commit 1 more version to ensure that minVersionsToDelete constraint is satisfied + db.load(6) + db.commit() + db.doMaintenance() + // Verify that only one version is deleted since thats the only stale version left + assert(snapshotVersionsPresent(remoteDir) == Seq(5, 6, 7)) + if (isChangelogCheckpointingEnabled) { + assert(changelogVersionsPresent(remoteDir) == Seq(5, 6, 7)) + } + } + } + testWithStateStoreCheckpointIdsAndColumnFamilies( "RocksDB: minDeltasForSnapshot", TestWithChangelogCheckpointingEnabled) {