Permalink
Browse files

Merge ef72554 into 0ad63f1

  • Loading branch information...
fnothaft committed Jan 3, 2017
2 parents 0ad63f1 + ef72554 commit 9b8e3edf595ec4270648b72a986c9726ba1085af
Showing with 481 additions and 63 deletions.
  1. +1 −1 adam-core/src/main/scala/org/bdgenomics/adam/models/ReferenceRegion.scala
  2. +36 −1 adam-core/src/main/scala/org/bdgenomics/adam/models/VariantContext.scala
  3. +19 −4 adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala
  4. +60 −18 adam-core/src/main/scala/org/bdgenomics/adam/rdd/TreeRegionJoin.scala
  5. +33 −0 adam-core/src/main/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDD.scala
  6. +34 −0 adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/CoverageRDD.scala
  7. +32 −0 adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala
  8. +33 −0 adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala
  9. +32 −0 adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala
  10. +33 −0 adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDD.scala
  11. +37 −1 adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantAnnotationRDD.scala
  12. +36 −2 adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantContextRDD.scala
  13. +37 −1 adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantRDD.scala
  14. +22 −19 adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala
  15. +23 −12 adam-core/src/test/scala/org/bdgenomics/adam/rdd/InnerTreeRegionJoinSuite.scala
  16. +5 −2 adam-core/src/test/scala/org/bdgenomics/adam/rdd/RightOuterTreeRegionJoinSuite.scala
  17. +7 −1 adam-core/src/test/scala/org/bdgenomics/adam/rdd/TreeRegionJoinSuite.scala
  18. +1 −1 pom.xml
