Skip to content

Commit

Permalink
[SPARK-35436][SS] RocksDBFileManager - save checkpoint to DFS
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
The implementation for the save operation of RocksDBFileManager.

### Why are the changes needed?
Save all the files in the given local checkpoint directory as a committed version in DFS.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
New UT added.

Closes #32582 from xuanyuanking/SPARK-35436.

Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
  • Loading branch information
xuanyuanking authored and HeartSaVioR committed Jun 9, 2021
1 parent 8013f98 commit 9f010a8
Show file tree
Hide file tree
Showing 3 changed files with 366 additions and 1 deletion.
16 changes: 16 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1129,6 +1129,22 @@ private[spark] object Utils extends Logging {
s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms"
}

/**
* Lists files recursively.
*/
def recursiveList(f: File): Array[File] = {
require(f.isDirectory)
val result = f.listFiles.toBuffer
val dirList = result.filter(_.isDirectory)
while (dirList.nonEmpty) {
val curDir = dirList.remove(0)
val files = curDir.listFiles()
result ++= files
dirList ++= files.filter(_.isDirectory)
}
result.toArray
}

/**
* Delete a file or directory and its contents recursively.
* Don't follow directories if they are symlinks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,266 @@

package org.apache.spark.sql.execution.streaming.state

import java.io.File
import java.io.{File, FileInputStream, InputStream}
import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.Files
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.zip.{ZipEntry, ZipOutputStream}

import scala.collection.JavaConverters._
import scala.collection.Seq

import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
import org.apache.commons.io.{FilenameUtils, IOUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.streaming.CheckpointFileManager
import org.apache.spark.util.Utils

/**
* Class responsible for syncing RocksDB checkpoint files from local disk to DFS.
* For each version, checkpoint is saved in specific directory structure that allows successive
* versions to reuse to SST data files and archived log files. This allows each commit to be
* incremental, only new SST files and archived log files generated by RocksDB will be uploaded.
* The directory structures on local disk and in DFS are as follows.
*
* Local checkpoint dir structure
* ------------------------------
* RocksDB generates a bunch of files in the local checkpoint directory. The most important among
* them are the SST files; they are the actual log structured data files. Rest of the files contain
* the metadata necessary for RocksDB to read the SST files and start from the checkpoint.
* Note that the SST files are hard links to files in the RocksDB's working directory, and therefore
* successive checkpoints can share some of the SST files. So these SST files have to be copied to
* DFS in shared directory such that different committed versions can save them.
*
* We consider both SST files and archived log files as immutable files which can be shared between
* different checkpoints.
*
* localCheckpointDir
* |
* +-- OPTIONS-000005
* +-- MANIFEST-000008
* +-- CURRENT
* +-- 00007.sst
* +-- 00011.sst
* +-- archive
* | +-- 00008.log
* | +-- 00013.log
* ...
*
*
* DFS directory structure after saving to DFS as version 10
* -----------------------------------------------------------
* The SST and archived log files are given unique file names and copied to the shared subdirectory.
* Every version maintains a mapping of local immutable file name to the unique file name in DFS.
* This mapping is saved in a JSON file (named `metadata`), which is zipped along with other
* checkpoint files into a single file `[version].zip`.
*
* dfsRootDir
* |
* +-- SSTs
* | +-- 00007-[uuid1].sst
* | +-- 00011-[uuid2].sst
* +-- logs
* | +-- 00008-[uuid3].log
* | +-- 00013-[uuid4].log
* +-- 10.zip
* | +-- metadata <--- contains mapping between 00007.sst and [uuid1].sst,
* and the mapping between 00008.log and [uuid3].log
* | +-- OPTIONS-000005
* | +-- MANIFEST-000008
* | +-- CURRENT
* | ...
* |
* +-- 9.zip
* +-- 8.zip
* ...
*
* Note the following.
* - Each [version].zip is a complete description of all the data and metadata needed to recover
* a RocksDB instance at the corresponding version. The SST files and log files are not included
* in the zip files, they can be shared cross different versions. This is unlike the
* [version].delta files of HDFSBackedStateStore where previous delta files needs to be read
* to be recovered.
* - This is safe wrt speculatively executed tasks running concurrently in different executors
* as each task would upload a different copy of the generated immutable files and
* atomically update the [version].zip.
* - Immutable files are identified uniquely based on their file name and file size.
* - Immutable files can be reused only across adjacent checkpoints/versions.
* - This class is thread-safe. Specifically, it is safe to concurrently delete old files from a
* different thread than the task thread saving files.
*
* @param dfsRootDir Directory where the [version].zip files will be stored
* @param localTempDir Local directory for temporary work
* @param hadoopConf Hadoop configuration for talking to DFS
* @param loggingId Id that will be prepended in logs for isolating concurrent RocksDBs
*/
class RocksDBFileManager(
dfsRootDir: String,
localTempDir: File,
hadoopConf: Configuration,
loggingId: String = "")
extends Logging {

import RocksDBImmutableFile._

private val versionToRocksDBFiles = new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]]
private lazy val fm = CheckpointFileManager.create(new Path(dfsRootDir), hadoopConf)
private val fs = new Path(dfsRootDir).getFileSystem(hadoopConf)

