Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADAM-1480] Add switch to disable the fast concat method. #1479

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -34,7 +34,7 @@ public static CoverageRDD conduit(final CoverageRDD recordRdd,
// make temp directory and save file
Path tempDir = Files.createTempDirectory("javaAC");
String fileName = tempDir.toString() + "/testRdd.coverage.adam";
recordRdd.save(fileName, false);
recordRdd.save(fileName, false, false);

// create a new adam context and load the file
JavaADAMContext jac = new JavaADAMContext(ac);
Expand Down
Expand Up @@ -34,7 +34,7 @@ public static FeatureRDD conduit(final FeatureRDD recordRdd,
// make temp directory and save file
Path tempDir = Files.createTempDirectory("javaAC");
String fileName = tempDir.toString() + "/testRdd.feature.adam";
recordRdd.save(fileName, false);
recordRdd.save(fileName, false, false);

// create a new adam context and load the file
JavaADAMContext jac = new JavaADAMContext(ac);
Expand Down
Expand Up @@ -56,6 +56,10 @@ class ADAM2VcfArgs extends Args4jBase with ParquetArgs {
@Args4jOption(required = false, name = "-single", usage = "Save as a single VCF file.")
var single: Boolean = false

@Args4jOption(required = false, name = "-disable_fast_concat",
usage = "Disables the parallel file concatenation engine.")
var disableFastConcat: Boolean = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a fan of double-negatives; is there a better way to do args4j flags that default to true?


@Args4jOption(required = false, name = "-stringency", usage = "Stringency level for various checks; can be SILENT, LENIENT, or STRICT. Defaults to STRICT")
var stringency: String = "STRICT"
}
Expand Down Expand Up @@ -95,6 +99,7 @@ class ADAM2Vcf(val args: ADAM2VcfArgs) extends BDGSparkCommand[ADAM2VcfArgs] wit

maybeSortedVcs.saveAsVcf(args.outputPath,
asSingleFile = args.single,
stringency)
stringency,
disableFastConcat = args.disableFastConcat)
}
}
Expand Up @@ -45,6 +45,8 @@ class Fragments2ReadsArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetAr
var sortReads: Boolean = false
@Args4jOption(required = false, name = "-defer_merging", usage = "Defers merging single file output")
var deferMerging: Boolean = false
@Args4jOption(required = false, name = "-disable_fast_concat", usage = "Disables the parallel file concatenation engine.")
var disableFastConcat: Boolean = false
@Args4jOption(required = false, name = "-sort_lexicographically", usage = "Sort the reads lexicographically by contig name, instead of by index.")
var sortLexicographically: Boolean = false
@Args4jOption(required = false, name = "-mark_duplicate_reads", usage = "Mark duplicate reads")
Expand Down
Expand Up @@ -56,6 +56,8 @@ class Reads2CoverageArgs extends Args4jBase with ParquetArgs {
var onlyPositiveStrands: Boolean = false
@Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file")
var asSingleFile: Boolean = false
@Args4jOption(required = false, name = "-disable_fast_concat", usage = "Disables the parallel file concatenation engine.")
var disableFastConcat: Boolean = false
}

class Reads2Coverage(protected val args: Reads2CoverageArgs) extends BDGSparkCommand[Reads2CoverageArgs] {
Expand All @@ -81,6 +83,8 @@ class Reads2Coverage(protected val args: Reads2CoverageArgs) extends BDGSparkCom
}

finalReads.toCoverage(args.collapse)
.save(args.outputPath, asSingleFile = args.asSingleFile)
.save(args.outputPath,
asSingleFile = args.asSingleFile,
disableFastConcat = args.disableFastConcat)
}
}
Expand Up @@ -43,6 +43,7 @@ class Reads2FragmentsArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetAr
var asSingleFile = false
var sortFastqOutput = false
var deferMerging = false
var disableFastConcat = false
}

