Skip to content

Commit

Permalink
[ADAM-1867] Add support for processing VariantContexts from SQL.
Browse files Browse the repository at this point in the history
  • Loading branch information
Frank Austin Nothaft committed Mar 7, 2018
1 parent f616784 commit 3756675
Show file tree
Hide file tree
Showing 4 changed files with 251 additions and 28 deletions.
50 changes: 47 additions & 3 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.spark.SparkContext
import org.apache.spark.rdd.MetricsContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{ SparkSession, Dataset }
import org.apache.spark.sql.{ Dataset, SparkSession, SQLContext }
import org.bdgenomics.adam.converters._
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.io._
Expand Down Expand Up @@ -79,7 +79,8 @@ import org.bdgenomics.adam.sql.{
Fragment => FragmentProduct,
Genotype => GenotypeProduct,
NucleotideContigFragment => NucleotideContigFragmentProduct,
Variant => VariantProduct
Variant => VariantProduct,
VariantContext => VariantContextProduct
}
import org.bdgenomics.adam.util.FileExtensions._
import org.bdgenomics.adam.util.{
Expand Down Expand Up @@ -630,7 +631,7 @@ object ADAMContext {
implicit def genericToVariantContextsConversionFn[Y <: GenericGenomicRDD[_]](
gRdd: Y,
rdd: RDD[VariantContext]): VariantContextRDD = {
new VariantContextRDD(rdd,
new RDDBoundVariantContextRDD(rdd,
gRdd.sequences,
Seq.empty,
DefaultHeaderLines.allHeaderLines,
Expand Down Expand Up @@ -2250,6 +2251,49 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

/**
* Load a path name in Parquet + Avro format into a VariantContextRDD.
*
* @param pathName The path name to load genotypes from.
* Globs/directories are supported.
* @return Returns a GenotypeRDD.
*/
def loadVariantContexts(
pathName: String): VariantContextRDD = {

if (isVcfExt(pathName)) {
loadVcf(pathName)
} else {
loadParquetVariantContexts(pathName)
}
}

/**
* Load a path name in Parquet + Avro format into a VariantContextRDD.
*
* @param pathName The path name to load genotypes from.
* Globs/directories are supported.
* @return Returns a GenotypeRDD.
*/
def loadParquetVariantContexts(
pathName: String): VariantContextRDD = {

// load header lines
val headers = loadHeaderLines(pathName)

// load sequence info
val sd = loadAvroSequenceDictionary(pathName)

// load avro record group dictionary and convert to samples
val samples = loadAvroSamples(pathName)

val sqlContext = SQLContext.getOrCreate(sc)
import sqlContext.implicits._
val ds = sqlContext.read.parquet(pathName).as[VariantContextProduct]

new DatasetBoundVariantContextRDD(ds, sd, samples, headers)
}

/**
* Load a path name in Parquet + Avro format into a VariantRDD.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,18 @@ import htsjdk.variant.vcf.{
VCFHeader,
VCFHeaderLine
}
import org.apache.avro.Schema
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.IndexedRecord
import org.apache.avro.specific.{ SpecificDatumWriter, SpecificRecordBase }
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{ Dataset, SQLContext }
import org.bdgenomics.adam.converters.{
DefaultHeaderLines,
VariantContextConverter
Expand All @@ -43,12 +50,14 @@ import org.bdgenomics.adam.models.{
}
import org.bdgenomics.adam.rdd.{
ADAMSaveAnyArgs,
GenomicDataset,
MultisampleGenomicRDD,
VCFHeaderUtils,
VCFSupportingGenomicRDD
}
import org.bdgenomics.adam.sql.{ VariantContext => VariantContextProduct }
import org.bdgenomics.adam.util.{ FileMerger, FileExtensions }
import org.bdgenomics.formats.avro.Sample
import org.bdgenomics.formats.avro.{ Contig, Sample }
import org.bdgenomics.utils.misc.Logging
import org.bdgenomics.utils.interval.array.{
IntervalArray,
Expand All @@ -61,6 +70,7 @@ import org.seqdoop.hadoop_bam.util.{
}
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import scala.reflect.runtime.universe._

private[adam] case class VariantContextArray(
array: Array[(ReferenceRegion, VariantContext)],
Expand Down Expand Up @@ -102,51 +112,168 @@ object VariantContextRDD extends Serializable {
sequences: SequenceDictionary,
samples: Iterable[Sample],
headerLines: Seq[VCFHeaderLine]): VariantContextRDD = {
VariantContextRDD(rdd, sequences, samples.toSeq, headerLines, None)
RDDBoundVariantContextRDD(rdd, sequences, samples.toSeq, headerLines, None)
}

def apply(rdd: RDD[VariantContext],
sequences: SequenceDictionary,
samples: Iterable[Sample]): VariantContextRDD = {
VariantContextRDD(rdd, sequences, samples.toSeq, null)
RDDBoundVariantContextRDD(rdd, sequences, samples.toSeq, null)
}
}

/**
* An RDD containing VariantContexts attached to a reference and samples.
*
* @param rdd The underlying RDD of VariantContexts.
* @param sequences The genome sequence these variants were called against.
* @param samples The genotyped samples in this RDD of VariantContexts.
* @param headerLines The VCF header lines that cover all INFO/FORMAT fields
* needed to represent this RDD of VariantContexts.
*/
case class VariantContextRDD(rdd: RDD[VariantContext],
sequences: SequenceDictionary,
@transient samples: Seq[Sample],
@transient headerLines: Seq[VCFHeaderLine],
optPartitionMap: Option[Array[Option[(ReferenceRegion, ReferenceRegion)]]]) extends MultisampleGenomicRDD[VariantContext, VariantContextRDD]
with Logging
with VCFSupportingGenomicRDD[VariantContext, VariantContextRDD] {
case class DatasetBoundVariantContextRDD private[rdd] (
dataset: Dataset[VariantContextProduct],
sequences: SequenceDictionary,
@transient samples: Seq[Sample],
@transient headerLines: Seq[VCFHeaderLine] = DefaultHeaderLines.allHeaderLines) extends VariantContextRDD {

protected lazy val optPartitionMap = None

lazy val rdd = dataset.rdd.map(_.toModel)

override def transformDataset(
tFn: Dataset[VariantContextProduct] => Dataset[VariantContextProduct]): VariantContextRDD = {
copy(dataset = tFn(dataset))
}

def replaceSequences(
newSequences: SequenceDictionary): VariantContextRDD = {
copy(sequences = newSequences)
}

def replaceHeaderLines(newHeaderLines: Seq[VCFHeaderLine]): VariantContextRDD = {
copy(headerLines = newHeaderLines)
}

def replaceSamples(newSamples: Iterable[Sample]): VariantContextRDD = {
copy(samples = newSamples.toSeq)
}
}

case class RDDBoundVariantContextRDD private[rdd] (
rdd: RDD[VariantContext],
sequences: SequenceDictionary,
@transient samples: Seq[Sample],
@transient headerLines: Seq[VCFHeaderLine] = DefaultHeaderLines.allHeaderLines,
optPartitionMap: Option[Array[Option[(ReferenceRegion, ReferenceRegion)]]] = None) extends VariantContextRDD {

/**
* Replaces the header lines attached to this RDD.
*
* @param newHeaderLines The new header lines to attach to this RDD.
* @return A new RDD with the header lines replaced.
* A SQL Dataset of variant contexts.
*/
lazy val dataset: Dataset[VariantContextProduct] = {
val sqlContext = SQLContext.getOrCreate(rdd.context)
import sqlContext.implicits._
sqlContext.createDataset(rdd.map(VariantContextProduct.fromModel))
}

def replaceSequences(
newSequences: SequenceDictionary): VariantContextRDD = {
copy(sequences = newSequences)
}

def replaceHeaderLines(newHeaderLines: Seq[VCFHeaderLine]): VariantContextRDD = {
copy(headerLines = newHeaderLines)
}

def replaceSamples(newSamples: Iterable[Sample]): VariantContextRDD = {
copy(samples = newSamples.toSeq)
}
}

/**
* An RDD containing VariantContexts attached to a reference and samples.
*/
sealed abstract class VariantContextRDD extends MultisampleGenomicRDD[VariantContext, VariantContextRDD] with GenomicDataset[VariantContext, VariantContextProduct, VariantContextRDD] with Logging with VCFSupportingGenomicRDD[VariantContext, VariantContextRDD] {

@transient val uTag: TypeTag[VariantContextProduct] = typeTag[VariantContextProduct]

val headerLines: Seq[VCFHeaderLine]

protected def saveAvro[U <: SpecificRecordBase](pathName: String,
sc: SparkContext,
schema: Schema,
avro: Seq[U])(implicit tUag: ClassTag[U]) {

// get our current file system
val path = new Path(pathName)
val fs = path.getFileSystem(sc.hadoopConfiguration)

// get an output stream
val os = fs.create(path)

// set up avro for writing
val dw = new SpecificDatumWriter[U](schema)
val fw = new DataFileWriter[U](dw)
fw.create(schema, os)

// write all our records
avro.foreach(r => fw.append(r))

// close the file
fw.close()
os.close()
}

protected def saveSequences(filePath: String): Unit = {
// convert sequence dictionary to avro form and save
val contigs = sequences.toAvro

saveAvro("%s/_seqdict.avro".format(filePath),
rdd.context,
Contig.SCHEMA$,
contigs)
}

protected def saveSamples(filePath: String): Unit = {
// get file to write to
saveAvro("%s/_samples.avro".format(filePath),
rdd.context,
Sample.SCHEMA$,
samples)
}

def saveVcfHeaders(filePath: String): Unit = {
// write vcf headers to file
VCFHeaderUtils.write(new VCFHeader(headerLines.toSet),
new Path("%s/_header".format(filePath)),
rdd.context.hadoopConfiguration,
false,
false)
}

protected def saveMetadata(filePath: String): Unit = {
saveSequences(filePath)
saveSamples(filePath)
saveVcfHeaders(filePath)
}

def saveAsParquet(filePath: String,
blockSize: Int = 128 * 1024 * 1024,
pageSize: Int = 1 * 1024 * 1024,
compressCodec: CompressionCodecName = CompressionCodecName.GZIP,
disableDictionaryEncoding: Boolean = false) {
log.warn("Saving directly as Parquet from SQL. Options other than compression codec are ignored.")
dataset.toDF()
.write
.format("parquet")
.option("spark.sql.parquet.compression.codec", compressCodec.toString.toLowerCase())
.save(filePath)
saveMetadata(filePath)
}

def transformDataset(
tFn: Dataset[VariantContextProduct] => Dataset[VariantContextProduct]): VariantContextRDD = {
DatasetBoundVariantContextRDD(tFn(dataset), sequences, samples, headerLines)
}

/**
* Replaces the header lines attached to this RDD.
*
* @param newHeaderLines The new header lines to attach to this RDD.
* @return A new RDD with the header lines replaced.
*/
def replaceHeaderLines(newHeaderLines: Seq[VCFHeaderLine]): VariantContextRDD

protected def buildTree(rdd: RDD[(ReferenceRegion, VariantContext)])(
implicit tTag: ClassTag[VariantContext]): IntervalArray[ReferenceRegion, VariantContext] = {
Expand Down Expand Up @@ -356,7 +483,11 @@ case class VariantContextRDD(rdd: RDD[VariantContext],
*/
protected def replaceRdd(newRdd: RDD[VariantContext],
newPartitionMap: Option[Array[Option[(ReferenceRegion, ReferenceRegion)]]] = None): VariantContextRDD = {
copy(rdd = newRdd, optPartitionMap = newPartitionMap)
RDDBoundVariantContextRDD(newRdd,
sequences,
samples,
headerLines,
newPartitionMap)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ sealed abstract class VariantRDD extends AvroGenomicRDD[Variant, VariantProduct,
* @return Returns this VariantRDD as a VariantContextRDD.
*/
def toVariantContexts(): VariantContextRDD = {
VariantContextRDD(rdd.map(VariantContext(_)),
new RDDBoundVariantContextRDD(rdd.map(VariantContext(_)),
sequences,
Seq.empty[Sample],
headerLines,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.sql

import org.bdgenomics.adam.models.{
ReferencePosition,
VariantContext => VariantContextModel
}
import org.bdgenomics.adam.rich.RichVariant

object VariantContext {

def fromModel(vc: VariantContextModel): VariantContext = {
VariantContext(vc.position.referenceName,
vc.position.start,
vc.variant.variant.getEnd,
Variant.fromAvro(vc.variant.variant),
vc.genotypes.map(g => Genotype.fromAvro(g)).toSeq)
}
}

case class VariantContext(contigName: String,
start: Long,
end: Long,
variant: Variant,
genotypes: Seq[Genotype]) {

def toModel(): VariantContextModel = {
new VariantContextModel(new ReferencePosition(contigName, start),
RichVariant(variant.toAvro),
genotypes.map(_.toAvro))
}
}

0 comments on commit 3756675

Please sign in to comment.