Skip to content

Commit

Permalink
Merge 089214a into afa55a4
Browse files Browse the repository at this point in the history
  • Loading branch information
Henry Davidge committed Jun 20, 2018
2 parents afa55a4 + 089214a commit 82e03c9
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 84 deletions.
137 changes: 58 additions & 79 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Expand Up @@ -18,15 +18,10 @@
package org.bdgenomics.adam.rdd

import java.io.{ File, FileNotFoundException, InputStream }

import htsjdk.samtools.{ SAMFileHeader, SAMProgramRecord, ValidationStringency }
import htsjdk.samtools.util.Locatable
import htsjdk.variant.vcf.{
VCFHeader,
VCFCompoundHeaderLine,
VCFFormatHeaderLine,
VCFHeaderLine,
VCFInfoHeaderLine
}
import htsjdk.variant.vcf.{ VCFCompoundHeaderLine, VCFFormatHeaderLine, VCFHeader, VCFHeaderLine, VCFInfoHeaderLine }
import org.apache.avro.Schema
import org.apache.avro.file.DataFileStream
import org.apache.avro.generic.{ GenericDatumReader, GenericRecord, IndexedRecord }
Expand All @@ -42,66 +37,22 @@ import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.spark.SparkContext
import org.apache.spark.rdd.MetricsContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{ Dataset, SparkSession, SQLContext }
import org.apache.spark.sql.{ DataFrame, Dataset, SQLContext, SparkSession }
import org.bdgenomics.adam.converters._
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.io._
import org.bdgenomics.adam.models._
import org.bdgenomics.adam.projections.{
FeatureField,
Projection
}
import org.bdgenomics.adam.rdd.contig.{
DatasetBoundNucleotideContigFragmentRDD,
NucleotideContigFragmentRDD,
ParquetUnboundNucleotideContigFragmentRDD,
RDDBoundNucleotideContigFragmentRDD
}
import org.bdgenomics.adam.projections.{ FeatureField, Projection }
import org.bdgenomics.adam.rdd.contig.{ DatasetBoundNucleotideContigFragmentRDD, NucleotideContigFragmentRDD, ParquetUnboundNucleotideContigFragmentRDD, RDDBoundNucleotideContigFragmentRDD }
import org.bdgenomics.adam.rdd.feature._
import org.bdgenomics.adam.rdd.fragment.{
DatasetBoundFragmentRDD,
FragmentRDD,
ParquetUnboundFragmentRDD,
RDDBoundFragmentRDD
}
import org.bdgenomics.adam.rdd.read.{
AlignmentRecordRDD,
DatasetBoundAlignmentRecordRDD,
RepairPartitions,
ParquetUnboundAlignmentRecordRDD,
RDDBoundAlignmentRecordRDD
}
import org.bdgenomics.adam.rdd.fragment.{ DatasetBoundFragmentRDD, FragmentRDD, ParquetUnboundFragmentRDD, RDDBoundFragmentRDD }
import org.bdgenomics.adam.rdd.read.{ AlignmentRecordRDD, DatasetBoundAlignmentRecordRDD, ParquetUnboundAlignmentRecordRDD, RDDBoundAlignmentRecordRDD, RepairPartitions }
import org.bdgenomics.adam.rdd.variant._
import org.bdgenomics.adam.rich.RichAlignmentRecord
import org.bdgenomics.adam.sql.{
AlignmentRecord => AlignmentRecordProduct,
Feature => FeatureProduct,
Fragment => FragmentProduct,
Genotype => GenotypeProduct,
NucleotideContigFragment => NucleotideContigFragmentProduct,
Variant => VariantProduct,
VariantContext => VariantContextProduct
}
import org.bdgenomics.adam.sql.{ AlignmentRecord => AlignmentRecordProduct, Feature => FeatureProduct, Fragment => FragmentProduct, Genotype => GenotypeProduct, NucleotideContigFragment => NucleotideContigFragmentProduct, Variant => VariantProduct, VariantContext => VariantContextProduct }
import org.bdgenomics.adam.util.FileExtensions._
import org.bdgenomics.adam.util.{
GenomeFileReader,
ReferenceContigMap,
ReferenceFile,
SequenceDictionaryReader,
TwoBitFile
}
import org.bdgenomics.formats.avro.{
AlignmentRecord,
Contig,
Feature,
Fragment,
Genotype,
NucleotideContigFragment,
ProcessingStep,
RecordGroup => RecordGroupMetadata,
Sample,
Variant
}
import org.bdgenomics.adam.util.{ GenomeFileReader, ReferenceContigMap, ReferenceFile, SequenceDictionaryReader, TwoBitFile }
import org.bdgenomics.formats.avro.{ RecordGroup => RecordGroupMetadata, _ }
import org.bdgenomics.utils.instrumentation.Metrics
import org.bdgenomics.utils.io.LocalFileByteAccess
import org.bdgenomics.utils.misc.{ HadoopUtil, Logging }
Expand Down Expand Up @@ -1121,6 +1072,8 @@ private class NoPrefixFileFilter(private val prefix: String) extends PathFilter
* @param sc The SparkContext to wrap.
*/
class ADAMContext(@transient val sc: SparkContext) extends Serializable with Logging {
@transient val spark = SQLContext.getOrCreate(sc).sparkSession
import spark.implicits._

/**
* @param samHeader The header to extract a sequence dictionary from.
Expand Down Expand Up @@ -1885,15 +1838,9 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
optPredicate: Option[FilterPredicate] = None,
optProjection: Option[Schema] = None): AlignmentRecordRDD = {

// convert avro to sequence dictionary
val sd = loadAvroSequenceDictionary(pathName)

// convert avro to sequence dictionary
val rgd = loadAvroRecordGroupDictionary(pathName)

// load processing step descriptions
val pgs = loadAvroPrograms(pathName)

(optPredicate, optProjection) match {
case (None, None) => {
ParquetUnboundAlignmentRecordRDD(sc, pathName, sd, rgd, pgs)
Expand Down Expand Up @@ -2371,6 +2318,13 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

def loadVariantContexts(df: DataFrame, metadataPath: String): VariantContextRDD = {
val sd = loadAvroSequenceDictionary(metadataPath)
val samples = loadAvroSamples(metadataPath)
val headerLines = loadHeaderLines(metadataPath)
DatasetBoundVariantContextRDD(df.as[VariantContextProduct], sd, samples, headerLines)
}

/**
* Load a path name in Parquet + Avro format into a VariantContextRDD.
*
Expand All @@ -2380,21 +2334,9 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
*/
def loadParquetVariantContexts(
pathName: String): VariantContextRDD = {

// load header lines
val headers = loadHeaderLines(pathName)

// load sequence info
val sd = loadAvroSequenceDictionary(pathName)

// load avro record group dictionary and convert to samples
val samples = loadAvroSamples(pathName)

val sqlContext = SQLContext.getOrCreate(sc)
import sqlContext.implicits._
val ds = sqlContext.read.parquet(pathName).as[VariantContextProduct]

new DatasetBoundVariantContextRDD(ds, sd, samples, headers)
val df = sqlContext.read.parquet(pathName)
loadVariantContexts(df, pathName)
}

/**
Expand Down Expand Up @@ -2986,6 +2928,11 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

def loadFeatures(df: DataFrame, metadataPath: String): FeatureRDD = {
val sd = loadAvroSequenceDictionary(metadataPath)
DatasetBoundFeatureRDD(df.as[FeatureProduct], sd)
}

/**
* Load reference sequences into a broadcastable ReferenceFile.
*
Expand Down Expand Up @@ -3085,6 +3032,11 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

def loadContigFragments(df: DataFrame, metadataPath: String): NucleotideContigFragmentRDD = {
val sd = loadAvroSequenceDictionary(metadataPath)
DatasetBoundNucleotideContigFragmentRDD(df.as[NucleotideContigFragmentProduct], sd)
}

/**
* Load genotypes into a GenotypeRDD.
*
Expand Down Expand Up @@ -3120,6 +3072,13 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

def loadGenotypes(df: DataFrame, metadataPath: String): GenotypeRDD = {
val sd = loadAvroSequenceDictionary(metadataPath)
val samples = loadAvroSamples(metadataPath)
val headerLines = loadHeaderLines(metadataPath)
DatasetBoundGenotypeRDD(df.as[GenotypeProduct], sd, samples, headerLines)
}

/**
* Load variants into a VariantRDD.
*
Expand Down Expand Up @@ -3154,6 +3113,12 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

def loadVariants(df: DataFrame, metadataPath: String): VariantRDD = {
val sd = loadAvroSequenceDictionary(metadataPath)
val headerLines = loadHeaderLines(metadataPath)
DatasetBoundVariantRDD(df.as[VariantProduct], sd, headerLines)
}

/**
* Load alignment records into an AlignmentRecordRDD.
*
Expand Down Expand Up @@ -3224,6 +3189,13 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

def loadAlignments(df: DataFrame, metadataPath: String): AlignmentRecordRDD = {
val sd = loadAvroSequenceDictionary(metadataPath)
val rgd = loadAvroRecordGroupDictionary(metadataPath)
val process = loadAvroPrograms(metadataPath)
new DatasetBoundAlignmentRecordRDD(df.as[AlignmentRecordProduct], sd, rgd, process)
}

/**
* Load fragments into a FragmentRDD.
*
Expand Down Expand Up @@ -3282,6 +3254,13 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

def loadFragments(df: DataFrame, metadataPath: String): FragmentRDD = {
val sd = loadAvroSequenceDictionary(metadataPath)
val rgd = loadAvroRecordGroupDictionary(metadataPath)
val processingSteps = loadAvroPrograms(metadataPath)
DatasetBoundFragmentRDD(df.as[FragmentProduct], sd, rgd, processingSteps)
}

/**
* Return length of partitions in base pairs if the specified path of Parquet + Avro files is partitioned.
*
Expand Down
Expand Up @@ -17,22 +17,21 @@
*/
package org.bdgenomics.adam.rdd

import htsjdk.samtools.{
SAMFormatException,
SAMProgramRecord,
ValidationStringency
}
import htsjdk.samtools.{ SAMFormatException, SAMProgramRecord, ValidationStringency }
import java.io.{ File, FileNotFoundException }

import com.google.common.io.Files
import org.apache.hadoop.fs.Path
import org.apache.parquet.filter2.dsl.Dsl._
import org.apache.parquet.filter2.predicate.FilterPredicate
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.bdgenomics.adam.models._
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.util.PhredUtils._
import org.bdgenomics.adam.util.ADAMFunSuite
import org.bdgenomics.adam.sql.{ VariantContext => VCProduct }
import org.bdgenomics.formats.avro._
import org.seqdoop.hadoop_bam.CRAMInputFormat
import org.seqdoop.hadoop_bam.util.SAMHeaderReader
Expand Down Expand Up @@ -760,4 +759,91 @@ class ADAMContextSuite extends ADAMFunSuite {
assert(secondPg.head.getCommandLine === "\"myProg 456\"")
assert(secondPg.head.getVersion === "1.0.0")
}

sparkTest("load variant contexts from dataframe") {
val path = testFile("small.vcf")
val vcs = sc.loadVcf(path)
val outputDir = tmpLocation()
vcs.saveAsParquet(outputDir)
val df = SQLContext.getOrCreate(sc).read.parquet(outputDir)
val reloaded = sc.loadVariantContexts(df, outputDir)
assert(reloaded.sequences == vcs.sequences)
assert(reloaded.headerLines.toSet == vcs.headerLines.toSet)
assert(reloaded.samples == vcs.samples)
assert(reloaded.rdd.collect().map(VCProduct.fromModel).deep ==
vcs.rdd.collect().map(VCProduct.fromModel).deep)
}

sparkTest("load features from dataframe") {
val path = testFile("dvl1.200.bed")
val features = sc.loadFeatures(path)
val outputDir = tmpLocation()
features.saveAsParquet(outputDir)
val df = SQLContext.getOrCreate(sc).read.parquet(outputDir)
val reloaded = sc.loadFeatures(df, outputDir)
assert(reloaded.sequences == features.sequences)
assert(reloaded.rdd.collect().deep == features.rdd.collect().deep)
}

sparkTest("load contig fragments from dataframe") {
val inputPath = testFile("chr20.250k.fa.gz")
val contigFragments = sc.loadFasta(inputPath, 10000L)
val outputDir = tmpLocation()
contigFragments.saveAsParquet(outputDir)
val df = SQLContext.getOrCreate(sc).read.parquet(outputDir)
val reloaded = sc.loadContigFragments(df, outputDir)
assert(reloaded.sequences == contigFragments.sequences)
assert(reloaded.rdd.collect().deep == contigFragments.rdd.collect().deep)
}

sparkTest("load genotypes from dataframe") {
val path = testFile("small.vcf")
val gts = sc.loadGenotypes(path)
val outputDir = tmpLocation()
gts.saveAsParquet(outputDir)
val df = SQLContext.getOrCreate(sc).read.parquet(outputDir)
val reloaded = sc.loadGenotypes(df, outputDir)
assert(reloaded.sequences == gts.sequences)
assert(reloaded.headerLines.toSet == gts.headerLines.toSet)
assert(reloaded.samples == gts.samples)
assert(reloaded.rdd.collect().deep == gts.rdd.collect().deep)
}

sparkTest("load variants from dataframe") {
val path = testFile("gvcf_dir/gvcf_multiallelic.g.vcf")
val variants = sc.loadVcf(path).toVariants()
val outputDir = tmpLocation()
variants.saveAsParquet(outputDir)
val df = SQLContext.getOrCreate(sc).read.parquet(outputDir)
val reloaded = sc.loadVariants(df, outputDir)
assert(reloaded.sequences == variants.sequences)
assert(reloaded.headerLines.toSet == variants.headerLines.toSet)
assert(reloaded.rdd.collect().deep == variants.rdd.collect().deep)
}

sparkTest("load alignments from dataframe") {
val path = testFile("bqsr1-r1.fq")
val alignments = sc.loadAlignments(path)
val outputDir = tmpLocation()
alignments.saveAsParquet(outputDir)
val df = SQLContext.getOrCreate(sc).read.parquet(outputDir)
val reloaded = sc.loadAlignments(df, outputDir)
assert(reloaded.sequences == alignments.sequences)
assert(reloaded.recordGroups == alignments.recordGroups)
assert(reloaded.processingSteps == alignments.processingSteps)
assert(reloaded.rdd.collect().deep == alignments.rdd.collect().deep)
}

sparkTest("load fragments from dataframe") {
val path = testFile("sample1.query.sam")
val fragments = sc.loadFragments(path)
val outputDir = tmpLocation()
fragments.saveAsParquet(outputDir)
val df = SQLContext.getOrCreate(sc).read.parquet(outputDir)
val reloaded = sc.loadFragments(df, outputDir)
assert(reloaded.sequences == fragments.sequences)
assert(reloaded.recordGroups == fragments.recordGroups)
assert(reloaded.processingSteps == fragments.processingSteps)
assert(reloaded.rdd.collect().deep == fragments.rdd.collect().deep)
}
}

0 comments on commit 82e03c9

Please sign in to comment.