Permalink
Browse files

[ADAM-1334] Clean up serialization issues in Broadcast region join.

Resolves #1334. Eliminates type erasure problems by having different concrete
implementations per each broadcast. Depends on
bigdatagenomics/utils#97.
  • Loading branch information...
fnothaft authored and heuermh committed Jan 3, 2017
1 parent 3dc8302 commit 5dcd70b111657b1ee874f8f277b32c8462583dfe
Showing with 363 additions and 46 deletions.
  1. +1 −1 adam-core/src/main/scala/org/bdgenomics/adam/models/ReferenceRegion.scala
  2. +28 −1 adam-core/src/main/scala/org/bdgenomics/adam/models/VariantContext.scala
  3. +8 −4 adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala
  4. +1 −1 adam-core/src/main/scala/org/bdgenomics/adam/rdd/TreeRegionJoin.scala
  5. +33 −0 adam-core/src/main/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDD.scala
  6. +34 −0 adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/CoverageRDD.scala
  7. +32 −0 adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala
  8. +33 −0 adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala
  9. +32 −0 adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala
  10. +33 −0 adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDD.scala
  11. +36 −2 adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantContextRDD.scala
  12. +34 −0 adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantRDD.scala
  13. +20 −19 adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala
  14. +23 −12 adam-core/src/test/scala/org/bdgenomics/adam/rdd/InnerTreeRegionJoinSuite.scala
  15. +5 −2 adam-core/src/test/scala/org/bdgenomics/adam/rdd/RightOuterTreeRegionJoinSuite.scala
  16. +7 −1 adam-core/src/test/scala/org/bdgenomics/adam/rdd/TreeRegionJoinSuite.scala
  17. +1 −1 pom.xml
  18. +1 −1 scripts/move_to_scala_2.10.sh
  19. +1 −1 scripts/move_to_scala_2.11.sh
