Skip to content

Commit

Permalink
Add lazy binder for Datasets, allow read directly from sql. Demo only…
Browse files Browse the repository at this point in the history
… on AlignmentRecord.
  • Loading branch information
fnothaft committed May 11, 2017
1 parent 6f0fad3 commit 92db52a
Show file tree
Hide file tree
Showing 14 changed files with 194 additions and 47 deletions.
21 changes: 16 additions & 5 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Expand Up @@ -54,7 +54,11 @@ import org.bdgenomics.adam.projections.{
import org.bdgenomics.adam.rdd.contig.NucleotideContigFragmentRDD
import org.bdgenomics.adam.rdd.feature._
import org.bdgenomics.adam.rdd.fragment.FragmentRDD
import org.bdgenomics.adam.rdd.read.{ AlignmentRecordRDD, RepairPartitions }
import org.bdgenomics.adam.rdd.read.{
AlignmentRecordRDD,
RepairPartitions,
ParquetUnboundAlignmentRecordRDD
}
import org.bdgenomics.adam.rdd.variant._
import org.bdgenomics.adam.rich.RichAlignmentRecord
import org.bdgenomics.adam.util.FileExtensions._
Expand Down Expand Up @@ -800,16 +804,23 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
optPredicate: Option[FilterPredicate] = None,
optProjection: Option[Schema] = None): AlignmentRecordRDD = {

// load from disk
val rdd = loadParquet[AlignmentRecord](pathName, optPredicate, optProjection)

// convert avro to sequence dictionary
val sd = loadAvroSequenceDictionary(pathName)

// convert avro to sequence dictionary
val rgd = loadAvroRecordGroupDictionary(pathName)

AlignmentRecordRDD(rdd, sd, rgd)
(optPredicate, optProjection) match {
case (None, None) => {
ParquetUnboundAlignmentRecordRDD(sc, pathName, sd, rgd)
}
case (_, _) => {
// load from disk
val rdd = loadParquet[AlignmentRecord](pathName, optPredicate, optProjection)

AlignmentRecordRDD(rdd, sd, rgd)
}
}
}

/**
Expand Down
39 changes: 34 additions & 5 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.SparkFiles
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.function.{ Function => JFunction }
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{ DataFrame, Dataset }
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.models.{
RecordGroupDictionary,
Expand Down Expand Up @@ -795,13 +796,41 @@ trait MultisampleGenomicRDD[T, U <: MultisampleGenomicRDD[T, U]] extends Genomic
val samples: Seq[Sample]
}

/**
* A trait describing a GenomicRDD that also supports the Spark SQL APIs.
*/
trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends GenomicRDD[T, V] {

/**
* This data as a Spark SQL Dataset.
*/
val dataset: Dataset[U]

/**
* @return This data as a Spark SQL DataFrame.
*/
def toDF(): DataFrame = {
dataset.toDF()
}

/**
* Applies a function that transforms the underlying Dataset into a new Dataset
* using the Spark SQL API.
*
* @param tFn A function that transforms the underlying RDD as a Dataset.
* @return A new RDD where the RDD of genomic data has been replaced, but the
* metadata (sequence dictionary, and etc) is copied without modification.
*/
def transformDataset(tFn: Dataset[U] => Dataset[U]): V
}

