Skip to content

Commit

Permalink
Flesh out Java APIs in GenomicRDDs.
Browse files Browse the repository at this point in the history
  • Loading branch information
fnothaft committed May 22, 2017
1 parent 2820e94 commit 770dc32
Show file tree
Hide file tree
Showing 8 changed files with 233 additions and 12 deletions.
12 changes: 12 additions & 0 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.SparkFiles
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.api.java.function.{ Function => JFunction }
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.models.{
Expand Down Expand Up @@ -136,6 +137,17 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
replaceRdd(tFn(rdd))
}

/**
* Applies a function that transforms the underlying RDD into a new RDD.
*
* @param tFn A function that transforms the underlying RDD.
* @return A new RDD where the RDD of genomic data has been replaced, but the
* metadata (sequence dictionary, and etc) is copied without modification.
*/
def transform(tFn: JFunction[JavaRDD[T], JavaRDD[T]]): U = {
replaceRdd(tFn.call(jrdd).rdd)
}

/**
* Sorts our genome aligned data by reference positions, with contigs ordered
* by index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.bdgenomics.adam.rdd.contig

import com.google.common.base.Splitter
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.converters.FragmentConverter
import org.bdgenomics.adam.models.{
Expand Down Expand Up @@ -278,6 +279,21 @@ case class NucleotideContigFragmentRDD(
}
}

/**
* For all adjacent records in the RDD, we extend the records so that the adjacent
* records now overlap by _n_ bases, where _n_ is the flank length.
*
* Java friendly variant.
*
* @param flankLength The length to extend adjacent records by.
* @return Returns the RDD, with all adjacent fragments extended with flanking sequence.
*/
def flankAdjacentFragments(
flankLength: java.lang.Integer): NucleotideContigFragmentRDD = {
val flank: Int = flankLength
flankAdjacentFragments(flank)
}