@@ -20,7 +20,7 @@ package org.bdgenomics.adam.models
import com.esotericsoftware.kryo.io.{ Input, Output }
import com.esotericsoftware.kryo.{ Kryo, Serializer }
import org.bdgenomics.formats.avro._
import org.bdgenomics.utils.intervalarray.Interval
import org.bdgenomics.utils.interval.array.Interval
import scala.math.{ max, min }
trait ReferenceOrdering[T <: ReferenceRegion] extends Ordering[T] {
@@ -17,8 +17,11 @@
*/
package org.bdgenomics.adam.models
import org.bdgenomics.formats.avro.{ Genotype, Variant, VariantAnnotation }
import com.esotericsoftware.kryo.io.{ Input, Output }
import com.esotericsoftware.kryo.{ Kryo, Serializer }
import org.bdgenomics.adam.rich.RichVariant
import org.bdgenomics.adam.serialization.AvroSerializer
import org.bdgenomics.formats.avro.{ Genotype, Variant, VariantAnnotation }
/**
* Singleton object for building VariantContexts.
@@ -84,3 +87,27 @@ class VariantContext(
val variant: RichVariant,
val genotypes: Iterable[Genotype]) extends Serializable {
}
class VariantContextSerializer extends Serializer[VariantContext] {
val rpSerializer = new ReferencePositionSerializer
val vSerializer = new AvroSerializer[Variant]
val gtSerializer = new AvroSerializer[Genotype]
def write(kryo: Kryo, output: Output, obj: VariantContext) = {
rpSerializer.write(kryo, output, obj.position)
vSerializer.write(kryo, output, obj.variant.variant)
output.writeInt(obj.genotypes.size)
obj.genotypes.foreach(gt => gtSerializer.write(kryo, output, gt))
}
def read(kryo: Kryo, input: Input, klazz: Class[VariantContext]): VariantContext = {
val rp = rpSerializer.read(kryo, input, classOf[ReferencePosition])
val v = vSerializer.read(kryo, input, classOf[Variant])
val gts = new Array[Genotype](input.readInt())
gts.indices.foreach(idx => {
gts(idx) = gtSerializer.read(kryo, input, classOf[Genotype])
})
new VariantContext(rp, new RichVariant(v), gts.toIterable)
}
}
@@ -32,7 +32,7 @@ import org.bdgenomics.adam.models.{
}
import org.bdgenomics.formats.avro.{ Contig, RecordGroupMetadata, Sample }
import org.bdgenomics.utils.cli.SaveArgs
import org.bdgenomics.utils.intervalarray.IntervalArray
import org.bdgenomics.utils.interval.array.IntervalArray
import scala.annotation.tailrec
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
@@ -386,9 +386,7 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
protected def buildTree(
rdd: RDD[(ReferenceRegion, T)])(
implicit tTag: ClassTag[T]): IntervalArray[ReferenceRegion, T] = {
IntervalArray(rdd)
}
implicit tTag: ClassTag[T]): IntervalArray[ReferenceRegion, T]
/**
* Performs a broadcast inner join between this RDD and another RDD.
@@ -768,6 +766,12 @@ private case class GenericGenomicRDD[T](rdd: RDD[T],
sequences: SequenceDictionary,
regionFn: T => Seq[ReferenceRegion]) extends GenomicRDD[T, GenericGenomicRDD[T]] {
protected def buildTree(
rdd: RDD[(ReferenceRegion, T)])(
implicit tTag: ClassTag[T]): IntervalArray[ReferenceRegion, T] = {
IntervalArray(rdd)
}
protected def replaceRdd(newRdd: RDD[T]): GenericGenomicRDD[T] = {
copy(rdd = newRdd)
}
@@ -20,7 +20,7 @@ package org.bdgenomics.adam.rdd
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.models.ReferenceRegion
import org.bdgenomics.utils.intervalarray.IntervalArray
import org.bdgenomics.utils.interval.array.IntervalArray
import scala.reflect.ClassTag
/**
@@ -22,14 +22,42 @@ import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.converters.FragmentConverter
import org.bdgenomics.adam.models.{
ReferenceRegion,
ReferenceRegionSerializer,
SequenceRecord,
SequenceDictionary
}
import org.bdgenomics.adam.rdd.{ AvroGenomicRDD, JavaSaveArgs }
import org.bdgenomics.adam.serialization.AvroSerializer
import org.bdgenomics.adam.util.ReferenceFile
import org.bdgenomics.formats.avro.{ AlignmentRecord, NucleotideContigFragment }
import org.bdgenomics.utils.interval.array.{
IntervalArray,
IntervalArraySerializer
}
import scala.collection.JavaConversions._
import scala.math.max
import scala.reflect.ClassTag
private[adam] case class NucleotideContigFragmentArray(
array: Array[(ReferenceRegion, NucleotideContigFragment)],
maxIntervalWidth: Long) extends IntervalArray[ReferenceRegion, NucleotideContigFragment] {
protected def replace(arr: Array[(ReferenceRegion, NucleotideContigFragment)],
maxWidth: Long): IntervalArray[ReferenceRegion, NucleotideContigFragment] = {
NucleotideContigFragmentArray(arr, maxWidth)
}
}
private[adam] class NucleotideContigFragmentArraySerializer extends IntervalArraySerializer[ReferenceRegion, NucleotideContigFragment, NucleotideContigFragmentArray] {
protected val kSerializer = new ReferenceRegionSerializer
protected val tSerializer = new AvroSerializer[NucleotideContigFragment]
protected def builder(arr: Array[(ReferenceRegion, NucleotideContigFragment)],
maxIntervalWidth: Long): NucleotideContigFragmentArray = {
NucleotideContigFragmentArray(arr, maxIntervalWidth)
}
}
private[rdd] object NucleotideContigFragmentRDD extends Serializable {
@@ -72,6 +100,11 @@ case class NucleotideContigFragmentRDD(
rdd: RDD[NucleotideContigFragment],
sequences: SequenceDictionary) extends AvroGenomicRDD[NucleotideContigFragment, NucleotideContigFragmentRDD] with ReferenceFile {
protected def buildTree(rdd: RDD[(ReferenceRegion, NucleotideContigFragment)])(
implicit tTag: ClassTag[NucleotideContigFragment]): IntervalArray[ReferenceRegion, NucleotideContigFragment] = {
IntervalArray(rdd, NucleotideContigFragmentArray.apply(_, _))
}
/**
* Converts an RDD of nucleotide contig fragments into reads. Adjacent contig fragments are
* combined.
@@ -17,14 +17,43 @@
*/
package org.bdgenomics.adam.rdd.feature
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.serializers.FieldSerializer
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.models.{
Coverage,
ReferenceRegion,
ReferenceRegionSerializer,
SequenceDictionary
}
import org.bdgenomics.adam.rdd.GenomicRDD
import org.bdgenomics.utils.interval.array.{
IntervalArray,
IntervalArraySerializer
}
import scala.annotation.tailrec
import scala.reflect.ClassTag
private[adam] case class CoverageArray(
array: Array[(ReferenceRegion, Coverage)],
maxIntervalWidth: Long) extends IntervalArray[ReferenceRegion, Coverage] {
protected def replace(arr: Array[(ReferenceRegion, Coverage)],
maxWidth: Long): IntervalArray[ReferenceRegion, Coverage] = {
CoverageArray(arr, maxWidth)
}
}
private[adam] class CoverageArraySerializer(kryo: Kryo) extends IntervalArraySerializer[ReferenceRegion, Coverage, CoverageArray] {
protected val kSerializer = new ReferenceRegionSerializer
protected val tSerializer = new FieldSerializer[Coverage](kryo, classOf[Coverage])
protected def builder(arr: Array[(ReferenceRegion, Coverage)],
maxIntervalWidth: Long): CoverageArray = {
CoverageArray(arr, maxIntervalWidth)
}
}
/**
* An RDD containing Coverage data.
@@ -36,6 +65,11 @@ import scala.annotation.tailrec
case class CoverageRDD(rdd: RDD[Coverage],
sequences: SequenceDictionary) extends GenomicRDD[Coverage, CoverageRDD] {
protected def buildTree(rdd: RDD[(ReferenceRegion, Coverage)])(
implicit tTag: ClassTag[Coverage]): IntervalArray[ReferenceRegion, Coverage] = {
IntervalArray(rdd, CoverageArray.apply(_, _))
}
/**
* Saves coverage as feature file.
*
@@ -28,9 +28,36 @@ import org.bdgenomics.adam.rdd.{
JavaSaveArgs,
SAMHeaderWriter
}
import org.bdgenomics.adam.serialization.AvroSerializer
import org.bdgenomics.formats.avro.{ Feature, Strand }
import org.bdgenomics.utils.interval.array.{
IntervalArray,
IntervalArraySerializer
}
import org.bdgenomics.utils.misc.Logging
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
private[adam] case class FeatureArray(
array: Array[(ReferenceRegion, Feature)],
maxIntervalWidth: Long) extends IntervalArray[ReferenceRegion, Feature] {
protected def replace(arr: Array[(ReferenceRegion, Feature)],
maxWidth: Long): IntervalArray[ReferenceRegion, Feature] = {
FeatureArray(arr, maxWidth)
}
}
private[adam] class FeatureArraySerializer extends IntervalArraySerializer[ReferenceRegion, Feature, FeatureArray] {
protected val kSerializer = new ReferenceRegionSerializer
protected val tSerializer = new AvroSerializer[Feature]
protected def builder(arr: Array[(ReferenceRegion, Feature)],
maxIntervalWidth: Long): FeatureArray = {
FeatureArray(arr, maxIntervalWidth)
}
}
private trait FeatureOrdering[T <: Feature] extends Ordering[T] {
def allowNull(s: java.lang.String): java.lang.Integer = {
@@ -213,6 +240,11 @@ object FeatureRDD {
case class FeatureRDD(rdd: RDD[Feature],
sequences: SequenceDictionary) extends AvroGenomicRDD[Feature, FeatureRDD] with Logging {
protected def buildTree(rdd: RDD[(ReferenceRegion, Feature)])(
implicit tTag: ClassTag[Feature]): IntervalArray[ReferenceRegion, Feature] = {
IntervalArray(rdd, FeatureArray.apply(_, _))
}
/**
* Java friendly save function. Automatically detects the output format.
*
@@ -22,6 +22,7 @@ import org.bdgenomics.adam.converters.AlignmentRecordConverter
import org.bdgenomics.adam.models.{
RecordGroupDictionary,
ReferenceRegion,
ReferenceRegionSerializer,
SequenceDictionary
}
import org.bdgenomics.adam.rdd.{ AvroReadGroupGenomicRDD, JavaSaveArgs }
@@ -30,9 +31,36 @@ import org.bdgenomics.adam.rdd.read.{
AlignmentRecordRDD,
UnalignedReadRDD
}
import org.bdgenomics.adam.serialization.AvroSerializer
import org.bdgenomics.formats.avro._
import org.bdgenomics.utils.interval.array.{
IntervalArray,
IntervalArraySerializer
}
import org.bdgenomics.utils.misc.Logging
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
private[adam] case class FragmentArray(
array: Array[(ReferenceRegion, Fragment)],
maxIntervalWidth: Long) extends IntervalArray[ReferenceRegion, Fragment] {
protected def replace(arr: Array[(ReferenceRegion, Fragment)],
maxWidth: Long): IntervalArray[ReferenceRegion, Fragment] = {
FragmentArray(arr, maxWidth)
}
}
private[adam] class FragmentArraySerializer extends IntervalArraySerializer[ReferenceRegion, Fragment, FragmentArray] {
protected val kSerializer = new ReferenceRegionSerializer
protected val tSerializer = new AvroSerializer[Fragment]
protected def builder(arr: Array[(ReferenceRegion, Fragment)],
maxIntervalWidth: Long): FragmentArray = {
FragmentArray(arr, maxIntervalWidth)
}
}
/**
* Helper singleton object for building FragmentRDDs.
@@ -61,6 +89,11 @@ case class FragmentRDD(rdd: RDD[Fragment],
sequences: SequenceDictionary,
recordGroups: RecordGroupDictionary) extends AvroReadGroupGenomicRDD[Fragment, FragmentRDD] {
protected def buildTree(rdd: RDD[(ReferenceRegion, Fragment)])(
implicit tTag: ClassTag[Fragment]): IntervalArray[ReferenceRegion, Fragment] = {
IntervalArray(rdd, FragmentArray.apply(_, _))
}
/**
* Replaces the underlying RDD with a new RDD.
*
@@ -49,15 +49,47 @@ import org.bdgenomics.adam.rdd.feature.CoverageRDD
import org.bdgenomics.adam.rdd.read.realignment.RealignIndels
import org.bdgenomics.adam.rdd.read.recalibration.BaseQualityRecalibration
import org.bdgenomics.adam.rdd.fragment.FragmentRDD
import org.bdgenomics.adam.serialization.AvroSerializer
import org.bdgenomics.adam.util.ReferenceFile
import org.bdgenomics.formats.avro._
import org.bdgenomics.utils.interval.array.{
IntervalArray,
IntervalArraySerializer
}
import org.seqdoop.hadoop_bam._
import scala.collection.JavaConversions._
import scala.language.implicitConversions
import scala.math.{ abs, min }
import scala.reflect.ClassTag
private[adam] case class AlignmentRecordArray(
array: Array[(ReferenceRegion, AlignmentRecord)],
maxIntervalWidth: Long) extends IntervalArray[ReferenceRegion, AlignmentRecord] {
protected def replace(arr: Array[(ReferenceRegion, AlignmentRecord)],
maxWidth: Long): IntervalArray[ReferenceRegion, AlignmentRecord] = {
AlignmentRecordArray(arr, maxWidth)
}
}
private[adam] class AlignmentRecordArraySerializer extends IntervalArraySerializer[ReferenceRegion, AlignmentRecord, AlignmentRecordArray] {
protected val kSerializer = new ReferenceRegionSerializer
protected val tSerializer = new AvroSerializer[AlignmentRecord]
protected def builder(arr: Array[(ReferenceRegion, AlignmentRecord)],
maxIntervalWidth: Long): AlignmentRecordArray = {
AlignmentRecordArray(arr, maxIntervalWidth)
}
}
sealed trait AlignmentRecordRDD extends AvroReadGroupGenomicRDD[AlignmentRecord, AlignmentRecordRDD] {
protected def buildTree(rdd: RDD[(ReferenceRegion, AlignmentRecord)])(
implicit tTag: ClassTag[AlignmentRecord]): IntervalArray[ReferenceRegion, AlignmentRecord] = {
IntervalArray(rdd, AlignmentRecordArray.apply(_, _))
}
/**
* Replaces the underlying RDD and SequenceDictionary and emits a new object.
*
Oops, something went wrong.

0 comments on commit 5dcd70b

Please sign in to comment.