/** Save all the files in given local checkpoint directory as a committed version in DFS */
def saveCheckpointToDfs(checkpointDir: File, version: Long, numKeys: Long): Unit = {
logFilesInDir(checkpointDir, s"Saving checkpoint files for version $version")
val (localImmutableFiles, localOtherFiles) = listRocksDBFiles(checkpointDir)
val rocksDBFiles = saveImmutableFilesToDfs(version, localImmutableFiles)
val metadata = RocksDBCheckpointMetadata(rocksDBFiles, numKeys)
val metadataFile = localMetadataFile(checkpointDir)
metadata.writeToFile(metadataFile)
logInfo(s"Written metadata for version $version:\n${metadata.prettyJson}")

if (version <= 1 && numKeys == 0) {
// If we're writing the initial version and there's no data, we have to explicitly initialize
// the root directory. Normally saveImmutableFilesToDfs will do this initialization, but
// when there's no data that method won't write any files, and zipToDfsFile uses the
// CheckpointFileManager.createAtomic API which doesn't auto-initialize parent directories.
val path = new Path(dfsRootDir)
if (!fm.exists(path)) fm.mkdirs(path)
}
zipToDfsFile(localOtherFiles :+ metadataFile, dfsBatchZipFile(version))
logInfo(s"Saved checkpoint file for version $version")
}

/** Save immutable files to DFS directory */
private def saveImmutableFilesToDfs(
version: Long,
localFiles: Seq[File]): Seq[RocksDBImmutableFile] = {
// Get the immutable files used in previous versions, as some of those uploaded files can be
// reused for this version
logInfo(s"Saving RocksDB files to DFS for $version")
val prevFilesToSizes = versionToRocksDBFiles.values.asScala.flatten.map { f =>
f.localFileName -> f
}.toMap

var bytesCopied = 0L
var filesCopied = 0L
var filesReused = 0L

val immutableFiles = localFiles.map { localFile =>
prevFilesToSizes
.get(localFile.getName)
.filter(_.isSameFile(localFile))
.map { reusable =>
filesReused += 1
reusable
}.getOrElse {
val localFileName = localFile.getName
val dfsFileName = newDFSFileName(localFileName)
val dfsFile = dfsFilePath(dfsFileName)
// Note: The implementation of copyFromLocalFile() closes the output stream when there is
// any exception while copying. So this may generate partial files on DFS. But that is
// okay because until the main [version].zip file is written, those partial files are
// not going to be used at all. Eventually these files should get cleared.
fs.copyFromLocalFile(
new Path(localFile.getAbsoluteFile.toURI), dfsFile)
val localFileSize = localFile.length()
logInfo(s"Copied $localFile to $dfsFile - $localFileSize bytes")
filesCopied += 1
bytesCopied += localFileSize

RocksDBImmutableFile(localFile.getName, dfsFileName, localFileSize)
}
}
logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from local to" +
s" DFS for version $version. $filesReused files reused without copying.")
versionToRocksDBFiles.put(version, immutableFiles)

immutableFiles
}

/**
* Compress files to a single zip file in DFS. Only the file names are embedded in the zip.
* Any error while writing will ensure that the file is not written.
*/
private def zipToDfsFile(files: Seq[File], dfsZipFile: Path): Unit = {
lazy val filesStr = s"$dfsZipFile\n\t${files.mkString("\n\t")}"
var in: InputStream = null
val out = fm.createAtomic(dfsZipFile, overwriteIfPossible = true)
var totalBytes = 0L
val zout = new ZipOutputStream(out)
try {
files.foreach { file =>
zout.putNextEntry(new ZipEntry(file.getName))
in = new FileInputStream(file)
val bytes = IOUtils.copy(in, zout)
in.close()
zout.closeEntry()
totalBytes += bytes
}
zout.close() // so that any error in closing also cancels the output stream
logInfo(s"Zipped $totalBytes bytes (before compression) to $filesStr")
} catch {
case e: Exception =>
// Cancel the actual output stream first, so that zout.close() does not write the file
out.cancel()
logError(s"Error zipping to $filesStr", e)
throw e
} finally {
// Close everything no matter what happened
IOUtils.closeQuietly(in)
IOUtils.closeQuietly(zout)
}
}

/** Log the files present in a directory. This is useful for debugging. */
private def logFilesInDir(dir: File, msg: String): Unit = {
lazy val files = Option(Utils.recursiveList(dir)).getOrElse(Array.empty).map { f =>
s"${f.getAbsolutePath} - ${f.length()} bytes"
}
logInfo(s"$msg - ${files.length} files\n\t${files.mkString("\n\t")}")
}

private def newDFSFileName(localFileName: String): String = {
val baseName = FilenameUtils.getBaseName(localFileName)
val extension = FilenameUtils.getExtension(localFileName)
s"$baseName-${UUID.randomUUID}.$extension"
}

private def dfsBatchZipFile(version: Long): Path = new Path(s"$dfsRootDir/$version.zip")

private def localMetadataFile(parentDir: File): File = new File(parentDir, "metadata")

private def dfsFilePath(fileName: String): Path = {
if (isSstFile(fileName)) {
new Path(new Path(dfsRootDir, SST_FILES_DFS_SUBDIR), fileName)
} else if (isLogFile(fileName)) {
new Path(new Path(dfsRootDir, LOG_FILES_DFS_SUBDIR), fileName)
} else {
new Path(dfsRootDir, fileName)
}
}

/**
* List all the RocksDB files that need be synced or recovered.
*/
private def listRocksDBFiles(localDir: File): (Seq[File], Seq[File]) = {
val topLevelFiles = localDir.listFiles.filter(!_.isDirectory)
val archivedLogFiles =
Option(new File(localDir, LOG_FILES_LOCAL_SUBDIR).listFiles())
.getOrElse(Array[File]())
// To ignore .log.crc files
.filter(file => isLogFile(file.getName))
val (topLevelSstFiles, topLevelOtherFiles) = topLevelFiles.partition(f => isSstFile(f.getName))
(topLevelSstFiles ++ archivedLogFiles, topLevelOtherFiles)
}
}

/**
* Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
* changes to this MUST be backward-compatible.
Expand Down
Loading

0 comments on commit 9f010a8

Please sign in to comment.