From ef33b9c50806475f287267c05278aeda3645abac Mon Sep 17 00:00:00 2001 From: Bhuwan Sahni Date: Wed, 24 Jan 2024 21:35:33 +0900 Subject: [PATCH] [SPARK-46796][SS] Ensure the correct remote files (mentioned in metadata.zip) are used on RocksDB version load This PR ensures that RocksDB loads do not run into SST file Version ID mismatch issue. RocksDB has added validation to ensure exact same SST file is used during database load from snapshot. Current streaming state suffers from certain edge cases where this condition is violated resulting in state load failure. The changes introduced are: 1. Ensure that the local SST file is exactly the same DFS file (as per mapping in metadata.zip). We keep track of the DFS file path for a local SST file, and re download the SST file in case DFS file has a different UUID in metadata zip. 2. Reset lastSnapshotVersion in RocksDB when Rocks DB is loaded. Changelog checkpoint relies on this version for future snapshots. Currently, if a older version is reloaded we were not uploading snapshots as lastSnapshotVersion was pointing to a higher snapshot of a cleanup database. We need to ensure that the correct SST files are used on executor during RocksDB load as per mapping in metadata.zip. With current implementation, its possible that the executor uses a SST file (with a different UUID) from a older version which is not the exact file mapped in the metadata.zip. This can cause version Id mismatch errors while loading RocksDB leading to streaming query failures. See https://issues.apache.org/jira/browse/SPARK-46796 for failure scenarios. No Added exhaustive unit testcases covering the scenarios. No Closes #44837 from sahnib/SPARK-46796. Authored-by: Bhuwan Sahni Signed-off-by: Jungtaek Lim (cherry picked from commit f25ebe52b9b84ece9b3c5ae30b83eaaef52ec55b) Signed-off-by: Jungtaek Lim --- .../execution/streaming/state/RocksDB.scala | 3 + .../streaming/state/RocksDBFileManager.scala | 92 +++-- .../streaming/state/RocksDBSuite.scala | 314 +++++++++++++++++- 3 files changed, 372 insertions(+), 37 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 2398b7780726a..0c9738a6b0817 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -151,6 +151,8 @@ class RocksDB( val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir) loadedVersion = latestSnapshotVersion + // reset last snapshot version + lastSnapshotVersion = 0L openDB() numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) { @@ -191,6 +193,7 @@ class RocksDB( */ private def replayChangelog(endVersion: Long): Unit = { for (v <- loadedVersion + 1 to endVersion) { + logInfo(s"replaying changelog from version $loadedVersion -> $endVersion") var changelogReader: StateStoreChangelogReader = null try { changelogReader = fileManager.getChangelogReader(v) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index faf9cd701aeca..300a3b8137b4c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -132,6 +132,15 @@ class RocksDBFileManager( import RocksDBImmutableFile._ private val versionToRocksDBFiles = new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]] + + + // used to keep a mapping of the exact Dfs file that was used to create a local SST file. + // The reason this is a separate map because versionToRocksDBFiles can contain multiple similar + // SST files to a particular local file (for example 1.sst can map to 1-UUID1.sst in v1 and + // 1-UUID2.sst in v2). We need to capture the exact file used to ensure Version ID compatibility + // across SST files and RocksDB manifest. + private[sql] val localFilesToDfsFiles = new ConcurrentHashMap[String, RocksDBImmutableFile] + private lazy val fm = CheckpointFileManager.create(new Path(dfsRootDir), hadoopConf) private val fs = new Path(dfsRootDir).getFileSystem(hadoopConf) private val onlyZipFiles = new PathFilter { @@ -213,6 +222,7 @@ class RocksDBFileManager( versionToRocksDBFiles.keySet().removeIf(_ >= version) val metadata = if (version == 0) { if (localDir.exists) Utils.deleteRecursively(localDir) + localFilesToDfsFiles.clear() localDir.mkdirs() RocksDBCheckpointMetadata(Seq.empty, 0) } else { @@ -449,44 +459,54 @@ class RocksDBFileManager( // Get the immutable files used in previous versions, as some of those uploaded files can be // reused for this version logInfo(s"Saving RocksDB files to DFS for $version") - val prevFilesToSizes = versionToRocksDBFiles.asScala.filterKeys(_ < version) - .values.flatten.map { f => - f.localFileName -> f - }.toMap var bytesCopied = 0L var filesCopied = 0L var filesReused = 0L val immutableFiles = localFiles.map { localFile => - prevFilesToSizes - .get(localFile.getName) - .filter(_.isSameFile(localFile)) - .map { reusable => - filesReused += 1 - reusable - }.getOrElse { - val localFileName = localFile.getName - val dfsFileName = newDFSFileName(localFileName) - val dfsFile = dfsFilePath(dfsFileName) - // Note: The implementation of copyFromLocalFile() closes the output stream when there is - // any exception while copying. So this may generate partial files on DFS. But that is - // okay because until the main [version].zip file is written, those partial files are - // not going to be used at all. Eventually these files should get cleared. - fs.copyFromLocalFile( - new Path(localFile.getAbsoluteFile.toURI), dfsFile) - val localFileSize = localFile.length() - logInfo(s"Copied $localFile to $dfsFile - $localFileSize bytes") - filesCopied += 1 - bytesCopied += localFileSize - - RocksDBImmutableFile(localFile.getName, dfsFileName, localFileSize) - } + val existingDfsFile = localFilesToDfsFiles.asScala.get(localFile.getName) + if (existingDfsFile.isDefined && existingDfsFile.get.sizeBytes == localFile.length()) { + val dfsFile = existingDfsFile.get + filesReused += 1 + logInfo(s"reusing file $dfsFile for $localFile") + RocksDBImmutableFile(localFile.getName, dfsFile.dfsFileName, dfsFile.sizeBytes) + } else { + val localFileName = localFile.getName + val dfsFileName = newDFSFileName(localFileName) + val dfsFile = dfsFilePath(dfsFileName) + // Note: The implementation of copyFromLocalFile() closes the output stream when there is + // any exception while copying. So this may generate partial files on DFS. But that is + // okay because until the main [version].zip file is written, those partial files are + // not going to be used at all. Eventually these files should get cleared. + fs.copyFromLocalFile( + new Path(localFile.getAbsoluteFile.toURI), dfsFile) + val localFileSize = localFile.length() + logInfo(s"Copied $localFile to $dfsFile - $localFileSize bytes") + filesCopied += 1 + bytesCopied += localFileSize + + val immutableDfsFile = RocksDBImmutableFile(localFile.getName, dfsFileName, localFileSize) + localFilesToDfsFiles.put(localFileName, immutableDfsFile) + + immutableDfsFile + } } logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from local to" + s" DFS for version $version. $filesReused files reused without copying.") versionToRocksDBFiles.put(version, immutableFiles) + // clean up deleted SST files from the localFilesToDfsFiles Map + val currentLocalFiles = localFiles.map(_.getName).toSet + val mappingsToClean = localFilesToDfsFiles.asScala + .keys + .filterNot(currentLocalFiles.contains) + + mappingsToClean.foreach { f => + logInfo(s"cleaning $f from the localFilesToDfsFiles map") + localFilesToDfsFiles.remove(f) + } + saveCheckpointMetrics = RocksDBFileManagerMetrics( bytesCopied = bytesCopied, filesCopied = filesCopied, @@ -506,11 +526,22 @@ class RocksDBFileManager( // Delete unnecessary local immutable files listRocksDBFiles(localDir)._1 .foreach { existingFile => - val isSameFile = - requiredFileNameToFileDetails.get(existingFile.getName).exists(_.isSameFile(existingFile)) + val requiredFile = requiredFileNameToFileDetails.get(existingFile.getName) + val prevDfsFile = localFilesToDfsFiles.asScala.get(existingFile.getName) + val isSameFile = if (requiredFile.isDefined && prevDfsFile.isDefined) { + requiredFile.get.dfsFileName == prevDfsFile.get.dfsFileName && + existingFile.length() == requiredFile.get.sizeBytes + } else { + false + } + if (!isSameFile) { existingFile.delete() - logInfo(s"Deleted local file $existingFile") + localFilesToDfsFiles.remove(existingFile.getName) + logInfo(s"Deleted local file $existingFile with size ${existingFile.length()} mapped" + + s" to previous dfsFile ${prevDfsFile.getOrElse("null")}") + } else { + logInfo(s"reusing $prevDfsFile present at $existingFile for $requiredFile") } } @@ -536,6 +567,7 @@ class RocksDBFileManager( } filesCopied += 1 bytesCopied += localFileSize + localFilesToDfsFiles.put(localFileName, file) logInfo(s"Copied $dfsFile to $localFile - $localFileSize bytes") } else { filesReused += 1 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 91dd858220717..04b11dfe43f0c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -24,16 +24,36 @@ import scala.language.implicitConversions import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.scalactic.source.Position import org.scalatest.Tag import org.apache.spark.sql.catalyst.util.quietly -import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager +import org.apache.spark.sql.execution.streaming.{CreateAtomicTestManager, FileSystemBasedCheckpointFileManager} +import org.apache.spark.sql.execution.streaming.CheckpointFileManager.{CancellableFSDataOutputStream, RenameBasedFSDataOutputStream} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} import org.apache.spark.tags.SlowSQLTest import org.apache.spark.util.{ThreadUtils, Utils} +class NoOverwriteFileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration) + extends FileSystemBasedCheckpointFileManager(path, hadoopConf) { + + override def createAtomic(path: Path, + overwriteIfPossible: Boolean): CancellableFSDataOutputStream = { + new RenameBasedFSDataOutputStream(this, path, overwriteIfPossible) + } + + override def renameTempFile(srcPath: Path, dstPath: Path, + overwriteIfPossible: Boolean): Unit = { + if (!fs.exists(dstPath)) { + // only write if a file does not exist at this location + super.renameTempFile(srcPath, dstPath, overwriteIfPossible) + } + } +} + trait RocksDBStateStoreChangelogCheckpointingTestUtil { val rocksdbChangelogCheckpointingConfKey: String = RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" @@ -666,19 +686,19 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared // Save SAME version again with different checkpoint files and load back again to verify // whether files were overwritten. val cpFiles1_ = Seq( - "sst-file1.sst" -> 10, // same SST file as before, but same version, so should get copied + "sst-file1.sst" -> 10, // same SST file as before, this should get reused "sst-file2.sst" -> 25, // new SST file with same name as before, but different length "sst-file3.sst" -> 30, // new SST file "other-file1" -> 100, // same non-SST file as before, should not get copied "other-file2" -> 210, // new non-SST file with same name as before, but different length "other-file3" -> 300, // new non-SST file - "archive/00001.log" -> 1000, // same log file as before and version, so should get copied + "archive/00001.log" -> 1000, // same log file as before, this should get reused "archive/00002.log" -> 2500, // new log file with same name as before, but different length "archive/00003.log" -> 3000 // new log file ) saveCheckpointFiles(fileManager, cpFiles1_, version = 1, numKeys = 1001) - assert(numRemoteSSTFiles === 5, "shouldn't copy same files again") // 2 old + 3 new SST files - assert(numRemoteLogFiles === 5, "shouldn't copy same files again") // 2 old + 3 new log files + assert(numRemoteSSTFiles === 4, "shouldn't copy same files again") // 2 old + 2 new SST files + assert(numRemoteLogFiles === 4, "shouldn't copy same files again") // 2 old + 2 new log files loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 1, cpFiles1_, 1001) // Save another version and verify @@ -688,8 +708,8 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared "archive/00004.log" -> 4000 ) saveCheckpointFiles(fileManager, cpFiles2, version = 2, numKeys = 1501) - assert(numRemoteSSTFiles === 6) // 1 new file over earlier 5 files - assert(numRemoteLogFiles === 6) // 1 new file over earlier 5 files + assert(numRemoteSSTFiles === 5) // 1 new file over earlier 4 files + assert(numRemoteLogFiles === 5) // 1 new file over earlier 4 files loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 2, cpFiles2, 1501) // Loading an older version should work @@ -1152,6 +1172,286 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } } + test("time travel - validate successful RocksDB load") { + val remoteDir = Utils.createTempDir().toString + val conf = dbConf.copy(minDeltasForSnapshot = 1, compactOnCommit = false) + new File(remoteDir).delete() // to make sure that the directory gets created + withDB(remoteDir, conf = conf) { db => + for (version <- 0 to 1) { + db.load(version) + db.put(version.toString, version.toString) + db.commit() + } + // upload snapshot 2.zip + db.doMaintenance() + for (version <- Seq(2)) { + db.load(version) + db.put(version.toString, version.toString) + db.commit() + } + // upload snapshot 3.zip + db.doMaintenance() + // simulate db in another executor that override the zip file + withDB(remoteDir, conf = conf) { db1 => + for (version <- 0 to 1) { + db1.load(version) + db1.put(version.toString, version.toString) + db1.commit() + } + db1.doMaintenance() + } + db.load(2) + for (version <- Seq(2)) { + db.load(version) + db.put(version.toString, version.toString) + db.commit() + } + // upload snapshot 3.zip + db.doMaintenance() + // rollback to version 2 + db.load(2) + } + } + + test("time travel 2 - validate successful RocksDB load") { + Seq(1, 2).map(minDeltasForSnapshot => { + val remoteDir = Utils.createTempDir().toString + val conf = dbConf.copy(minDeltasForSnapshot = minDeltasForSnapshot, + compactOnCommit = false) + new File(remoteDir).delete() // to make sure that the directory gets created + withDB(remoteDir, conf = conf) { db => + for (version <- 0 to 1) { + db.load(version) + db.put(version.toString, version.toString) + db.commit() + } + // upload snapshot 2.zip + db.doMaintenance() + for (version <- 2 to 3) { + db.load(version) + db.put(version.toString, version.toString) + db.commit() + } + db.load(0) + // simulate db in another executor that override the zip file + withDB(remoteDir, conf = conf) { db1 => + for (version <- 0 to 1) { + db1.load(version) + db1.put(version.toString, version.toString) + db1.commit() + } + db1.doMaintenance() + } + for (version <- 2 to 3) { + db.load(version) + db.put(version.toString, version.toString) + db.commit() + } + // upload snapshot 4.zip + db.doMaintenance() + } + withDB(remoteDir, version = 4, conf = conf) { db => + } + }) + } + + test("time travel 3 - validate successful RocksDB load") { + val remoteDir = Utils.createTempDir().toString + val conf = dbConf.copy(minDeltasForSnapshot = 0, compactOnCommit = false) + new File(remoteDir).delete() // to make sure that the directory gets created + withDB(remoteDir, conf = conf) { db => + for (version <- 0 to 2) { + db.load(version) + db.put(version.toString, version.toString) + db.commit() + } + // upload snapshot 2.zip + db.doMaintenance() + for (version <- 1 to 3) { + db.load(version) + db.put(version.toString, version.toString) + db.commit() + } + // upload snapshot 4.zip + db.doMaintenance() + } + + withDB(remoteDir, version = 4, conf = conf) { db => + } + } + + test("validate Rocks DB SST files do not have a VersionIdMismatch" + + " when metadata file is not overwritten - scenario 1") { + val fmClass = "org.apache.spark.sql.execution.streaming.state." + + "NoOverwriteFileSystemBasedCheckpointFileManager" + withTempDir { dir => + val dbConf = RocksDBConf(StateStoreConf(new SQLConf())) + val hadoopConf = new Configuration() + hadoopConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, fmClass) + + val remoteDir = dir.getCanonicalPath + withDB(remoteDir, conf = dbConf, hadoopConf = hadoopConf) { db1 => + withDB(remoteDir, conf = dbConf, hadoopConf = hadoopConf) { db2 => + // commit version 1 via db1 + db1.load(0) + db1.put("a", "1") + db1.put("b", "1") + + db1.commit() + + // commit version 1 via db2 + db2.load(0) + db2.put("a", "1") + db2.put("b", "1") + + db2.commit() + + // commit version 2 via db2 + db2.load(1) + db2.put("a", "2") + db2.put("b", "2") + + db2.commit() + + // reload version 1, this should succeed + db2.load(1) + db1.load(1) + + // reload version 2, this should succeed + db2.load(2) + db1.load(2) + } + } + } + } + + test("validate Rocks DB SST files do not have a VersionIdMismatch" + + " when metadata file is overwritten - scenario 1") { + withTempDir { dir => + val dbConf = RocksDBConf(StateStoreConf(new SQLConf())) + val hadoopConf = new Configuration() + val remoteDir = dir.getCanonicalPath + withDB(remoteDir, conf = dbConf, hadoopConf = hadoopConf) { db1 => + withDB(remoteDir, conf = dbConf, hadoopConf = hadoopConf) { db2 => + // commit version 1 via db1 + db1.load(0) + db1.put("a", "1") + db1.put("b", "1") + + db1.commit() + + // commit version 1 via db2 + db2.load(0) + db2.put("a", "1") + db2.put("b", "1") + + db2.commit() + + // commit version 2 via db2 + db2.load(1) + db2.put("a", "2") + db2.put("b", "2") + + db2.commit() + + // reload version 1, this should succeed + db2.load(1) + db1.load(1) + + // reload version 2, this should succeed + db2.load(2) + db1.load(2) + } + } + } + } + + test("validate Rocks DB SST files do not have a VersionIdMismatch" + + " when metadata file is not overwritten - scenario 2") { + val fmClass = "org.apache.spark.sql.execution.streaming.state." + + "NoOverwriteFileSystemBasedCheckpointFileManager" + withTempDir { dir => + val dbConf = RocksDBConf(StateStoreConf(new SQLConf())) + val hadoopConf = new Configuration() + hadoopConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, fmClass) + + val remoteDir = dir.getCanonicalPath + withDB(remoteDir, conf = dbConf, hadoopConf = hadoopConf) { db1 => + withDB(remoteDir, conf = dbConf, hadoopConf = hadoopConf) { db2 => + // commit version 1 via db2 + db2.load(0) + db2.put("a", "1") + db2.put("b", "1") + + db2.commit() + + // commit version 1 via db1 + db1.load(0) + db1.put("a", "1") + db1.put("b", "1") + + db1.commit() + + // commit version 2 via db2 + db2.load(1) + db2.put("a", "2") + db2.put("b", "2") + + db2.commit() + + // reload version 1, this should succeed + db2.load(1) + db1.load(1) + + // reload version 2, this should succeed + db2.load(2) + db1.load(2) + } + } + } + } + + test("validate Rocks DB SST files do not have a VersionIdMismatch" + + " when metadata file is overwritten - scenario 2") { + withTempDir { dir => + val dbConf = RocksDBConf(StateStoreConf(new SQLConf())) + val hadoopConf = new Configuration() + val remoteDir = dir.getCanonicalPath + withDB(remoteDir, conf = dbConf, hadoopConf = hadoopConf) { db1 => + withDB(remoteDir, conf = dbConf, hadoopConf = hadoopConf) { db2 => + // commit version 1 via db2 + db2.load(0) + db2.put("a", "1") + db2.put("b", "1") + + db2.commit() + + // commit version 1 via db1 + db1.load(0) + db1.put("a", "1") + db1.put("b", "1") + + db1.commit() + + // commit version 2 via db2 + db2.load(1) + db2.put("a", "2") + db2.put("b", "2") + + db2.commit() + + // reload version 1, this should succeed + db2.load(1) + db1.load(1) + + // reload version 2, this should succeed + db2.load(2) + db1.load(2) + } + } + } + } + private def sqlConf = SQLConf.get.clone() private def dbConf = RocksDBConf(StateStoreConf(sqlConf))