diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala index 32ffdca2e1..23a700736f 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala @@ -1784,10 +1784,10 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log private[rdd] def extractPartitionMap( filename: String): Option[Array[Option[(ReferenceRegion, ReferenceRegion)]]] = { - val path = new Path(filename + "/_partitionMap.avro") - val fs = path.getFileSystem(sc.hadoopConfiguration) - try { + val path = new Path(filename + "/_partitionMap.avro") + val fs = path.getFileSystem(sc.hadoopConfiguration) + // get an input stream val is = fs.open(path) @@ -1852,8 +1852,9 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log Some(partitionMapBuilder.toArray) } catch { - case e: FileNotFoundException => None - case e: Throwable => throw e + case npe: NullPointerException => None + case e: FileNotFoundException => None + case e: Throwable => throw e } } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala index 6dfd50b816..2541173988 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala @@ -129,6 +129,9 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg */ val dataset: Dataset[U] + protected val productFn: T => U + protected val unproductFn: U => T + /** * @return This data as a Spark SQL DataFrame. */ @@ -1000,7 +1003,9 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg // pad by -1 * flankSize to undo pad from preprocessing getReferenceRegions(kv._1).map(_.pad(-1 * flankSize)) ++ genomicRdd.getReferenceRegions(kv._2) - }) + }, + kv => (productFn(kv._1), genomicRdd.productFn(kv._2)), + kv => (unproductFn(kv._1), genomicRdd.unproductFn(kv._2))) } /** @@ -1050,17 +1055,19 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg def broadcastRegionJoinAgainst[X, Y <: Product, Z <: GenomicDataset[X, Y, Z]]( broadcast: GenomicBroadcast[X, Y, Z])( implicit tTag: ClassTag[T], xTag: ClassTag[X], - uyTag: TypeTag[(U, Y)]): GenericGenomicDataset[(X, T), (U, Y)] = InnerBroadcastJoin.time { + uyTag: TypeTag[(Y, U)]): GenericGenomicDataset[(X, T), (Y, U)] = InnerBroadcastJoin.time { // key the RDDs and join - RDDBoundGenericGenomicDataset[(X, T), (U, Y)](InnerTreeRegionJoin[X, T]().join( + RDDBoundGenericGenomicDataset[(X, T), (Y, U)](InnerTreeRegionJoin[X, T]().join( broadcast.broadcastTree, flattenRddByRegions()), sequences ++ broadcast.backingDataset.sequences, kv => { broadcast.backingDataset.getReferenceRegions(kv._1) ++ getReferenceRegions(kv._2) - }) + }, + kv => (broadcast.backingDataset.productFn(kv._1), productFn(kv._2)), + kv => (broadcast.backingDataset.unproductFn(kv._1), unproductFn(kv._2))) } /** @@ -1101,7 +1108,9 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg Seq(kv._1.map(v => getReferenceRegions(v) .map(_.pad(-1 * flankSize)))).flatten.flatten ++ genomicRdd.getReferenceRegions(kv._2) - }) + }, + kv => (kv._1.map(productFn), genomicRdd.productFn(kv._2)), + kv => (kv._1.map(unproductFn), genomicRdd.unproductFn(kv._2))) } /** @@ -1138,7 +1147,10 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg kv => { Seq(kv._1.map(v => broadcast.backingDataset.getReferenceRegions(v))).flatten.flatten ++ getReferenceRegions(kv._2) - }) + }, + kv => (kv._1.map(broadcast.backingDataset.productFn), productFn(kv._2)), + kv => (kv._1.map(broadcast.backingDataset.unproductFn), unproductFn(kv._2))) + } /** @@ -1204,7 +1216,9 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg (kv._1.flatMap(getReferenceRegions) ++ genomicRdd.getReferenceRegions(kv._2)) .toSeq - }) + }, + kv => (kv._1.map(productFn).toSeq, genomicRdd.productFn(kv._2)), + kv => (kv._1.map(unproductFn), genomicRdd.unproductFn(kv._2))) } /** @@ -1241,7 +1255,9 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg (kv._1.flatMap(broadcast.backingDataset.getReferenceRegions) ++ getReferenceRegions(kv._2)) .toSeq - }) + }, + kv => (kv._1.map(broadcast.backingDataset.productFn).toSeq, productFn(kv._2)), + kv => (kv._1.map(broadcast.backingDataset.unproductFn), unproductFn(kv._2))) } /** @@ -1307,7 +1323,9 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg Seq(kv._1.map(v => getReferenceRegions(v) .map(_.pad(-1 * flankSize)))).flatten.flatten ++ genomicRdd.getReferenceRegions(kv._2) - }) + }, + kv => (kv._1.map(productFn).toSeq, genomicRdd.productFn(kv._2)), + kv => (kv._1.map(unproductFn), genomicRdd.unproductFn(kv._2))) } /** @@ -1345,7 +1363,9 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg // pad by -1 * flankSize to undo pad from preprocessing Seq(kv._1.map(v => broadcast.backingDataset.getReferenceRegions(v))).flatten.flatten ++ getReferenceRegions(kv._2) - }) + }, + kv => (kv._1.map(broadcast.backingDataset.productFn).toSeq, productFn(kv._2)), + kv => (kv._1.map(broadcast.backingDataset.unproductFn), unproductFn(kv._2))) } /** @@ -1449,7 +1469,9 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg // pad by -1 * flankSize to undo pad from preprocessing getReferenceRegions(kv._1).map(_.pad(-1 * flankSize)) ++ genomicRdd.getReferenceRegions(kv._2) - }) + }, + kv => (productFn(kv._1), genomicRdd.productFn(kv._2)), + kv => (unproductFn(kv._1), genomicRdd.unproductFn(kv._2))) } /** @@ -1546,7 +1568,9 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg Seq(kv._1.map(v => getReferenceRegions(v) .map(_.pad(-1 * flankSize)))).flatten.flatten ++ genomicRdd.getReferenceRegions(kv._2) - }) + }, + kv => (kv._1.map(productFn), genomicRdd.productFn(kv._2)), + kv => (kv._1.map(unproductFn), genomicRdd.unproductFn(kv._2))) } /** @@ -1647,7 +1671,9 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg // pad by -1 * flankSize to undo pad from preprocessing getReferenceRegions(kv._1).map(_.pad(-1 * flankSize)) ++ Seq(kv._2.map(v => genomicRdd.getReferenceRegions(v))).flatten.flatten - }) + }, + kv => (productFn(kv._1), kv._2.map(genomicRdd.productFn)), + kv => (unproductFn(kv._1), kv._2.map(genomicRdd.unproductFn))) } /** @@ -1749,7 +1775,9 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg // pad by -1 * flankSize to undo flank from preprocessing getReferenceRegions(kv._1).map(_.pad(-1 * flankSize)) ++ Seq(kv._2.map(v => genomicRdd.getReferenceRegions(v))).flatten.flatten - }) + }, + kv => (productFn(kv._1), kv._2.map(genomicRdd.productFn).toSeq), + kv => (unproductFn(kv._1), kv._2.map(genomicRdd.unproductFn))) } /** @@ -1851,7 +1879,9 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg // pad by -1 * flankSize to undo pad from preprocessing Seq(kv._1.map(v => getReferenceRegions(v).map(_.pad(-1 * flankSize))), kv._2.map(v => genomicRdd.getReferenceRegions(v))).flatten.flatten - }) + }, + kv => (kv._1.map(productFn), kv._2.map(genomicRdd.productFn)), + kv => (kv._1.map(unproductFn), kv._2.map(genomicRdd.unproductFn))) } /** @@ -1951,7 +1981,9 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg getReferenceRegions(kv._1) .map(_.pad(-1 * flankSize)) ++ kv._2.flatMap(v => genomicRdd.getReferenceRegions(v)) - }) + }, + kv => (productFn(kv._1), kv._2.map(genomicRdd.productFn).toSeq), + kv => (unproductFn(kv._1), kv._2.map(genomicRdd.unproductFn))) } /** @@ -2055,7 +2087,9 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg kv._1.toSeq.flatMap(v => getReferenceRegions(v) .map(_.pad(-1 * flankSize))) ++ kv._2.flatMap(v => genomicRdd.getReferenceRegions(v)) - }) + }, + kv => (kv._1.map(productFn), kv._2.map(genomicRdd.productFn).toSeq), + kv => (kv._1.map(unproductFn), kv._2.map(genomicRdd.unproductFn))) } /** @@ -2301,13 +2335,6 @@ sealed abstract class GenericGenomicDataset[T, U <: Product: TypeTag] extends Ge val regionFn: T => Seq[ReferenceRegion] - val productFn: T => U = { - ??? - } - val unproductFn: U => T = { - ??? - } - @transient val uTag: TypeTag[U] = typeTag[U] def saveAsParquet(filePath: String, @@ -2337,7 +2364,9 @@ sealed abstract class GenericGenomicDataset[T, U <: Product: TypeTag] extends Ge case class DatasetBoundGenericGenomicDataset[T, U <: Product: TypeTag]( dataset: Dataset[U], sequences: SequenceDictionary, - regionFn: T => Seq[ReferenceRegion])( + regionFn: T => Seq[ReferenceRegion], + productFn: T => U, + unproductFn: U => T)( implicit tTag: ClassTag[T], uTag: ClassTag[U]) extends GenericGenomicDataset[T, U] { @@ -2358,7 +2387,9 @@ case class DatasetBoundGenericGenomicDataset[T, U <: Product: TypeTag]( val iterableRdds = rdds.toSeq RDDBoundGenericGenomicDataset(rdd.context.union(rdd, iterableRdds.map(_.rdd): _*), iterableRdds.map(_.sequences).fold(sequences)(_ ++ _), - regionFn) + regionFn, + productFn, + unproductFn) } // this cannot be in the GenericGenomicDataset trait due to need for the @@ -2367,14 +2398,22 @@ case class DatasetBoundGenericGenomicDataset[T, U <: Product: TypeTag]( newRdd: RDD[T], newPartitionMap: Option[Array[Option[(ReferenceRegion, ReferenceRegion)]]] = None): GenericGenomicDataset[T, U] = { - RDDBoundGenericGenomicDataset(newRdd, sequences, regionFn, + RDDBoundGenericGenomicDataset(newRdd, + sequences, + regionFn, + productFn, + unproductFn, optPartitionMap = newPartitionMap) } // this cannot be in the GenericGenomicDataset trait due to need for the // implicit classtag def transformDataset(tFn: Dataset[U] => Dataset[U]): GenericGenomicDataset[T, U] = { - DatasetBoundGenericGenomicDataset(tFn(dataset), sequences, regionFn) + DatasetBoundGenericGenomicDataset(tFn(dataset), + sequences, + regionFn, + productFn, + unproductFn) } } @@ -2382,6 +2421,8 @@ case class RDDBoundGenericGenomicDataset[T, U <: Product: TypeTag]( rdd: RDD[T], sequences: SequenceDictionary, regionFn: T => Seq[ReferenceRegion], + productFn: T => U, + unproductFn: U => T, optPartitionMap: Option[Array[Option[(ReferenceRegion, ReferenceRegion)]]] = None)( implicit tTag: ClassTag[T], uTag: ClassTag[U]) extends GenericGenomicDataset[T, U] { @@ -2404,7 +2445,9 @@ case class RDDBoundGenericGenomicDataset[T, U <: Product: TypeTag]( val iterableRdds = rdds.toSeq RDDBoundGenericGenomicDataset(rdd.context.union(rdd, iterableRdds.map(_.rdd): _*), iterableRdds.map(_.sequences).fold(sequences)(_ ++ _), - regionFn) + regionFn, + productFn, + unproductFn) } // this cannot be in the GenericGenomicDataset trait due to need for the @@ -2413,14 +2456,22 @@ case class RDDBoundGenericGenomicDataset[T, U <: Product: TypeTag]( newRdd: RDD[T], newPartitionMap: Option[Array[Option[(ReferenceRegion, ReferenceRegion)]]] = None): GenericGenomicDataset[T, U] = { - RDDBoundGenericGenomicDataset(newRdd, sequences, regionFn, + RDDBoundGenericGenomicDataset(newRdd, + sequences, + regionFn, + productFn, + unproductFn, optPartitionMap = newPartitionMap) } // this cannot be in the GenericGenomicDataset trait due to need for the // implicit classtag def transformDataset(tFn: Dataset[U] => Dataset[U]): GenericGenomicDataset[T, U] = { - DatasetBoundGenericGenomicDataset(tFn(dataset), sequences, regionFn) + DatasetBoundGenericGenomicDataset(tFn(dataset), + sequences, + regionFn, + productFn, + unproductFn) } } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDD.scala index fa6532334d..f7a9dc9f35 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDD.scala @@ -187,6 +187,9 @@ case class RDDBoundNucleotideContigFragmentRDD private[rdd] ( sealed abstract class NucleotideContigFragmentRDD extends AvroGenomicDataset[NucleotideContigFragment, NucleotideContigFragmentProduct, NucleotideContigFragmentRDD] { + protected val productFn = NucleotideContigFragmentProduct.fromAvro(_) + protected val unproductFn = (c: NucleotideContigFragmentProduct) => c.toAvro + @transient val uTag: TypeTag[NucleotideContigFragmentProduct] = typeTag[NucleotideContigFragmentProduct] protected def buildTree(rdd: RDD[(ReferenceRegion, NucleotideContigFragment)])( diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/CoverageRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/CoverageRDD.scala index 14e28bbb43..b2d8d91a82 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/CoverageRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/CoverageRDD.scala @@ -154,6 +154,9 @@ case class RDDBoundCoverageRDD private[rdd] ( abstract class CoverageRDD extends GenomicDataset[Coverage, Coverage, CoverageRDD] { + protected val productFn = (c: Coverage) => c + protected val unproductFn = (c: Coverage) => c + @transient val uTag: TypeTag[Coverage] = typeTag[Coverage] protected def buildTree(rdd: RDD[(ReferenceRegion, Coverage)])( diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala index 7e897da97b..eca28701a4 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala @@ -338,6 +338,9 @@ case class RDDBoundFeatureRDD private[rdd] ( sealed abstract class FeatureRDD extends AvroGenomicDataset[Feature, FeatureProduct, FeatureRDD] { + protected val productFn = FeatureProduct.fromAvro(_) + protected val unproductFn = (f: FeatureProduct) => f.toAvro + @transient val uTag: TypeTag[FeatureProduct] = typeTag[FeatureProduct] protected def buildTree(rdd: RDD[(ReferenceRegion, Feature)])( diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala index 76424c2639..42cf6efc52 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala @@ -258,6 +258,9 @@ case class RDDBoundFragmentRDD private[rdd] ( sealed abstract class FragmentRDD extends AvroRecordGroupGenomicDataset[Fragment, FragmentProduct, FragmentRDD] { + protected val productFn = FragmentProduct.fromAvro(_) + protected val unproductFn = (f: FragmentProduct) => f.toAvro + @transient val uTag: TypeTag[FragmentProduct] = typeTag[FragmentProduct] protected def buildTree(rdd: RDD[(ReferenceRegion, Fragment)])( diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala index 4dfa1bcef5..ff9cb6bb30 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala @@ -341,6 +341,9 @@ private case class AlignmentWindow(contigName: String, start: Long, end: Long) { sealed abstract class AlignmentRecordRDD extends AvroRecordGroupGenomicDataset[AlignmentRecord, AlignmentRecordProduct, AlignmentRecordRDD] { + protected val productFn = AlignmentRecordProduct.fromAvro(_) + protected val unproductFn = (a: AlignmentRecordProduct) => a.toAvro + @transient val uTag: TypeTag[AlignmentRecordProduct] = typeTag[AlignmentRecordProduct] /** diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDD.scala index 3a50ec26ca..8006ec2bc0 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDD.scala @@ -212,6 +212,9 @@ case class RDDBoundGenotypeRDD private[rdd] ( sealed abstract class GenotypeRDD extends MultisampleAvroGenomicDataset[Genotype, GenotypeProduct, GenotypeRDD] { + protected val productFn = GenotypeProduct.fromAvro(_) + protected val unproductFn = (g: GenotypeProduct) => g.toAvro + @transient val uTag: TypeTag[GenotypeProduct] = typeTag[GenotypeProduct] val headerLines: Seq[VCFHeaderLine] diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantContextRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantContextRDD.scala index 73ecece073..24b130df75 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantContextRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantContextRDD.scala @@ -191,6 +191,9 @@ case class RDDBoundVariantContextRDD private[rdd] ( */ sealed abstract class VariantContextRDD extends MultisampleGenomicDataset[VariantContext, VariantContextProduct, VariantContextRDD] with Logging { + protected val productFn = VariantContextProduct.fromModel(_) + protected val unproductFn = (vc: VariantContextProduct) => vc.toModel + @transient val uTag: TypeTag[VariantContextProduct] = typeTag[VariantContextProduct] val headerLines: Seq[VCFHeaderLine] diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantRDD.scala index 1d3119942f..994b8f60ac 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantRDD.scala @@ -196,6 +196,9 @@ case class RDDBoundVariantRDD private[rdd] ( sealed abstract class VariantRDD extends AvroGenomicDataset[Variant, VariantProduct, VariantRDD] { + protected val productFn = VariantProduct.fromAvro(_) + protected val unproductFn = (v: VariantProduct) => v.toAvro + @transient val uTag: TypeTag[VariantProduct] = typeTag[VariantProduct] val headerLines: Seq[VCFHeaderLine]