class Reads2Fragments(protected val args: Reads2FragmentsArgs) extends BDGSparkCommand[Reads2FragmentsArgs] with Logging {
Expand Down
Expand Up @@ -103,6 +103,8 @@ class TransformArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs {
var asSingleFile: Boolean = false
@Args4jOption(required = false, name = "-defer_merging", usage = "Defers merging single file output")
var deferMerging: Boolean = false
@Args4jOption(required = false, name = "-disable_fast_concat", usage = "Disables the parallel file concatenation engine.")
var disableFastConcat: Boolean = false
@Args4jOption(required = false, name = "-paired_fastq", usage = "When converting two (paired) FASTQ files to ADAM, pass the path to the second file here.")
var pairedFastqFile: String = null
@Args4jOption(required = false, name = "-record_group", usage = "Set converted FASTQs' record-group names to this value; if empty-string is passed, use the basename of the input file, minus the extension.")
Expand Down
Expand Up @@ -49,6 +49,10 @@ class TransformFeaturesArgs extends Args4jBase with ParquetSaveArgs {
usage = "Save as a single file, for the text formats.")
var single: Boolean = false

@Args4jOption(required = false, name = "-disable_fast_concat",
usage = "Disables the parallel file concatenation engine.")
var disableFastConcat: Boolean = false

@Args4jOption(required = false, name = "-cache", usage = "Cache before building the sequence dictionary. Recommended for formats other than IntervalList and Parquet.")
var cache: Boolean = false

Expand All @@ -69,6 +73,6 @@ class TransformFeatures(val args: TransformFeaturesArgs)
optStorageLevel = optStorageLevel,
optMinPartitions = Option(args.numPartitions),
optProjection = None
).save(args.outputPath, args.single)
).save(args.outputPath, args.single, args.disableFastConcat)
}
}
12 changes: 10 additions & 2 deletions adam-cli/src/main/scala/org/bdgenomics/adam/cli/View.scala
Expand Up @@ -79,10 +79,18 @@ class ViewArgs extends Args4jBase with ParquetArgs with ADAMSaveAnyArgs {
)
var outputPathArg: String = null

// required by ADAMAnySaveArgs
var sortFastqOutput: Boolean = false
@Args4jOption(required = false, name = "-single",
usage = "Saves OUTPUT as single file")
var asSingleFile: Boolean = false
@Args4jOption(required = false, name = "-defer_merging",
usage = "Defers merging single file output")
var deferMerging: Boolean = false
@Args4jOption(required = false, name = "-disable_fast_concat",
usage = "Disables the parallel file concatenation engine.")
var disableFastConcat: Boolean = false

// required by ADAMAnySaveArgs
var sortFastqOutput: Boolean = false
}

