Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47036][SS][3.5] Cleanup RocksDB file tracking for previously uploaded files if files were deleted from local directory #45206

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -496,16 +496,12 @@ class RocksDBFileManager(
s" DFS for version $version. $filesReused files reused without copying.")
versionToRocksDBFiles.put(version, immutableFiles)

// clean up deleted SST files from the localFilesToDfsFiles Map
val currentLocalFiles = localFiles.map(_.getName).toSet
val mappingsToClean = localFilesToDfsFiles.asScala
.keys
.filterNot(currentLocalFiles.contains)

mappingsToClean.foreach { f =>
logInfo(s"cleaning $f from the localFilesToDfsFiles map")
localFilesToDfsFiles.remove(f)
}
// Cleanup locally deleted files from the localFilesToDfsFiles map
// Locally, SST Files can be deleted due to RocksDB compaction. These files need
// to be removed rom the localFilesToDfsFiles map to ensure that if a older version
// regenerates them and overwrites the version.zip, SST files from the conflicting
// version (previously committed) are not reused.
removeLocallyDeletedSSTFilesFromDfsMapping(localFiles)

saveCheckpointMetrics = RocksDBFileManagerMetrics(
bytesCopied = bytesCopied,
Expand All @@ -523,8 +519,18 @@ class RocksDBFileManager(
private def loadImmutableFilesFromDfs(
immutableFiles: Seq[RocksDBImmutableFile], localDir: File): Unit = {
val requiredFileNameToFileDetails = immutableFiles.map(f => f.localFileName -> f).toMap

val localImmutableFiles = listRocksDBFiles(localDir)._1

// Cleanup locally deleted files from the localFilesToDfsFiles map
// Locally, SST Files can be deleted due to RocksDB compaction. These files need
// to be removed rom the localFilesToDfsFiles map to ensure that if a older version
// regenerates them and overwrites the version.zip, SST files from the conflicting
// version (previously committed) are not reused.
removeLocallyDeletedSSTFilesFromDfsMapping(localImmutableFiles)

// Delete unnecessary local immutable files
listRocksDBFiles(localDir)._1
localImmutableFiles
.foreach { existingFile =>
val requiredFile = requiredFileNameToFileDetails.get(existingFile.getName)
val prevDfsFile = localFilesToDfsFiles.asScala.get(existingFile.getName)
Expand Down Expand Up @@ -582,6 +588,19 @@ class RocksDBFileManager(
filesReused = filesReused)
}

private def removeLocallyDeletedSSTFilesFromDfsMapping(localFiles: Seq[File]): Unit = {
// clean up deleted SST files from the localFilesToDfsFiles Map
val currentLocalFiles = localFiles.map(_.getName).toSet
val mappingsToClean = localFilesToDfsFiles.asScala
.keys
.filterNot(currentLocalFiles.contains)

mappingsToClean.foreach { f =>
logInfo(s"cleaning $f from the localFilesToDfsFiles map")
localFilesToDfsFiles.remove(f)
}
}

/** Get the SST files required for a version from the version zip file in DFS */
private def getImmutableFilesFromVersionZip(version: Long): Seq[RocksDBImmutableFile] = {
Utils.deleteRecursively(localTempDir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming.state
import java.io._
import java.nio.charset.Charset

import scala.collection.mutable
import scala.language.implicitConversions

import org.apache.commons.io.FileUtils
Expand Down Expand Up @@ -1452,6 +1453,88 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
}
}

test("ensure local files deleted on filesystem" +
" are cleaned from dfs file mapping") {
def getSSTFiles(dir: File): Set[File] = {
val sstFiles = new mutable.HashSet[File]()
dir.listFiles().foreach { f =>
if (f.isDirectory) {
sstFiles ++= getSSTFiles(f)
} else {
if (f.getName.endsWith(".sst")) {
sstFiles.add(f)
}
}
}
sstFiles.toSet
}

def filterAndDeleteSSTFiles(dir: File, filesToKeep: Set[File]): Unit = {
dir.listFiles().foreach { f =>
if (f.isDirectory) {
filterAndDeleteSSTFiles(f, filesToKeep)
} else {
if (!filesToKeep.contains(f) && f.getName.endsWith(".sst")) {
logInfo(s"deleting ${f.getAbsolutePath} from local directory")
f.delete()
}
}
}
}

withTempDir { dir =>
withTempDir { localDir =>
val sqlConf = new SQLConf()
val dbConf = RocksDBConf(StateStoreConf(sqlConf))
logInfo(s"config set to ${dbConf.compactOnCommit}")
val hadoopConf = new Configuration()
val remoteDir = dir.getCanonicalPath
withDB(remoteDir = remoteDir,
conf = dbConf,
hadoopConf = hadoopConf,
localDir = localDir) { db =>
db.load(0)
db.put("a", "1")
db.put("b", "1")
db.commit()
db.doMaintenance()

// find all SST files written in version 1
val sstFiles = getSSTFiles(localDir)

// make more commits, this would generate more SST files and write
// them to remoteDir
for (version <- 1 to 10) {
db.load(version)
db.put("c", "1")
db.put("d", "1")
db.commit()
db.doMaintenance()
}

// clean the SST files committed after version 1 from local
// filesystem. This is similar to what a process like compaction
// where multiple L0 SST files can be merged into a single L1 file
filterAndDeleteSSTFiles(localDir, sstFiles)

// reload 2, and overwrite commit for version 3, this should not
// reuse any locally deleted files as they should be removed from the mapping
db.load(2)
db.put("e", "1")
db.put("f", "1")
db.commit()
db.doMaintenance()

// clean local state
db.load(0)

// reload version 3, should be successful
db.load(3)
}
}
}
}

private def sqlConf = SQLConf.get.clone()

private def dbConf = RocksDBConf(StateStoreConf(sqlConf))
Expand All @@ -1460,12 +1543,16 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
remoteDir: String,
version: Int = 0,
conf: RocksDBConf = dbConf,
hadoopConf: Configuration = new Configuration())(
hadoopConf: Configuration = new Configuration(),
localDir: File = Utils.createTempDir())(
func: RocksDB => T): T = {
var db: RocksDB = null
try {
db = new RocksDB(
remoteDir, conf = conf, hadoopConf = hadoopConf,
remoteDir,
conf = conf,
localRootDir = localDir,
hadoopConf = hadoopConf,
loggingId = s"[Thread-${Thread.currentThread.getId}]")
db.load(version)
func(db)
Expand Down