@@ -20,7 +20,7 @@ package org.bdgenomics.adam.models
import com.esotericsoftware.kryo.io.{ Input, Output }
import com.esotericsoftware.kryo.{ Kryo, Serializer }
import org.bdgenomics.formats.avro._
import org.bdgenomics.utils.intervalarray.Interval
import org.bdgenomics.utils.interval.array.Interval
import scala.math.{ max, min }
trait ReferenceOrdering[T <: ReferenceRegion] extends Ordering[T] {
@@ -17,8 +17,11 @@
*/
package org.bdgenomics.adam.models
import org.bdgenomics.formats.avro.{ Genotype, Variant, VariantAnnotation }
import com.esotericsoftware.kryo.io.{ Input, Output }
import com.esotericsoftware.kryo.{ Kryo, Serializer }
import org.bdgenomics.adam.rich.RichVariant
import org.bdgenomics.adam.serialization.AvroSerializer
import org.bdgenomics.formats.avro.{ Genotype, Variant, VariantAnnotation }
/**
* Singleton object for building VariantContexts.
@@ -158,3 +161,35 @@ class VariantContext(
val genotypes: Iterable[Genotype],
val annotations: Option[VariantAnnotation] = None) extends Serializable {
}
class VariantContextSerializer extends Serializer[VariantContext] {
val rpSerializer = new ReferencePositionSerializer
val vSerializer = new AvroSerializer[Variant]
val gtSerializer = new AvroSerializer[Genotype]
val vaSerializer = new AvroSerializer[VariantAnnotation]
def write(kryo: Kryo, output: Output, obj: VariantContext) = {
rpSerializer.write(kryo, output, obj.position)
vSerializer.write(kryo, output, obj.variant.variant)
output.writeInt(obj.genotypes.size)
obj.genotypes.foreach(gt => gtSerializer.write(kryo, output, gt))
output.writeBoolean(obj.annotations.isDefined)
obj.annotations.foreach(va => vaSerializer.write(kryo, output, va))
}
def read(kryo: Kryo, input: Input, klazz: Class[VariantContext]): VariantContext = {
val rp = rpSerializer.read(kryo, input, classOf[ReferencePosition])
val v = vSerializer.read(kryo, input, classOf[Variant])
val gts = new Array[Genotype](input.readInt())
gts.indices.foreach(idx => {
gts(idx) = gtSerializer.read(kryo, input, classOf[Genotype])
})
val optVa = if (input.readBoolean()) {
Some(vaSerializer.read(kryo, input, classOf[VariantAnnotation]))
} else {
None
}
new VariantContext(rp, new RichVariant(v), gts.toIterable, optVa)
}
}
@@ -32,6 +32,7 @@ import org.bdgenomics.adam.models.{
}
import org.bdgenomics.formats.avro.{ Contig, RecordGroupMetadata, Sample }
import org.bdgenomics.utils.cli.SaveArgs
import org.bdgenomics.utils.interval.array.IntervalArray
import scala.annotation.tailrec
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
@@ -383,6 +384,10 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
}))
}
protected def buildTree(
rdd: RDD[(ReferenceRegion, T)])(
implicit tTag: ClassTag[T]): IntervalArray[ReferenceRegion, T]
/**
* Performs a broadcast inner join between this RDD and another RDD.
*
@@ -401,7 +406,8 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, X), Z] = {
// key the RDDs and join
GenericGenomicRDD[(T, X)](InnerTreeRegionJoin[T, X]().partitionAndJoin(flattenRddByRegions(),
GenericGenomicRDD[(T, X)](InnerTreeRegionJoin[T, X]().broadcastAndJoin(
buildTree(flattenRddByRegions()),
genomicRdd.flattenRddByRegions()),
sequences ++ genomicRdd.sequences,
kv => { getReferenceRegions(kv._1) ++ genomicRdd.getReferenceRegions(kv._2) })
@@ -429,7 +435,8 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], X), Z] = {
// key the RDDs and join
GenericGenomicRDD[(Option[T], X)](RightOuterTreeRegionJoin[T, X]().partitionAndJoin(flattenRddByRegions(),
GenericGenomicRDD[(Option[T], X)](RightOuterTreeRegionJoin[T, X]().broadcastAndJoin(
buildTree(flattenRddByRegions()),
genomicRdd.flattenRddByRegions()),
sequences ++ genomicRdd.sequences,
kv => {
@@ -492,7 +499,8 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Iterable[T], X), Z] = {
// key the RDDs and join
GenericGenomicRDD[(Iterable[T], X)](InnerTreeRegionJoinAndGroupByRight[T, X]().partitionAndJoin(flattenRddByRegions(),
GenericGenomicRDD[(Iterable[T], X)](InnerTreeRegionJoinAndGroupByRight[T, X]().broadcastAndJoin(
buildTree(flattenRddByRegions()),
genomicRdd.flattenRddByRegions()),
sequences ++ genomicRdd.sequences,
kv => { (kv._1.flatMap(getReferenceRegions) ++ genomicRdd.getReferenceRegions(kv._2)).toSeq })
@@ -519,7 +527,8 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Iterable[T], X), Z] = {
// key the RDDs and join
GenericGenomicRDD[(Iterable[T], X)](RightOuterTreeRegionJoinAndGroupByRight[T, X]().partitionAndJoin(flattenRddByRegions(),
GenericGenomicRDD[(Iterable[T], X)](RightOuterTreeRegionJoinAndGroupByRight[T, X]().broadcastAndJoin(
buildTree(flattenRddByRegions()),
genomicRdd.flattenRddByRegions()),
sequences ++ genomicRdd.sequences,
kv => {
@@ -757,6 +766,12 @@ private case class GenericGenomicRDD[T](rdd: RDD[T],
sequences: SequenceDictionary,
regionFn: T => Seq[ReferenceRegion]) extends GenomicRDD[T, GenericGenomicRDD[T]] {
protected def buildTree(
rdd: RDD[(ReferenceRegion, T)])(
implicit tTag: ClassTag[T]): IntervalArray[ReferenceRegion, T] = {
IntervalArray(rdd)
}
protected def replaceRdd(newRdd: RDD[T]): GenericGenomicRDD[T] = {
copy(rdd = newRdd)
}
@@ -20,7 +20,7 @@ package org.bdgenomics.adam.rdd
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.models.ReferenceRegion
import org.bdgenomics.utils.intervalarray.IntervalArray
import org.bdgenomics.utils.interval.array.IntervalArray
import scala.reflect.ClassTag
/**
@@ -31,6 +31,29 @@ import scala.reflect.ClassTag
*/
trait TreeRegionJoin[T, U] {
private[rdd] def runJoinAndGroupByRightWithTree(
tree: IntervalArray[ReferenceRegion, T],
rightRdd: RDD[(ReferenceRegion, U)])(
implicit tTag: ClassTag[T]): RDD[(Iterable[T], U)] = {
RunningMapSideJoin.time {
// broadcast this tree
val broadcastTree = rightRdd.context
.broadcast(tree)
// map and join
rightRdd.map(kv => {
val (rr, u) = kv
// what values keys does this overlap in the tree?
val overlappingValues = broadcastTree.value
.get(rr)
.map(_._2)
(overlappingValues, u)
})
}
}
/**
* Performs an inner region join between two RDDs, and groups by the
* value on the right side of the join.
@@ -49,23 +72,8 @@ trait TreeRegionJoin[T, U] {
// build the tree from the left RDD
val tree = IntervalArray(leftRdd)
RunningMapSideJoin.time {
// broadcast this tree
val broadcastTree = leftRdd.context
.broadcast(tree)
// map and join
rightRdd.map(kv => {
val (rr, u) = kv
// what values keys does this overlap in the tree?
val overlappingValues = broadcastTree.value
.get(rr)
.map(_._2)
(overlappingValues, u)
})
}
// and join
runJoinAndGroupByRightWithTree(tree, rightRdd)
}
}
@@ -74,6 +82,15 @@ trait TreeRegionJoin[T, U] {
*/
case class InnerTreeRegionJoin[T: ClassTag, U: ClassTag]() extends RegionJoin[T, U, T, U] with TreeRegionJoin[T, U] {
def broadcastAndJoin(tree: IntervalArray[ReferenceRegion, T],
joinedRDD: RDD[(ReferenceRegion, U)]): RDD[(T, U)] = {
runJoinAndGroupByRightWithTree(tree, joinedRDD)
.flatMap(kv => {
val (leftIterable, right) = kv
leftIterable.map(left => (left, right))
})
}
/**
* Performs an inner region join between two RDDs.
*
@@ -101,6 +118,20 @@ case class RightOuterTreeRegionJoin[T: ClassTag, U: ClassTag]()
extends RegionJoin[T, U, Option[T], U]
with TreeRegionJoin[T, U] {
def broadcastAndJoin(tree: IntervalArray[ReferenceRegion, T],
joinedRDD: RDD[(ReferenceRegion, U)]): RDD[(Option[T], U)] = {
runJoinAndGroupByRightWithTree(tree, joinedRDD)
.flatMap(kv => {
val (leftIterable, right) = kv
if (leftIterable.isEmpty) {
Iterable((None, right))
} else {
leftIterable.map(left => (Some(left), right))
}
})
}
/**
* Performs a right outer region join between two RDDs.
*
@@ -136,6 +167,12 @@ case class InnerTreeRegionJoinAndGroupByRight[T: ClassTag, U: ClassTag]()
extends RegionJoin[T, U, Iterable[T], U]
with TreeRegionJoin[T, U] {
def broadcastAndJoin(tree: IntervalArray[ReferenceRegion, T],
joinedRDD: RDD[(ReferenceRegion, U)]): RDD[(Iterable[T], U)] = {
runJoinAndGroupByRightWithTree(tree, joinedRDD)
.filter(_._1.nonEmpty)
}
/**
* Performs an inner join between two RDDs, followed by a groupBy on the
* right object.
@@ -165,6 +202,11 @@ case class RightOuterTreeRegionJoinAndGroupByRight[T: ClassTag, U: ClassTag]()
extends RegionJoin[T, U, Iterable[T], U]
with TreeRegionJoin[T, U] {
def broadcastAndJoin(tree: IntervalArray[ReferenceRegion, T],
joinedRDD: RDD[(ReferenceRegion, U)]): RDD[(Iterable[T], U)] = {
runJoinAndGroupByRightWithTree(tree, joinedRDD)
}
/**
* Performs an inner join between two RDDs, followed by a groupBy on the
* right object.
@@ -22,14 +22,42 @@ import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.converters.FragmentConverter
import org.bdgenomics.adam.models.{
ReferenceRegion,
ReferenceRegionSerializer,
SequenceRecord,
SequenceDictionary
}
import org.bdgenomics.adam.rdd.{ AvroGenomicRDD, JavaSaveArgs }
import org.bdgenomics.adam.serialization.AvroSerializer
import org.bdgenomics.adam.util.ReferenceFile
import org.bdgenomics.formats.avro.{ AlignmentRecord, NucleotideContigFragment }
import org.bdgenomics.utils.interval.array.{
IntervalArray,
IntervalArraySerializer
}
import scala.collection.JavaConversions._
import scala.math.max
import scala.reflect.ClassTag
private[adam] case class NucleotideContigFragmentArray(
array: Array[(ReferenceRegion, NucleotideContigFragment)],
maxIntervalWidth: Long) extends IntervalArray[ReferenceRegion, NucleotideContigFragment] {
protected def replace(arr: Array[(ReferenceRegion, NucleotideContigFragment)],
maxWidth: Long): IntervalArray[ReferenceRegion, NucleotideContigFragment] = {
NucleotideContigFragmentArray(arr, maxWidth)
}
}
private[adam] class NucleotideContigFragmentArraySerializer extends IntervalArraySerializer[ReferenceRegion, NucleotideContigFragment, NucleotideContigFragmentArray] {
protected val kSerializer = new ReferenceRegionSerializer
protected val tSerializer = new AvroSerializer[NucleotideContigFragment]
protected def builder(arr: Array[(ReferenceRegion, NucleotideContigFragment)],
maxIntervalWidth: Long): NucleotideContigFragmentArray = {
NucleotideContigFragmentArray(arr, maxIntervalWidth)
}
}
private[rdd] object NucleotideContigFragmentRDD extends Serializable {
@@ -72,6 +100,11 @@ case class NucleotideContigFragmentRDD(
rdd: RDD[NucleotideContigFragment],
sequences: SequenceDictionary) extends AvroGenomicRDD[NucleotideContigFragment, NucleotideContigFragmentRDD] with ReferenceFile {
protected def buildTree(rdd: RDD[(ReferenceRegion, NucleotideContigFragment)])(
implicit tTag: ClassTag[NucleotideContigFragment]): IntervalArray[ReferenceRegion, NucleotideContigFragment] = {
IntervalArray(rdd, NucleotideContigFragmentArray.apply(_, _))
}
/**
* Converts an RDD of nucleotide contig fragments into reads. Adjacent contig fragments are
* combined.
@@ -17,14 +17,43 @@
*/
package org.bdgenomics.adam.rdd.feature
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.serializers.FieldSerializer
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.models.{
Coverage,
ReferenceRegion,
ReferenceRegionSerializer,
SequenceDictionary
}
import org.bdgenomics.adam.rdd.GenomicRDD
import org.bdgenomics.utils.interval.array.{
IntervalArray,
IntervalArraySerializer
}
import scala.annotation.tailrec
import scala.reflect.ClassTag
private[adam] case class CoverageArray(
array: Array[(ReferenceRegion, Coverage)],
maxIntervalWidth: Long) extends IntervalArray[ReferenceRegion, Coverage] {
protected def replace(arr: Array[(ReferenceRegion, Coverage)],
maxWidth: Long): IntervalArray[ReferenceRegion, Coverage] = {
CoverageArray(arr, maxWidth)
}
}
private[adam] class CoverageArraySerializer(kryo: Kryo) extends IntervalArraySerializer[ReferenceRegion, Coverage, CoverageArray] {
protected val kSerializer = new ReferenceRegionSerializer
protected val tSerializer = new FieldSerializer[Coverage](kryo, classOf[Coverage])
protected def builder(arr: Array[(ReferenceRegion, Coverage)],
maxIntervalWidth: Long): CoverageArray = {
CoverageArray(arr, maxIntervalWidth)
}
}
/**
* An RDD containing Coverage data.
@@ -36,6 +65,11 @@ import scala.annotation.tailrec
case class CoverageRDD(rdd: RDD[Coverage],
sequences: SequenceDictionary) extends GenomicRDD[Coverage, CoverageRDD] {
protected def buildTree(rdd: RDD[(ReferenceRegion, Coverage)])(
implicit tTag: ClassTag[Coverage]): IntervalArray[ReferenceRegion, Coverage] = {
IntervalArray(rdd, CoverageArray.apply(_, _))
}
/**
* Saves coverage as feature file.
*
Oops, something went wrong.

0 comments on commit 9b8e3ed

Please sign in to comment.