Skip to content

Commit

Permalink
[ADAM-1160] Allow users to set merging buffer size through Hadoop con…
Browse files Browse the repository at this point in the history
…figuration.

Resolves bigdatagenomics#1160.
  • Loading branch information
fnothaft committed Sep 13, 2016
1 parent 3d03109 commit bb20b2b
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 29 deletions.
Expand Up @@ -34,8 +34,10 @@ class MergeShardsArgs extends Args4jBase {
var outputPath: String = null
@Args4jOption(required = false, name = "-headerPath", usage = "Optional path to a header")
var headerPath: String = null
@Args4jOption(required = false, name = "-bufferSize", usage = "Buffer size for merging single file output. Default is 4MB.")
var bufferSize: Int = 4 * 1024 * 1024
@Args4jOption(required = false,
name = "-bufferSize",
usage = "Buffer size for merging single file output. If provided, overrides configured buffer size.")
var bufferSize: Int = _
@Args4jOption(required = false, name = "-writeEmptyGZIPAtEof", usage = "If provided, writes an empty GZIP block at EOF")
var gzipAtEof: Boolean = false
}
Expand Down Expand Up @@ -71,9 +73,10 @@ class MergeShards(val args: MergeShardsArgs) extends BDGSparkCommand[MergeShards
val fsOut = outputPath.getFileSystem(conf)

// merge the files
FileMerger.mergeFilesAcrossFilesystems(fsIn, fsOut,
FileMerger.mergeFilesAcrossFilesystems(conf,
fsIn, fsOut,
outputPath, tailPath, optHeadPath,
writeEmptyGzipBlock = args.gzipAtEof,
bufferSize = args.bufferSize)
optBufferSize = Option(args.bufferSize).filter(_ > 0))
}
}
60 changes: 39 additions & 21 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/FileMerger.scala
Expand Up @@ -21,14 +21,20 @@ import htsjdk.samtools.util.BlockCompressedStreamConstants
import htsjdk.samtools.cram.build.CramIO
import htsjdk.samtools.cram.common.CramVersions
import java.io.{ InputStream, OutputStream }
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{ FileSystem, Path }
import org.bdgenomics.utils.misc.Logging
import scala.annotation.tailrec

/**
* Helper object to merge sharded files together.
*/
private[adam] object FileMerger extends Logging {
object FileMerger extends Logging {

/**
* The config entry for the buffer size in bytes.
*/
val BUFFER_SIZE_CONF = "org.bdgenomics.adam.rdd.FileMerger.bufferSize"

/**
* Merges together sharded files, while preserving partition ordering.
Expand All @@ -41,22 +47,26 @@ private[adam] object FileMerger extends Logging {
* @param writeEmptyGzipBlock If true, we write an empty GZIP block at the
* end of the merged file.
* @param writeCramEOF If true, we write CRAM's EOF signifier.
* @param bufferSize The size in bytes of the buffer used for copying.
*
* @param optBufferSize The size in bytes of the buffer used for copying. If
* not set, we check the config for this value. If that is not set, we
* default to 4MB.
*
* @see mergeFilesAcrossFilesystems
*/
def mergeFiles(fs: FileSystem,
outputPath: Path,
tailPath: Path,
optHeaderPath: Option[Path] = None,
writeEmptyGzipBlock: Boolean = false,
writeCramEOF: Boolean = false,
bufferSize: Int = 4 * 1024 * 1024) {
mergeFilesAcrossFilesystems(fs, fs,
private[adam] def mergeFiles(conf: Configuration,
fs: FileSystem,
outputPath: Path,
tailPath: Path,
optHeaderPath: Option[Path] = None,
writeEmptyGzipBlock: Boolean = false,
writeCramEOF: Boolean = false,
optBufferSize: Option[Int] = None) {
mergeFilesAcrossFilesystems(conf,
fs, fs,
outputPath, tailPath, optHeaderPath = optHeaderPath,
writeEmptyGzipBlock = writeEmptyGzipBlock,
writeCramEOF = writeCramEOF,
bufferSize = bufferSize)
optBufferSize = optBufferSize)
}

/**
Expand All @@ -73,16 +83,24 @@ private[adam] object FileMerger extends Logging {
* @param writeEmptyGzipBlock If true, we write an empty GZIP block at the
* end of the merged file.
* @param writeCramEOF If true, we write CRAM's EOF signifier.
* @param bufferSize The size in bytes of the buffer used for copying.
* @param optBufferSize The size in bytes of the buffer used for copying. If
* not set, we check the config for this value. If that is not set, we
* default to 4MB.
*/
def mergeFilesAcrossFilesystems(fsIn: FileSystem,
fsOut: FileSystem,
outputPath: Path,
tailPath: Path,
optHeaderPath: Option[Path] = None,
writeEmptyGzipBlock: Boolean = false,
writeCramEOF: Boolean = false,
bufferSize: Int = 4 * 1024 * 1024) {
private[adam] def mergeFilesAcrossFilesystems(conf: Configuration,
fsIn: FileSystem,
fsOut: FileSystem,
outputPath: Path,
tailPath: Path,
optHeaderPath: Option[Path] = None,
writeEmptyGzipBlock: Boolean = false,
writeCramEOF: Boolean = false,
optBufferSize: Option[Int] = None) {

// check for buffer size in option, if not in option, check hadoop conf,
// if not in hadoop conf, fall back on 4MB
val bufferSize = optBufferSize.getOrElse(conf.getInt(BUFFER_SIZE_CONF,
4 * 1024 * 1024))

require(bufferSize > 0,
"Cannot have buffer size < 1. %d was provided.".format(bufferSize))
Expand Down
Expand Up @@ -294,7 +294,8 @@ case class FeatureRDD(rdd: RDD[Feature],
val fs = FileSystem.get(rdd.context.hadoopConfiguration)

// and then merge
FileMerger.mergeFiles(fs,
FileMerger.mergeFiles(rdd.context.hadoopConfiguration,
fs,
new Path(outputPath),
new Path(tailPath))
} else {
Expand Down Expand Up @@ -366,7 +367,11 @@ case class FeatureRDD(rdd: RDD[Feature],
intervalEntities.saveAsTextFile(tailPath.toString)

// merge
FileMerger.mergeFiles(fs, new Path(fileName), tailPath, Some(headPath))
FileMerger.mergeFiles(rdd.context.hadoopConfiguration,
fs,
new Path(fileName),
tailPath,
Some(headPath))
} else {
intervalEntities.saveAsTextFile(fileName)
}
Expand Down
Expand Up @@ -452,7 +452,8 @@ sealed trait AlignmentRecordRDD extends AvroReadGroupGenomicRDD[AlignmentRecord,
)

if (!deferMerging) {
FileMerger.mergeFiles(fs,
FileMerger.mergeFiles(rdd.context.hadoopConfiguration,
fs,
outputPath,
tailPath,
optHeaderPath = Some(headPath),
Expand Down
Expand Up @@ -190,7 +190,8 @@ case class VariantContextRDD(rdd: RDD[VariantContext],
)

// merge shards
FileMerger.mergeFiles(fs,
FileMerger.mergeFiles(rdd.context.hadoopConfiguration,
fs,
new Path(filePath),
new Path(tailPath),
Some(headPath))
Expand Down

0 comments on commit bb20b2b

Please sign in to comment.