Skip to content

Commit

Permalink
Add functions to support dataset API from generic genomic datasets.
Browse files Browse the repository at this point in the history
WIP towards #1728.
  • Loading branch information
Frank Austin Nothaft committed Feb 21, 2018
1 parent ac7cf6e commit b75295e
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 36 deletions.
Expand Up @@ -1784,10 +1784,10 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
private[rdd] def extractPartitionMap(
filename: String): Option[Array[Option[(ReferenceRegion, ReferenceRegion)]]] = {

val path = new Path(filename + "/_partitionMap.avro")
val fs = path.getFileSystem(sc.hadoopConfiguration)

try {
val path = new Path(filename + "/_partitionMap.avro")
val fs = path.getFileSystem(sc.hadoopConfiguration)

// get an input stream
val is = fs.open(path)

Expand Down Expand Up @@ -1852,8 +1852,9 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log

Some(partitionMapBuilder.toArray)
} catch {
case e: FileNotFoundException => None
case e: Throwable => throw e
case npe: NullPointerException => None
case e: FileNotFoundException => None
case e: Throwable => throw e
}
}

Expand Down
113 changes: 82 additions & 31 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala
Expand Up @@ -129,6 +129,9 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg
*/
val dataset: Dataset[U]

protected val productFn: T => U
protected val unproductFn: U => T

/**
* @return This data as a Spark SQL DataFrame.
*/
Expand Down Expand Up @@ -1000,7 +1003,9 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg
// pad by -1 * flankSize to undo pad from preprocessing
getReferenceRegions(kv._1).map(_.pad(-1 * flankSize)) ++
genomicRdd.getReferenceRegions(kv._2)
})
},
kv => (productFn(kv._1), genomicRdd.productFn(kv._2)),
kv => (unproductFn(kv._1), genomicRdd.unproductFn(kv._2)))
}

