diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformFeatures.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformFeatures.scala index 305bc169c8..bbb5487f44 100644 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformFeatures.scala +++ b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformFeatures.scala @@ -18,7 +18,6 @@ package org.bdgenomics.adam.cli import org.apache.spark.SparkContext -import org.apache.spark.storage.StorageLevel import org.bdgenomics.adam.rdd.ADAMContext._ import org.bdgenomics.utils.cli._ import org.kohsuke.args4j.{ Argument, Option ⇒ Args4jOption } @@ -52,25 +51,16 @@ class TransformFeaturesArgs extends Args4jBase with ParquetSaveArgs { @Args4jOption(required = false, name = "-disable_fast_concat", usage = "Disables the parallel file concatenation engine.") var disableFastConcat: Boolean = false - - @Args4jOption(required = false, name = "-cache", usage = "Cache before building the sequence dictionary. Recommended for formats other than IntervalList and Parquet.") - var cache: Boolean = false - - @Args4jOption(required = false, name = "-storage_level", usage = "Set the storage level to use for caching. Defaults to MEMORY_ONLY.") - var storageLevel: String = "MEMORY_ONLY" } class TransformFeatures(val args: TransformFeaturesArgs) extends BDGSparkCommand[TransformFeaturesArgs] { val companion = TransformFeatures - val storageLevel = StorageLevel.fromString(args.storageLevel) - val optStorageLevel = if (args.cache) Some(storageLevel) else None def run(sc: SparkContext) { sc.loadFeatures( args.featuresFile, - optStorageLevel = optStorageLevel, optMinPartitions = Option(args.numPartitions), optProjection = None ).save(args.outputPath, args.single, args.disableFastConcat) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/instrumentation/Timers.scala b/adam-core/src/main/scala/org/bdgenomics/adam/instrumentation/Timers.scala index 8075fb48f8..5177a0c680 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/instrumentation/Timers.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/instrumentation/Timers.scala @@ -137,7 +137,4 @@ object Timers extends Metrics { val FullOuterShuffleJoin = timer("Full outer shuffle region join") val ShuffleJoinAndGroupByLeft = timer("Shuffle join followed by group-by on left") val RightOuterShuffleJoinAndGroupByLeft = timer("Right outer shuffle join followed by group-by on left") - - // org.bdgenomics.adam.rdd.feature.FeatureRDD - val BuildSequenceDictionary = timer("Build SequenceDictionary for Features") } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala index 4f3f88745a..be9663dfe7 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala @@ -42,7 +42,6 @@ import org.apache.parquet.hadoop.util.ContextUtil import org.apache.spark.SparkContext import org.apache.spark.rdd.MetricsContext._ import org.apache.spark.rdd.RDD -import org.apache.spark.storage.StorageLevel import org.bdgenomics.adam.converters._ import org.bdgenomics.adam.instrumentation.Timers._ import org.bdgenomics.adam.io._ @@ -1310,8 +1309,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log * Globs/directories are supported, although file extension must be present * for BED6/12, GFF3, GTF/GFF2, NarrowPeak, or IntervalList formats. * @param optSequenceDictionary Optional sequence dictionary. Defaults to None. - * @param optStorageLevel Optional storage level to use for cache before building - * the sequence dictionary, if one is not provided. Defaults to StorageLevel.MEMORY_ONLY. * @param optMinPartitions An optional minimum number of partitions to use. For * textual formats, if this is None, fall back to the Spark default * parallelism. Defaults to None. @@ -1326,7 +1323,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log def loadCoverage( pathName: String, optSequenceDictionary: Option[SequenceDictionary] = None, - optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY), optMinPartitions: Option[Int] = None, optPredicate: Option[FilterPredicate] = None, optProjection: Option[Schema] = None, @@ -1334,7 +1330,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log loadFeatures(pathName, optSequenceDictionary = optSequenceDictionary, - optStorageLevel = optStorageLevel, optMinPartitions = optMinPartitions, optPredicate = optPredicate, optProjection = optProjection, @@ -1380,8 +1375,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log * @param pathName The path name to load features in GFF3 format from. * Globs/directories are supported. * @param optSequenceDictionary Optional sequence dictionary. Defaults to None. - * @param optStorageLevel Optional storage level to use for cache before building - * the sequence dictionary, if one is not provided. Defaults to StorageLevel.MEMORY_ONLY. * @param optMinPartitions An optional minimum number of partitions to load. If * not set, falls back to the configured Spark default parallelism. Defaults to None. * @param stringency The validation stringency to use when validating GFF3 format. @@ -1391,7 +1384,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log def loadGff3( pathName: String, optSequenceDictionary: Option[SequenceDictionary] = None, - optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY), optMinPartitions: Option[Int] = None, stringency: ValidationStringency = ValidationStringency.STRICT): FeatureRDD = LoadGff3.time { @@ -1400,7 +1392,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log if (Metrics.isRecording) records.instrument() else records optSequenceDictionary - .fold(FeatureRDD(records, optStorageLevel = optStorageLevel))(FeatureRDD(records, _)) + .fold(FeatureRDD(records))(FeatureRDD(records, _)) } /** @@ -1409,8 +1401,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log * @param pathName The path name to load features in GTF/GFF2 format from. * Globs/directories are supported. * @param optSequenceDictionary Optional sequence dictionary. Defaults to None. - * @param optStorageLevel Optional storage level to use for cache before building - * the sequence dictionary, if one is not provided. Defaults to StorageLevel.MEMORY_ONLY. * @param optMinPartitions An optional minimum number of partitions to load. If * not set, falls back to the configured Spark default parallelism. Defaults to None. * @param stringency The validation stringency to use when validating GTF/GFF2 format. @@ -1420,7 +1410,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log def loadGtf( pathName: String, optSequenceDictionary: Option[SequenceDictionary] = None, - optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY), optMinPartitions: Option[Int] = None, stringency: ValidationStringency = ValidationStringency.STRICT): FeatureRDD = LoadGtf.time { @@ -1429,7 +1418,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log if (Metrics.isRecording) records.instrument() else records optSequenceDictionary - .fold(FeatureRDD(records, optStorageLevel = optStorageLevel))(FeatureRDD(records, _)) + .fold(FeatureRDD(records))(FeatureRDD(records, _)) } /** @@ -1438,8 +1427,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log * @param pathName The path name to load features in BED6/12 format from. * Globs/directories are supported. * @param optSequenceDictionary Optional sequence dictionary. Defaults to None. - * @param optStorageLevel Optional storage level to use for cache before building - * the sequence dictionary, if one is not provided. Defaults to StorageLevel.MEMORY_ONLY. * @param optMinPartitions An optional minimum number of partitions to load. If * not set, falls back to the configured Spark default parallelism. Defaults to None. * @param stringency The validation stringency to use when validating BED6/12 format. @@ -1449,7 +1436,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log def loadBed( pathName: String, optSequenceDictionary: Option[SequenceDictionary] = None, - optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY), optMinPartitions: Option[Int] = None, stringency: ValidationStringency = ValidationStringency.STRICT): FeatureRDD = LoadBed.time { @@ -1458,7 +1444,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log if (Metrics.isRecording) records.instrument() else records optSequenceDictionary - .fold(FeatureRDD(records, optStorageLevel = optStorageLevel))(FeatureRDD(records, _)) + .fold(FeatureRDD(records))(FeatureRDD(records, _)) } /** @@ -1467,8 +1453,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log * @param pathName The path name to load features in NarrowPeak format from. * Globs/directories are supported. * @param optSequenceDictionary Optional sequence dictionary. Defaults to None. - * @param optStorageLevel Optional storage level to use for cache before building - * the sequence dictionary, if one is not provided. Defaults to StorageLevel.MEMORY_ONLY. * @param optMinPartitions An optional minimum number of partitions to load. If * not set, falls back to the configured Spark default parallelism. Defaults to None. * @param stringency The validation stringency to use when validating NarrowPeak format. @@ -1478,7 +1462,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log def loadNarrowPeak( pathName: String, optSequenceDictionary: Option[SequenceDictionary] = None, - optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY), optMinPartitions: Option[Int] = None, stringency: ValidationStringency = ValidationStringency.STRICT): FeatureRDD = LoadNarrowPeak.time { @@ -1487,7 +1470,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log if (Metrics.isRecording) records.instrument() else records optSequenceDictionary - .fold(FeatureRDD(records, optStorageLevel = optStorageLevel))(FeatureRDD(records, _)) + .fold(FeatureRDD(records))(FeatureRDD(records, _)) } /** @@ -1643,8 +1626,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log * Globs/directories are supported, although file extension must be present * for BED6/12, GFF3, GTF/GFF2, NarrowPeak, or IntervalList formats. * @param optSequenceDictionary Optional sequence dictionary. Defaults to None. - * @param optStorageLevel Optional storage level to use for cache before building - * the sequence dictionary, if one is not provided. Defaults to StorageLevel.MEMORY_ONLY. * @param optMinPartitions An optional minimum number of partitions to use. For * textual formats, if this is None, fall back to the Spark default * parallelism. Defaults to None. @@ -1659,7 +1640,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log def loadFeatures( pathName: String, optSequenceDictionary: Option[SequenceDictionary] = None, - optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY), optMinPartitions: Option[Int] = None, optPredicate: Option[FilterPredicate] = None, optProjection: Option[Schema] = None, @@ -1670,28 +1650,24 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log log.info(s"Loading $pathName as BED and converting to Features.") loadBed(pathName, optSequenceDictionary = optSequenceDictionary, - optStorageLevel = optStorageLevel, optMinPartitions = optMinPartitions, stringency = stringency) } else if (isGff3Ext(trimmedPathName)) { log.info(s"Loading $pathName as GFF3 and converting to Features.") loadGff3(pathName, optSequenceDictionary = optSequenceDictionary, - optStorageLevel = optStorageLevel, optMinPartitions = optMinPartitions, stringency = stringency) } else if (isGtfExt(trimmedPathName)) { log.info(s"Loading $pathName as GTF/GFF2 and converting to Features.") loadGtf(pathName, optSequenceDictionary = optSequenceDictionary, - optStorageLevel = optStorageLevel, optMinPartitions = optMinPartitions, stringency = stringency) } else if (isNarrowPeakExt(trimmedPathName)) { log.info(s"Loading $pathName as NarrowPeak and converting to Features.") loadNarrowPeak(pathName, optSequenceDictionary = optSequenceDictionary, - optStorageLevel = optStorageLevel, optMinPartitions = optMinPartitions, stringency = stringency) } else if (isIntervalListExt(trimmedPathName)) { diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala index e395b433aa..f46c4b5ea4 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala @@ -24,7 +24,6 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{ Dataset, SQLContext } -import org.apache.spark.storage.StorageLevel import org.bdgenomics.adam.instrumentation.Timers._ import org.bdgenomics.adam.models._ import org.bdgenomics.adam.rdd.ADAMContext._ @@ -116,7 +115,7 @@ object FeatureRDD { * A GenomicRDD that wraps a dataset of Feature data. * * @param ds A Dataset of genomic Features. - * @param sequences The reference genome this data is aligned to. + * @param sequences The reference genome these data are aligned to. */ def apply(ds: Dataset[FeatureProduct], sequences: SequenceDictionary): FeatureRDD = { @@ -124,38 +123,20 @@ object FeatureRDD { } /** - * Builds a FeatureRDD without SequenceDictionary information by running an - * aggregate to rebuild the SequenceDictionary. + * Builds a FeatureRDD with an empty sequence dictionary. * * @param rdd The underlying Feature RDD to build from. - * @param optStorageLevel Optional storage level to use for cache before - * building the SequenceDictionary. * @return Returns a new FeatureRDD. */ - def apply( - rdd: RDD[Feature], - optStorageLevel: Option[StorageLevel]): FeatureRDD = BuildSequenceDictionary.time { - - // optionally cache the rdd, since we're making multiple passes - optStorageLevel.foreach(rdd.persist(_)) - - // create sequence records with length max(start, end) + 1L - val sequenceRecords = rdd - .keyBy(_.getContigName) - .map(kv => (kv._1, max(kv._2.getStart, kv._2.getEnd) + 1L)) - .reduceByKey(max(_, _)) - .map(kv => SequenceRecord(kv._1, kv._2)) - - val sd = new SequenceDictionary(sequenceRecords.collect.toVector) - - FeatureRDD(rdd, sd) + def apply(rdd: RDD[Feature]): FeatureRDD = { + FeatureRDD(rdd, SequenceDictionary.empty) } /** - * Builds a FeatureRDD without a partitionMap. + * Builds a FeatureRDD given a sequence dictionary. * - * @param rdd The underlying Feature RDD. - * @param sd The Sequence Dictionary for the Feature RDD. + * @param rdd The underlying Feature RDD to build from. + * @param sd The sequence dictionary for this FeatureRDD. * @return Returns a new FeatureRDD. */ def apply(rdd: RDD[Feature], sd: SequenceDictionary): FeatureRDD = { diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala index 5d6a6201bc..e9e9578e2f 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala @@ -25,7 +25,6 @@ import org.apache.parquet.filter2.dsl.Dsl._ import org.apache.parquet.filter2.predicate.FilterPredicate import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.rdd.RDD -import org.apache.spark.storage.StorageLevel import org.bdgenomics.adam.models._ import org.bdgenomics.adam.rdd.ADAMContext._ import org.bdgenomics.adam.util.PhredUtils._ @@ -147,12 +146,6 @@ class ADAMContextSuite extends ADAMFunSuite { assert(features.count === 4) } - sparkTest("Can read a .bed file without cache") { - val path = testFile("gencode.v7.annotation.trunc10.bed") - val features: RDD[Feature] = sc.loadFeatures(path, optStorageLevel = Some(StorageLevel.NONE)).rdd - assert(features.count === 10) - } - sparkTest("Can read a .narrowPeak file") { val path = testFile("wgEncodeOpenChromDnaseGm19238Pk.trunc10.narrowPeak") val annot: RDD[Feature] = sc.loadFeatures(path).rdd diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/feature/CoverageRDDSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/feature/CoverageRDDSuite.scala index 206d2bba67..982a0da118 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/feature/CoverageRDDSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/feature/CoverageRDDSuite.scala @@ -49,7 +49,7 @@ class CoverageRDDSuite extends ADAMFunSuite { val f2 = Feature.newBuilder().setContigName("chr1").setStart(15).setEnd(20).setScore(2.0).build() val f3 = Feature.newBuilder().setContigName("chr2").setStart(15).setEnd(20).setScore(2.0).build() - val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)), optStorageLevel = None) + val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3))) val coverageRDD: CoverageRDD = featureRDD.toCoverage val outputFile = tmpLocation(".bed") @@ -103,7 +103,7 @@ class CoverageRDDSuite extends ADAMFunSuite { val f2 = Feature.newBuilder().setContigName("chr1").setStart(15).setEnd(20).setScore(2.0).build() val f3 = Feature.newBuilder().setContigName("chr2").setStart(15).setEnd(20).setScore(2.0).build() - val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)), optStorageLevel = None) + val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3))) val coverageRDD: CoverageRDD = featureRDD.toCoverage val outputFile = tmpLocation(".adam") @@ -120,7 +120,7 @@ class CoverageRDDSuite extends ADAMFunSuite { val f2 = Feature.newBuilder().setContigName("chr1").setStart(5).setEnd(7).setScore(3.0).build() val f3 = Feature.newBuilder().setContigName("chr1").setStart(7).setEnd(20).setScore(4.0).build() - val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)), optStorageLevel = None) + val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3))) val coverageRDD: CoverageRDD = featureRDD.toCoverage val coverage = coverageRDD.coverage(bpPerBin = 4) @@ -132,7 +132,7 @@ class CoverageRDDSuite extends ADAMFunSuite { val f2 = Feature.newBuilder().setContigName("chr1").setStart(5).setEnd(7).setScore(3.0).build() val f3 = Feature.newBuilder().setContigName("chr1").setStart(7).setEnd(20).setScore(4.0).build() - val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)), optStorageLevel = None) + val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3))) val coverageRDD: CoverageRDD = featureRDD.toCoverage val coverage = coverageRDD diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/feature/FeatureRDDSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/feature/FeatureRDDSuite.scala index 52fe8bc9c2..566f2eb28b 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/feature/FeatureRDDSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/feature/FeatureRDDSuite.scala @@ -426,7 +426,7 @@ class FeatureRDDSuite extends ADAMFunSuite { val f6 = fb.setContigName("1").setStart(10L).setEnd(110L).clearStrand().build() // null strand last val f7 = fb.setContigName("2").build() - val features = FeatureRDD(sc.parallelize(Seq(f7, f6, f5, f4, f3, f2, f1)), optStorageLevel = None) + val features = FeatureRDD(sc.parallelize(Seq(f7, f6, f5, f4, f3, f2, f1))) val sorted = features.sortByReference().rdd.collect() assert(f1 == sorted(0)) @@ -448,7 +448,7 @@ class FeatureRDDSuite extends ADAMFunSuite { val f6 = fb.setScore(0.9).build() // Double defaults to increasing sort order val f7 = fb.clearScore().build() // nulls last - val features = FeatureRDD(sc.parallelize(Seq(f7, f6, f5, f4, f3, f2, f1)), optStorageLevel = None) + val features = FeatureRDD(sc.parallelize(Seq(f7, f6, f5, f4, f3, f2, f1))) val sorted = features.sortByReference().rdd.collect() assert(f1 == sorted(0)) @@ -466,7 +466,7 @@ class FeatureRDDSuite extends ADAMFunSuite { val f2 = fb.setGeneId("gene2").build() val f3 = fb.clearGeneId().build() // nulls last - val features = FeatureRDD(sc.parallelize(Seq(f3, f2, f1)), optStorageLevel = None) + val features = FeatureRDD(sc.parallelize(Seq(f3, f2, f1))) val sorted = features.sortByReference().rdd.collect() assert(f1 == sorted(0)) @@ -482,7 +482,7 @@ class FeatureRDDSuite extends ADAMFunSuite { val f4 = fb.setGeneId("gene2").setTranscriptId("transcript2").build() val f5 = fb.setGeneId("gene2").clearTranscriptId().build() // nulls last - val features = FeatureRDD(sc.parallelize(Seq(f5, f4, f3, f2, f1)), optStorageLevel = None) + val features = FeatureRDD(sc.parallelize(Seq(f5, f4, f3, f2, f1))) val sorted = features.sortByReference().rdd.collect() assert(f1 == sorted(0)) @@ -504,7 +504,7 @@ class FeatureRDDSuite extends ADAMFunSuite { val f8 = fb.setGeneId("gene2").setTranscriptId("transcript1").setAttributes(ImmutableMap.of("rank", "2")).build() val f9 = fb.setGeneId("gene2").setTranscriptId("transcript1").clearAttributes().build() // nulls last - val features = FeatureRDD(sc.parallelize(Seq(f9, f8, f7, f6, f5, f4, f3, f2, f1)), optStorageLevel = None) + val features = FeatureRDD(sc.parallelize(Seq(f9, f8, f7, f6, f5, f4, f3, f2, f1))) val sorted = features.sortByReference().rdd.collect() assert(f1 == sorted(0)) @@ -526,7 +526,7 @@ class FeatureRDDSuite extends ADAMFunSuite { val f4 = fb.setAttributes(ImmutableMap.of("rank", "2")).build() val f5 = fb.clearAttributes().build() // nulls last - val features = FeatureRDD(sc.parallelize(Seq(f5, f4, f3, f2, f1)), optStorageLevel = None) + val features = FeatureRDD(sc.parallelize(Seq(f5, f4, f3, f2, f1))) val sorted = features.sortByReference().rdd.collect() assert(f1 == sorted(0)) @@ -541,7 +541,7 @@ class FeatureRDDSuite extends ADAMFunSuite { val f2 = Feature.newBuilder().setContigName("chr1").setStart(15).setEnd(20).setScore(2.0).build() val f3 = Feature.newBuilder().setContigName("chr2").setStart(15).setEnd(20).setScore(2.0).build() - val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)), optStorageLevel = None) + val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3))) val coverageRDD: CoverageRDD = featureRDD.toCoverage val coverage = coverageRDD.flatten @@ -747,35 +747,6 @@ class FeatureRDDSuite extends ADAMFunSuite { val features2 = sc.loadGff3(testFile("dvl1.200.gff3")) val union = features1.union(features2) assert(union.rdd.count === (features1.rdd.count + features2.rdd.count)) - // only a single contig between the two - assert(union.sequences.size === 1) - } - - sparkTest("estimate sequence dictionary contig lengths from GTF format") { - val inputPath = testFile("Homo_sapiens.GRCh37.75.trun100.gtf") - val features = sc.loadGtf(inputPath) - // max(start,end) = 1 36081 - assert(features.sequences.containsRefName("1")) - assert(features.sequences.apply("1").isDefined) - assert(features.sequences.apply("1").get.length >= 36081L) - } - - sparkTest("estimate sequence dictionary contig lengths from GFF3 format") { - val inputPath = testFile("dvl1.200.gff3") - val features = sc.loadGff3(inputPath) - // max(start, end) = 1 1356705 - assert(features.sequences.containsRefName("1")) - assert(features.sequences.apply("1").isDefined) - assert(features.sequences.apply("1").get.length >= 1356705L) - } - - sparkTest("estimate sequence dictionary contig lengths from BED format") { - val inputPath = testFile("dvl1.200.bed") - val features = sc.loadBed(inputPath) - // max(start, end) = 1 1358504 - assert(features.sequences.containsRefName("1")) - assert(features.sequences.apply("1").isDefined) - assert(features.sequences.apply("1").get.length >= 1358504L) } sparkTest("obtain sequence dictionary contig lengths from header in IntervalList format") { @@ -794,15 +765,6 @@ class FeatureRDDSuite extends ADAMFunSuite { assert(features.sequences.apply("chr2").get.length >= 243199373L) } - sparkTest("estimate sequence dictionary contig lengths from NarrowPeak format") { - val inputPath = testFile("wgEncodeOpenChromDnaseGm19238Pk.trunc10.narrowPeak") - val features = sc.loadNarrowPeak(inputPath) - // max(start, end) = chr1 794336 - assert(features.sequences.containsRefName("chr1")) - assert(features.sequences.apply("chr1").isDefined) - assert(features.sequences.apply("chr1").get.length >= 794336L) - } - sparkTest("don't lose any features when piping as BED format") { val inputPath = testFile("dvl1.200.bed") val frdd = sc.loadBed(inputPath)