Skip to content

Commit

Permalink
[ADAM-1480] Add switch to disable the fast concat method.
Browse files Browse the repository at this point in the history
Resolves #1478.
  • Loading branch information
fnothaft committed Apr 7, 2017
1 parent 93b32c6 commit 9f4c33b
Show file tree
Hide file tree
Showing 19 changed files with 154 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public static CoverageRDD conduit(final CoverageRDD recordRdd,
// make temp directory and save file
Path tempDir = Files.createTempDirectory("javaAC");
String fileName = tempDir.toString() + "/testRdd.coverage.adam";
recordRdd.save(fileName, false);
recordRdd.save(fileName, false, false);

// create a new adam context and load the file
JavaADAMContext jac = new JavaADAMContext(ac);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public static FeatureRDD conduit(final FeatureRDD recordRdd,
// make temp directory and save file
Path tempDir = Files.createTempDirectory("javaAC");
String fileName = tempDir.toString() + "/testRdd.feature.adam";
recordRdd.save(fileName, false);
recordRdd.save(fileName, false, false);

// create a new adam context and load the file
JavaADAMContext jac = new JavaADAMContext(ac);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ class ADAM2VcfArgs extends Args4jBase with ParquetArgs {
@Args4jOption(required = false, name = "-single", usage = "Save as a single VCF file.")
var single: Boolean = false

@Args4jOption(required = false, name = "-disable_fast_concat",
usage = "Disables the parallel file concatenation engine.")
var disableFastConcat: Boolean = false

@Args4jOption(required = false, name = "-stringency", usage = "Stringency level for various checks; can be SILENT, LENIENT, or STRICT. Defaults to STRICT")
var stringency: String = "STRICT"
}
Expand Down Expand Up @@ -95,6 +99,7 @@ class ADAM2Vcf(val args: ADAM2VcfArgs) extends BDGSparkCommand[ADAM2VcfArgs] wit

maybeSortedVcs.saveAsVcf(args.outputPath,
asSingleFile = args.single,
stringency)
stringency,
disableFastConcat = args.disableFastConcat)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class Fragments2ReadsArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetAr
var sortReads: Boolean = false
@Args4jOption(required = false, name = "-defer_merging", usage = "Defers merging single file output")
var deferMerging: Boolean = false
@Args4jOption(required = false, name = "-disable_fast_concat", usage = "Disables the parallel file concatenation engine.")
var disableFastConcat: Boolean = false
@Args4jOption(required = false, name = "-sort_lexicographically", usage = "Sort the reads lexicographically by contig name, instead of by index.")
var sortLexicographically: Boolean = false
@Args4jOption(required = false, name = "-mark_duplicate_reads", usage = "Mark duplicate reads")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class Reads2CoverageArgs extends Args4jBase with ParquetArgs {
var onlyPositiveStrands: Boolean = false
@Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file")
var asSingleFile: Boolean = false
@Args4jOption(required = false, name = "-disable_fast_concat", usage = "Disables the parallel file concatenation engine.")
var disableFastConcat: Boolean = false
}

class Reads2Coverage(protected val args: Reads2CoverageArgs) extends BDGSparkCommand[Reads2CoverageArgs] {
Expand All @@ -81,6 +83,8 @@ class Reads2Coverage(protected val args: Reads2CoverageArgs) extends BDGSparkCom
}

finalReads.toCoverage(args.collapse)
.save(args.outputPath, asSingleFile = args.asSingleFile)
.save(args.outputPath,
asSingleFile = args.asSingleFile,
disableFastConcat = args.disableFastConcat)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class Reads2FragmentsArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetAr
var asSingleFile = false
var sortFastqOutput = false
var deferMerging = false
var disableFastConcat = false
}