/**
Expand Down Expand Up @@ -1050,17 +1055,19 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg
def broadcastRegionJoinAgainst[X, Y <: Product, Z <: GenomicDataset[X, Y, Z]](
broadcast: GenomicBroadcast[X, Y, Z])(
implicit tTag: ClassTag[T], xTag: ClassTag[X],
uyTag: TypeTag[(U, Y)]): GenericGenomicDataset[(X, T), (U, Y)] = InnerBroadcastJoin.time {
uyTag: TypeTag[(Y, U)]): GenericGenomicDataset[(X, T), (Y, U)] = InnerBroadcastJoin.time {

// key the RDDs and join
RDDBoundGenericGenomicDataset[(X, T), (U, Y)](InnerTreeRegionJoin[X, T]().join(
RDDBoundGenericGenomicDataset[(X, T), (Y, U)](InnerTreeRegionJoin[X, T]().join(
broadcast.broadcastTree,
flattenRddByRegions()),
sequences ++ broadcast.backingDataset.sequences,
kv => {
broadcast.backingDataset.getReferenceRegions(kv._1) ++
getReferenceRegions(kv._2)
})
},
kv => (broadcast.backingDataset.productFn(kv._1), productFn(kv._2)),
kv => (broadcast.backingDataset.unproductFn(kv._1), unproductFn(kv._2)))
}

/**
Expand Down Expand Up @@ -1101,7 +1108,9 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg
Seq(kv._1.map(v => getReferenceRegions(v)
.map(_.pad(-1 * flankSize)))).flatten.flatten ++
genomicRdd.getReferenceRegions(kv._2)
})
},
kv => (kv._1.map(productFn), genomicRdd.productFn(kv._2)),
kv => (kv._1.map(unproductFn), genomicRdd.unproductFn(kv._2)))
}

/**
Expand Down Expand Up @@ -1138,7 +1147,10 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg
kv => {
Seq(kv._1.map(v => broadcast.backingDataset.getReferenceRegions(v))).flatten.flatten ++
getReferenceRegions(kv._2)
})
},
kv => (kv._1.map(broadcast.backingDataset.productFn), productFn(kv._2)),
kv => (kv._1.map(broadcast.backingDataset.unproductFn), unproductFn(kv._2)))

}

/**
Expand Down Expand Up @@ -1204,7 +1216,9 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg
(kv._1.flatMap(getReferenceRegions) ++
genomicRdd.getReferenceRegions(kv._2))
.toSeq
})
},
kv => (kv._1.map(productFn).toSeq, genomicRdd.productFn(kv._2)),
kv => (kv._1.map(unproductFn), genomicRdd.unproductFn(kv._2)))
}

/**
Expand Down Expand Up @@ -1241,7 +1255,9 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg
(kv._1.flatMap(broadcast.backingDataset.getReferenceRegions) ++
getReferenceRegions(kv._2))
.toSeq
})
},
kv => (kv._1.map(broadcast.backingDataset.productFn).toSeq, productFn(kv._2)),
kv => (kv._1.map(broadcast.backingDataset.unproductFn), unproductFn(kv._2)))
}

/**
Expand Down Expand Up @@ -1307,7 +1323,9 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg
Seq(kv._1.map(v => getReferenceRegions(v)
.map(_.pad(-1 * flankSize)))).flatten.flatten ++
genomicRdd.getReferenceRegions(kv._2)
})
},
kv => (kv._1.map(productFn).toSeq, genomicRdd.productFn(kv._2)),
kv => (kv._1.map(unproductFn), genomicRdd.unproductFn(kv._2)))
}

/**
Expand Down Expand Up @@ -1345,7 +1363,9 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg
// pad by -1 * flankSize to undo pad from preprocessing
Seq(kv._1.map(v => broadcast.backingDataset.getReferenceRegions(v))).flatten.flatten ++
getReferenceRegions(kv._2)
})
},
kv => (kv._1.map(broadcast.backingDataset.productFn).toSeq, productFn(kv._2)),
kv => (kv._1.map(broadcast.backingDataset.unproductFn), unproductFn(kv._2)))
}

/**
Expand Down Expand Up @@ -1449,7 +1469,9 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg
// pad by -1 * flankSize to undo pad from preprocessing
getReferenceRegions(kv._1).map(_.pad(-1 * flankSize)) ++
genomicRdd.getReferenceRegions(kv._2)
})
},
kv => (productFn(kv._1), genomicRdd.productFn(kv._2)),
kv => (unproductFn(kv._1), genomicRdd.unproductFn(kv._2)))
}

/**
Expand Down Expand Up @@ -1546,7 +1568,9 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg
Seq(kv._1.map(v => getReferenceRegions(v)
.map(_.pad(-1 * flankSize)))).flatten.flatten ++
genomicRdd.getReferenceRegions(kv._2)
})
},
kv => (kv._1.map(productFn), genomicRdd.productFn(kv._2)),
kv => (kv._1.map(unproductFn), genomicRdd.unproductFn(kv._2)))
}

/**
Expand Down Expand Up @@ -1647,7 +1671,9 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg
// pad by -1 * flankSize to undo pad from preprocessing
getReferenceRegions(kv._1).map(_.pad(-1 * flankSize)) ++
Seq(kv._2.map(v => genomicRdd.getReferenceRegions(v))).flatten.flatten
})
},
kv => (productFn(kv._1), kv._2.map(genomicRdd.productFn)),
kv => (unproductFn(kv._1), kv._2.map(genomicRdd.unproductFn)))
}

/**
Expand Down Expand Up @@ -1749,7 +1775,9 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg
// pad by -1 * flankSize to undo flank from preprocessing
getReferenceRegions(kv._1).map(_.pad(-1 * flankSize)) ++
Seq(kv._2.map(v => genomicRdd.getReferenceRegions(v))).flatten.flatten
})
},
kv => (productFn(kv._1), kv._2.map(genomicRdd.productFn).toSeq),
kv => (unproductFn(kv._1), kv._2.map(genomicRdd.unproductFn)))
}

/**
Expand Down Expand Up @@ -1851,7 +1879,9 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg
// pad by -1 * flankSize to undo pad from preprocessing
Seq(kv._1.map(v => getReferenceRegions(v).map(_.pad(-1 * flankSize))),
kv._2.map(v => genomicRdd.getReferenceRegions(v))).flatten.flatten
})
},
kv => (kv._1.map(productFn), kv._2.map(genomicRdd.productFn)),
kv => (kv._1.map(unproductFn), kv._2.map(genomicRdd.unproductFn)))
}

/**
Expand Down Expand Up @@ -1951,7 +1981,9 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg
getReferenceRegions(kv._1)
.map(_.pad(-1 * flankSize)) ++
kv._2.flatMap(v => genomicRdd.getReferenceRegions(v))
})
},
kv => (productFn(kv._1), kv._2.map(genomicRdd.productFn).toSeq),
kv => (unproductFn(kv._1), kv._2.map(genomicRdd.unproductFn)))
}

/**
Expand Down Expand Up @@ -2055,7 +2087,9 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg
kv._1.toSeq.flatMap(v => getReferenceRegions(v)
.map(_.pad(-1 * flankSize))) ++
kv._2.flatMap(v => genomicRdd.getReferenceRegions(v))
})
},
kv => (kv._1.map(productFn), kv._2.map(genomicRdd.productFn).toSeq),
kv => (kv._1.map(unproductFn), kv._2.map(genomicRdd.unproductFn)))
}

/**
Expand Down Expand Up @@ -2301,13 +2335,6 @@ sealed abstract class GenericGenomicDataset[T, U <: Product: TypeTag] extends Ge

val regionFn: T => Seq[ReferenceRegion]

val productFn: T => U = {
???
}
val unproductFn: U => T = {
???
}

@transient val uTag: TypeTag[U] = typeTag[U]

def saveAsParquet(filePath: String,
Expand Down Expand Up @@ -2337,7 +2364,9 @@ sealed abstract class GenericGenomicDataset[T, U <: Product: TypeTag] extends Ge
case class DatasetBoundGenericGenomicDataset[T, U <: Product: TypeTag](
dataset: Dataset[U],
sequences: SequenceDictionary,
regionFn: T => Seq[ReferenceRegion])(
regionFn: T => Seq[ReferenceRegion],
productFn: T => U,
unproductFn: U => T)(
implicit tTag: ClassTag[T],
uTag: ClassTag[U]) extends GenericGenomicDataset[T, U] {

Expand All @@ -2358,7 +2387,9 @@ case class DatasetBoundGenericGenomicDataset[T, U <: Product: TypeTag](
val iterableRdds = rdds.toSeq
RDDBoundGenericGenomicDataset(rdd.context.union(rdd, iterableRdds.map(_.rdd): _*),
iterableRdds.map(_.sequences).fold(sequences)(_ ++ _),
regionFn)
regionFn,
productFn,
unproductFn)
}

// this cannot be in the GenericGenomicDataset trait due to need for the
Expand All @@ -2367,21 +2398,31 @@ case class DatasetBoundGenericGenomicDataset[T, U <: Product: TypeTag](
newRdd: RDD[T],
newPartitionMap: Option[Array[Option[(ReferenceRegion, ReferenceRegion)]]] = None): GenericGenomicDataset[T, U] = {

RDDBoundGenericGenomicDataset(newRdd, sequences, regionFn,
RDDBoundGenericGenomicDataset(newRdd,
sequences,
regionFn,
productFn,
unproductFn,
optPartitionMap = newPartitionMap)
}

// this cannot be in the GenericGenomicDataset trait due to need for the
// implicit classtag
def transformDataset(tFn: Dataset[U] => Dataset[U]): GenericGenomicDataset[T, U] = {
DatasetBoundGenericGenomicDataset(tFn(dataset), sequences, regionFn)
DatasetBoundGenericGenomicDataset(tFn(dataset),
sequences,
regionFn,
productFn,
unproductFn)
}
}

case class RDDBoundGenericGenomicDataset[T, U <: Product: TypeTag](
rdd: RDD[T],
sequences: SequenceDictionary,
regionFn: T => Seq[ReferenceRegion],
productFn: T => U,
unproductFn: U => T,
optPartitionMap: Option[Array[Option[(ReferenceRegion, ReferenceRegion)]]] = None)(
implicit tTag: ClassTag[T],
uTag: ClassTag[U]) extends GenericGenomicDataset[T, U] {
Expand All @@ -2404,7 +2445,9 @@ case class RDDBoundGenericGenomicDataset[T, U <: Product: TypeTag](
val iterableRdds = rdds.toSeq
RDDBoundGenericGenomicDataset(rdd.context.union(rdd, iterableRdds.map(_.rdd): _*),
iterableRdds.map(_.sequences).fold(sequences)(_ ++ _),
regionFn)
regionFn,
productFn,
unproductFn)
}

// this cannot be in the GenericGenomicDataset trait due to need for the
Expand All @@ -2413,14 +2456,22 @@ case class RDDBoundGenericGenomicDataset[T, U <: Product: TypeTag](
newRdd: RDD[T],
newPartitionMap: Option[Array[Option[(ReferenceRegion, ReferenceRegion)]]] = None): GenericGenomicDataset[T, U] = {

RDDBoundGenericGenomicDataset(newRdd, sequences, regionFn,
RDDBoundGenericGenomicDataset(newRdd,
sequences,
regionFn,
productFn,
unproductFn,
optPartitionMap = newPartitionMap)
}

// this cannot be in the GenericGenomicDataset trait due to need for the
// implicit classtag
def transformDataset(tFn: Dataset[U] => Dataset[U]): GenericGenomicDataset[T, U] = {
DatasetBoundGenericGenomicDataset(tFn(dataset), sequences, regionFn)
DatasetBoundGenericGenomicDataset(tFn(dataset),
sequences,
regionFn,
productFn,
unproductFn)
}
}

Expand Down
Expand Up @@ -187,6 +187,9 @@ case class RDDBoundNucleotideContigFragmentRDD private[rdd] (

sealed abstract class NucleotideContigFragmentRDD extends AvroGenomicDataset[NucleotideContigFragment, NucleotideContigFragmentProduct, NucleotideContigFragmentRDD] {

protected val productFn = NucleotideContigFragmentProduct.fromAvro(_)
protected val unproductFn = (c: NucleotideContigFragmentProduct) => c.toAvro

@transient val uTag: TypeTag[NucleotideContigFragmentProduct] = typeTag[NucleotideContigFragmentProduct]

protected def buildTree(rdd: RDD[(ReferenceRegion, NucleotideContigFragment)])(
Expand Down
Expand Up @@ -154,6 +154,9 @@ case class RDDBoundCoverageRDD private[rdd] (

abstract class CoverageRDD extends GenomicDataset[Coverage, Coverage, CoverageRDD] {

protected val productFn = (c: Coverage) => c
protected val unproductFn = (c: Coverage) => c

@transient val uTag: TypeTag[Coverage] = typeTag[Coverage]

protected def buildTree(rdd: RDD[(ReferenceRegion, Coverage)])(
Expand Down
Expand Up @@ -338,6 +338,9 @@ case class RDDBoundFeatureRDD private[rdd] (

sealed abstract class FeatureRDD extends AvroGenomicDataset[Feature, FeatureProduct, FeatureRDD] {

protected val productFn = FeatureProduct.fromAvro(_)
protected val unproductFn = (f: FeatureProduct) => f.toAvro

@transient val uTag: TypeTag[FeatureProduct] = typeTag[FeatureProduct]

protected def buildTree(rdd: RDD[(ReferenceRegion, Feature)])(
Expand Down
Expand Up @@ -258,6 +258,9 @@ case class RDDBoundFragmentRDD private[rdd] (

sealed abstract class FragmentRDD extends AvroRecordGroupGenomicDataset[Fragment, FragmentProduct, FragmentRDD] {

protected val productFn = FragmentProduct.fromAvro(_)
protected val unproductFn = (f: FragmentProduct) => f.toAvro

@transient val uTag: TypeTag[FragmentProduct] = typeTag[FragmentProduct]

protected def buildTree(rdd: RDD[(ReferenceRegion, Fragment)])(
Expand Down
Expand Up @@ -341,6 +341,9 @@ private case class AlignmentWindow(contigName: String, start: Long, end: Long) {

sealed abstract class AlignmentRecordRDD extends AvroRecordGroupGenomicDataset[AlignmentRecord, AlignmentRecordProduct, AlignmentRecordRDD] {

protected val productFn = AlignmentRecordProduct.fromAvro(_)
protected val unproductFn = (a: AlignmentRecordProduct) => a.toAvro

@transient val uTag: TypeTag[AlignmentRecordProduct] = typeTag[AlignmentRecordProduct]

/**
Expand Down
Expand Up @@ -212,6 +212,9 @@ case class RDDBoundGenotypeRDD private[rdd] (

sealed abstract class GenotypeRDD extends MultisampleAvroGenomicDataset[Genotype, GenotypeProduct, GenotypeRDD] {

protected val productFn = GenotypeProduct.fromAvro(_)
protected val unproductFn = (g: GenotypeProduct) => g.toAvro

@transient val uTag: TypeTag[GenotypeProduct] = typeTag[GenotypeProduct]

val headerLines: Seq[VCFHeaderLine]
Expand Down
Expand Up @@ -191,6 +191,9 @@ case class RDDBoundVariantContextRDD private[rdd] (
*/
sealed abstract class VariantContextRDD extends MultisampleGenomicDataset[VariantContext, VariantContextProduct, VariantContextRDD] with Logging {

protected val productFn = VariantContextProduct.fromModel(_)
protected val unproductFn = (vc: VariantContextProduct) => vc.toModel

@transient val uTag: TypeTag[VariantContextProduct] = typeTag[VariantContextProduct]

val headerLines: Seq[VCFHeaderLine]
Expand Down
Expand Up @@ -196,6 +196,9 @@ case class RDDBoundVariantRDD private[rdd] (

sealed abstract class VariantRDD extends AvroGenomicDataset[Variant, VariantProduct, VariantRDD] {

protected val productFn = VariantProduct.fromAvro(_)
protected val unproductFn = (v: VariantProduct) => v.toAvro

@transient val uTag: TypeTag[VariantProduct] = typeTag[VariantProduct]

val headerLines: Seq[VCFHeaderLine]
Expand Down

0 comments on commit b75295e

Please sign in to comment.