/**
* For all adjacent records in the RDD, we extend the records so that the adjacent
* records now overlap by _n_ bases, where _n_ is the flank length.
Expand Down Expand Up @@ -306,4 +322,20 @@ case class NucleotideContigFragmentRDD(
.map(k => (k, 1L))
}).reduceByKey((k1: Long, k2: Long) => k1 + k2)
}

/**
* Counts the k-mers contained in a FASTA contig.
*
* Java friendly variant.
*
* @param kmerLength The length of k-mers to count.
* @return Returns an RDD containing k-mer/count pairs.
*/
def countKmers(
kmerLength: java.lang.Integer): JavaRDD[(String, java.lang.Long)] = {
val k: Int = kmerLength
countKmers(k).map(p => {
(p._1, p._2: java.lang.Long)
}).toJavaRDD()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,15 +169,31 @@ case class CoverageRDD(rdd: RDD[Coverage],
*
* @return Returns a FeatureRDD from CoverageRDD.
*/
def toFeatureRDD: FeatureRDD = {
def toFeatureRDD(): FeatureRDD = {
val featureRdd = rdd.map(_.toFeature)
FeatureRDD(featureRdd, sequences)
}

/**
* Gets coverage overlapping specified ReferenceRegion.
* For large ReferenceRegions, base pairs per bin (bpPerBin) can be specified to bin together ReferenceRegions of
* equal size. The coverage of each bin is coverage of the first base pair in that bin.
*
* For large ReferenceRegions, base pairs per bin (bpPerBin) can be specified
* to bin together ReferenceRegions of equal size. The coverage of each bin is
* coverage of the first base pair in that bin. Java friendly variant.
*
* @param bpPerBin base pairs per bin, number of bases to combine to one bin.
* @return RDD of Coverage Records.
*/
def coverage(bpPerBin: java.lang.Integer): CoverageRDD = {
val bp: Int = bpPerBin
coverage(bpPerBin = bp)
}

/**
* Gets coverage overlapping specified ReferenceRegion.
* For large ReferenceRegions, base pairs per bin (bpPerBin) can be specified
* to bin together ReferenceRegions of equal size. The coverage of each bin is
* coverage of the first base pair in that bin.
*
* @param bpPerBin base pairs per bin, number of bases to combine to one bin.
* @return RDD of Coverage Records.
Expand All @@ -196,9 +212,26 @@ case class CoverageRDD(rdd: RDD[Coverage],
}

/**
* Gets coverage overlapping specified ReferenceRegion. For large ReferenceRegions,
* base pairs per bin (bpPerBin) can be specified to bin together ReferenceRegions of
* equal size. The coverage of each bin is the mean coverage over all base pairs in that bin.
* Gets coverage overlapping specified ReferenceRegion.
*
* For large ReferenceRegions, base pairs per bin (bpPerBin) can be specified
* to bin together ReferenceRegions of equal size. The coverage of each bin is
* the mean coverage over all base pairs in that bin. Java friendly variant.
*
* @param bpPerBin base pairs per bin, number of bases to combine to one bin.
* @return RDD of Coverage Records.
*/
def aggregatedCoverage(bpPerBin: java.lang.Integer): CoverageRDD = {
val bp: Int = bpPerBin
aggregatedCoverage(bpPerBin = bp)
}

/**
* Gets coverage overlapping specified ReferenceRegion.
*
* For large ReferenceRegions, base pairs per bin (bpPerBin) can be specified
* to bin together ReferenceRegions of equal size. The coverage of each bin is
* the mean coverage over all base pairs in that bin.
*
* @param bpPerBin base pairs per bin, number of bases to combine to one bin.
* @return RDD of Coverage Records.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ case class FeatureRDD(rdd: RDD[Feature],
*
* @return CoverageRDD containing RDD of Coverage.
*/
def toCoverage: CoverageRDD = {
def toCoverage(): CoverageRDD = {
val coverageRdd = rdd.map(f => Coverage(f))
CoverageRDD(coverageRdd, sequences)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.net.URI
import java.nio.file.Paths
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.LongWritable
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.MetricsContext._
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -165,7 +166,7 @@ case class AlignmentRecordRDD(
* @return Returns a FragmentRDD where all reads have been grouped together by
* the original sequence fragment they come from.
*/
def toFragments: FragmentRDD = {
def toFragments(): FragmentRDD = {
FragmentRDD(groupReadsByFragment().map(_.toFragment),
sequences,
recordGroups)
Expand Down Expand Up @@ -381,6 +382,22 @@ case class AlignmentRecordRDD(
(convertedRDD, header)
}

/**
* Cuts reads into _k_-mers, and then counts the number of occurrences of each _k_-mer.
*
* Java friendly variant.
*
* @param kmerLength The value of _k_ to use for cutting _k_-mers.
* @return Returns an RDD containing k-mer/count pairs.
*/
def countKmers(kmerLength: java.lang.Integer): JavaRDD[(String, java.lang.Long)] = {
val k: Int = kmerLength
countKmers(k).map(kv => {
val (k, v) = kv
(k, v: java.lang.Long)
}).toJavaRDD()
}

/**
* Cuts reads into _k_-mers, and then counts the number of occurrences of each _k_-mer.
*
Expand Down Expand Up @@ -656,6 +673,29 @@ case class AlignmentRecordRDD(
replaceRdd(MarkDuplicates(this))
}

/**
* Runs base quality score recalibration on a set of reads. Uses a table of
* known SNPs to mask true variation during the recalibration process.
*
* Java friendly variant.
*
* @param knownSnps A table of known SNPs to mask valid variants.
* @param minAcceptableQuality The minimum quality score to recalibrate.
* @param storageLevel An optional storage level to set for the output
* of the first stage of BQSR. Set to null to omit.
* @return Returns an RDD of recalibrated reads.
*/
def recalibrateBaseQualities(
knownSnps: SnpTable,
minAcceptableQuality: java.lang.Integer,
storageLevel: StorageLevel): AlignmentRecordRDD = {
val bcastSnps = rdd.context.broadcast(knownSnps)
val sMinQual: Int = minAcceptableQuality
recalibrateBaseQualities(bcastSnps,
minAcceptableQuality = sMinQual,
optStorageLevel = Option(storageLevel))
}

/**
* Runs base quality score recalibration on a set of reads. Uses a table of
* known SNPs to mask true variation during the recalibration process.
Expand All @@ -677,6 +717,39 @@ case class AlignmentRecordRDD(
optStorageLevel))
}

/**
* Realigns indels using a concensus-based heuristic.
*
* Java friendly variant.
*
* @param consensusModel The model to use for generating consensus sequences
* to realign against.
* @param isSorted If the input data is sorted, setting this parameter to true
* avoids a second sort.
* @param maxIndelSize The size of the largest indel to use for realignment.
* @param maxConsensusNumber The maximum number of consensus sequences to
* realign against per target region.
* @param lodThreshold Log-odds threshold to use when realigning; realignments
* are only finalized if the log-odds threshold is exceeded.
* @param maxTargetSize The maximum width of a single target region for
* realignment.
* @return Returns an RDD of mapped reads which have been realigned.
*/
def realignIndels(
consensusModel: ConsensusGenerator,
isSorted: java.lang.Boolean,
maxIndelSize: java.lang.Integer,
maxConsensusNumber: java.lang.Integer,
lodThreshold: java.lang.Double,
maxTargetSize: java.lang.Integer): AlignmentRecordRDD = {
replaceRdd(RealignIndels(rdd,
consensusModel,
isSorted: Boolean,
maxIndelSize: Int,
maxConsensusNumber: Int,
lodThreshold: Double))
}

/**
* Realigns indels using a concensus-based heuristic.
*
Expand Down Expand Up @@ -761,6 +834,35 @@ case class AlignmentRecordRDD(
SingleReadBucket(rdd)
}

/**
* Saves these AlignmentRecords to two FASTQ files.
*
* The files are one for the first mate in each pair, and the other for the
* second mate in the pair. Java friendly variant.
*
* @param fileName1 Path at which to save a FASTQ file containing the first
* mate of each pair.
* @param fileName2 Path at which to save a FASTQ file containing the second
* mate of each pair.
* @param outputOriginalBaseQualities If true, writes out reads with the base
* qualities from the original qualities (SAM "OQ") field. If false, writes
* out reads with the base qualities from the qual field. Default is false.
* @param validationStringency Iff strict, throw an exception if any read in
* this RDD is not accompanied by its mate.
* @param persistLevel The persistence level to cache reads at between passes.
*/
def saveAsPairedFastq(
fileName1: String,
fileName2: String,
outputOriginalBaseQualities: java.lang.Boolean,
validationStringency: ValidationStringency,
persistLevel: StorageLevel) {
saveAsPairedFastq(fileName1, fileName2,
outputOriginalBaseQualities = outputOriginalBaseQualities: Boolean,
validationStringency = validationStringency,
persistLevel = Some(persistLevel))
}

/**
* Saves these AlignmentRecords to two FASTQ files.
*
Expand Down Expand Up @@ -885,6 +987,32 @@ case class AlignmentRecordRDD(
maybeUnpersist(secondInPairRecords)
}

/**
* Saves reads in FASTQ format.
*
* Java friendly variant.
*
* @param fileName Path to save files at.
* @param outputOriginalBaseQualities If true, writes out reads with the base
* qualities from the original qualities (SAM "OQ") field. If false, writes
* out reads with the base qualities from the qual field. Default is false.
* @param sort Whether to sort the FASTQ files by read name or not. Defaults
* to false. Sorting the output will recover pair order, if desired.
* @param validationStringency Iff strict, throw an exception if any read in
* this RDD is not accompanied by its mate.
*/
def saveAsFastq(
fileName: String,
outputOriginalBaseQualities: java.lang.Boolean,
sort: java.lang.Boolean,
validationStringency: ValidationStringency) {
saveAsFastq(fileName, fileName2Opt = None,
outputOriginalBaseQualities = outputOriginalBaseQualities: Boolean,
sort = sort: Boolean,
validationStringency = validationStringency,
persistLevel = None)
}

/**
* Saves reads in FASTQ format.
*
Expand Down Expand Up @@ -936,6 +1064,22 @@ case class AlignmentRecordRDD(
}
}

/**
* Reassembles read pairs from two sets of unpaired reads. The assumption is that the two sets
* were _originally_ paired together. Java friendly variant.
*
* @note The RDD that this is called on should be the RDD with the first read from the pair.
* @param secondPairRdd The rdd containing the second read from the pairs.
* @param validationStringency How stringently to validate the reads.
* @return Returns an RDD with the pair information recomputed.
*/
def reassembleReadPairs(
secondPairRdd: JavaRDD[AlignmentRecord],
validationStringency: ValidationStringency): AlignmentRecordRDD = {
reassembleReadPairs(secondPairRdd.rdd,
validationStringency = validationStringency)
}

/**
* Reassembles read pairs from two sets of unpaired reads. The assumption is that the two sets
* were _originally_ paired together.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ case class GenotypeRDD(rdd: RDD[Genotype],
/**
* @return Returns this GenotypeRDD squared off as a VariantContextRDD.
*/
def toVariantContextRDD: VariantContextRDD = {
def toVariantContextRDD(): VariantContextRDD = {
val vcIntRdd: RDD[(RichVariant, Genotype)] = rdd.keyBy(g => {
RichVariant.genotypeToRichVariant(g)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ case class VariantContextRDD(rdd: RDD[VariantContext],
/**
* @return Returns a GenotypeRDD containing the Genotypes in this RDD.
*/
def toGenotypeRDD: GenotypeRDD = {
def toGenotypeRDD(): GenotypeRDD = {
GenotypeRDD(rdd.flatMap(_.genotypes),
sequences,
samples,
Expand All @@ -120,7 +120,7 @@ case class VariantContextRDD(rdd: RDD[VariantContext],
/**
* @return Returns the Variants in this RDD.
*/
def toVariantRDD: VariantRDD = {
def toVariantRDD(): VariantRDD = {
VariantRDD(rdd.map(_.variant.variant),
sequences,
headerLines)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ case class VariantRDD(rdd: RDD[Variant],
/**
* @return Returns this VariantRDD as a VariantContextRDD.
*/
def toVariantContextRDD: VariantContextRDD = {
def toVariantContextRDD(): VariantContextRDD = {
VariantContextRDD(rdd.map(VariantContext(_)), sequences, Seq.empty[Sample], headerLines)
}

Expand Down

0 comments on commit 770dc32

Please sign in to comment.