Skip to content

Commit

Permalink
transformFragments sort_reads --> sort_by*
Browse files Browse the repository at this point in the history
  • Loading branch information
heuermh committed Jun 28, 2019
1 parent bc1bde4 commit ab03d15
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 31 deletions.
Expand Up @@ -53,100 +53,148 @@ object TransformAlignments extends BDGCommandCompanion {
}

class TransformAlignmentsArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs {

@Argument(required = true, metaVar = "INPUT", usage = "The ADAM, BAM or SAM file to apply the transforms to", index = 0)
var inputPath: String = null

@Argument(required = true, metaVar = "OUTPUT", usage = "Location to write the transformed data in ADAM/Parquet format", index = 1)
var outputPath: String = null

@Args4jOption(required = false, name = "-limit_projection", usage = "Only project necessary fields. Only works for Parquet files.")
var limitProjection: Boolean = false

@Args4jOption(required = false, name = "-aligned_read_predicate", usage = "Only load aligned reads. Only works for Parquet files. Exclusive of region predicate.")
var useAlignedReadPredicate: Boolean = false

@Args4jOption(required = false, name = "-region_predicate", usage = "Only load a specific range of regions. Mutually exclusive with aligned read predicate.")
var regionPredicate: String = null

@Args4jOption(required = false, name = "-sort_by_read_name", usage = "Sort alignments by read name.")
var sortByReadName: Boolean = false

@Args4jOption(required = false, name = "-sort_by_reference_position", usage = "Sort alignments by reference position, with references ordered by name.")
var sortByReferencePosition: Boolean = false

@Args4jOption(required = false, name = "-sort_by_reference_position_and_index", usage = "Sort alignments by reference position, with references ordered by index.")
var sortByReferencePositionAndIndex: Boolean = false

@Args4jOption(required = false, name = "-mark_duplicate_reads", usage = "Mark duplicate reads")
var markDuplicates: Boolean = false

@Args4jOption(required = false, name = "-recalibrate_base_qualities", usage = "Recalibrate the base quality scores (ILLUMINA only)")
var recalibrateBaseQualities: Boolean = false

@Args4jOption(required = false, name = "-min_acceptable_quality", usage = "Minimum acceptable quality for recalibrating a base in a read. Default is 5.")
var minAcceptableQuality: Int = 5

@Args4jOption(required = false, name = "-sampling_fraction", usage = "Fraction of reads to sample during recalibration; omitted by default.")
var samplingFraction: Double = 0.0

@Args4jOption(required = false, name = "-sampling_seed", usage = "Seed to use when sampling during recalibration; omitted by default by setting to 0 (thus, 0 is an invalid seed).")
var samplingSeed: Long = 0L

@Args4jOption(required = false, name = "-stringency", usage = "Stringency level for various checks; can be SILENT, LENIENT, or STRICT. Defaults to LENIENT")
var stringency: String = "LENIENT"

@Args4jOption(required = false, name = "-known_snps", usage = "Sites-only VCF giving location of known SNPs")
var knownSnpsFile: String = null

@Args4jOption(required = false, name = "-realign_indels", usage = "Locally realign indels present in reads.")
var locallyRealign: Boolean = false

@Args4jOption(required = false, name = "-known_indels", usage = "VCF file including locations of known INDELs. If none is provided, default consensus model will be used.")
var knownIndelsFile: String = null

@Args4jOption(required = false, name = "-max_indel_size", usage = "The maximum length of an INDEL to realign to. Default value is 500.")
var maxIndelSize = 500

@Args4jOption(required = false, name = "-max_consensus_number", usage = "The maximum number of consensus to try realigning a target region to. Default value is 30.")
var maxConsensusNumber = 30

@Args4jOption(required = false, name = "-log_odds_threshold", usage = "The log-odds threshold for accepting a realignment. Default value is 5.0.")
var lodThreshold = 5.0

@Args4jOption(required = false, name = "-unclip_reads", usage = "If true, unclips reads during realignment.")
var unclipReads = false

@Args4jOption(required = false, name = "-max_target_size", usage = "The maximum length of a target region to attempt realigning. Default length is 3000.")
var maxTargetSize = 3000

@Args4jOption(required = false, name = "-max_reads_per_target", usage = "The maximum number of reads attached to a target considered for realignment. Default is 20000.")
var maxReadsPerTarget = 20000

@Args4jOption(required = false, name = "-reference", usage = "Path to a reference file to use for indel realignment.")
var reference: String = null

@Args4jOption(required = false, name = "-repartition", usage = "Set the number of partitions to map data to")
var repartition: Int = -1

@Args4jOption(required = false, name = "-coalesce", usage = "Set the number of partitions written to the ADAM output directory")
var coalesce: Int = -1

@Args4jOption(required = false, name = "-force_shuffle_coalesce", usage = "Even if the repartitioned RDD has fewer partitions, force a shuffle.")
var forceShuffle: Boolean = false

@Args4jOption(required = false, name = "-sort_fastq_output", usage = "Sets whether to sort the FASTQ output, if saving as FASTQ. False by default. Ignored if not saving as FASTQ.")
var sortFastqOutput: Boolean = false

@Args4jOption(required = false, name = "-force_load_bam", usage = "Forces TransformAlignments to load from BAM/SAM.")
var forceLoadBam: Boolean = false

@Args4jOption(required = false, name = "-force_load_fastq", usage = "Forces TransformAlignments to load from unpaired FASTQ.")
var forceLoadFastq: Boolean = false

@Args4jOption(required = false, name = "-force_load_ifastq", usage = "Forces TransformAlignments to load from interleaved FASTQ.")
var forceLoadIFastq: Boolean = false

@Args4jOption(required = false, name = "-force_load_parquet", usage = "Forces TransformAlignments to load from Parquet.")
var forceLoadParquet: Boolean = false

@Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file")
var asSingleFile: Boolean = false

@Args4jOption(required = false, name = "-defer_merging", usage = "Defers merging single file output")
var deferMerging: Boolean = false

@Args4jOption(required = false, name = "-disable_fast_concat", usage = "Disables the parallel file concatenation engine.")
var disableFastConcat: Boolean = false

@Args4jOption(required = false, name = "-paired_fastq", usage = "When converting two (paired) FASTQ files to ADAM, pass the path to the second file here.")
var pairedFastqFile: String = null

@Args4jOption(required = false, name = "-read_group", usage = "Set converted FASTQs' read-group identifiers to this value; if empty-string is passed, use the basename of the input file, minus the extension.")
var fastqReadGroup: String = null

@Args4jOption(required = false, name = "-concat", usage = "Concatenate this file with <INPUT> and write the result to <OUTPUT>")
var concatFilename: String = null

@Args4jOption(required = false, name = "-add_md_tags", usage = "Add MD Tags to reads based on the FASTA (or equivalent) file passed to this option.")
var mdTagsReferenceFile: String = null

@Args4jOption(required = false, name = "-md_tag_fragment_size", usage = "When adding MD tags to reads, load the reference in fragments of this size.")
var mdTagsFragmentSize: Long = 1000000L

@Args4jOption(required = false, name = "-md_tag_overwrite", usage = "When adding MD tags to reads, overwrite existing incorrect tags.")
var mdTagsOverwrite: Boolean = false

@Args4jOption(required = false, name = "-bin_quality_scores", usage = "Rewrites quality scores of reads into bins from a string of bin descriptions, e.g. 0,20,10;20,40,30.")
var binQualityScores: String = null

@Args4jOption(required = false, name = "-cache", usage = "Cache data to avoid recomputing between stages.")
var cache: Boolean = false

@Args4jOption(required = false, name = "-storage_level", usage = "Set the storage level to use for caching.")
var storageLevel: String = "MEMORY_ONLY"

@Args4jOption(required = false, name = "-disable_pg", usage = "Disable writing a new @PG line.")
var disableProcessingStep = false

@Args4jOption(required = false, name = "-partition_by_start_pos", usage = "Save the data partitioned by genomic range bins based on start pos using Hive-style partitioning.")
var partitionByStartPos: Boolean = false

@Args4jOption(required = false, name = "-partition_bin_size", usage = "Partition bin size used in Hive-style partitioning. Defaults to 1Mbp (1,000,000) base pairs).")
var partitionedBinSize = 1000000

@Args4jOption(required = false, name = "-max_read_length", usage = "Maximum FASTQ read length, defaults to 10,000 base pairs (bp).")
var maxReadLength: Int = 0

Expand Down
Expand Up @@ -40,26 +40,40 @@ object TransformFragments extends BDGCommandCompanion {
class TransformFragmentsArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs {
@Argument(required = true, metaVar = "INPUT", usage = "The Fragment file to apply the transforms to", index = 0)
var inputPath: String = null

@Argument(required = true, metaVar = "OUTPUT", usage = "Location to write the transformed fragments", index = 1)
var outputPath: String = null
@Args4jOption(required = false, name = "-load_as_reads", usage = "Treats the input data as reads")
var loadAsReads: Boolean = false
@Args4jOption(required = false, name = "-save_as_reads", usage = "Saves the output data as reads")
var saveAsReads: Boolean = false

@Args4jOption(required = false, name = "-load_as_alignments", usage = "Treats the input data as alignments")
var loadAsAlignments: Boolean = false

@Args4jOption(required = false, name = "-save_as_alignments", usage = "Saves the output data as alignments")
var saveAsAlignments: Boolean = false

@Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file")
var asSingleFile: Boolean = false
@Args4jOption(required = false, name = "-sort_reads", usage = "Sort the reads by referenceId and read position. Only valid if run with -save_as_reads")
var sortReads: Boolean = false

@Args4jOption(required = false, name = "-sort_by_read_name", usage = "Sort alignments by read name. Only valid with -save_as_alignments.")
var sortByReadName: Boolean = false

@Args4jOption(required = false, name = "-sort_by_reference_position", usage = "Sort alignments by reference position, with references ordered by name. Only valid with -save_as_alignments.")
var sortByReferencePosition: Boolean = false

@Args4jOption(required = false, name = "-sort_by_reference_position_and_index", usage = "Sort alignments by reference position, with references ordered by index. Only valid with -save_as_alignments.")
var sortByReferencePositionAndIndex: Boolean = false

@Args4jOption(required = false, name = "-defer_merging", usage = "Defers merging single file output")
var deferMerging: Boolean = false

@Args4jOption(required = false, name = "-disable_fast_concat", usage = "Disables the parallel file concatenation engine.")
var disableFastConcat: Boolean = false
@Args4jOption(required = false, name = "-sort_lexicographically", usage = "Sort the reads lexicographically by contig name, instead of by index.")
var sortLexicographically: Boolean = false

@Args4jOption(required = false, name = "-mark_duplicate_reads", usage = "Mark duplicate reads")
var markDuplicates: Boolean = false

@Args4jOption(required = false, name = "-bin_quality_scores", usage = "Rewrites quality scores of reads into bins from a string of bin descriptions, e.g. 0,20,10;20,40,30.")
var binQualityScores: String = null

@Args4jOption(required = false, name = "-max_read_length", usage = "Maximum FASTQ read length, defaults to 10,000 base pairs (bp).")
var maxReadLength: Int = 0

Expand Down Expand Up @@ -98,22 +112,23 @@ class TransformFragments(protected val args: TransformFragmentsArgs) extends BDG
def run(sc: SparkContext) {
checkWriteablePath(args.outputPath, sc.hadoopConfiguration)

if (args.loadAsReads && args.saveAsReads) {
warn("If loading and saving as reads, consider using TransformAlignments instead.")
if (args.loadAsAlignments && args.saveAsAlignments) {
warn("If loading and saving as alignments, consider using TransformAlignments instead")
}
if (args.sortReads) {
require(args.saveAsReads,
"-sort_reads is only valid if -save_as_reads is given.")
if (args.sortByReadName || args.sortByReferencePosition || args.sortByReferencePositionAndIndex) {
require(args.saveAsAlignments,
"-sort_by_* flags are only valid if -save_as_alignments is given")
}
if (args.sortLexicographically) {
require(args.saveAsReads,
"-sort_lexicographically is only valid if -save_as_reads is given.")
if (Seq(args.sortByReadName, args.sortByReferencePosition, args.sortByReferencePositionAndIndex).count(b => b) > 1) {
throw new IllegalArgumentException(
"only one of -sort_by_name, -sort_by_reference_position, and -sort_by_reference_position_and_index may be specified"
)
}
if (args.maxReadLength > 0) {
FastqRecordReader.setMaxReadLength(sc.hadoopConfiguration, args.maxReadLength)
}

val rdd = if (args.loadAsReads) {
val rdd = if (args.loadAsAlignments) {
sc.loadAlignments(args.inputPath)
.toFragments
} else {
Expand All @@ -126,22 +141,24 @@ class TransformFragments(protected val args: TransformFragmentsArgs) extends BDG
// should we dedupe the reads?
val maybeDedupedReads = maybeDedupe(maybeBinnedReads)

if (args.saveAsReads) {
// save rdd as reads
val readRdd = maybeDedupedReads.toAlignments
if (args.saveAsAlignments) {
// save rdd as alignments
val alignmentRdd = maybeDedupedReads.toAlignments

// prep to save
val finalRdd = if (args.sortReads) {
readRdd.sortByReferencePosition()
} else if (args.sortLexicographically) {
readRdd.sortByReferencePositionAndIndex()
val finalRdd = if (args.sortByReadName) {
alignmentRdd.sortByReadName()
} else if (args.sortByReferencePosition) {
alignmentRdd.sortByReferencePosition()
} else if (args.sortByReferencePositionAndIndex) {
alignmentRdd.sortByReferencePositionAndIndex()
} else {
readRdd
alignmentRdd
}

// save the file
finalRdd.save(args,
isSorted = args.sortReads || args.sortLexicographically)
isSorted = args.sortByReadName || args.sortByReferencePosition || args.sortByReferencePositionAndIndex)
} else {
maybeDedupedReads.saveAsParquet(args)
}
Expand Down
Expand Up @@ -32,20 +32,23 @@ class TransformFragmentsSuite extends ADAMFunSuite {
sparkTest("cannot sort if not saving as sam") {
val loc = tmpLocation()
intercept[IllegalArgumentException] {
TransformFragments(Array(loc, loc, "-sort_reads")).run(sc)
TransformFragments(Array(loc, loc, "-sort_by_read_name")).run(sc)
}
intercept[IllegalArgumentException] {
TransformFragments(Array(loc, loc, "-sort_lexicographically")).run(sc)
TransformFragments(Array(loc, loc, "-sort_by_reference_position")).run(sc)
}
intercept[IllegalArgumentException] {
TransformFragments(Array(loc, loc, "-sort_by_reference_position_and_index")).run(sc)
}
}

sparkTest("load reads as sam and save them sorted") {
val input = copyResource("unsorted.sam")
val output = tmpLocation(".sam")
TransformFragments(Array(input, output,
"-load_as_reads", "-save_as_reads",
"-load_as_alignments", "-save_as_alignments",
"-single",
"-sort_reads")).run(sc)
"-sort_by_reference_position")).run(sc)
val actualSorted = copyResource("sorted.sam")
checkFiles(actualSorted, output)
}
Expand All @@ -54,7 +57,7 @@ class TransformFragmentsSuite extends ADAMFunSuite {
val inputPath = copyResource("bqsr1.sam")
val finalPath = tmpFile("binned.adam")
TransformFragments(Array(inputPath, finalPath,
"-save_as_reads",
"-save_as_alignments",
"-bin_quality_scores", "0,20,10;20,40,30;40,60,50")).run(sc)
val qualityScoreCounts = sc.loadAlignments(finalPath)
.rdd
Expand Down

0 comments on commit ab03d15

Please sign in to comment.