diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala index 5f9ce37aab..3aff7f0149 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala @@ -18,15 +18,10 @@ package org.bdgenomics.adam.rdd import java.io.{ File, FileNotFoundException, InputStream } + import htsjdk.samtools.{ SAMFileHeader, SAMProgramRecord, ValidationStringency } import htsjdk.samtools.util.Locatable -import htsjdk.variant.vcf.{ - VCFHeader, - VCFCompoundHeaderLine, - VCFFormatHeaderLine, - VCFHeaderLine, - VCFInfoHeaderLine -} +import htsjdk.variant.vcf.{ VCFCompoundHeaderLine, VCFFormatHeaderLine, VCFHeader, VCFHeaderLine, VCFInfoHeaderLine } import org.apache.avro.Schema import org.apache.avro.file.DataFileStream import org.apache.avro.generic.{ GenericDatumReader, GenericRecord, IndexedRecord } @@ -42,66 +37,22 @@ import org.apache.parquet.hadoop.util.ContextUtil import org.apache.spark.SparkContext import org.apache.spark.rdd.MetricsContext._ import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{ Dataset, SparkSession, SQLContext } +import org.apache.spark.sql.{ DataFrame, Dataset, SQLContext, SparkSession } import org.bdgenomics.adam.converters._ import org.bdgenomics.adam.instrumentation.Timers._ import org.bdgenomics.adam.io._ import org.bdgenomics.adam.models._ -import org.bdgenomics.adam.projections.{ - FeatureField, - Projection -} -import org.bdgenomics.adam.rdd.contig.{ - DatasetBoundNucleotideContigFragmentRDD, - NucleotideContigFragmentRDD, - ParquetUnboundNucleotideContigFragmentRDD, - RDDBoundNucleotideContigFragmentRDD -} +import org.bdgenomics.adam.projections.{ FeatureField, Projection } +import org.bdgenomics.adam.rdd.contig.{ DatasetBoundNucleotideContigFragmentRDD, NucleotideContigFragmentRDD, ParquetUnboundNucleotideContigFragmentRDD, RDDBoundNucleotideContigFragmentRDD } import org.bdgenomics.adam.rdd.feature._ -import org.bdgenomics.adam.rdd.fragment.{ - DatasetBoundFragmentRDD, - FragmentRDD, - ParquetUnboundFragmentRDD, - RDDBoundFragmentRDD -} -import org.bdgenomics.adam.rdd.read.{ - AlignmentRecordRDD, - DatasetBoundAlignmentRecordRDD, - RepairPartitions, - ParquetUnboundAlignmentRecordRDD, - RDDBoundAlignmentRecordRDD -} +import org.bdgenomics.adam.rdd.fragment.{ DatasetBoundFragmentRDD, FragmentRDD, ParquetUnboundFragmentRDD, RDDBoundFragmentRDD } +import org.bdgenomics.adam.rdd.read.{ AlignmentRecordRDD, DatasetBoundAlignmentRecordRDD, ParquetUnboundAlignmentRecordRDD, RDDBoundAlignmentRecordRDD, RepairPartitions } import org.bdgenomics.adam.rdd.variant._ import org.bdgenomics.adam.rich.RichAlignmentRecord -import org.bdgenomics.adam.sql.{ - AlignmentRecord => AlignmentRecordProduct, - Feature => FeatureProduct, - Fragment => FragmentProduct, - Genotype => GenotypeProduct, - NucleotideContigFragment => NucleotideContigFragmentProduct, - Variant => VariantProduct, - VariantContext => VariantContextProduct -} +import org.bdgenomics.adam.sql.{ AlignmentRecord => AlignmentRecordProduct, Feature => FeatureProduct, Fragment => FragmentProduct, Genotype => GenotypeProduct, NucleotideContigFragment => NucleotideContigFragmentProduct, Variant => VariantProduct, VariantContext => VariantContextProduct } import org.bdgenomics.adam.util.FileExtensions._ -import org.bdgenomics.adam.util.{ - GenomeFileReader, - ReferenceContigMap, - ReferenceFile, - SequenceDictionaryReader, - TwoBitFile -} -import org.bdgenomics.formats.avro.{ - AlignmentRecord, - Contig, - Feature, - Fragment, - Genotype, - NucleotideContigFragment, - ProcessingStep, - RecordGroup => RecordGroupMetadata, - Sample, - Variant -} +import org.bdgenomics.adam.util.{ GenomeFileReader, ReferenceContigMap, ReferenceFile, SequenceDictionaryReader, TwoBitFile } +import org.bdgenomics.formats.avro.{ RecordGroup => RecordGroupMetadata, _ } import org.bdgenomics.utils.instrumentation.Metrics import org.bdgenomics.utils.io.LocalFileByteAccess import org.bdgenomics.utils.misc.{ HadoopUtil, Logging } @@ -1121,6 +1072,8 @@ private class NoPrefixFileFilter(private val prefix: String) extends PathFilter * @param sc The SparkContext to wrap. */ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Logging { + @transient val spark = SQLContext.getOrCreate(sc).sparkSession + import spark.implicits._ /** * @param samHeader The header to extract a sequence dictionary from. @@ -1885,15 +1838,9 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log optPredicate: Option[FilterPredicate] = None, optProjection: Option[Schema] = None): AlignmentRecordRDD = { - // convert avro to sequence dictionary val sd = loadAvroSequenceDictionary(pathName) - - // convert avro to sequence dictionary val rgd = loadAvroRecordGroupDictionary(pathName) - - // load processing step descriptions val pgs = loadAvroPrograms(pathName) - (optPredicate, optProjection) match { case (None, None) => { ParquetUnboundAlignmentRecordRDD(sc, pathName, sd, rgd, pgs) @@ -2371,6 +2318,13 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } } + def loadVariantContexts(df: DataFrame, metadataPath: String): VariantContextRDD = { + val sd = loadAvroSequenceDictionary(metadataPath) + val samples = loadAvroSamples(metadataPath) + val headerLines = loadHeaderLines(metadataPath) + DatasetBoundVariantContextRDD(df.as[VariantContextProduct], sd, samples, headerLines) + } + /** * Load a path name in Parquet + Avro format into a VariantContextRDD. * @@ -2380,21 +2334,9 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log */ def loadParquetVariantContexts( pathName: String): VariantContextRDD = { - - // load header lines - val headers = loadHeaderLines(pathName) - - // load sequence info - val sd = loadAvroSequenceDictionary(pathName) - - // load avro record group dictionary and convert to samples - val samples = loadAvroSamples(pathName) - val sqlContext = SQLContext.getOrCreate(sc) - import sqlContext.implicits._ - val ds = sqlContext.read.parquet(pathName).as[VariantContextProduct] - - new DatasetBoundVariantContextRDD(ds, sd, samples, headers) + val df = sqlContext.read.parquet(pathName) + loadVariantContexts(df, pathName) } /** @@ -2986,6 +2928,11 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } } + def loadFeatures(df: DataFrame, metadataPath: String): FeatureRDD = { + val sd = loadAvroSequenceDictionary(metadataPath) + DatasetBoundFeatureRDD(df.as[FeatureProduct], sd) + } + /** * Load reference sequences into a broadcastable ReferenceFile. * @@ -3085,6 +3032,11 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } } + def loadContigFragments(df: DataFrame, metadataPath: String): NucleotideContigFragmentRDD = { + val sd = loadAvroSequenceDictionary(metadataPath) + DatasetBoundNucleotideContigFragmentRDD(df.as[NucleotideContigFragmentProduct], sd) + } + /** * Load genotypes into a GenotypeRDD. * @@ -3120,6 +3072,13 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } } + def loadGenotypes(df: DataFrame, metadataPath: String): GenotypeRDD = { + val sd = loadAvroSequenceDictionary(metadataPath) + val samples = loadAvroSamples(metadataPath) + val headerLines = loadHeaderLines(metadataPath) + DatasetBoundGenotypeRDD(df.as[GenotypeProduct], sd, samples, headerLines) + } + /** * Load variants into a VariantRDD. * @@ -3154,6 +3113,12 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } } + def loadVariants(df: DataFrame, metadataPath: String): VariantRDD = { + val sd = loadAvroSequenceDictionary(metadataPath) + val headerLines = loadHeaderLines(metadataPath) + DatasetBoundVariantRDD(df.as[VariantProduct], sd, headerLines) + } + /** * Load alignment records into an AlignmentRecordRDD. * @@ -3224,6 +3189,13 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } } + def loadAlignments(df: DataFrame, metadataPath: String): AlignmentRecordRDD = { + val sd = loadAvroSequenceDictionary(metadataPath) + val rgd = loadAvroRecordGroupDictionary(metadataPath) + val process = loadAvroPrograms(metadataPath) + new DatasetBoundAlignmentRecordRDD(df.as[AlignmentRecordProduct], sd, rgd, process) + } + /** * Load fragments into a FragmentRDD. * @@ -3282,6 +3254,13 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } } + def loadFragments(df: DataFrame, metadataPath: String): FragmentRDD = { + val sd = loadAvroSequenceDictionary(metadataPath) + val rgd = loadAvroRecordGroupDictionary(metadataPath) + val processingSteps = loadAvroPrograms(metadataPath) + DatasetBoundFragmentRDD(df.as[FragmentProduct], sd, rgd, processingSteps) + } + /** * Return length of partitions in base pairs if the specified path of Parquet + Avro files is partitioned. * diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala index c14e367fff..0e30f6ed8f 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala @@ -17,22 +17,21 @@ */ package org.bdgenomics.adam.rdd -import htsjdk.samtools.{ - SAMFormatException, - SAMProgramRecord, - ValidationStringency -} +import htsjdk.samtools.{ SAMFormatException, SAMProgramRecord, ValidationStringency } import java.io.{ File, FileNotFoundException } + import com.google.common.io.Files import org.apache.hadoop.fs.Path import org.apache.parquet.filter2.dsl.Dsl._ import org.apache.parquet.filter2.predicate.FilterPredicate import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SQLContext import org.bdgenomics.adam.models._ import org.bdgenomics.adam.rdd.ADAMContext._ import org.bdgenomics.adam.util.PhredUtils._ import org.bdgenomics.adam.util.ADAMFunSuite +import org.bdgenomics.adam.sql.{ VariantContext => VCProduct } import org.bdgenomics.formats.avro._ import org.seqdoop.hadoop_bam.CRAMInputFormat import org.seqdoop.hadoop_bam.util.SAMHeaderReader @@ -760,4 +759,91 @@ class ADAMContextSuite extends ADAMFunSuite { assert(secondPg.head.getCommandLine === "\"myProg 456\"") assert(secondPg.head.getVersion === "1.0.0") } + + sparkTest("load variant contexts from dataframe") { + val path = testFile("small.vcf") + val vcs = sc.loadVcf(path) + val outputDir = tmpLocation() + vcs.saveAsParquet(outputDir) + val df = SQLContext.getOrCreate(sc).read.parquet(outputDir) + val reloaded = sc.loadVariantContexts(df, outputDir) + assert(reloaded.sequences == vcs.sequences) + assert(reloaded.headerLines.toSet == vcs.headerLines.toSet) + assert(reloaded.samples == vcs.samples) + assert(reloaded.rdd.collect().map(VCProduct.fromModel).deep == + vcs.rdd.collect().map(VCProduct.fromModel).deep) + } + + sparkTest("load features from dataframe") { + val path = testFile("dvl1.200.bed") + val features = sc.loadFeatures(path) + val outputDir = tmpLocation() + features.saveAsParquet(outputDir) + val df = SQLContext.getOrCreate(sc).read.parquet(outputDir) + val reloaded = sc.loadFeatures(df, outputDir) + assert(reloaded.sequences == features.sequences) + assert(reloaded.rdd.collect().deep == features.rdd.collect().deep) + } + + sparkTest("load contig fragments from dataframe") { + val inputPath = testFile("chr20.250k.fa.gz") + val contigFragments = sc.loadFasta(inputPath, 10000L) + val outputDir = tmpLocation() + contigFragments.saveAsParquet(outputDir) + val df = SQLContext.getOrCreate(sc).read.parquet(outputDir) + val reloaded = sc.loadContigFragments(df, outputDir) + assert(reloaded.sequences == contigFragments.sequences) + assert(reloaded.rdd.collect().deep == contigFragments.rdd.collect().deep) + } + + sparkTest("load genotypes from dataframe") { + val path = testFile("small.vcf") + val gts = sc.loadGenotypes(path) + val outputDir = tmpLocation() + gts.saveAsParquet(outputDir) + val df = SQLContext.getOrCreate(sc).read.parquet(outputDir) + val reloaded = sc.loadGenotypes(df, outputDir) + assert(reloaded.sequences == gts.sequences) + assert(reloaded.headerLines.toSet == gts.headerLines.toSet) + assert(reloaded.samples == gts.samples) + assert(reloaded.rdd.collect().deep == gts.rdd.collect().deep) + } + + sparkTest("load variants from dataframe") { + val path = testFile("gvcf_dir/gvcf_multiallelic.g.vcf") + val variants = sc.loadVcf(path).toVariants() + val outputDir = tmpLocation() + variants.saveAsParquet(outputDir) + val df = SQLContext.getOrCreate(sc).read.parquet(outputDir) + val reloaded = sc.loadVariants(df, outputDir) + assert(reloaded.sequences == variants.sequences) + assert(reloaded.headerLines.toSet == variants.headerLines.toSet) + assert(reloaded.rdd.collect().deep == variants.rdd.collect().deep) + } + + sparkTest("load alignments from dataframe") { + val path = testFile("bqsr1-r1.fq") + val alignments = sc.loadAlignments(path) + val outputDir = tmpLocation() + alignments.saveAsParquet(outputDir) + val df = SQLContext.getOrCreate(sc).read.parquet(outputDir) + val reloaded = sc.loadAlignments(df, outputDir) + assert(reloaded.sequences == alignments.sequences) + assert(reloaded.recordGroups == alignments.recordGroups) + assert(reloaded.processingSteps == alignments.processingSteps) + assert(reloaded.rdd.collect().deep == alignments.rdd.collect().deep) + } + + sparkTest("load fragments from dataframe") { + val path = testFile("sample1.query.sam") + val fragments = sc.loadFragments(path) + val outputDir = tmpLocation() + fragments.saveAsParquet(outputDir) + val df = SQLContext.getOrCreate(sc).read.parquet(outputDir) + val reloaded = sc.loadFragments(df, outputDir) + assert(reloaded.sequences == fragments.sequences) + assert(reloaded.recordGroups == fragments.recordGroups) + assert(reloaded.processingSteps == fragments.processingSteps) + assert(reloaded.rdd.collect().deep == fragments.rdd.collect().deep) + } }