class Reads2Fragments(protected val args: Reads2FragmentsArgs) extends BDGSparkCommand[Reads2FragmentsArgs] with Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ class TransformArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs {
var asSingleFile: Boolean = false
@Args4jOption(required = false, name = "-defer_merging", usage = "Defers merging single file output")
var deferMerging: Boolean = false
@Args4jOption(required = false, name = "-disable_fast_concat", usage = "Disables the parallel file concatenation engine.")
var disableFastConcat: Boolean = false
@Args4jOption(required = false, name = "-paired_fastq", usage = "When converting two (paired) FASTQ files to ADAM, pass the path to the second file here.")
var pairedFastqFile: String = null
@Args4jOption(required = false, name = "-record_group", usage = "Set converted FASTQs' record-group names to this value; if empty-string is passed, use the basename of the input file, minus the extension.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ class TransformFeaturesArgs extends Args4jBase with ParquetSaveArgs {
usage = "Save as a single file, for the text formats.")
var single: Boolean = false

@Args4jOption(required = false, name = "-disable_fast_concat",
usage = "Disables the parallel file concatenation engine.")
var disableFastConcat: Boolean = false

@Args4jOption(required = false, name = "-cache", usage = "Cache before building the sequence dictionary. Recommended for formats other than IntervalList and Parquet.")
var cache: Boolean = false

Expand All @@ -69,6 +73,6 @@ class TransformFeatures(val args: TransformFeaturesArgs)
optStorageLevel = optStorageLevel,
projection = None,
minPartitions = Option(args.numPartitions)
).save(args.outputPath, args.single)
).save(args.outputPath, args.single, args.disableFastConcat)
}
}
12 changes: 10 additions & 2 deletions adam-cli/src/main/scala/org/bdgenomics/adam/cli/View.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,18 @@ class ViewArgs extends Args4jBase with ParquetArgs with ADAMSaveAnyArgs {
)
var outputPathArg: String = null

// required by ADAMAnySaveArgs
var sortFastqOutput: Boolean = false
@Args4jOption(required = false, name = "-single",
usage = "Saves OUTPUT as single file")
var asSingleFile: Boolean = false
@Args4jOption(required = false, name = "-defer_merging",
usage = "Defers merging single file output")
var deferMerging: Boolean = false
@Args4jOption(required = false, name = "-disable_fast_concat",
usage = "Disables the parallel file concatenation engine.")
var disableFastConcat: Boolean = false

// required by ADAMAnySaveArgs
var sortFastqOutput: Boolean = false
}