/**
* An abstract class describing a GenomicRDD where:
*
* * The data are Avro IndexedRecords.
* * The data are associated to read groups (i.e., they are reads or fragments).
*/
abstract class AvroReadGroupGenomicRDD[T <% IndexedRecord: Manifest, U <: AvroReadGroupGenomicRDD[T, U]] extends AvroGenomicRDD[T, U] {
abstract class AvroReadGroupGenomicRDD[T <% IndexedRecord: Manifest, U <: Product, V <: AvroReadGroupGenomicRDD[T, U, V]] extends AvroGenomicRDD[T, U, V] {

/**
* A dictionary describing the read groups attached to this GenomicRDD.
Expand Down Expand Up @@ -831,8 +860,8 @@ abstract class AvroReadGroupGenomicRDD[T <% IndexedRecord: Manifest, U <: AvroRe
* An abstract class that extends the MultisampleGenomicRDD trait, where the data
* are Avro IndexedRecords.
*/
abstract class MultisampleAvroGenomicRDD[T <% IndexedRecord: Manifest, U <: MultisampleAvroGenomicRDD[T, U]] extends AvroGenomicRDD[T, U]
with MultisampleGenomicRDD[T, U] {
abstract class MultisampleAvroGenomicRDD[T <% IndexedRecord: Manifest, U <: Product, V <: MultisampleAvroGenomicRDD[T, U, V]] extends AvroGenomicRDD[T, U, V]
with MultisampleGenomicRDD[T, V] {

/**
* The header lines attached to the file.
Expand Down Expand Up @@ -866,8 +895,8 @@ abstract class MultisampleAvroGenomicRDD[T <% IndexedRecord: Manifest, U <: Mult
* Avro IndexedRecords. This abstract class provides methods for saving to
* Parquet, and provides hooks for writing the metadata.
*/
abstract class AvroGenomicRDD[T <% IndexedRecord: Manifest, U <: AvroGenomicRDD[T, U]] extends ADAMRDDFunctions[T]
with GenomicRDD[T, U] {
abstract class AvroGenomicRDD[T <% IndexedRecord: Manifest, U <: Product, V <: AvroGenomicRDD[T, U, V]] extends ADAMRDDFunctions[T]
with GenomicDataset[T, U, V] {

/**
* Called in saveAsParquet after saving RDD to Parquet to save metadata.
Expand Down
Expand Up @@ -105,7 +105,7 @@ private[rdd] object NucleotideContigFragmentRDD extends Serializable {
*/
case class NucleotideContigFragmentRDD(
rdd: RDD[NucleotideContigFragment],
sequences: SequenceDictionary) extends AvroGenomicRDD[NucleotideContigFragment, NucleotideContigFragmentRDD] with ReferenceFile {
sequences: SequenceDictionary) extends AvroGenomicRDD[NucleotideContigFragment, NucleotideContigFragmentProduct, NucleotideContigFragmentRDD] with ReferenceFile {

protected def buildTree(rdd: RDD[(ReferenceRegion, NucleotideContigFragment)])(
implicit tTag: ClassTag[NucleotideContigFragment]): IntervalArray[ReferenceRegion, NucleotideContigFragment] = {
Expand Down Expand Up @@ -144,9 +144,9 @@ case class NucleotideContigFragmentRDD(
}

/**
* @return Creates a SQL Dataset of contig fragments.
* A SQL Dataset of contig fragments.
*/
def toDataset(): Dataset[NucleotideContigFragmentProduct] = {
lazy val dataset: Dataset[NucleotideContigFragmentProduct] = {
val sqlContext = SQLContext.getOrCreate(rdd.context)
import sqlContext.implicits._
sqlContext.createDataset(rdd.map(NucleotideContigFragmentProduct.fromAvro))
Expand All @@ -162,7 +162,7 @@ case class NucleotideContigFragmentRDD(
*/
def transformDataset(
tFn: Dataset[NucleotideContigFragmentProduct] => Dataset[NucleotideContigFragmentProduct]): NucleotideContigFragmentRDD = {
replaceRdd(tFn(toDataset()).rdd.map(_.toAvro))
replaceRdd(tFn(dataset).rdd.map(_.toAvro))
}

/**
Expand Down
Expand Up @@ -20,13 +20,14 @@ package org.bdgenomics.adam.rdd.feature
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.serializers.FieldSerializer
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{ Dataset, SQLContext }
import org.bdgenomics.adam.models.{
Coverage,
ReferenceRegion,
ReferenceRegionSerializer,
SequenceDictionary
}
import org.bdgenomics.adam.rdd.GenomicRDD
import org.bdgenomics.adam.rdd.GenomicDataset
import org.bdgenomics.utils.interval.array.{
IntervalArray,
IntervalArraySerializer
Expand Down Expand Up @@ -67,13 +68,24 @@ private[adam] class CoverageArraySerializer(kryo: Kryo) extends IntervalArraySer
* @param sequences A dictionary describing the reference genome.
*/
case class CoverageRDD(rdd: RDD[Coverage],
sequences: SequenceDictionary) extends GenomicRDD[Coverage, CoverageRDD] {
sequences: SequenceDictionary) extends GenomicDataset[Coverage, Coverage, CoverageRDD] {

protected def buildTree(rdd: RDD[(ReferenceRegion, Coverage)])(
implicit tTag: ClassTag[Coverage]): IntervalArray[ReferenceRegion, Coverage] = {
IntervalArray(rdd, CoverageArray.apply(_, _))
}

lazy val dataset: Dataset[Coverage] = {
val sqlContext = SQLContext.getOrCreate(rdd.context)
import sqlContext.implicits._
sqlContext.createDataset(rdd)
}

def transformDataset(
tFn: Dataset[Coverage] => Dataset[Coverage]): CoverageRDD = {
copy(rdd = tFn(dataset).rdd)
}

/**
* Saves coverage as feature file.
*
Expand Down
Expand Up @@ -245,17 +245,17 @@ object FeatureRDD {
* @param sequences The reference genome this data is aligned to.
*/
case class FeatureRDD(rdd: RDD[Feature],
sequences: SequenceDictionary) extends AvroGenomicRDD[Feature, FeatureRDD] with Logging {
sequences: SequenceDictionary) extends AvroGenomicRDD[Feature, FeatureProduct, FeatureRDD] with Logging {

protected def buildTree(rdd: RDD[(ReferenceRegion, Feature)])(
implicit tTag: ClassTag[Feature]): IntervalArray[ReferenceRegion, Feature] = {
IntervalArray(rdd, FeatureArray.apply(_, _))
}

/**
* @return Creates a SQL Dataset of genotypes.
* A SQL Dataset of features.
*/
def toDataset(): Dataset[FeatureProduct] = {
lazy val dataset: Dataset[FeatureProduct] = {
val sqlContext = SQLContext.getOrCreate(rdd.context)
import sqlContext.implicits._
sqlContext.createDataset(rdd.map(FeatureProduct.fromAvro))
Expand All @@ -271,7 +271,7 @@ case class FeatureRDD(rdd: RDD[Feature],
*/
def transformDataset(
tFn: Dataset[FeatureProduct] => Dataset[FeatureProduct]): FeatureRDD = {
replaceRdd(tFn(toDataset()).rdd.map(_.toAvro))
replaceRdd(tFn(dataset).rdd.map(_.toAvro))
}

/**
Expand Down
Expand Up @@ -107,7 +107,7 @@ object FragmentRDD {
*/
case class FragmentRDD(rdd: RDD[Fragment],
sequences: SequenceDictionary,
recordGroups: RecordGroupDictionary) extends AvroReadGroupGenomicRDD[Fragment, FragmentRDD] {
recordGroups: RecordGroupDictionary) extends AvroReadGroupGenomicRDD[Fragment, FragmentProduct, FragmentRDD] {

protected def buildTree(rdd: RDD[(ReferenceRegion, Fragment)])(
implicit tTag: ClassTag[Fragment]): IntervalArray[ReferenceRegion, Fragment] = {
Expand All @@ -126,9 +126,9 @@ case class FragmentRDD(rdd: RDD[Fragment],
}

/**
* @return Creates a SQL Dataset of fragments.
* A SQL Dataset of fragments.
*/
def toDataset(): Dataset[FragmentProduct] = {
lazy val dataset: Dataset[FragmentProduct] = {
val sqlContext = SQLContext.getOrCreate(rdd.context)
import sqlContext.implicits._
sqlContext.createDataset(rdd.map(FragmentProduct.fromAvro))
Expand All @@ -144,7 +144,7 @@ case class FragmentRDD(rdd: RDD[Fragment],
*/
def transformDataset(
tFn: Dataset[FragmentProduct] => Dataset[FragmentProduct]): FragmentRDD = {
replaceRdd(tFn(toDataset()).rdd.map(_.toAvro))
replaceRdd(tFn(dataset).rdd.map(_.toAvro))
}

/**
Expand Down
Expand Up @@ -25,6 +25,8 @@ import java.net.URI
import java.nio.file.Paths
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.LongWritable
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.SparkContext
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.MetricsContext._
Expand Down Expand Up @@ -98,21 +100,80 @@ object AlignmentRecordRDD {
SequenceDictionary.empty,
RecordGroupDictionary.empty)
}

def apply(rdd: RDD[AlignmentRecord],
sequences: SequenceDictionary,
recordGroups: RecordGroupDictionary): AlignmentRecordRDD = {
RDDBoundAlignmentRecordRDD(rdd, sequences, recordGroups)
}

def apply(ds: Dataset[AlignmentRecordProduct],
sequences: SequenceDictionary,
recordGroups: RecordGroupDictionary): AlignmentRecordRDD = {
DatasetBoundAlignmentRecordRDD(ds, sequences, recordGroups)
}
}

case class AlignmentRecordRDD(
case class ParquetUnboundAlignmentRecordRDD private[rdd] (
private val sc: SparkContext,
private val parquetFilename: String,
sequences: SequenceDictionary,
recordGroups: RecordGroupDictionary) extends AlignmentRecordRDD {

lazy val rdd: RDD[AlignmentRecord] = {
sc.loadParquet(parquetFilename)
}

lazy val dataset = {
val sqlContext = SQLContext.getOrCreate(sc)
import sqlContext.implicits._
sqlContext.read.parquet(parquetFilename).as[AlignmentRecordProduct]
}
}

case class DatasetBoundAlignmentRecordRDD private[rdd] (
dataset: Dataset[AlignmentRecordProduct],
sequences: SequenceDictionary,
recordGroups: RecordGroupDictionary) extends AlignmentRecordRDD {

lazy val rdd = dataset.rdd.map(_.toAvro)

override def saveAsParquet(filePath: String,
blockSize: Int = 128 * 1024 * 1024,
pageSize: Int = 1 * 1024 * 1024,
compressCodec: CompressionCodecName = CompressionCodecName.GZIP,
disableDictionaryEncoding: Boolean = false) {
log.warn("Saving directly as Parquet from SQL. Options other than compression codec are ignored.")
dataset.toDF()
.write
.format("parquet")
.option("spark.sql.parquet.compression.codec", compressCodec.toString.toLowerCase())
.save(filePath)
saveMetadata(filePath)
}

override def transformDataset(
tFn: Dataset[AlignmentRecordProduct] => Dataset[AlignmentRecordProduct]): AlignmentRecordRDD = {
copy(dataset = tFn(dataset))
}
}

case class RDDBoundAlignmentRecordRDD private[rdd] (
rdd: RDD[AlignmentRecord],
sequences: SequenceDictionary,
recordGroups: RecordGroupDictionary) extends AvroReadGroupGenomicRDD[AlignmentRecord, AlignmentRecordRDD] {
recordGroups: RecordGroupDictionary) extends AlignmentRecordRDD {

/**
* @return Creates a SQL Dataset of reads.
* A SQL Dataset of reads.
*/
def toDataset(): Dataset[AlignmentRecordProduct] = {
lazy val dataset: Dataset[AlignmentRecordProduct] = {
val sqlContext = SQLContext.getOrCreate(rdd.context)
import sqlContext.implicits._
sqlContext.createDataset(rdd.map(AlignmentRecordProduct.fromAvro))
}
}

abstract class AlignmentRecordRDD extends AvroReadGroupGenomicRDD[AlignmentRecord, AlignmentRecordProduct, AlignmentRecordRDD] {

/**
* Applies a function that transforms the underlying RDD into a new RDD using
Expand All @@ -124,7 +185,8 @@ case class AlignmentRecordRDD(
*/
def transformDataset(
tFn: Dataset[AlignmentRecordProduct] => Dataset[AlignmentRecordProduct]): AlignmentRecordRDD = {
replaceRdd(tFn(toDataset()).rdd.map(_.toAvro))
DatasetBoundAlignmentRecordRDD(dataset, sequences, recordGroups)
.transformDataset(tFn)
}

/**
Expand All @@ -136,13 +198,13 @@ case class AlignmentRecordRDD(
*/
protected def replaceRddAndSequences(newRdd: RDD[AlignmentRecord],
newSequences: SequenceDictionary): AlignmentRecordRDD = {
AlignmentRecordRDD(newRdd,
RDDBoundAlignmentRecordRDD(newRdd,
newSequences,
recordGroups)
}

protected def replaceRdd(newRdd: RDD[AlignmentRecord]): AlignmentRecordRDD = {
copy(rdd = newRdd)
RDDBoundAlignmentRecordRDD(newRdd, sequences, recordGroups)
}

protected def buildTree(rdd: RDD[(ReferenceRegion, AlignmentRecord)])(
Expand Down
Expand Up @@ -79,17 +79,17 @@ private[adam] class GenotypeArraySerializer extends IntervalArraySerializer[Refe
case class GenotypeRDD(rdd: RDD[Genotype],
sequences: SequenceDictionary,
@transient samples: Seq[Sample],
@transient headerLines: Seq[VCFHeaderLine] = DefaultHeaderLines.allHeaderLines) extends MultisampleAvroGenomicRDD[Genotype, GenotypeRDD] {
@transient headerLines: Seq[VCFHeaderLine] = DefaultHeaderLines.allHeaderLines) extends MultisampleAvroGenomicRDD[Genotype, GenotypeProduct, GenotypeRDD] {

protected def buildTree(rdd: RDD[(ReferenceRegion, Genotype)])(
implicit tTag: ClassTag[Genotype]): IntervalArray[ReferenceRegion, Genotype] = {
IntervalArray(rdd, GenotypeArray.apply(_, _))
}

/**
* @return Creates a SQL Dataset of genotypes.
* A SQL Dataset of genotypes.
*/
def toDataset(): Dataset[GenotypeProduct] = {
lazy val dataset: Dataset[GenotypeProduct] = {
val sqlContext = SQLContext.getOrCreate(rdd.context)
import sqlContext.implicits._
sqlContext.createDataset(rdd.map(GenotypeProduct.fromAvro))
Expand All @@ -105,7 +105,7 @@ case class GenotypeRDD(rdd: RDD[Genotype],
*/
def transformDataset(
tFn: Dataset[GenotypeProduct] => Dataset[GenotypeProduct]): GenotypeRDD = {
replaceRdd(tFn(toDataset()).rdd.map(_.toAvro))
replaceRdd(tFn(dataset).rdd.map(_.toAvro))
}

/**
Expand Down

0 comments on commit 92db52a

Please sign in to comment.