Skip to content

Commit

Permalink
Merge 1759d81 into aa33b06
Browse files Browse the repository at this point in the history
  • Loading branch information
heuermh committed Jun 28, 2019
2 parents aa33b06 + 1759d81 commit 9d7e471
Show file tree
Hide file tree
Showing 24 changed files with 293 additions and 184 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ object ADAMMain {
TransformSlices,
TransformVariants,
MergeShards,
Reads2Coverage
Coverage
)
),
CommandGroup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,44 +29,52 @@ import org.bdgenomics.utils.cli._
import org.kohsuke.args4j.{ Argument, Option => Args4jOption }

/**
* Reads2Coverage (accessible as the command 'reads2coverage' through the CLI) takes two arguments,
* Coverage (accessible as the command 'coverage' through the CLI) takes two arguments,
* an INPUT and OUTPUT, and calculates the number of reads from INPUT at every location in
* the file. Optional arguments are only_negative_strands, only_positive_strands and collapse.
* only_negative_strands and only_positive_strands save coverage computed from only negative and positive strands,
* respectively. Collapse specifies whether saved coverage should merge neighboring coverage with the same counts
* to one record.
*/
object Reads2Coverage extends BDGCommandCompanion {
val commandName: String = "reads2coverage"
object Coverage extends BDGCommandCompanion {
val commandName: String = "coverage"
val commandDescription: String = "Calculate the coverage from a given ADAM file"

def apply(cmdLine: Array[String]): BDGCommand = {
new Reads2Coverage(Args4j[Reads2CoverageArgs](cmdLine))
new Coverage(Args4j[CoverageArgs](cmdLine))
}
}

class Reads2CoverageArgs extends Args4jBase with ParquetArgs {
@Argument(required = true, metaVar = "INPUT", usage = "The reads file to use to calculate depths", index = 0)
class CoverageArgs extends Args4jBase with ParquetArgs {

@Argument(required = true, metaVar = "INPUT", usage = "The alignments file to use to calculate depths", index = 0)
var inputPath: String = null

@Argument(required = true, metaVar = "OUTPUT", usage = "Location to write the coverage data to", index = 1)
var outputPath: String = null

@Args4jOption(required = false, name = "-collapse", usage = "Collapses neighboring coverage records " +
"of equal counts into the same record")
var collapse: Boolean = false

@Args4jOption(required = false, name = "-only_negative_strands", usage = "Compute coverage for negative strands")
var onlyNegativeStrands: Boolean = false

@Args4jOption(required = false, name = "-only_positive_strands", usage = "Compute coverage for positive strands")
var onlyPositiveStrands: Boolean = false

@Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file")
var asSingleFile: Boolean = false

@Args4jOption(required = false, name = "-disable_fast_concat", usage = "Disables the parallel file concatenation engine.")
var disableFastConcat: Boolean = false

@Args4jOption(required = false, name = "-sort_lexicographically", usage = "Sort the reads lexicographically by contig name, instead of by index.")
var sortLexicographically: Boolean = false
}

class Reads2Coverage(protected val args: Reads2CoverageArgs) extends BDGSparkCommand[Reads2CoverageArgs] {
val companion: BDGCommandCompanion = Reads2Coverage
class Coverage(protected val args: CoverageArgs) extends BDGSparkCommand[CoverageArgs] {
val companion: BDGCommandCompanion = Coverage

def run(sc: SparkContext): Unit = {
checkWriteablePath(args.outputPath, sc.hadoopConfiguration)
Expand All @@ -80,18 +88,18 @@ class Reads2Coverage(protected val args: Reads2CoverageArgs) extends BDGSparkCom
require(!(args.onlyNegativeStrands && args.onlyPositiveStrands),
"Cannot compute coverage for both negative and positive strands separately")

// load reads
val readsRdd: AlignmentRecordDataset = sc.loadAlignments(args.inputPath)
// load alignments
val alignmentsRdd: AlignmentRecordDataset = sc.loadAlignments(args.inputPath)

val finalReads = if (args.onlyNegativeStrands && !args.onlyPositiveStrands) {
readsRdd.transform((rdd: RDD[AlignmentRecord]) => rdd.filter(_.getReadNegativeStrand))
val finalAlignments = if (args.onlyNegativeStrands && !args.onlyPositiveStrands) {
alignmentsRdd.transform((rdd: RDD[AlignmentRecord]) => rdd.filter(_.getReadNegativeStrand))
} else if (!args.onlyNegativeStrands && args.onlyPositiveStrands) {
readsRdd.transform((rdd: RDD[AlignmentRecord]) => rdd.filter(!_.getReadNegativeStrand))
alignmentsRdd.transform((rdd: RDD[AlignmentRecord]) => rdd.filter(!_.getReadNegativeStrand))
} else {
readsRdd
alignmentsRdd
}

val coverage = finalReads.toCoverage()
val coverage = finalAlignments.toCoverage()

val maybeCollapsedCoverage = if (args.collapse) {
val sortedCoverage = if (args.sortLexicographically) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,98 +53,148 @@ object TransformAlignments extends BDGCommandCompanion {
}

class TransformAlignmentsArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs {

@Argument(required = true, metaVar = "INPUT", usage = "The ADAM, BAM or SAM file to apply the transforms to", index = 0)
var inputPath: String = null

@Argument(required = true, metaVar = "OUTPUT", usage = "Location to write the transformed data in ADAM/Parquet format", index = 1)
var outputPath: String = null

@Args4jOption(required = false, name = "-limit_projection", usage = "Only project necessary fields. Only works for Parquet files.")
var limitProjection: Boolean = false

@Args4jOption(required = false, name = "-aligned_read_predicate", usage = "Only load aligned reads. Only works for Parquet files. Exclusive of region predicate.")
var useAlignedReadPredicate: Boolean = false

@Args4jOption(required = false, name = "-region_predicate", usage = "Only load a specific range of regions. Mutually exclusive with aligned read predicate.")
var regionPredicate: String = null
@Args4jOption(required = false, name = "-sort_reads", usage = "Sort the reads by referenceId and read position")
var sortReads: Boolean = false
@Args4jOption(required = false, name = "-sort_lexicographically", usage = "Sort the reads lexicographically by contig name, instead of by index.")
var sortLexicographically: Boolean = false

@Args4jOption(required = false, name = "-sort_by_read_name", usage = "Sort alignments by read name.")
var sortByReadName: Boolean = false

@Args4jOption(required = false, name = "-sort_by_reference_position", usage = "Sort alignments by reference position, with references ordered by name.")
var sortByReferencePosition: Boolean = false

@Args4jOption(required = false, name = "-sort_by_reference_position_and_index", usage = "Sort alignments by reference position, with references ordered by index.")
var sortByReferencePositionAndIndex: Boolean = false

@Args4jOption(required = false, name = "-mark_duplicate_reads", usage = "Mark duplicate reads")
var markDuplicates: Boolean = false

@Args4jOption(required = false, name = "-recalibrate_base_qualities", usage = "Recalibrate the base quality scores (ILLUMINA only)")
var recalibrateBaseQualities: Boolean = false

@Args4jOption(required = false, name = "-min_acceptable_quality", usage = "Minimum acceptable quality for recalibrating a base in a read. Default is 5.")
var minAcceptableQuality: Int = 5

@Args4jOption(required = false, name = "-sampling_fraction", usage = "Fraction of reads to sample during recalibration; omitted by default.")
var samplingFraction: Double = 0.0

@Args4jOption(required = false, name = "-sampling_seed", usage = "Seed to use when sampling during recalibration; omitted by default by setting to 0 (thus, 0 is an invalid seed).")
var samplingSeed: Long = 0L

@Args4jOption(required = false, name = "-stringency", usage = "Stringency level for various checks; can be SILENT, LENIENT, or STRICT. Defaults to LENIENT")
var stringency: String = "LENIENT"

@Args4jOption(required = false, name = "-known_snps", usage = "Sites-only VCF giving location of known SNPs")
var knownSnpsFile: String = null

@Args4jOption(required = false, name = "-realign_indels", usage = "Locally realign indels present in reads.")
var locallyRealign: Boolean = false

@Args4jOption(required = false, name = "-known_indels", usage = "VCF file including locations of known INDELs. If none is provided, default consensus model will be used.")
var knownIndelsFile: String = null

@Args4jOption(required = false, name = "-max_indel_size", usage = "The maximum length of an INDEL to realign to. Default value is 500.")
var maxIndelSize = 500

@Args4jOption(required = false, name = "-max_consensus_number", usage = "The maximum number of consensus to try realigning a target region to. Default value is 30.")
var maxConsensusNumber = 30

@Args4jOption(required = false, name = "-log_odds_threshold", usage = "The log-odds threshold for accepting a realignment. Default value is 5.0.")
var lodThreshold = 5.0

@Args4jOption(required = false, name = "-unclip_reads", usage = "If true, unclips reads during realignment.")
var unclipReads = false

@Args4jOption(required = false, name = "-max_target_size", usage = "The maximum length of a target region to attempt realigning. Default length is 3000.")
var maxTargetSize = 3000

@Args4jOption(required = false, name = "-max_reads_per_target", usage = "The maximum number of reads attached to a target considered for realignment. Default is 20000.")
var maxReadsPerTarget = 20000

@Args4jOption(required = false, name = "-reference", usage = "Path to a reference file to use for indel realignment.")
var reference: String = null

@Args4jOption(required = false, name = "-repartition", usage = "Set the number of partitions to map data to")
var repartition: Int = -1

@Args4jOption(required = false, name = "-coalesce", usage = "Set the number of partitions written to the ADAM output directory")
var coalesce: Int = -1

@Args4jOption(required = false, name = "-force_shuffle_coalesce", usage = "Even if the repartitioned RDD has fewer partitions, force a shuffle.")
var forceShuffle: Boolean = false

@Args4jOption(required = false, name = "-sort_fastq_output", usage = "Sets whether to sort the FASTQ output, if saving as FASTQ. False by default. Ignored if not saving as FASTQ.")
var sortFastqOutput: Boolean = false

@Args4jOption(required = false, name = "-force_load_bam", usage = "Forces TransformAlignments to load from BAM/SAM.")
var forceLoadBam: Boolean = false

@Args4jOption(required = false, name = "-force_load_fastq", usage = "Forces TransformAlignments to load from unpaired FASTQ.")
var forceLoadFastq: Boolean = false

@Args4jOption(required = false, name = "-force_load_ifastq", usage = "Forces TransformAlignments to load from interleaved FASTQ.")
var forceLoadIFastq: Boolean = false

@Args4jOption(required = false, name = "-force_load_parquet", usage = "Forces TransformAlignments to load from Parquet.")
var forceLoadParquet: Boolean = false

@Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file")
var asSingleFile: Boolean = false

@Args4jOption(required = false, name = "-defer_merging", usage = "Defers merging single file output")
var deferMerging: Boolean = false

@Args4jOption(required = false, name = "-disable_fast_concat", usage = "Disables the parallel file concatenation engine.")
var disableFastConcat: Boolean = false

@Args4jOption(required = false, name = "-paired_fastq", usage = "When converting two (paired) FASTQ files to ADAM, pass the path to the second file here.")
var pairedFastqFile: String = null

@Args4jOption(required = false, name = "-read_group", usage = "Set converted FASTQs' read-group identifiers to this value; if empty-string is passed, use the basename of the input file, minus the extension.")
var fastqReadGroup: String = null

@Args4jOption(required = false, name = "-concat", usage = "Concatenate this file with <INPUT> and write the result to <OUTPUT>")
var concatFilename: String = null

@Args4jOption(required = false, name = "-add_md_tags", usage = "Add MD Tags to reads based on the FASTA (or equivalent) file passed to this option.")
var mdTagsReferenceFile: String = null

@Args4jOption(required = false, name = "-md_tag_fragment_size", usage = "When adding MD tags to reads, load the reference in fragments of this size.")
var mdTagsFragmentSize: Long = 1000000L

@Args4jOption(required = false, name = "-md_tag_overwrite", usage = "When adding MD tags to reads, overwrite existing incorrect tags.")
var mdTagsOverwrite: Boolean = false

@Args4jOption(required = false, name = "-bin_quality_scores", usage = "Rewrites quality scores of reads into bins from a string of bin descriptions, e.g. 0,20,10;20,40,30.")
var binQualityScores: String = null

@Args4jOption(required = false, name = "-cache", usage = "Cache data to avoid recomputing between stages.")
var cache: Boolean = false

@Args4jOption(required = false, name = "-storage_level", usage = "Set the storage level to use for caching.")
var storageLevel: String = "MEMORY_ONLY"

@Args4jOption(required = false, name = "-disable_pg", usage = "Disable writing a new @PG line.")
var disableProcessingStep = false

@Args4jOption(required = false, name = "-partition_by_start_pos", usage = "Save the data partitioned by genomic range bins based on start pos using Hive-style partitioning.")
var partitionByStartPos: Boolean = false

@Args4jOption(required = false, name = "-partition_bin_size", usage = "Partition bin size used in Hive-style partitioning. Defaults to 1Mbp (1,000,000) base pairs).")
var partitionedBinSize = 1000000

@Args4jOption(required = false, name = "-max_read_length", usage = "Maximum FASTQ read length, defaults to 10,000 base pairs (bp).")
var maxReadLength: Int = 0

Expand Down Expand Up @@ -333,7 +383,7 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B
*/
private def maybeSort(ds: AlignmentRecordDataset,
sl: StorageLevel): AlignmentRecordDataset = {
if (args.sortReads) {
if (args.sortByReadName || args.sortByReferencePosition || args.sortByReferencePositionAndIndex) {

// cache the input if requested. sort is two stages:
// 1. sample to create partitioner
Expand All @@ -342,13 +392,15 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B
ds.rdd.persist(sl)
}

info("Sorting reads")

// are we sorting lexicographically or using legacy SAM sort order?
val sortedDs = if (args.sortLexicographically) {
ds.sortReadsByReferencePosition()
val sortedDs = if (args.sortByReadName) {
info("Sorting alignments by read name")
ds.sortByReadName()
} else if (args.sortByReferencePosition) {
info("Sorting alignments by reference position, with references ordered by name")
ds.sortByReferencePosition()
} else {
ds.sortReadsByReferencePositionAndIndex()
info("Sorting alignments by reference position, with references ordered by index")
ds.sortByReferencePositionAndIndex()
}

// unpersist the cached rdd, if caching was requested
Expand Down Expand Up @@ -442,29 +494,34 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B
// throw exception if aligned read predicate or limit projection flags are used improperly
if (args.useAlignedReadPredicate && forceNonParquet()) {
throw new IllegalArgumentException(
"-aligned_read_predicate only applies to Parquet files, but a non-Parquet force load flag was passed."
"-aligned_read_predicate only applies to Parquet files, but a non-Parquet force load flag was passed"
)
}
if (args.limitProjection && forceNonParquet()) {
throw new IllegalArgumentException(
"-limit_projection only applies to Parquet files, but a non-Parquet force load flag was passed."
"-limit_projection only applies to Parquet files, but a non-Parquet force load flag was passed"
)
}
if (args.useAlignedReadPredicate && isNonParquet(args.inputPath)) {
throw new IllegalArgumentException(
"-aligned_read_predicate only applies to Parquet files, but a non-Parquet input path was specified."
"-aligned_read_predicate only applies to Parquet files, but a non-Parquet input path was specified"
)
}
if (args.limitProjection && isNonParquet(args.inputPath)) {
throw new IllegalArgumentException(
"-limit_projection only applies to Parquet files, but a non-Parquet input path was specified."
"-limit_projection only applies to Parquet files, but a non-Parquet input path was specified"
)
}
if (args.useAlignedReadPredicate && args.regionPredicate != null) {
throw new IllegalArgumentException(
"-aligned_read_predicate and -region_predicate are mutually exclusive"
)
}
if (Seq(args.sortByReadName, args.sortByReferencePosition, args.sortByReferencePositionAndIndex).count(b => b) > 1) {
throw new IllegalArgumentException(
"only one of -sort_by_name, -sort_by_reference_position, and -sort_by_reference_position_and_index may be specified"
)
}

val loadedDs: AlignmentRecordDataset =
if (args.forceLoadBam) {
Expand Down Expand Up @@ -572,17 +629,10 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B
// run our transformation
val outputDs = this.apply(newDs)

// if we are sorting, we must strip the indices from the sequence dictionary
// if we are sorting by reference, we must strip the indices from the sequence dictionary
// and sort the sequence dictionary
//
// we must do this because we do a lexicographic sort, not an index-based sort
val sdFinal = if (args.sortReads) {
if (args.sortLexicographically) {
mergedSd.stripIndices
.sorted
} else {
mergedSd
}
val sdFinal = if (args.sortByReferencePosition) {
mergedSd.stripIndices.sorted
} else {
mergedSd
}
Expand All @@ -594,7 +644,7 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B
outputDs.saveAsPartitionedParquet(args.outputPath, partitionSize = args.partitionedBinSize)
} else {
outputDs.save(args,
isSorted = args.sortReads || args.sortLexicographically)
isSorted = args.sortByReadName || args.sortByReferencePosition || args.sortByReferencePositionAndIndex)
}
}

Expand Down
Loading

0 comments on commit 9d7e471

Please sign in to comment.