object View extends BDGCommandCompanion {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ trait ADAMSaveAnyArgs extends SaveArgs {
* @see asSingleFile
*/
var deferMerging: Boolean

/**
* If asSingleFile is true and deferMerging is false, disables the use of the
* fast file concatenation engine.
*/
var disableFastConcat: Boolean
}

private[rdd] abstract class ADAMRDDFunctions[T <% IndexedRecord: Manifest] extends Serializable with Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ private[adam] object FileMerger extends Logging {
* @param optBufferSize The size in bytes of the buffer used for copying. If
* not set, we check the config for this value. If that is not set, we
* default to 4MB.
* @param disableFastConcat If true, disables the parallel file merger. By
* default, the fast file merger is invoked when running on HDFS. However,
* the fast file merger can fail if the underlying file system is encrypted,
* or any number of undocumented invariants are not met. In that case, we
* provide this switch to disable fast merging.
*
* @see mergeFilesAcrossFilesystems
*/
Expand All @@ -66,10 +71,11 @@ private[adam] object FileMerger extends Logging {
optHeaderPath: Option[Path] = None,
writeEmptyGzipBlock: Boolean = false,
writeCramEOF: Boolean = false,
optBufferSize: Option[Int] = None) {
optBufferSize: Option[Int] = None,
disableFastConcat: Boolean = false) {

// if our file system is an hdfs mount, we can use the parallel merger
if (fs.getScheme == "hdfs") {
if (!disableFastConcat && (fs.getScheme == "hdfs")) {
ParallelFileMerger.mergeFiles(sc,
outputPath,
tailPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ private[rdd] class JavaSaveArgs(var outputPath: String,
var pageSize: Int = 1 * 1024 * 1024,
var compressionCodec: CompressionCodecName = CompressionCodecName.GZIP,
var disableDictionaryEncoding: Boolean = false,
var asSingleFile: Boolean = false) extends ADAMSaveAnyArgs {
var asSingleFile: Boolean = false,
var disableFastConcat: Boolean = false) extends ADAMSaveAnyArgs {
var sortFastqOutput = false
var deferMerging = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,18 @@ case class CoverageRDD(rdd: RDD[Coverage],
* val coverage = feature.getScore
*
* @param filePath The location to write the output.
* @param asSingleFile If false, writes file to disk as shards with
* one shard per partition. If true, we save the file to disk as a single
* file by merging the shards.
* @param disableFastConcat If asSingleFile is true, disables the use of the
* fast file concatenation engine.
*/
def save(filePath: java.lang.String, asSingleFile: java.lang.Boolean) = {
this.toFeatureRDD.save(filePath, asSingleFile = asSingleFile)
def save(filePath: java.lang.String,
asSingleFile: java.lang.Boolean,
disableFastConcat: java.lang.Boolean) = {
toFeatureRDD.save(filePath,
asSingleFile,
disableFastConcat)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,20 +262,34 @@ case class FeatureRDD(rdd: RDD[Feature],
* @param asSingleFile If false, writes file to disk as shards with
* one shard per partition. If true, we save the file to disk as a single
* file by merging the shards.
* @param disableFastConcat If asSingleFile is true, disables the use of the
* fast file concatenation engine.
*/
def save(filePath: java.lang.String, asSingleFile: java.lang.Boolean) {
def save(filePath: java.lang.String,
asSingleFile: java.lang.Boolean,
disableFastConcat: java.lang.Boolean) {
if (filePath.endsWith(".bed")) {
saveAsBed(filePath, asSingleFile = asSingleFile)
saveAsBed(filePath,
asSingleFile = asSingleFile,
disableFastConcat = disableFastConcat)
} else if (filePath.endsWith(".gtf") ||
filePath.endsWith(".gff")) {
saveAsGtf(filePath, asSingleFile = asSingleFile)
saveAsGtf(filePath,
asSingleFile = asSingleFile,
disableFastConcat = disableFastConcat)
} else if (filePath.endsWith(".gff3")) {
saveAsGff3(filePath, asSingleFile = asSingleFile)
saveAsGff3(filePath,
asSingleFile = asSingleFile,
disableFastConcat = disableFastConcat)
} else if (filePath.endsWith(".narrowPeak") ||
filePath.endsWith(".narrowpeak")) {
saveAsNarrowPeak(filePath, asSingleFile = asSingleFile)
saveAsNarrowPeak(filePath,
asSingleFile = asSingleFile,
disableFastConcat = disableFastConcat)
} else if (filePath.endsWith(".interval_list")) {
saveAsIntervalList(filePath, asSingleFile = asSingleFile)
saveAsIntervalList(filePath,
asSingleFile = asSingleFile,
disableFastConcat = disableFastConcat)
} else {
if (asSingleFile) {
log.warn("asSingleFile = true ignored when saving as Parquet.")
Expand Down Expand Up @@ -317,10 +331,13 @@ case class FeatureRDD(rdd: RDD[Feature],
* @param rdd RDD to save.
* @param outputPath Output path to save text files to.
* @param asSingleFile If true, combines all partition shards.
* @param disableFastConcat If asSingleFile is true, disables the use of the
* parallel file merging engine.
*/
private def writeTextRdd[T](rdd: RDD[T],
outputPath: String,
asSingleFile: Boolean) {
asSingleFile: Boolean,
disableFastConcat: Boolean) {
if (asSingleFile) {

// write rdd to disk
Expand All @@ -334,7 +351,8 @@ case class FeatureRDD(rdd: RDD[Feature],
FileMerger.mergeFiles(rdd.context,
fs,
new Path(outputPath),
new Path(tailPath))
new Path(tailPath),
disableFastConcat = disableFastConcat)
} else {
rdd.saveAsTextFile(outputPath)
}
Expand All @@ -347,9 +365,16 @@ case class FeatureRDD(rdd: RDD[Feature],
* @param asSingleFile By default (false), writes file to disk as shards with
* one shard per partition. If true, we save the file to disk as a single
* file by merging the shards.
* @param disableFastConcat If asSingleFile is true, disables the use of the
* parallel file merging engine.
*/
def saveAsGtf(fileName: String, asSingleFile: Boolean = false) = {
writeTextRdd(rdd.map(FeatureRDD.toGtf), fileName, asSingleFile)
def saveAsGtf(fileName: String,
asSingleFile: Boolean = false,
disableFastConcat: Boolean = false) = {
writeTextRdd(rdd.map(FeatureRDD.toGtf),
fileName,
asSingleFile,
disableFastConcat)
}

/**
Expand All @@ -359,9 +384,16 @@ case class FeatureRDD(rdd: RDD[Feature],
* @param asSingleFile By default (false), writes file to disk as shards with
* one shard per partition. If true, we save the file to disk as a single
* file by merging the shards.
* @param disableFastConcat If asSingleFile is true, disables the use of the
* parallel file merging engine.
*/
def saveAsGff3(fileName: String, asSingleFile: Boolean = false) = {
writeTextRdd(rdd.map(FeatureRDD.toGff3), fileName, asSingleFile)
def saveAsGff3(fileName: String,
asSingleFile: Boolean = false,
disableFastConcat: Boolean = false) = {
writeTextRdd(rdd.map(FeatureRDD.toGff3),
fileName,
asSingleFile,
disableFastConcat)
}

/**
Expand All @@ -371,9 +403,16 @@ case class FeatureRDD(rdd: RDD[Feature],
* @param asSingleFile By default (false), writes file to disk as shards with
* one shard per partition. If true, we save the file to disk as a single
* file by merging the shards.
* @param disableFastConcat If asSingleFile is true, disables the use of the
* parallel file merging engine.
*/
def saveAsBed(fileName: String, asSingleFile: Boolean = false) = {
writeTextRdd(rdd.map(FeatureRDD.toBed), fileName, asSingleFile)
def saveAsBed(fileName: String,
asSingleFile: Boolean = false,
disableFastConcat: Boolean = false) = {
writeTextRdd(rdd.map(FeatureRDD.toBed),
fileName,
asSingleFile,
disableFastConcat)
}

/**
Expand All @@ -383,8 +422,12 @@ case class FeatureRDD(rdd: RDD[Feature],
* @param asSingleFile By default (false), writes file to disk as shards with
* one shard per partition. If true, we save the file to disk as a single
* file by merging the shards.
* @param disableFastConcat If asSingleFile is true, disables the use of the
* parallel file merging engine.
*/
def saveAsIntervalList(fileName: String, asSingleFile: Boolean = false) = {
def saveAsIntervalList(fileName: String,
asSingleFile: Boolean = false,
disableFastConcat: Boolean = false) = {
val intervalEntities = rdd.map(FeatureRDD.toInterval)

if (asSingleFile) {
Expand All @@ -407,7 +450,8 @@ case class FeatureRDD(rdd: RDD[Feature],
fs,
new Path(fileName),
tailPath,
Some(headPath))
optHeaderPath = Some(headPath),
disableFastConcat = disableFastConcat)
} else {
intervalEntities.saveAsTextFile(fileName)
}
Expand All @@ -420,9 +464,16 @@ case class FeatureRDD(rdd: RDD[Feature],
* @param asSingleFile By default (false), writes file to disk as shards with
* one shard per partition. If true, we save the file to disk as a single
* file by merging the shards.
* @param disableFastConcat If asSingleFile is true, disables the use of the
* parallel file merging engine.
*/
def saveAsNarrowPeak(fileName: String, asSingleFile: Boolean = false) {
writeTextRdd(rdd.map(FeatureRDD.toNarrowPeak), fileName, asSingleFile)
def saveAsNarrowPeak(fileName: String,
asSingleFile: Boolean = false,
disableFastConcat: Boolean = false) {
writeTextRdd(rdd.map(FeatureRDD.toNarrowPeak),
fileName,
asSingleFile,
disableFastConcat)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ case class AlignmentRecordRDD(
args.outputPath,
isSorted = isSorted,
asSingleFile = args.asSingleFile,
deferMerging = args.deferMerging
deferMerging = args.deferMerging,
disableFastConcat = args.disableFastConcat
)
true
} else {
Expand Down Expand Up @@ -382,13 +383,16 @@ case class AlignmentRecordRDD(
* @param isSorted If the output is sorted, this will modify the header.
* @param deferMerging If true and asSingleFile is true, we will save the
* output shards as a headerless file, but we will not merge the shards.
* @param disableFastConcat If asSingleFile is true and deferMerging is false,
* disables the use of the parallel file merging engine.
*/
def saveAsSam(
filePath: String,
asType: Option[SAMFormat] = None,
asSingleFile: Boolean = false,
isSorted: Boolean = false,
deferMerging: Boolean = false): Unit = SAMSave.time {
deferMerging: Boolean = false,
disableFastConcat: Boolean = false): Unit = SAMSave.time {

val fileType = asType.getOrElse(SAMFormat.inferFromFilePath(filePath))

Expand Down Expand Up @@ -531,7 +535,8 @@ case class AlignmentRecordRDD(
tailPath,
optHeaderPath = Some(headPath),
writeEmptyGzipBlock = (fileType == SAMFormat.BAM),
writeCramEOF = (fileType == SAMFormat.CRAM))
writeCramEOF = (fileType == SAMFormat.CRAM),
disableFastConcat = disableFastConcat)
}
}
}
Expand Down
Loading

0 comments on commit 9f4c33b

Please sign in to comment.