object View extends BDGCommandCompanion {
Expand Down
Expand Up @@ -64,6 +64,12 @@ trait ADAMSaveAnyArgs extends SaveArgs {
* @see asSingleFile
*/
var deferMerging: Boolean

/**
* If asSingleFile is true and deferMerging is false, disables the use of the
* fast file concatenation engine.
*/
var disableFastConcat: Boolean
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per this, #1295, and #1438, perhaps we should brainstorm on how to clean up the save methods and ensure consistency. One or more enums, better SaveArgs classes, a context map, etc.

}

private[rdd] abstract class ADAMRDDFunctions[T <% IndexedRecord: Manifest] extends Serializable with Logging {
Expand Down
Expand Up @@ -56,6 +56,11 @@ private[adam] object FileMerger extends Logging {
* @param optBufferSize The size in bytes of the buffer used for copying. If
* not set, we check the config for this value. If that is not set, we
* default to 4MB.
* @param disableFastConcat If true, disables the parallel file merger. By
* default, the fast file merger is invoked when running on HDFS. However,
* the fast file merger can fail if the underlying file system is encrypted,
* or any number of undocumented invariants are not met. In that case, we
* provide this switch to disable fast merging.
*
* @see mergeFilesAcrossFilesystems
*/
Expand All @@ -66,10 +71,11 @@ private[adam] object FileMerger extends Logging {
optHeaderPath: Option[Path] = None,
writeEmptyGzipBlock: Boolean = false,
writeCramEOF: Boolean = false,
optBufferSize: Option[Int] = None) {
optBufferSize: Option[Int] = None,
disableFastConcat: Boolean = false) {

// if our file system is an hdfs mount, we can use the parallel merger
if (fs.getScheme == "hdfs") {
if (!disableFastConcat && (fs.getScheme == "hdfs")) {
ParallelFileMerger.mergeFiles(sc,
outputPath,
tailPath,
Expand Down
Expand Up @@ -43,7 +43,8 @@ private[rdd] class JavaSaveArgs(var outputPath: String,
var pageSize: Int = 1 * 1024 * 1024,
var compressionCodec: CompressionCodecName = CompressionCodecName.GZIP,
var disableDictionaryEncoding: Boolean = false,
var asSingleFile: Boolean = false) extends ADAMSaveAnyArgs {
var asSingleFile: Boolean = false,
var disableFastConcat: Boolean = false) extends ADAMSaveAnyArgs {
var sortFastqOutput = false
var deferMerging = false
}
Expand Down
Expand Up @@ -88,9 +88,18 @@ case class CoverageRDD(rdd: RDD[Coverage],
* val coverage = feature.getScore
*
* @param filePath The location to write the output.
* @param asSingleFile If false, writes file to disk as shards with
* one shard per partition. If true, we save the file to disk as a single
* file by merging the shards.
* @param disableFastConcat If asSingleFile is true, disables the use of the
* fast file concatenation engine.
*/
def save(filePath: java.lang.String, asSingleFile: java.lang.Boolean) = {
this.toFeatureRDD.save(filePath, asSingleFile = asSingleFile)
def save(filePath: java.lang.String,
asSingleFile: java.lang.Boolean,
disableFastConcat: java.lang.Boolean) = {
toFeatureRDD.save(filePath,
asSingleFile,
disableFastConcat)
}

/**
Expand Down
Expand Up @@ -262,20 +262,34 @@ case class FeatureRDD(rdd: RDD[Feature],
* @param asSingleFile If false, writes file to disk as shards with
* one shard per partition. If true, we save the file to disk as a single
* file by merging the shards.
* @param disableFastConcat If asSingleFile is true, disables the use of the
* fast file concatenation engine.
*/
def save(filePath: java.lang.String, asSingleFile: java.lang.Boolean) {
def save(filePath: java.lang.String,
asSingleFile: java.lang.Boolean,
disableFastConcat: java.lang.Boolean) {
if (filePath.endsWith(".bed")) {
saveAsBed(filePath, asSingleFile = asSingleFile)
saveAsBed(filePath,
asSingleFile = asSingleFile,
disableFastConcat = disableFastConcat)
} else if (filePath.endsWith(".gtf") ||
filePath.endsWith(".gff")) {
saveAsGtf(filePath, asSingleFile = asSingleFile)
saveAsGtf(filePath,
asSingleFile = asSingleFile,
disableFastConcat = disableFastConcat)
} else if (filePath.endsWith(".gff3")) {
saveAsGff3(filePath, asSingleFile = asSingleFile)
saveAsGff3(filePath,
asSingleFile = asSingleFile,
disableFastConcat = disableFastConcat)
} else if (filePath.endsWith(".narrowPeak") ||
filePath.endsWith(".narrowpeak")) {
saveAsNarrowPeak(filePath, asSingleFile = asSingleFile)
saveAsNarrowPeak(filePath,
asSingleFile = asSingleFile,
disableFastConcat = disableFastConcat)
} else if (filePath.endsWith(".interval_list")) {
saveAsIntervalList(filePath, asSingleFile = asSingleFile)
saveAsIntervalList(filePath,
asSingleFile = asSingleFile,
disableFastConcat = disableFastConcat)
} else {
if (asSingleFile) {
log.warn("asSingleFile = true ignored when saving as Parquet.")
Expand Down Expand Up @@ -317,10 +331,13 @@ case class FeatureRDD(rdd: RDD[Feature],
* @param rdd RDD to save.
* @param outputPath Output path to save text files to.
* @param asSingleFile If true, combines all partition shards.
* @param disableFastConcat If asSingleFile is true, disables the use of the
* parallel file merging engine.
*/
private def writeTextRdd[T](rdd: RDD[T],
outputPath: String,
asSingleFile: Boolean) {
asSingleFile: Boolean,
disableFastConcat: Boolean) {
if (asSingleFile) {

// write rdd to disk
Expand All @@ -334,7 +351,8 @@ case class FeatureRDD(rdd: RDD[Feature],
FileMerger.mergeFiles(rdd.context,
fs,
new Path(outputPath),
new Path(tailPath))
new Path(tailPath),
disableFastConcat = disableFastConcat)
} else {
rdd.saveAsTextFile(outputPath)
}
Expand All @@ -347,9 +365,16 @@ case class FeatureRDD(rdd: RDD[Feature],
* @param asSingleFile By default (false), writes file to disk as shards with
* one shard per partition. If true, we save the file to disk as a single
* file by merging the shards.
* @param disableFastConcat If asSingleFile is true, disables the use of the
* parallel file merging engine.
*/
def saveAsGtf(fileName: String, asSingleFile: Boolean = false) = {
writeTextRdd(rdd.map(FeatureRDD.toGtf), fileName, asSingleFile)
def saveAsGtf(fileName: String,
asSingleFile: Boolean = false,
disableFastConcat: Boolean = false) = {
writeTextRdd(rdd.map(FeatureRDD.toGtf),
fileName,
asSingleFile,
disableFastConcat)
}

/**
Expand All @@ -359,9 +384,16 @@ case class FeatureRDD(rdd: RDD[Feature],
* @param asSingleFile By default (false), writes file to disk as shards with
* one shard per partition. If true, we save the file to disk as a single
* file by merging the shards.
* @param disableFastConcat If asSingleFile is true, disables the use of the
* parallel file merging engine.
*/
def saveAsGff3(fileName: String, asSingleFile: Boolean = false) = {
writeTextRdd(rdd.map(FeatureRDD.toGff3), fileName, asSingleFile)
def saveAsGff3(fileName: String,
asSingleFile: Boolean = false,
disableFastConcat: Boolean = false) = {
writeTextRdd(rdd.map(FeatureRDD.toGff3),
fileName,
asSingleFile,
disableFastConcat)
}

/**
Expand All @@ -371,9 +403,16 @@ case class FeatureRDD(rdd: RDD[Feature],
* @param asSingleFile By default (false), writes file to disk as shards with
* one shard per partition. If true, we save the file to disk as a single
* file by merging the shards.
* @param disableFastConcat If asSingleFile is true, disables the use of the
* parallel file merging engine.
*/
def saveAsBed(fileName: String, asSingleFile: Boolean = false) = {
writeTextRdd(rdd.map(FeatureRDD.toBed), fileName, asSingleFile)
def saveAsBed(fileName: String,
asSingleFile: Boolean = false,
disableFastConcat: Boolean = false) = {
writeTextRdd(rdd.map(FeatureRDD.toBed),
fileName,
asSingleFile,
disableFastConcat)
}

/**
Expand All @@ -383,8 +422,12 @@ case class FeatureRDD(rdd: RDD[Feature],
* @param asSingleFile By default (false), writes file to disk as shards with
* one shard per partition. If true, we save the file to disk as a single
* file by merging the shards.
* @param disableFastConcat If asSingleFile is true, disables the use of the
* parallel file merging engine.
*/
def saveAsIntervalList(fileName: String, asSingleFile: Boolean = false) = {
def saveAsIntervalList(fileName: String,
asSingleFile: Boolean = false,
disableFastConcat: Boolean = false) = {
val intervalEntities = rdd.map(FeatureRDD.toInterval)

if (asSingleFile) {
Expand All @@ -407,7 +450,8 @@ case class FeatureRDD(rdd: RDD[Feature],
fs,
new Path(fileName),
tailPath,
Some(headPath))
optHeaderPath = Some(headPath),
disableFastConcat = disableFastConcat)
} else {
intervalEntities.saveAsTextFile(fileName)
}
Expand All @@ -420,9 +464,16 @@ case class FeatureRDD(rdd: RDD[Feature],
* @param asSingleFile By default (false), writes file to disk as shards with
* one shard per partition. If true, we save the file to disk as a single
* file by merging the shards.
* @param disableFastConcat If asSingleFile is true, disables the use of the
* parallel file merging engine.
*/
def saveAsNarrowPeak(fileName: String, asSingleFile: Boolean = false) {
writeTextRdd(rdd.map(FeatureRDD.toNarrowPeak), fileName, asSingleFile)
def saveAsNarrowPeak(fileName: String,
asSingleFile: Boolean = false,
disableFastConcat: Boolean = false) {
writeTextRdd(rdd.map(FeatureRDD.toNarrowPeak),
fileName,
asSingleFile,
disableFastConcat)
}

/**
Expand Down
Expand Up @@ -234,7 +234,8 @@ case class AlignmentRecordRDD(
args.outputPath,
isSorted = isSorted,
asSingleFile = args.asSingleFile,
deferMerging = args.deferMerging
deferMerging = args.deferMerging,
disableFastConcat = args.disableFastConcat
)
true
} else {
Expand Down Expand Up @@ -382,13 +383,16 @@ case class AlignmentRecordRDD(
* @param isSorted If the output is sorted, this will modify the header.
* @param deferMerging If true and asSingleFile is true, we will save the
* output shards as a headerless file, but we will not merge the shards.
* @param disableFastConcat If asSingleFile is true and deferMerging is false,
* disables the use of the parallel file merging engine.
*/
def saveAsSam(
filePath: String,
asType: Option[SAMFormat] = None,
asSingleFile: Boolean = false,
isSorted: Boolean = false,
deferMerging: Boolean = false): Unit = SAMSave.time {
deferMerging: Boolean = false,
disableFastConcat: Boolean = false): Unit = SAMSave.time {

val fileType = asType.getOrElse(SAMFormat.inferFromFilePath(filePath))

Expand Down Expand Up @@ -531,7 +535,8 @@ case class AlignmentRecordRDD(
tailPath,
optHeaderPath = Some(headPath),
writeEmptyGzipBlock = (fileType == SAMFormat.BAM),
writeCramEOF = (fileType == SAMFormat.CRAM))
writeCramEOF = (fileType == SAMFormat.CRAM),
disableFastConcat = disableFastConcat)
}
}
}
Expand Down