Skip to content

Commit

Permalink
[ADAM-1165] Support merging shards across multiple file systems.
Browse files Browse the repository at this point in the history
  • Loading branch information
fnothaft committed Sep 12, 2016
1 parent ccc4372 commit d8fbc5c
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 8 deletions.
Expand Up @@ -67,10 +67,11 @@ class MergeShards(val args: MergeShardsArgs) extends BDGSparkCommand[MergeShards
val optHeadPath = Option(args.headerPath).map(p => new Path(p))
val tailPath = new Path(args.inputPath)
val outputPath = new Path(args.outputPath)
val fs = tailPath.getFileSystem(conf)
val fsIn = tailPath.getFileSystem(conf)
val fsOut = outputPath.getFileSystem(conf)

// merge the files
FileMerger.mergeFiles(fs,
FileMerger.mergeFilesAcrossFilesystems(fsIn, fsOut,
outputPath, tailPath, optHeadPath,
writeEmptyGzipBlock = args.gzipAtEof,
bufferSize = args.bufferSize)
Expand Down
41 changes: 35 additions & 6 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/FileMerger.scala
Expand Up @@ -38,16 +38,45 @@ private[adam] object FileMerger extends Logging {
* been written.
* @param writeEmptyGzipBlock If true, we write an empty GZIP block at the
* end of the merged file.
*
* @see mergeFilesAcrossFilesystems
*/
def mergeFiles(fs: FileSystem,
outputPath: Path,
tailPath: Path,
optHeaderPath: Option[Path] = None,
writeEmptyGzipBlock: Boolean = false,
bufferSize: Int = 4 * 1024 * 1024) {
mergeFilesAcrossFilesystems(fs, fs,
outputPath, tailPath, optHeaderPath = optHeaderPath,
writeEmptyGzipBlock = writeEmptyGzipBlock,
bufferSize = bufferSize)
}

/**
* Merges together sharded files, while preserving partition ordering.
*
* Can read files from a different filesystem then they are written to.
*
* @param fsIn The file system implementation to use for the tail/head paths.
* @param fsOut The file system implementation to use for the output path.
* @param outputPath The location to write the merged file at.
* @param tailPath The location where the sharded files have been written.
* @param optHeaderPath Optionally, the location where a header file has
* been written.
* @param writeEmptyGzipBlock If true, we write an empty GZIP block at the
* end of the merged file.
*/
def mergeFilesAcrossFilesystems(fsIn: FileSystem,
fsOut: FileSystem,
outputPath: Path,
tailPath: Path,
optHeaderPath: Option[Path] = None,
writeEmptyGzipBlock: Boolean = false,
bufferSize: Int = 4 * 1024 * 1024) {

// get a list of all of the files in the tail file
val tailFiles = fs.globStatus(new Path("%s/part-*".format(tailPath)))
val tailFiles = fsIn.globStatus(new Path("%s/part-*".format(tailPath)))
.toSeq
.map(_.getPath)
.sortBy(_.getName)
Expand All @@ -66,7 +95,7 @@ private[adam] object FileMerger extends Logging {
// but! it is correct.

// open our output file
val os = fs.create(outputPath)
val os = fsOut.create(outputPath)

// here is a byte array for copying
val ba = new Array[Byte](bufferSize)
Expand All @@ -91,7 +120,7 @@ private[adam] object FileMerger extends Logging {
log.info("Copying header file (%s)".format(p))

// open our input file
val is = fs.open(p)
val is = fsIn.open(p)

// until we are out of bytes, copy
copy(is, os)
Expand All @@ -112,7 +141,7 @@ private[adam] object FileMerger extends Logging {
numFiles))

// open our input file
val is = fs.open(p)
val is = fsIn.open(p)

// until we are out of bytes, copy
copy(is, os)
Expand All @@ -134,7 +163,7 @@ private[adam] object FileMerger extends Logging {
os.close()

// delete temp files
optHeaderPath.foreach(headPath => fs.delete(headPath, true))
fs.delete(tailPath, true)
optHeaderPath.foreach(headPath => fsIn.delete(headPath, true))
fsIn.delete(tailPath, true)
}
}

0 comments on commit d8fbc5c

Please sign in to comment.