diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 2398b7780726a..0c9738a6b0817 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -151,6 +151,8 @@ class RocksDB( val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir) loadedVersion = latestSnapshotVersion + // reset last snapshot version + lastSnapshotVersion = 0L openDB() numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) { @@ -191,6 +193,7 @@ class RocksDB( */ private def replayChangelog(endVersion: Long): Unit = { for (v <- loadedVersion + 1 to endVersion) { + logInfo(s"replaying changelog from version $loadedVersion -> $endVersion") var changelogReader: StateStoreChangelogReader = null try { changelogReader = fileManager.getChangelogReader(v) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index faf9cd701aeca..300a3b8137b4c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -132,6 +132,15 @@ class RocksDBFileManager( import RocksDBImmutableFile._ private val versionToRocksDBFiles = new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]] + + + // used to keep a mapping of the exact Dfs file that was used to create a local SST file. + // The reason this is a separate map because versionToRocksDBFiles can contain multiple similar + // SST files to a particular local file (for example 1.sst can map to 1-UUID1.sst in v1 and + // 1-UUID2.sst in v2). We need to capture the exact file used to ensure Version ID compatibility + // across SST files and RocksDB manifest. + private[sql] val localFilesToDfsFiles = new ConcurrentHashMap[String, RocksDBImmutableFile] + private lazy val fm = CheckpointFileManager.create(new Path(dfsRootDir), hadoopConf) private val fs = new Path(dfsRootDir).getFileSystem(hadoopConf) private val onlyZipFiles = new PathFilter { @@ -213,6 +222,7 @@ class RocksDBFileManager( versionToRocksDBFiles.keySet().removeIf(_ >= version) val metadata = if (version == 0) { if (localDir.exists) Utils.deleteRecursively(localDir) + localFilesToDfsFiles.clear() localDir.mkdirs() RocksDBCheckpointMetadata(Seq.empty, 0) } else { @@ -449,44 +459,54 @@ class RocksDBFileManager( // Get the immutable files used in previous versions, as some of those uploaded files can be // reused for this version logInfo(s"Saving RocksDB files to DFS for $version") - val prevFilesToSizes = versionToRocksDBFiles.asScala.filterKeys(_ < version) - .values.flatten.map { f => - f.localFileName -> f - }.toMap var bytesCopied = 0L var filesCopied = 0L var filesReused = 0L val immutableFiles = localFiles.map { localFile => - prevFilesToSizes - .get(localFile.getName) - .filter(_.isSameFile(localFile)) - .map { reusable => - filesReused += 1 - reusable - }.getOrElse { - val localFileName = localFile.getName - val dfsFileName = newDFSFileName(localFileName) - val dfsFile = dfsFilePath(dfsFileName) - // Note: The implementation of copyFromLocalFile() closes the output stream when there is - // any exception while copying. So this may generate partial files on DFS. But that is - // okay because until the main [version].zip file is written, those partial files are - // not going to be used at all. Eventually these files should get cleared. - fs.copyFromLocalFile( - new Path(localFile.getAbsoluteFile.toURI), dfsFile) - val localFileSize = localFile.length() - logInfo(s"Copied $localFile to $dfsFile - $localFileSize bytes") - filesCopied += 1 - bytesCopied += localFileSize - - RocksDBImmutableFile(localFile.getName, dfsFileName, localFileSize) - } + val existingDfsFile = localFilesToDfsFiles.asScala.get(localFile.getName) + if (existingDfsFile.isDefined && existingDfsFile.get.sizeBytes == localFile.length()) { + val dfsFile = existingDfsFile.get + filesReused += 1 + logInfo(s"reusing file $dfsFile for $localFile") + RocksDBImmutableFile(localFile.getName, dfsFile.dfsFileName, dfsFile.sizeBytes) + } else { + val localFileName = localFile.getName + val dfsFileName = newDFSFileName(localFileName) + val dfsFile = dfsFilePath(dfsFileName) + // Note: The implementation of copyFromLocalFile() closes the output stream when there is + // any exception while copying. So this may generate partial files on DFS. But that is + // okay because until the main [version].zip file is written, those partial files are + // not going to be used at all. Eventually these files should get cleared. + fs.copyFromLocalFile( + new Path(localFile.getAbsoluteFile.toURI), dfsFile) + val localFileSize = localFile.length() + logInfo(s"Copied $localFile to $dfsFile - $localFileSize bytes") + filesCopied += 1 + bytesCopied += localFileSize + + val immutableDfsFile = RocksDBImmutableFile(localFile.getName, dfsFileName, localFileSize) + localFilesToDfsFiles.put(localFileName, immutableDfsFile) + + immutableDfsFile + } } logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from local to" + s" DFS for version $version. $filesReused files reused without copying.") versionToRocksDBFiles.put(version, immutableFiles) + // clean up deleted SST files from the localFilesToDfsFiles Map + val currentLocalFiles = localFiles.map(_.getName).toSet + val mappingsToClean = localFilesToDfsFiles.asScala + .keys + .filterNot(currentLocalFiles.contains) + + mappingsToClean.foreach { f => + logInfo(s"cleaning $f from the localFilesToDfsFiles map") + localFilesToDfsFiles.remove(f) + } + saveCheckpointMetrics = RocksDBFileManagerMetrics( bytesCopied = bytesCopied, filesCopied = filesCopied, @@ -506,11 +526,22 @@ class RocksDBFileManager( // Delete unnecessary local immutable files listRocksDBFiles(localDir)._1 .foreach { existingFile => - val isSameFile = - requiredFileNameToFileDetails.get(existingFile.getName).exists(_.isSameFile(existingFile)) + val requiredFile = requiredFileNameToFileDetails.get(existingFile.getName) + val prevDfsFile = localFilesToDfsFiles.asScala.get(existingFile.getName) + val isSameFile = if (requiredFile.isDefined && prevDfsFile.isDefined) { + requiredFile.get.dfsFileName == prevDfsFile.get.dfsFileName && + existingFile.length() == requiredFile.get.sizeBytes + } else { + false + } + if (!isSameFile) { existingFile.delete() - logInfo(s"Deleted local file $existingFile") + localFilesToDfsFiles.remove(existingFile.getName) + logInfo(s"Deleted local file $existingFile with size ${existingFile.length()} mapped" + + s" to previous dfsFile ${prevDfsFile.getOrElse("null")}") + } else { + logInfo(s"reusing $prevDfsFile present at $existingFile for $requiredFile") } } @@ -536,6 +567,7 @@ class RocksDBFileManager( } filesCopied += 1 bytesCopied += localFileSize + localFilesToDfsFiles.put(localFileName, file) logInfo(s"Copied $dfsFile to $localFile - $localFileSize bytes") } else { filesReused += 1 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 91dd858220717..04b11dfe43f0c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -24,16 +24,36 @@ import scala.language.implicitConversions import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.scalactic.source.Position import org.scalatest.Tag import org.apache.spark.sql.catalyst.util.quietly -import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager +import org.apache.spark.sql.execution.streaming.{CreateAtomicTestManager, FileSystemBasedCheckpointFileManager} +import org.apache.spark.sql.execution.streaming.CheckpointFileManager.{CancellableFSDataOutputStream, RenameBasedFSDataOutputStream} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} import org.apache.spark.tags.SlowSQLTest import org.apache.spark.util.{ThreadUtils, Utils} +class NoOverwriteFileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration) + extends FileSystemBasedCheckpointFileManager(path, hadoopConf) { + + override def createAtomic(path: Path, + overwriteIfPossible: Boolean): CancellableFSDataOutputStream = { + new RenameBasedFSDataOutputStream(this, path, overwriteIfPossible) + } + + override def renameTempFile(srcPath: Path, dstPath: Path, + overwriteIfPossible: Boolean): Unit = { + if (!fs.exists(dstPath)) { + // only write if a file does not exist at this location + super.renameTempFile(srcPath, dstPath, overwriteIfPossible) + } + } +} + trait RocksDBStateStoreChangelogCheckpointingTestUtil { val rocksdbChangelogCheckpointingConfKey: String = RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" @@ -666,19 +686,19 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared // Save SAME version again with different checkpoint files and load back again to verify // whether files were overwritten. val cpFiles1_ = Seq( - "sst-file1.sst" -> 10, // same SST file as before, but same version, so should get copied + "sst-file1.sst" -> 10, // same SST file as before, this should get reused "sst-file2.sst" -> 25, // new SST file with same name as before, but different length "sst-file3.sst" -> 30, // new SST file "other-file1" -> 100, // same non-SST file as before, should not get copied "other-file2" -> 210, // new non-SST file with same name as before, but different length "other-file3" -> 300, // new non-SST file - "archive/00001.log" -> 1000, // same log file as before and version, so should get copied + "archive/00001.log" -> 1000, // same log file as before, this should get reused "archive/00002.log" -> 2500, // new log file with same name as before, but different length "archive/00003.log" -> 3000 // new log file ) saveCheckpointFiles(fileManager, cpFiles1_, version = 1, numKeys = 1001) - assert(numRemoteSSTFiles === 5, "shouldn't copy same files again") // 2 old + 3 new SST files - assert(numRemoteLogFiles === 5, "shouldn't copy same files again") // 2 old + 3 new log files + assert(numRemoteSSTFiles === 4, "shouldn't copy same files again") // 2 old + 2 new SST files + assert(numRemoteLogFiles === 4, "shouldn't copy same files again") // 2 old + 2 new log files loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 1, cpFiles1_, 1001) // Save another version and verify @@ -688,8 +708,8 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared "archive/00004.log" -> 4000 ) saveCheckpointFiles(fileManager, cpFiles2, version = 2, numKeys = 1501) - assert(numRemoteSSTFiles === 6) // 1 new file over earlier 5 files - assert(numRemoteLogFiles === 6) // 1 new file over earlier 5 files + assert(numRemoteSSTFiles === 5) // 1 new file over earlier 4 files + assert(numRemoteLogFiles === 5) // 1 new file over earlier 4 files loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 2, cpFiles2, 1501) // Loading an older version should work @@ -1152,6 +1172,286 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } } + test("time travel - validate successful RocksDB load") { + val remoteDir = Utils.createTempDir().toString + val conf = dbConf.copy(minDeltasForSnapshot = 1, compactOnCommit = false) + new File(remoteDir).delete() // to make sure that the directory gets created + withDB(remoteDir, conf = conf) { db => + for (version <- 0 to 1) { + db.load(version) + db.put(version.toString, version.toString) + db.commit() + } + // upload snapshot 2.zip + db.doMaintenance() + for (version <- Seq(2)) { + db.load(version) + db.put(version.toString, version.toString) + db.commit() + } + // upload snapshot 3.zip + db.doMaintenance() + // simulate db in another executor that override the zip file + withDB(remoteDir, conf = conf) { db1 => + for (version <- 0 to 1) { + db1.load(version) + db1.put(version.toString, version.toString) + db1.commit() + } + db1.doMaintenance() + } + db.load(2) + for (version <- Seq(2)) { + db.load(version) + db.put(version.toString, version.toString) + db.commit() + } + // upload snapshot 3.zip + db.doMaintenance() + // rollback to version 2 + db.load(2) + } + } + + test("time travel 2 - validate successful RocksDB load") { + Seq(1, 2).map(minDeltasForSnapshot => { + val remoteDir = Utils.createTempDir().toString + val conf = dbConf.copy(minDeltasForSnapshot = minDeltasForSnapshot, + compactOnCommit = false) + new File(remoteDir).delete() // to make sure that the directory gets created + withDB(remoteDir, conf = conf) { db => + for (version <- 0 to 1) { + db.load(version) + db.put(version.toString, version.toString) + db.commit() + } + // upload snapshot 2.zip + db.doMaintenance() + for (version <- 2 to 3) { + db.load(version) + db.put(version.toString, version.toString) + db.commit() + } + db.load(0) + // simulate db in another executor that override the zip file + withDB(remoteDir, conf = conf) { db1 => + for (version <- 0 to 1) { + db1.load(version) + db1.put(version.toString, version.toString) + db1.commit() + } + db1.doMaintenance() + } + for (version <- 2 to 3) { + db.load(version) + db.put(version.toString, version.toString) + db.commit() + } + // upload snapshot 4.zip + db.doMaintenance() + } + withDB(remoteDir, version = 4, conf = conf) { db => + } + }) + } + + test("time travel 3 - validate successful RocksDB load") { + val remoteDir = Utils.createTempDir().toString + val conf = dbConf.copy(minDeltasForSnapshot = 0, compactOnCommit = false) + new File(remoteDir).delete() // to make sure that the directory gets created + withDB(remoteDir, conf = conf) { db => + for (version <- 0 to 2) { + db.load(version) + db.put(version.toString, version.toString) + db.commit() + } + // upload snapshot 2.zip + db.doMaintenance() + for (version <- 1 to 3) { + db.load(version) + db.put(version.toString, version.toString) + db.commit() + } + // upload snapshot 4.zip + db.doMaintenance() + } + + withDB(remoteDir, version = 4, conf = conf) { db => + } + } + + test("validate Rocks DB SST files do not have a VersionIdMismatch" + + " when metadata file is not overwritten - scenario 1") { + val fmClass = "org.apache.spark.sql.execution.streaming.state." + + "NoOverwriteFileSystemBasedCheckpointFileManager" + withTempDir { dir => + val dbConf = RocksDBConf(StateStoreConf(new SQLConf())) + val hadoopConf = new Configuration() + hadoopConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, fmClass) + + val remoteDir = dir.getCanonicalPath + withDB(remoteDir, conf = dbConf, hadoopConf = hadoopConf) { db1 => + withDB(remoteDir, conf = dbConf, hadoopConf = hadoopConf) { db2 => + // commit version 1 via db1 + db1.load(0) + db1.put("a", "1") + db1.put("b", "1") + + db1.commit() + + // commit version 1 via db2 + db2.load(0) + db2.put("a", "1") + db2.put("b", "1") + + db2.commit() + + // commit version 2 via db2 + db2.load(1) + db2.put("a", "2") + db2.put("b", "2") + + db2.commit() + + // reload version 1, this should succeed + db2.load(1) + db1.load(1) + + // reload version 2, this should succeed + db2.load(2) + db1.load(2) + } + } + } + } + + test("validate Rocks DB SST files do not have a VersionIdMismatch" + + " when metadata file is overwritten - scenario 1") { + withTempDir { dir => + val dbConf = RocksDBConf(StateStoreConf(new SQLConf())) + val hadoopConf = new Configuration() + val remoteDir = dir.getCanonicalPath + withDB(remoteDir, conf = dbConf, hadoopConf = hadoopConf) { db1 => + withDB(remoteDir, conf = dbConf, hadoopConf = hadoopConf) { db2 => + // commit version 1 via db1 + db1.load(0) + db1.put("a", "1") + db1.put("b", "1") + + db1.commit() + + // commit version 1 via db2 + db2.load(0) + db2.put("a", "1") + db2.put("b", "1") + + db2.commit() + + // commit version 2 via db2 + db2.load(1) + db2.put("a", "2") + db2.put("b", "2") + + db2.commit() + + // reload version 1, this should succeed + db2.load(1) + db1.load(1) + + // reload version 2, this should succeed + db2.load(2) + db1.load(2) + } + } + } + } + + test("validate Rocks DB SST files do not have a VersionIdMismatch" + + " when metadata file is not overwritten - scenario 2") { + val fmClass = "org.apache.spark.sql.execution.streaming.state." + + "NoOverwriteFileSystemBasedCheckpointFileManager" + withTempDir { dir => + val dbConf = RocksDBConf(StateStoreConf(new SQLConf())) + val hadoopConf = new Configuration() + hadoopConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, fmClass) + + val remoteDir = dir.getCanonicalPath + withDB(remoteDir, conf = dbConf, hadoopConf = hadoopConf) { db1 => + withDB(remoteDir, conf = dbConf, hadoopConf = hadoopConf) { db2 => + // commit version 1 via db2 + db2.load(0) + db2.put("a", "1") + db2.put("b", "1") + + db2.commit() + + // commit version 1 via db1 + db1.load(0) + db1.put("a", "1") + db1.put("b", "1") + + db1.commit() + + // commit version 2 via db2 + db2.load(1) + db2.put("a", "2") + db2.put("b", "2") + + db2.commit() + + // reload version 1, this should succeed + db2.load(1) + db1.load(1) + + // reload version 2, this should succeed + db2.load(2) + db1.load(2) + } + } + } + } + + test("validate Rocks DB SST files do not have a VersionIdMismatch" + + " when metadata file is overwritten - scenario 2") { + withTempDir { dir => + val dbConf = RocksDBConf(StateStoreConf(new SQLConf())) + val hadoopConf = new Configuration() + val remoteDir = dir.getCanonicalPath + withDB(remoteDir, conf = dbConf, hadoopConf = hadoopConf) { db1 => + withDB(remoteDir, conf = dbConf, hadoopConf = hadoopConf) { db2 => + // commit version 1 via db2 + db2.load(0) + db2.put("a", "1") + db2.put("b", "1") + + db2.commit() + + // commit version 1 via db1 + db1.load(0) + db1.put("a", "1") + db1.put("b", "1") + + db1.commit() + + // commit version 2 via db2 + db2.load(1) + db2.put("a", "2") + db2.put("b", "2") + + db2.commit() + + // reload version 1, this should succeed + db2.load(1) + db1.load(1) + + // reload version 2, this should succeed + db2.load(2) + db1.load(2) + } + } + } + } + private def sqlConf = SQLConf.get.clone() private def dbConf = RocksDBConf(StateStoreConf(sqlConf))