From 40d43b8e845ca41de8dbb43832270b6d8cecbd7d Mon Sep 17 00:00:00 2001 From: Justin Paschall Date: Tue, 23 Jan 2018 09:39:22 -0500 Subject: [PATCH] Address PR comments - part 1 --- .../org/bdgenomics/adam/rdd/ADAMContext.scala | 205 ++++++++---------- .../org/bdgenomics/adam/rdd/GenomicRDD.scala | 18 +- .../adam/rdd/feature/FeatureRDDSuite.scala | 2 +- .../rdd/read/AlignmentRecordRDDSuite.scala | 4 +- .../adam/rdd/variant/GenotypeRDDSuite.scala | 2 +- .../adam/rdd/variant/VariantRDDSuite.scala | 2 +- 6 files changed, 108 insertions(+), 125 deletions(-) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala index 621d0c5372..9a872e75f4 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala @@ -18,20 +18,15 @@ package org.bdgenomics.adam.rdd import java.io.{ File, FileNotFoundException, InputStream } + import htsjdk.samtools.{ SAMFileHeader, SAMProgramRecord, ValidationStringency } import htsjdk.samtools.util.Locatable -import htsjdk.variant.vcf.{ - VCFHeader, - VCFCompoundHeaderLine, - VCFFormatHeaderLine, - VCFHeaderLine, - VCFInfoHeaderLine -} +import htsjdk.variant.vcf.{ VCFCompoundHeaderLine, VCFFormatHeaderLine, VCFHeader, VCFHeaderLine, VCFInfoHeaderLine } import org.apache.avro.Schema import org.apache.avro.file.DataFileStream import org.apache.avro.generic.{ GenericDatumReader, GenericRecord, IndexedRecord } import org.apache.avro.specific.{ SpecificDatumReader, SpecificRecord, SpecificRecordBase } -import org.apache.hadoop.fs.{ FileSystem, Path, PathFilter } +import org.apache.hadoop.fs.{ FileStatus, FileSystem, Path, PathFilter } import org.apache.hadoop.io.{ LongWritable, Text } import org.apache.hadoop.io.compress.CompressionCodecFactory import org.apache.hadoop.mapreduce.lib.input.TextInputFormat @@ -42,65 +37,22 @@ import org.apache.parquet.hadoop.util.ContextUtil import org.apache.spark.SparkContext import org.apache.spark.rdd.MetricsContext._ import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{ SparkSession, Dataset } +import org.apache.spark.sql.{ Dataset, SparkSession } import org.bdgenomics.adam.converters._ import org.bdgenomics.adam.instrumentation.Timers._ import org.bdgenomics.adam.io._ import org.bdgenomics.adam.models._ -import org.bdgenomics.adam.projections.{ - FeatureField, - Projection -} -import org.bdgenomics.adam.rdd.contig.{ - DatasetBoundNucleotideContigFragmentRDD, - NucleotideContigFragmentRDD, - ParquetUnboundNucleotideContigFragmentRDD, - RDDBoundNucleotideContigFragmentRDD -} +import org.bdgenomics.adam.projections.{ FeatureField, Projection } +import org.bdgenomics.adam.rdd.contig.{ DatasetBoundNucleotideContigFragmentRDD, NucleotideContigFragmentRDD, ParquetUnboundNucleotideContigFragmentRDD, RDDBoundNucleotideContigFragmentRDD } import org.bdgenomics.adam.rdd.feature._ -import org.bdgenomics.adam.rdd.fragment.{ - DatasetBoundFragmentRDD, - FragmentRDD, - ParquetUnboundFragmentRDD, - RDDBoundFragmentRDD -} -import org.bdgenomics.adam.rdd.read.{ - AlignmentRecordRDD, - DatasetBoundAlignmentRecordRDD, - RepairPartitions, - ParquetUnboundAlignmentRecordRDD, - RDDBoundAlignmentRecordRDD -} +import org.bdgenomics.adam.rdd.fragment.{ DatasetBoundFragmentRDD, FragmentRDD, ParquetUnboundFragmentRDD, RDDBoundFragmentRDD } +import org.bdgenomics.adam.rdd.read.{ AlignmentRecordRDD, DatasetBoundAlignmentRecordRDD, ParquetUnboundAlignmentRecordRDD, RDDBoundAlignmentRecordRDD, RepairPartitions } import org.bdgenomics.adam.rdd.variant._ import org.bdgenomics.adam.rich.RichAlignmentRecord -import org.bdgenomics.adam.sql.{ - AlignmentRecord => AlignmentRecordProduct, - Feature => FeatureProduct, - Fragment => FragmentProduct, - Genotype => GenotypeProduct, - NucleotideContigFragment => NucleotideContigFragmentProduct, - Variant => VariantProduct -} +import org.bdgenomics.adam.sql.{ AlignmentRecord => AlignmentRecordProduct, Feature => FeatureProduct, Fragment => FragmentProduct, Genotype => GenotypeProduct, NucleotideContigFragment => NucleotideContigFragmentProduct, Variant => VariantProduct } import org.bdgenomics.adam.util.FileExtensions._ -import org.bdgenomics.adam.util.{ - GenomeFileReader, - ReferenceContigMap, - ReferenceFile, - SequenceDictionaryReader, - TwoBitFile -} -import org.bdgenomics.formats.avro.{ - AlignmentRecord, - Contig, - Feature, - Fragment, - Genotype, - NucleotideContigFragment, - ProcessingStep, - RecordGroup => RecordGroupMetadata, - Sample, - Variant -} +import org.bdgenomics.adam.util.{ GenomeFileReader, ReferenceContigMap, ReferenceFile, SequenceDictionaryReader, TwoBitFile } +import org.bdgenomics.formats.avro.{ AlignmentRecord, Contig, Feature, Fragment, Genotype, NucleotideContigFragment, ProcessingStep, Sample, Variant, RecordGroup => RecordGroupMetadata } import org.bdgenomics.utils.instrumentation.Metrics import org.bdgenomics.utils.io.LocalFileByteAccess import org.bdgenomics.utils.misc.{ HadoopUtil, Logging } @@ -108,6 +60,7 @@ import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods._ import org.seqdoop.hadoop_bam._ import org.seqdoop.hadoop_bam.util._ + import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag @@ -1369,7 +1322,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } fs.listStatus(path, filter) } else { - val paths = fs.globStatus(path) + val paths: Array[FileStatus] = fs.globStatus(path) if (paths == null || paths.isEmpty) { throw new FileNotFoundException( s"Couldn't find any files matching ${path.toUri}" @@ -1874,10 +1827,9 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log * @param pathName The path name to load alignment records from. * Globs/directories are supported. * @param regions Optional list of genomic regions to load. - * @param addChrPrefix Flag to add "chr" prefix to contigs * @return Returns an AlignmentRecordRDD. */ - def loadPartitionedParquetAlignments(pathName: String, regions: Option[Iterable[ReferenceRegion]] = None): AlignmentRecordRDD = { + def loadPartitionedParquetAlignments(pathName: String, regions: Iterable[ReferenceRegion] = Iterable.empty): AlignmentRecordRDD = { require(checkPartitionedParquetFlag(pathName), "Input Parquet files (%s) are not partitioned.".format(pathName)) @@ -1891,10 +1843,15 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log val pgs = loadAvroPrograms(pathName) val reads: AlignmentRecordRDD = ParquetUnboundAlignmentRecordRDD(sc, pathName, sd, rgd, pgs) - val datasetBoundAlignmentRecordRDD: AlignmentRecordRDD = regions match { - case Some(x) => DatasetBoundAlignmentRecordRDD(reads.dataset.filter(referenceRegionsToDatasetQueryString(x)), reads.sequences, reads.recordGroups, reads.processingSteps) - case _ => DatasetBoundAlignmentRecordRDD(reads.dataset, reads.sequences, reads.recordGroups, reads.processingSteps) - } + val datasetBoundAlignmentRecordRDD = + if (regions.nonEmpty) { + DatasetBoundAlignmentRecordRDD(reads.dataset.filter(referenceRegionsToDatasetQueryString(regions)), + reads.sequences, + reads.recordGroups, + reads.processingSteps) + } else { + DatasetBoundAlignmentRecordRDD(reads.dataset, reads.sequences, reads.recordGroups, reads.processingSteps) + } datasetBoundAlignmentRecordRDD } @@ -2287,10 +2244,9 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log * @param pathName The path name to load alignment records from. * Globs/directories are supported. * @param regions Optional list of genomic regions to load. - * @param addChrPrefix Flag to add "chr" prefix to contigs * @return Returns a GenotypeRDD. */ - def loadPartitionedParquetGenotypes(pathName: String, regions: Option[Iterable[ReferenceRegion]] = None): GenotypeRDD = { + def loadPartitionedParquetGenotypes(pathName: String, regions: Iterable[ReferenceRegion] = Iterable.empty): GenotypeRDD = { require(checkPartitionedParquetFlag(pathName), "Input Parquet files (%s) are not partitioned.".format(pathName)) // load header lines @@ -2302,11 +2258,15 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log val genotypes = ParquetUnboundGenotypeRDD(sc, pathName, sd, samples, headers) - val datasetBoundGenotypeRDD: GenotypeRDD = regions match { - case Some(x) => DatasetBoundGenotypeRDD(genotypes.dataset - .filter(referenceRegionsToDatasetQueryString(x)), genotypes.sequences, genotypes.samples, genotypes.headerLines) - case _ => DatasetBoundGenotypeRDD(genotypes.dataset, genotypes.sequences, genotypes.samples, genotypes.headerLines) - } + val datasetBoundGenotypeRDD = + if (regions.nonEmpty) { + DatasetBoundGenotypeRDD(genotypes.dataset.filter(referenceRegionsToDatasetQueryString(regions)), + genotypes.sequences, + genotypes.samples, + genotypes.headerLines) + } else { + DatasetBoundGenotypeRDD(genotypes.dataset, genotypes.sequences, genotypes.samples, genotypes.headerLines) + } datasetBoundGenotypeRDD } @@ -2349,11 +2309,10 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log * @param pathName The path name to load alignment records from. * Globs/directories are supported. * @param regions Optional list of genomic regions to load. - * @param addChrPrefix Flag to add "chr" prefix to contigs * @return Returns a VariantRDD */ - def loadPartitionedParquetVariants(pathName: String, regions: Option[Iterable[ReferenceRegion]] = None): VariantRDD = { + def loadPartitionedParquetVariants(pathName: String, regions: Iterable[ReferenceRegion] = Iterable.empty): VariantRDD = { require(checkPartitionedParquetFlag(pathName), "Input Parquet files (%s) are not partitioned.".format(pathName)) val sd = loadAvroSequenceDictionary(pathName) @@ -2363,11 +2322,14 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log val variants = ParquetUnboundVariantRDD(sc, pathName, sd, headers) - val datasetBoundVariantRDD: VariantRDD = regions match { - case Some(x) => DatasetBoundVariantRDD(variants.dataset - .filter(referenceRegionsToDatasetQueryString(x)), variants.sequences, headers) - case _ => DatasetBoundVariantRDD(variants.dataset, variants.sequences, headers) - } + val datasetBoundVariantRDD = + if (regions.nonEmpty) { + DatasetBoundVariantRDD(variants.dataset.filter(referenceRegionsToDatasetQueryString(regions)), + variants.sequences, + headers) + } else { + DatasetBoundVariantRDD(variants.dataset, variants.sequences, headers) + } datasetBoundVariantRDD } @@ -2693,24 +2655,22 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log * @param pathName The path name to load alignment records from. * Globs/directories are supported. * @param regions Optional list of genomic regions to load. - * @param addChrPrefix Flag to add "chr" prefix to contigs * @return Returns a FeatureRDD. */ - - def loadPartitionedParquetFeatures(pathName: String, regions: Option[Iterable[ReferenceRegion]] = None): FeatureRDD = { + def loadPartitionedParquetFeatures(pathName: String, regions: Iterable[ReferenceRegion] = Iterable.empty): FeatureRDD = { require(checkPartitionedParquetFlag(pathName), "Input Parquet files (%s) are not partitioned.".format(pathName)) val sd = loadAvroSequenceDictionary(pathName) val features = ParquetUnboundFeatureRDD(sc, pathName, sd) - val datasetBoundFeatureRDD: FeatureRDD = regions match { - case Some(x) => DatasetBoundFeatureRDD(features.dataset - .filter(referenceRegionsToDatasetQueryString(x)), features.sequences) - case _ => DatasetBoundFeatureRDD(features.dataset, features.sequences) - } - + val datasetBoundFeatureRDD = + if (!regions.isEmpty) { + DatasetBoundFeatureRDD(features.dataset.filter(referenceRegionsToDatasetQueryString(regions)), + features.sequences) + } else { + DatasetBoundFeatureRDD(features.dataset, features.sequences) + } datasetBoundFeatureRDD - } /** @@ -2751,21 +2711,23 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log * @param pathName The path name to load alignment records from. * Globs/directories are supported. * @param regions Optional list of genomic regions to load. - * @param addChrPrefix Flag to add "chr" prefix to contigs * @return Returns a NucleotideContigFragmentRDD */ - def loadPartitionedParquetFragments(pathName: String, regions: Option[Iterable[ReferenceRegion]] = None): NucleotideContigFragmentRDD = { + def loadPartitionedParquetFragments(pathName: String, regions: Iterable[ReferenceRegion] = Iterable.empty): NucleotideContigFragmentRDD = { require(checkPartitionedParquetFlag(pathName), "Input Parquet files (%s) are not partitioned.".format(pathName)) val sd = loadAvroSequenceDictionary(pathName) val nucleotideContigFragments = ParquetUnboundNucleotideContigFragmentRDD(sc, pathName, sd) - val datasetBoundNucleotideContigFragmentRDD: NucleotideContigFragmentRDD = regions match { - case Some(x) => DatasetBoundNucleotideContigFragmentRDD(nucleotideContigFragments.dataset - .filter(referenceRegionsToDatasetQueryString(x)), nucleotideContigFragments.sequences) - case _ => DatasetBoundNucleotideContigFragmentRDD(nucleotideContigFragments.dataset, nucleotideContigFragments.sequences) - } - + val datasetBoundNucleotideContigFragmentRDD = + if (regions.nonEmpty) { + DatasetBoundNucleotideContigFragmentRDD(nucleotideContigFragments + .dataset.filter(referenceRegionsToDatasetQueryString(regions)), + nucleotideContigFragments.sequences) + } else { + DatasetBoundNucleotideContigFragmentRDD(nucleotideContigFragments.dataset, + nucleotideContigFragments.sequences) + } datasetBoundNucleotideContigFragmentRDD } @@ -3192,39 +3154,56 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } /** - * Test if Parquet files are partitioned + * Return true if the specified path of Parquet + Avro files is partitioned. * * @param filePath Path in which to look for partitioned flag. - * @return Return True if partitioned flag found, False otherwise. + * @return Return true if the specified path of Parquet + Avro files is partitioned. + * Behavior is undefined if some paths in glob are contain paritioned flag and some do not. */ - def checkPartitionedParquetFlag(filePath: String): Boolean = { val path = new Path(filePath, "_isPartitionedByStartPos") val fs = path.getFileSystem(sc.hadoopConfiguration) - fs.exists(path) + + val files: Array[Path] = getFsAndFilesWithFilter(filePath, new FileFilter("_isPartitionedByStartPos")) + + // if getFsAndFilesWithFilter calls succeeds without an exception then _isPartitionedByStartPos was found + // so we return true + return true + } /** - * Returns a query string used to filter a dataset based on an input list of ReferenceRegion + * Returns a query string used to filter a dataset based on zero or more ReferenceRegions * - * @param regions list of regions to include in query - * @param partitionSize size of genomic range partitions used when writing parquet - * @param addChrPrefix add the prefix "chr" to the contig names - use if the original loaded data and sequence - * dictionary used "chr" prefix - * @return + * @param regions Zero or more regions to include in a query. + * @param partitionSize size of of partitions used when writing parquet, in base pairs. Defaults to 1000000 + * @return Returns a query string used to filter a dataset based on zero or more ReferenceRegions */ def referenceRegionsToDatasetQueryString(regions: Iterable[ReferenceRegion], partitionSize: Int = 1000000): String = { - var regionQueryString = "(contigName=" + "\'" + regions.head.referenceName + "\' and posBin >= \'" + - scala.math.floor(regions.head.start / partitionSize).toInt + "\' and posBin < \'" + (scala.math.floor(regions.head.end / partitionSize).toInt + 1) + "\' and start >= " + regions.head.start + " and end <= " + regions.head.end + ")" + println("This is input regions: ") + regions.foreach(x => println(x)) + + val regionQueryString = regions.map(r => "(contigName=" + "\'" + r.referenceName + "\' and positionBin >= \'" + + scala.math.floor(r.start / partitionSize).toInt + "\' and positionBin < \'" + + (scala.math.floor(r.end / partitionSize).toInt + 1) + + "\' and start >= " + r.start + " and end <= " + r.end + ")") + .mkString(" or ") + + var regionQueryString_old = "(contigName=" + "\'" + regions.head.referenceName + "\' and positionBin >= \'" + + scala.math.floor(regions.head.start / partitionSize).toInt + "\' and positionBin < \'" + (scala.math.floor(regions.head.end / partitionSize).toInt + 1) + "\' and start >= " + regions.head.start + " and end <= " + regions.head.end + ")" if (regions.size > 1) { regions.foreach((i) => { - regionQueryString = regionQueryString + " or " + "(contigName=" + "\'" + - i.referenceName + "\' and posBin >= \'" + scala.math.floor(i.start / partitionSize).toInt + "\' and posBin < \'" + (scala.math.floor(i.end / partitionSize).toInt + 1) + "\' and start >= " + i.start + " and end <= " + i.end + ")" + regionQueryString_old = regionQueryString_old + " or " + "(contigName=" + "\'" + + i.referenceName + "\' and positionBin >= \'" + scala.math.floor(i.start / partitionSize).toInt + "\' and positionBin < \'" + (scala.math.floor(i.end / partitionSize).toInt + 1) + "\' and start >= " + i.start + " and end <= " + i.end + ")" }) } + + println("new region QueryStirng: " + regionQueryString) + println("old region Query Stirng: " + regionQueryString_old) + regionQueryString - } + } } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala index ac67b7ede5..ea81436975 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala @@ -2519,27 +2519,31 @@ abstract class AvroGenomicRDD[T <% IndexedRecord: Manifest, U <: Product, V <: A saveAsParquet(new JavaSaveArgs(filePath)) } + /** + * Saves this RDD to disk in range binned partitioned Parquet + Avro format + * + * @param filePath Path to save the file at. + */ def writePartitionedParquetFlag(filePath: String): Boolean = { val path = new Path(filePath, "_isPartitionedByStartPos") - val fs = path.getFileSystem(toDF().sqlContext.sparkContext.hadoopConfiguration) + val fs = path.getFileSystem(rdd.context.hadoopConfiguration) fs.createNewFile(path) } - def saveAsPartitionedParquet(filePath: String, - compressCodec: CompressionCodecName = CompressionCodecName.GZIP, - partitionSize: Int = 1000000) { + private[rdd] def saveAsPartitionedParquet(filePath: String, + compressCodec: CompressionCodecName = CompressionCodecName.GZIP, + partitionSize: Int = 1000000) { log.warn("Saving directly as Hive-partitioned Parquet from SQL. " + "Options other than compression codec are ignored.") val df = toDF() - df.withColumn("posBin", floor(df("start") / partitionSize)) + df.withColumn("positionBin", floor(df("start") / partitionSize)) .write - .partitionBy("contigName", "posBin") + .partitionBy("contigName", "positionBin") .format("parquet") .option("spark.sql.parquet.compression.codec", compressCodec.toString.toLowerCase()) .save(filePath) writePartitionedParquetFlag(filePath) - //rdd.context.writePartitionedParquetFlag(filePath) saveMetadata(filePath) } diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/feature/FeatureRDDSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/feature/FeatureRDDSuite.scala index 2bce87cce7..b52ef92d90 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/feature/FeatureRDDSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/feature/FeatureRDDSuite.scala @@ -925,7 +925,7 @@ class FeatureRDDSuite extends ADAMFunSuite { assert(rdd3.dataset.count === 4) } - sparkTest("load paritioned parquet to sql, save, re-read from avro") { + sparkTest("load partitioned parquet to sql, save, re-read from avro") { def testMetadata(fRdd: FeatureRDD) { val sequenceRdd = fRdd.addSequence(SequenceRecord("aSequence", 1000L)) assert(sequenceRdd.sequences.containsRefName("aSequence")) diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala index f4e17351a0..b822215f52 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala @@ -638,7 +638,7 @@ class AlignmentRecordRDDSuite extends ADAMFunSuite { assert(rdd3.dataset.count === 20) } - sparkTest("load from sam, save as partitioend parquet, and and re-read from partitioned parquet") { + sparkTest("load from sam, save as partitioned parquet, and and re-read from partitioned parquet") { def testMetadata(arRdd: AlignmentRecordRDD) { val sequenceRdd = arRdd.addSequence(SequenceRecord("aSequence", 1000L)) assert(sequenceRdd.sequences.containsRefName("aSequence")) @@ -658,7 +658,7 @@ class AlignmentRecordRDDSuite extends ADAMFunSuite { assert(rdd2.rdd.count === 3) assert(rdd2.dataset.count === 3) - val rdd3 = sc.loadPartitionedParquetAlignments(outputPath, Option(List(ReferenceRegion("1", 240000000L, 241000000L), ReferenceRegion("2", 189000000L, 190000000L)))) + val rdd3 = sc.loadPartitionedParquetAlignments(outputPath, List(ReferenceRegion("1", 240000000L, 241000000L), ReferenceRegion("2", 189000000L, 190000000L))) assert(rdd3.dataset.count === 2) assert(rdd3.rdd.count === 2) diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDDSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDDSuite.scala index 8cd127615c..6cf50b56fa 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDDSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDDSuite.scala @@ -128,7 +128,7 @@ class GenotypeRDDSuite extends ADAMFunSuite { assert(starts(752790L)) } - sparkTest("round trip to paritioned parquet") { + sparkTest("round trip to partitioned parquet") { val genotypes = sc.loadGenotypes(testFile("small.vcf")) val outputPath = tmpLocation() genotypes.saveAsPartitionedParquet(outputPath) diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variant/VariantRDDSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variant/VariantRDDSuite.scala index 765f79cf3d..fdb538d3f5 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variant/VariantRDDSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variant/VariantRDDSuite.scala @@ -155,7 +155,7 @@ class VariantRDDSuite extends ADAMFunSuite { assert(unfilteredVariants.rdd.count === 6) assert(unfilteredVariants.dataset.count === 6) - val regionsVariants = sc.loadPartitionedParquetVariants(outputPath, Option(List(ReferenceRegion("2", 19000L, 21000L), ReferenceRegion("13", 752700L, 752750L)))) + val regionsVariants = sc.loadPartitionedParquetVariants(outputPath, List(ReferenceRegion("2", 19000L, 21000L), ReferenceRegion("13", 752700L, 752750L))) assert(regionsVariants.rdd.count === 2) assert(regionsVariants.dataset.count === 2) }