From 95b6e9f336c448ac62f61e46f34a4975af9b1df3 Mon Sep 17 00:00:00 2001 From: Henry D Date: Wed, 20 Jun 2018 13:41:25 -0700 Subject: [PATCH 1/4] Add ADAMContext APIs to create genomic RDDs from dataframes --- .../org/bdgenomics/adam/rdd/ADAMContext.scala | 143 ++++++++---------- .../adam/rdd/ADAMContextSuite.scala | 96 +++++++++++- 2 files changed, 155 insertions(+), 84 deletions(-) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala index 5f9ce37aab..10287374a3 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala @@ -18,15 +18,10 @@ package org.bdgenomics.adam.rdd import java.io.{ File, FileNotFoundException, InputStream } + import htsjdk.samtools.{ SAMFileHeader, SAMProgramRecord, ValidationStringency } import htsjdk.samtools.util.Locatable -import htsjdk.variant.vcf.{ - VCFHeader, - VCFCompoundHeaderLine, - VCFFormatHeaderLine, - VCFHeaderLine, - VCFInfoHeaderLine -} +import htsjdk.variant.vcf.{ VCFCompoundHeaderLine, VCFFormatHeaderLine, VCFHeader, VCFHeaderLine, VCFInfoHeaderLine } import org.apache.avro.Schema import org.apache.avro.file.DataFileStream import org.apache.avro.generic.{ GenericDatumReader, GenericRecord, IndexedRecord } @@ -42,66 +37,22 @@ import org.apache.parquet.hadoop.util.ContextUtil import org.apache.spark.SparkContext import org.apache.spark.rdd.MetricsContext._ import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{ Dataset, SparkSession, SQLContext } +import org.apache.spark.sql.{ DataFrame, Dataset, SQLContext, SparkSession } import org.bdgenomics.adam.converters._ import org.bdgenomics.adam.instrumentation.Timers._ import org.bdgenomics.adam.io._ import org.bdgenomics.adam.models._ -import org.bdgenomics.adam.projections.{ - FeatureField, - Projection -} -import org.bdgenomics.adam.rdd.contig.{ - DatasetBoundNucleotideContigFragmentRDD, - NucleotideContigFragmentRDD, - ParquetUnboundNucleotideContigFragmentRDD, - RDDBoundNucleotideContigFragmentRDD -} +import org.bdgenomics.adam.projections.{ FeatureField, Projection } +import org.bdgenomics.adam.rdd.contig.{ DatasetBoundNucleotideContigFragmentRDD, NucleotideContigFragmentRDD, ParquetUnboundNucleotideContigFragmentRDD, RDDBoundNucleotideContigFragmentRDD } import org.bdgenomics.adam.rdd.feature._ -import org.bdgenomics.adam.rdd.fragment.{ - DatasetBoundFragmentRDD, - FragmentRDD, - ParquetUnboundFragmentRDD, - RDDBoundFragmentRDD -} -import org.bdgenomics.adam.rdd.read.{ - AlignmentRecordRDD, - DatasetBoundAlignmentRecordRDD, - RepairPartitions, - ParquetUnboundAlignmentRecordRDD, - RDDBoundAlignmentRecordRDD -} +import org.bdgenomics.adam.rdd.fragment.{ DatasetBoundFragmentRDD, FragmentRDD, ParquetUnboundFragmentRDD, RDDBoundFragmentRDD } +import org.bdgenomics.adam.rdd.read.{ AlignmentRecordRDD, DatasetBoundAlignmentRecordRDD, ParquetUnboundAlignmentRecordRDD, RDDBoundAlignmentRecordRDD, RepairPartitions } import org.bdgenomics.adam.rdd.variant._ import org.bdgenomics.adam.rich.RichAlignmentRecord -import org.bdgenomics.adam.sql.{ - AlignmentRecord => AlignmentRecordProduct, - Feature => FeatureProduct, - Fragment => FragmentProduct, - Genotype => GenotypeProduct, - NucleotideContigFragment => NucleotideContigFragmentProduct, - Variant => VariantProduct, - VariantContext => VariantContextProduct -} +import org.bdgenomics.adam.sql.{ AlignmentRecord => AlignmentRecordProduct, Feature => FeatureProduct, Fragment => FragmentProduct, Genotype => GenotypeProduct, NucleotideContigFragment => NucleotideContigFragmentProduct, Variant => VariantProduct, VariantContext => VariantContextProduct } import org.bdgenomics.adam.util.FileExtensions._ -import org.bdgenomics.adam.util.{ - GenomeFileReader, - ReferenceContigMap, - ReferenceFile, - SequenceDictionaryReader, - TwoBitFile -} -import org.bdgenomics.formats.avro.{ - AlignmentRecord, - Contig, - Feature, - Fragment, - Genotype, - NucleotideContigFragment, - ProcessingStep, - RecordGroup => RecordGroupMetadata, - Sample, - Variant -} +import org.bdgenomics.adam.util.{ GenomeFileReader, ReferenceContigMap, ReferenceFile, SequenceDictionaryReader, TwoBitFile } +import org.bdgenomics.formats.avro.{ AlignmentRecord, Contig, Feature, Fragment, Genotype, NucleotideContigFragment, ProcessingStep, Sample, Variant, RecordGroup => RecordGroupMetadata } import org.bdgenomics.utils.instrumentation.Metrics import org.bdgenomics.utils.io.LocalFileByteAccess import org.bdgenomics.utils.misc.{ HadoopUtil, Logging } @@ -1120,7 +1071,13 @@ private class NoPrefixFileFilter(private val prefix: String) extends PathFilter * * @param sc The SparkContext to wrap. */ -class ADAMContext(@transient val sc: SparkContext) extends Serializable with Logging { +class ADAMContext(@transient val spark: SparkSession) extends Serializable with Logging { + import spark.implicits._ + @transient val sc: SparkContext = spark.sparkContext + + def this(sc: SparkContext) { + this(SQLContext.getOrCreate(sc).sparkSession) + } /** * @param samHeader The header to extract a sequence dictionary from. @@ -1885,15 +1842,9 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log optPredicate: Option[FilterPredicate] = None, optProjection: Option[Schema] = None): AlignmentRecordRDD = { - // convert avro to sequence dictionary val sd = loadAvroSequenceDictionary(pathName) - - // convert avro to sequence dictionary val rgd = loadAvroRecordGroupDictionary(pathName) - - // load processing step descriptions val pgs = loadAvroPrograms(pathName) - (optPredicate, optProjection) match { case (None, None) => { ParquetUnboundAlignmentRecordRDD(sc, pathName, sd, rgd, pgs) @@ -2371,6 +2322,13 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } } + def loadVariantContexts(df: DataFrame, metadataPath: String): VariantContextRDD = { + val sd = loadAvroSequenceDictionary(metadataPath) + val samples = loadAvroSamples(metadataPath) + val headerLines = loadHeaderLines(metadataPath) + DatasetBoundVariantContextRDD(df.as[VariantContextProduct], sd, samples, headerLines) + } + /** * Load a path name in Parquet + Avro format into a VariantContextRDD. * @@ -2380,21 +2338,10 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log */ def loadParquetVariantContexts( pathName: String): VariantContextRDD = { - - // load header lines - val headers = loadHeaderLines(pathName) - - // load sequence info - val sd = loadAvroSequenceDictionary(pathName) - - // load avro record group dictionary and convert to samples - val samples = loadAvroSamples(pathName) - val sqlContext = SQLContext.getOrCreate(sc) import sqlContext.implicits._ - val ds = sqlContext.read.parquet(pathName).as[VariantContextProduct] - - new DatasetBoundVariantContextRDD(ds, sd, samples, headers) + val df = sqlContext.read.parquet(pathName) + loadVariantContexts(df, pathName) } /** @@ -2890,6 +2837,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log ParquetUnboundFragmentRDD(sc, pathName, sd, rgd, pgs) } case (_, _) => { + val df = SQLContext.getOrCreate(sc) // load from disk val rdd = loadParquet[Fragment](pathName, optPredicate, optProjection) @@ -2986,6 +2934,11 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } } + def loadFeatures(df: DataFrame, metadataPath: String): FeatureRDD = { + val sd = loadAvroSequenceDictionary(metadataPath) + DatasetBoundFeatureRDD(df.as[FeatureProduct], sd) + } + /** * Load reference sequences into a broadcastable ReferenceFile. * @@ -3085,6 +3038,11 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } } + def loadContigFragments(df: DataFrame, metadataPath: String): NucleotideContigFragmentRDD = { + val sd = loadAvroSequenceDictionary(metadataPath) + DatasetBoundNucleotideContigFragmentRDD(df.as[NucleotideContigFragmentProduct], sd) + } + /** * Load genotypes into a GenotypeRDD. * @@ -3120,6 +3078,13 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } } + def loadGenotypes(df: DataFrame, metadataPath: String): GenotypeRDD = { + val sd = loadAvroSequenceDictionary(metadataPath) + val samples = loadAvroSamples(metadataPath) + val headerLines = loadHeaderLines(metadataPath) + DatasetBoundGenotypeRDD(df.as[GenotypeProduct], sd, samples, headerLines) + } + /** * Load variants into a VariantRDD. * @@ -3154,6 +3119,12 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } } + def loadVariants(df: DataFrame, metadataPath: String): VariantRDD = { + val sd = loadAvroSequenceDictionary(metadataPath) + val headerLines = loadHeaderLines(metadataPath) + DatasetBoundVariantRDD(df.as[VariantProduct], sd, headerLines) + } + /** * Load alignment records into an AlignmentRecordRDD. * @@ -3224,6 +3195,13 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } } + def loadAlignments(df: DataFrame, metadataPath: String): AlignmentRecordRDD = { + val sd = loadAvroSequenceDictionary(metadataPath) + val rgd = loadAvroRecordGroupDictionary(metadataPath) + val process = loadAvroPrograms(metadataPath) + new DatasetBoundAlignmentRecordRDD(df.as[AlignmentRecordProduct], sd, rgd, process) + } + /** * Load fragments into a FragmentRDD. * @@ -3282,6 +3260,13 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } } + def loadFragments(df: DataFrame, metadataPath: String): FragmentRDD = { + val sd = loadAvroSequenceDictionary(metadataPath) + val rgd = loadAvroRecordGroupDictionary(metadataPath) + val processingSteps = loadAvroPrograms(metadataPath) + DatasetBoundFragmentRDD(df.as[FragmentProduct], sd, rgd, processingSteps) + } + /** * Return length of partitions in base pairs if the specified path of Parquet + Avro files is partitioned. * diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala index c14e367fff..dd9b531de9 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala @@ -17,22 +17,21 @@ */ package org.bdgenomics.adam.rdd -import htsjdk.samtools.{ - SAMFormatException, - SAMProgramRecord, - ValidationStringency -} +import htsjdk.samtools.{ SAMFormatException, SAMProgramRecord, ValidationStringency } import java.io.{ File, FileNotFoundException } + import com.google.common.io.Files import org.apache.hadoop.fs.Path import org.apache.parquet.filter2.dsl.Dsl._ import org.apache.parquet.filter2.predicate.FilterPredicate import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SQLContext import org.bdgenomics.adam.models._ import org.bdgenomics.adam.rdd.ADAMContext._ import org.bdgenomics.adam.util.PhredUtils._ import org.bdgenomics.adam.util.ADAMFunSuite +import org.bdgenomics.adam.sql.{ VariantContext => VCProduct } import org.bdgenomics.formats.avro._ import org.seqdoop.hadoop_bam.CRAMInputFormat import org.seqdoop.hadoop_bam.util.SAMHeaderReader @@ -760,4 +759,91 @@ class ADAMContextSuite extends ADAMFunSuite { assert(secondPg.head.getCommandLine === "\"myProg 456\"") assert(secondPg.head.getVersion === "1.0.0") } + + sparkTest("load variant contexts from dataframe") { + val path = testFile("small.vcf") + val vcs = sc.loadVcf(path) + val outputDir = tmpLocation() + vcs.saveAsParquet(outputDir) + val df = SQLContext.getOrCreate(sc).read.parquet(outputDir) + val reloaded = sc.loadVariantContexts(df, outputDir) + assert(reloaded.sequences == vcs.sequences) + assert(reloaded.headerLines.toSet == vcs.headerLines.toSet) + assert(reloaded.samples == vcs.samples) + assert(reloaded.rdd.collect().map(VCProduct.fromModel).deep.sorted == + vcs.rdd.collect().map(VCProduct.fromModel).deep.sorted) + } + + sparkTest("load features from dataframe") { + val path = testFile("dvl1.200.bed") + val features = sc.loadFeatures(path) + val outputDir = tmpLocation() + features.saveAsParquet(outputDir) + val df = SQLContext.getOrCreate(sc).read.parquet(outputDir) + val reloaded = sc.loadFeatures(df, outputDir) + assert(reloaded.sequences == features.sequences) + assert(reloaded.rdd.collect().deep == features.rdd.collect().deep) + } + + sparkTest("load contig fragments from dataframe") { + val inputPath = testFile("chr20.250k.fa.gz") + val contigFragments = sc.loadFasta(inputPath, 10000L) + val outputDir = tmpLocation() + contigFragments.saveAsParquet(outputDir) + val df = SQLContext.getOrCreate(sc).read.parquet(outputDir) + val reloaded = sc.loadContigFragments(df, outputDir) + assert(reloaded.sequences == contigFragments.sequences) + assert(reloaded.rdd.collect().deep == contigFragments.rdd.collect().deep) + } + + sparkTest("load genotypes from dataframe") { + val path = testFile("small.vcf") + val gts = sc.loadGenotypes(path) + val outputDir = tmpLocation() + gts.saveAsParquet(outputDir) + val df = SQLContext.getOrCreate(sc).read.parquet(outputDir) + val reloaded = sc.loadGenotypes(df, outputDir) + assert(reloaded.sequences == gts.sequences) + assert(reloaded.headerLines.toSet == gts.headerLines.toSet) + assert(reloaded.samples == gts.samples) + assert(reloaded.rdd.collect().deep == gts.rdd.collect().deep) + } + + sparkTest("load variants from dataframe") { + val path = testFile("gvcf_dir/gvcf_multiallelic.g.vcf") + val variants = sc.loadVcf(path).toVariants() + val outputDir = tmpLocation() + variants.saveAsParquet(outputDir) + val df = SQLContext.getOrCreate(sc).read.parquet(outputDir) + val reloadedVariants = sc.loadVariants(df, outputDir) + assert(reloadedVariants.sequences == variants.sequences) + assert(reloadedVariants.headerLines.toSet == variants.headerLines.toSet) + assert(reloadedVariants.rdd.collect().deep == variants.rdd.collect().deep) + } + + sparkTest("load alignments from dataframe") { + val path = testFile("bqsr1-r1.fq") + val alignments = sc.loadAlignments(path) + val outputDir = tmpLocation() + alignments.saveAsParquet(outputDir) + val df = SQLContext.getOrCreate(sc).read.parquet(outputDir) + val reloaded = sc.loadAlignments(df, outputDir) + assert(reloaded.sequences == alignments.sequences) + assert(reloaded.recordGroups == alignments.recordGroups) + assert(reloaded.processingSteps == alignments.processingSteps) + assert(reloaded.rdd.collect().deep == alignments.rdd.collect().deep) + } + + sparkTest("load fragments from dataframe") { + val path = testFile("sample1.query.sam") + val fragments = sc.loadFragments(path) + val outputDir = tmpLocation() + fragments.saveAsParquet(outputDir) + val df = SQLContext.getOrCreate(sc).read.parquet(outputDir) + val reloaded = sc.loadFragments(df, outputDir) + assert(reloaded.sequences == fragments.sequences) + assert(reloaded.recordGroups == fragments.recordGroups) + assert(reloaded.processingSteps == fragments.processingSteps) + assert(reloaded.rdd.collect().deep == fragments.rdd.collect().deep) + } } From 6221afcf30118901ebc0c42d6373fab81a7fe35b Mon Sep 17 00:00:00 2001 From: Henry D Date: Wed, 20 Jun 2018 13:58:48 -0700 Subject: [PATCH 2/4] some cleanup --- .../scala/org/bdgenomics/adam/rdd/ADAMContext.scala | 10 +--------- .../org/bdgenomics/adam/rdd/ADAMContextSuite.scala | 12 ++++++------ 2 files changed, 7 insertions(+), 15 deletions(-) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala index 10287374a3..6b405b9f4f 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala @@ -1071,13 +1071,7 @@ private class NoPrefixFileFilter(private val prefix: String) extends PathFilter * * @param sc The SparkContext to wrap. */ -class ADAMContext(@transient val spark: SparkSession) extends Serializable with Logging { - import spark.implicits._ - @transient val sc: SparkContext = spark.sparkContext - - def this(sc: SparkContext) { - this(SQLContext.getOrCreate(sc).sparkSession) - } +class ADAMContext(@transient val sc: SparkContext) extends Serializable with Logging { /** * @param samHeader The header to extract a sequence dictionary from. @@ -2339,7 +2333,6 @@ class ADAMContext(@transient val spark: SparkSession) extends Serializable with def loadParquetVariantContexts( pathName: String): VariantContextRDD = { val sqlContext = SQLContext.getOrCreate(sc) - import sqlContext.implicits._ val df = sqlContext.read.parquet(pathName) loadVariantContexts(df, pathName) } @@ -2837,7 +2830,6 @@ class ADAMContext(@transient val spark: SparkSession) extends Serializable with ParquetUnboundFragmentRDD(sc, pathName, sd, rgd, pgs) } case (_, _) => { - val df = SQLContext.getOrCreate(sc) // load from disk val rdd = loadParquet[Fragment](pathName, optPredicate, optProjection) diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala index dd9b531de9..0e30f6ed8f 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala @@ -770,8 +770,8 @@ class ADAMContextSuite extends ADAMFunSuite { assert(reloaded.sequences == vcs.sequences) assert(reloaded.headerLines.toSet == vcs.headerLines.toSet) assert(reloaded.samples == vcs.samples) - assert(reloaded.rdd.collect().map(VCProduct.fromModel).deep.sorted == - vcs.rdd.collect().map(VCProduct.fromModel).deep.sorted) + assert(reloaded.rdd.collect().map(VCProduct.fromModel).deep == + vcs.rdd.collect().map(VCProduct.fromModel).deep) } sparkTest("load features from dataframe") { @@ -815,10 +815,10 @@ class ADAMContextSuite extends ADAMFunSuite { val outputDir = tmpLocation() variants.saveAsParquet(outputDir) val df = SQLContext.getOrCreate(sc).read.parquet(outputDir) - val reloadedVariants = sc.loadVariants(df, outputDir) - assert(reloadedVariants.sequences == variants.sequences) - assert(reloadedVariants.headerLines.toSet == variants.headerLines.toSet) - assert(reloadedVariants.rdd.collect().deep == variants.rdd.collect().deep) + val reloaded = sc.loadVariants(df, outputDir) + assert(reloaded.sequences == variants.sequences) + assert(reloaded.headerLines.toSet == variants.headerLines.toSet) + assert(reloaded.rdd.collect().deep == variants.rdd.collect().deep) } sparkTest("load alignments from dataframe") { From e0cc666666a8d9b83fdd60f6edc2cc07bca2a52c Mon Sep 17 00:00:00 2001 From: Henry D Date: Wed, 20 Jun 2018 14:01:29 -0700 Subject: [PATCH 3/4] fix compilation --- .../src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala index 6b405b9f4f..9f8664332f 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala @@ -1072,6 +1072,8 @@ private class NoPrefixFileFilter(private val prefix: String) extends PathFilter * @param sc The SparkContext to wrap. */ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Logging { + @transient val spark = SQLContext.getOrCreate(sc).sparkSession + import spark.implicits._ /** * @param samHeader The header to extract a sequence dictionary from. From 089214a0642bee50695684b895ba53ca692ce0ba Mon Sep 17 00:00:00 2001 From: Henry D Date: Wed, 20 Jun 2018 14:15:10 -0700 Subject: [PATCH 4/4] fix one codacy comment --- .../src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala index 9f8664332f..3aff7f0149 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala @@ -52,7 +52,7 @@ import org.bdgenomics.adam.rich.RichAlignmentRecord import org.bdgenomics.adam.sql.{ AlignmentRecord => AlignmentRecordProduct, Feature => FeatureProduct, Fragment => FragmentProduct, Genotype => GenotypeProduct, NucleotideContigFragment => NucleotideContigFragmentProduct, Variant => VariantProduct, VariantContext => VariantContextProduct } import org.bdgenomics.adam.util.FileExtensions._ import org.bdgenomics.adam.util.{ GenomeFileReader, ReferenceContigMap, ReferenceFile, SequenceDictionaryReader, TwoBitFile } -import org.bdgenomics.formats.avro.{ AlignmentRecord, Contig, Feature, Fragment, Genotype, NucleotideContigFragment, ProcessingStep, Sample, Variant, RecordGroup => RecordGroupMetadata } +import org.bdgenomics.formats.avro.{ RecordGroup => RecordGroupMetadata, _ } import org.bdgenomics.utils.instrumentation.Metrics import org.bdgenomics.utils.io.LocalFileByteAccess import org.bdgenomics.utils.misc.{ HadoopUtil, Logging }