Skip to content

Commit

Permalink
Use empty sequence dictionary when loading features, fixes #1588
Browse files Browse the repository at this point in the history
  • Loading branch information
heuermh authored and fnothaft committed Jul 11, 2017
1 parent e76b0e2 commit 01b0dc7
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 123 deletions.
Expand Up @@ -18,7 +18,6 @@
package org.bdgenomics.adam.cli

import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.utils.cli._
import org.kohsuke.args4j.{ Argument, Option Args4jOption }
Expand Down Expand Up @@ -52,25 +51,16 @@ class TransformFeaturesArgs extends Args4jBase with ParquetSaveArgs {
@Args4jOption(required = false, name = "-disable_fast_concat",
usage = "Disables the parallel file concatenation engine.")
var disableFastConcat: Boolean = false

@Args4jOption(required = false, name = "-cache", usage = "Cache before building the sequence dictionary. Recommended for formats other than IntervalList and Parquet.")
var cache: Boolean = false

@Args4jOption(required = false, name = "-storage_level", usage = "Set the storage level to use for caching. Defaults to MEMORY_ONLY.")
var storageLevel: String = "MEMORY_ONLY"
}

class TransformFeatures(val args: TransformFeaturesArgs)
extends BDGSparkCommand[TransformFeaturesArgs] {

val companion = TransformFeatures
val storageLevel = StorageLevel.fromString(args.storageLevel)
val optStorageLevel = if (args.cache) Some(storageLevel) else None

def run(sc: SparkContext) {
sc.loadFeatures(
args.featuresFile,
optStorageLevel = optStorageLevel,
optMinPartitions = Option(args.numPartitions),
optProjection = None
).save(args.outputPath, args.single, args.disableFastConcat)
Expand Down
Expand Up @@ -137,7 +137,4 @@ object Timers extends Metrics {
val FullOuterShuffleJoin = timer("Full outer shuffle region join")
val ShuffleJoinAndGroupByLeft = timer("Shuffle join followed by group-by on left")
val RightOuterShuffleJoinAndGroupByLeft = timer("Right outer shuffle join followed by group-by on left")

// org.bdgenomics.adam.rdd.feature.FeatureRDD
val BuildSequenceDictionary = timer("Build SequenceDictionary for Features")
}
32 changes: 4 additions & 28 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Expand Up @@ -42,7 +42,6 @@ import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.spark.SparkContext
import org.apache.spark.rdd.MetricsContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.bdgenomics.adam.converters._
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.io._
Expand Down Expand Up @@ -1310,8 +1309,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
* Globs/directories are supported, although file extension must be present
* for BED6/12, GFF3, GTF/GFF2, NarrowPeak, or IntervalList formats.
* @param optSequenceDictionary Optional sequence dictionary. Defaults to None.
* @param optStorageLevel Optional storage level to use for cache before building
* the sequence dictionary, if one is not provided. Defaults to StorageLevel.MEMORY_ONLY.
* @param optMinPartitions An optional minimum number of partitions to use. For
* textual formats, if this is None, fall back to the Spark default
* parallelism. Defaults to None.
Expand All @@ -1326,15 +1323,13 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
def loadCoverage(
pathName: String,
optSequenceDictionary: Option[SequenceDictionary] = None,
optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY),
optMinPartitions: Option[Int] = None,
optPredicate: Option[FilterPredicate] = None,
optProjection: Option[Schema] = None,
stringency: ValidationStringency = ValidationStringency.STRICT): CoverageRDD = LoadCoverage.time {

loadFeatures(pathName,
optSequenceDictionary = optSequenceDictionary,
optStorageLevel = optStorageLevel,
optMinPartitions = optMinPartitions,
optPredicate = optPredicate,
optProjection = optProjection,
Expand Down Expand Up @@ -1380,8 +1375,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
* @param pathName The path name to load features in GFF3 format from.
* Globs/directories are supported.
* @param optSequenceDictionary Optional sequence dictionary. Defaults to None.
* @param optStorageLevel Optional storage level to use for cache before building
* the sequence dictionary, if one is not provided. Defaults to StorageLevel.MEMORY_ONLY.
* @param optMinPartitions An optional minimum number of partitions to load. If
* not set, falls back to the configured Spark default parallelism. Defaults to None.
* @param stringency The validation stringency to use when validating GFF3 format.
Expand All @@ -1391,7 +1384,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
def loadGff3(
pathName: String,
optSequenceDictionary: Option[SequenceDictionary] = None,
optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY),
optMinPartitions: Option[Int] = None,
stringency: ValidationStringency = ValidationStringency.STRICT): FeatureRDD = LoadGff3.time {

Expand All @@ -1400,7 +1392,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
if (Metrics.isRecording) records.instrument() else records

optSequenceDictionary
.fold(FeatureRDD(records, optStorageLevel = optStorageLevel))(FeatureRDD(records, _))
.fold(FeatureRDD(records))(FeatureRDD(records, _))
}

/**
Expand All @@ -1409,8 +1401,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
* @param pathName The path name to load features in GTF/GFF2 format from.
* Globs/directories are supported.
* @param optSequenceDictionary Optional sequence dictionary. Defaults to None.
* @param optStorageLevel Optional storage level to use for cache before building
* the sequence dictionary, if one is not provided. Defaults to StorageLevel.MEMORY_ONLY.
* @param optMinPartitions An optional minimum number of partitions to load. If
* not set, falls back to the configured Spark default parallelism. Defaults to None.
* @param stringency The validation stringency to use when validating GTF/GFF2 format.
Expand All @@ -1420,7 +1410,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
def loadGtf(
pathName: String,
optSequenceDictionary: Option[SequenceDictionary] = None,
optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY),
optMinPartitions: Option[Int] = None,
stringency: ValidationStringency = ValidationStringency.STRICT): FeatureRDD = LoadGtf.time {

Expand All @@ -1429,7 +1418,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
if (Metrics.isRecording) records.instrument() else records

optSequenceDictionary
.fold(FeatureRDD(records, optStorageLevel = optStorageLevel))(FeatureRDD(records, _))
.fold(FeatureRDD(records))(FeatureRDD(records, _))
}

/**
Expand All @@ -1438,8 +1427,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
* @param pathName The path name to load features in BED6/12 format from.
* Globs/directories are supported.
* @param optSequenceDictionary Optional sequence dictionary. Defaults to None.
* @param optStorageLevel Optional storage level to use for cache before building
* the sequence dictionary, if one is not provided. Defaults to StorageLevel.MEMORY_ONLY.
* @param optMinPartitions An optional minimum number of partitions to load. If
* not set, falls back to the configured Spark default parallelism. Defaults to None.
* @param stringency The validation stringency to use when validating BED6/12 format.
Expand All @@ -1449,7 +1436,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
def loadBed(
pathName: String,
optSequenceDictionary: Option[SequenceDictionary] = None,
optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY),
optMinPartitions: Option[Int] = None,
stringency: ValidationStringency = ValidationStringency.STRICT): FeatureRDD = LoadBed.time {

Expand All @@ -1458,7 +1444,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
if (Metrics.isRecording) records.instrument() else records

optSequenceDictionary
.fold(FeatureRDD(records, optStorageLevel = optStorageLevel))(FeatureRDD(records, _))
.fold(FeatureRDD(records))(FeatureRDD(records, _))
}

/**
Expand All @@ -1467,8 +1453,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
* @param pathName The path name to load features in NarrowPeak format from.
* Globs/directories are supported.
* @param optSequenceDictionary Optional sequence dictionary. Defaults to None.
* @param optStorageLevel Optional storage level to use for cache before building
* the sequence dictionary, if one is not provided. Defaults to StorageLevel.MEMORY_ONLY.
* @param optMinPartitions An optional minimum number of partitions to load. If
* not set, falls back to the configured Spark default parallelism. Defaults to None.
* @param stringency The validation stringency to use when validating NarrowPeak format.
Expand All @@ -1478,7 +1462,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
def loadNarrowPeak(
pathName: String,
optSequenceDictionary: Option[SequenceDictionary] = None,
optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY),
optMinPartitions: Option[Int] = None,
stringency: ValidationStringency = ValidationStringency.STRICT): FeatureRDD = LoadNarrowPeak.time {

Expand All @@ -1487,7 +1470,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
if (Metrics.isRecording) records.instrument() else records

optSequenceDictionary
.fold(FeatureRDD(records, optStorageLevel = optStorageLevel))(FeatureRDD(records, _))
.fold(FeatureRDD(records))(FeatureRDD(records, _))
}

/**
Expand Down Expand Up @@ -1643,8 +1626,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
* Globs/directories are supported, although file extension must be present
* for BED6/12, GFF3, GTF/GFF2, NarrowPeak, or IntervalList formats.
* @param optSequenceDictionary Optional sequence dictionary. Defaults to None.
* @param optStorageLevel Optional storage level to use for cache before building
* the sequence dictionary, if one is not provided. Defaults to StorageLevel.MEMORY_ONLY.
* @param optMinPartitions An optional minimum number of partitions to use. For
* textual formats, if this is None, fall back to the Spark default
* parallelism. Defaults to None.
Expand All @@ -1659,7 +1640,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
def loadFeatures(
pathName: String,
optSequenceDictionary: Option[SequenceDictionary] = None,
optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY),
optMinPartitions: Option[Int] = None,
optPredicate: Option[FilterPredicate] = None,
optProjection: Option[Schema] = None,
Expand All @@ -1670,28 +1650,24 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
log.info(s"Loading $pathName as BED and converting to Features.")
loadBed(pathName,
optSequenceDictionary = optSequenceDictionary,
optStorageLevel = optStorageLevel,
optMinPartitions = optMinPartitions,
stringency = stringency)
} else if (isGff3Ext(trimmedPathName)) {
log.info(s"Loading $pathName as GFF3 and converting to Features.")
loadGff3(pathName,
optSequenceDictionary = optSequenceDictionary,
optStorageLevel = optStorageLevel,
optMinPartitions = optMinPartitions,
stringency = stringency)
} else if (isGtfExt(trimmedPathName)) {
log.info(s"Loading $pathName as GTF/GFF2 and converting to Features.")
loadGtf(pathName,
optSequenceDictionary = optSequenceDictionary,
optStorageLevel = optStorageLevel,
optMinPartitions = optMinPartitions,
stringency = stringency)
} else if (isNarrowPeakExt(trimmedPathName)) {
log.info(s"Loading $pathName as NarrowPeak and converting to Features.")
loadNarrowPeak(pathName,
optSequenceDictionary = optSequenceDictionary,
optStorageLevel = optStorageLevel,
optMinPartitions = optMinPartitions,
stringency = stringency)
} else if (isIntervalListExt(trimmedPathName)) {
Expand Down
Expand Up @@ -24,7 +24,6 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{ Dataset, SQLContext }
import org.apache.spark.storage.StorageLevel
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.models._
import org.bdgenomics.adam.rdd.ADAMContext._
Expand Down Expand Up @@ -116,46 +115,28 @@ object FeatureRDD {
* A GenomicRDD that wraps a dataset of Feature data.
*
* @param ds A Dataset of genomic Features.
* @param sequences The reference genome this data is aligned to.
* @param sequences The reference genome these data are aligned to.
*/
def apply(ds: Dataset[FeatureProduct],
sequences: SequenceDictionary): FeatureRDD = {
new DatasetBoundFeatureRDD(ds, sequences)
}

/**
* Builds a FeatureRDD without SequenceDictionary information by running an
* aggregate to rebuild the SequenceDictionary.
* Builds a FeatureRDD with an empty sequence dictionary.
*
* @param rdd The underlying Feature RDD to build from.
* @param optStorageLevel Optional storage level to use for cache before
* building the SequenceDictionary.
* @return Returns a new FeatureRDD.
*/
def apply(
rdd: RDD[Feature],
optStorageLevel: Option[StorageLevel]): FeatureRDD = BuildSequenceDictionary.time {

// optionally cache the rdd, since we're making multiple passes
optStorageLevel.foreach(rdd.persist(_))

// create sequence records with length max(start, end) + 1L
val sequenceRecords = rdd
.keyBy(_.getContigName)
.map(kv => (kv._1, max(kv._2.getStart, kv._2.getEnd) + 1L))
.reduceByKey(max(_, _))
.map(kv => SequenceRecord(kv._1, kv._2))

val sd = new SequenceDictionary(sequenceRecords.collect.toVector)

FeatureRDD(rdd, sd)
def apply(rdd: RDD[Feature]): FeatureRDD = {
FeatureRDD(rdd, SequenceDictionary.empty)
}

/**
* Builds a FeatureRDD without a partitionMap.
* Builds a FeatureRDD given a sequence dictionary.
*
* @param rdd The underlying Feature RDD.
* @param sd The Sequence Dictionary for the Feature RDD.
* @param rdd The underlying Feature RDD to build from.
* @param sd The sequence dictionary for this FeatureRDD.
* @return Returns a new FeatureRDD.
*/
def apply(rdd: RDD[Feature], sd: SequenceDictionary): FeatureRDD = {
Expand Down
Expand Up @@ -25,7 +25,6 @@ import org.apache.parquet.filter2.dsl.Dsl._
import org.apache.parquet.filter2.predicate.FilterPredicate
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.bdgenomics.adam.models._
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.util.PhredUtils._
Expand Down Expand Up @@ -147,12 +146,6 @@ class ADAMContextSuite extends ADAMFunSuite {
assert(features.count === 4)
}

sparkTest("Can read a .bed file without cache") {
val path = testFile("gencode.v7.annotation.trunc10.bed")
val features: RDD[Feature] = sc.loadFeatures(path, optStorageLevel = Some(StorageLevel.NONE)).rdd
assert(features.count === 10)
}

sparkTest("Can read a .narrowPeak file") {
val path = testFile("wgEncodeOpenChromDnaseGm19238Pk.trunc10.narrowPeak")
val annot: RDD[Feature] = sc.loadFeatures(path).rdd
Expand Down
Expand Up @@ -49,7 +49,7 @@ class CoverageRDDSuite extends ADAMFunSuite {
val f2 = Feature.newBuilder().setContigName("chr1").setStart(15).setEnd(20).setScore(2.0).build()
val f3 = Feature.newBuilder().setContigName("chr2").setStart(15).setEnd(20).setScore(2.0).build()

val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)), optStorageLevel = None)
val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)))
val coverageRDD: CoverageRDD = featureRDD.toCoverage

val outputFile = tmpLocation(".bed")
Expand Down Expand Up @@ -103,7 +103,7 @@ class CoverageRDDSuite extends ADAMFunSuite {
val f2 = Feature.newBuilder().setContigName("chr1").setStart(15).setEnd(20).setScore(2.0).build()
val f3 = Feature.newBuilder().setContigName("chr2").setStart(15).setEnd(20).setScore(2.0).build()

val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)), optStorageLevel = None)
val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)))
val coverageRDD: CoverageRDD = featureRDD.toCoverage

val outputFile = tmpLocation(".adam")
Expand All @@ -120,7 +120,7 @@ class CoverageRDDSuite extends ADAMFunSuite {
val f2 = Feature.newBuilder().setContigName("chr1").setStart(5).setEnd(7).setScore(3.0).build()
val f3 = Feature.newBuilder().setContigName("chr1").setStart(7).setEnd(20).setScore(4.0).build()

val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)), optStorageLevel = None)
val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)))
val coverageRDD: CoverageRDD = featureRDD.toCoverage
val coverage = coverageRDD.coverage(bpPerBin = 4)

Expand All @@ -132,7 +132,7 @@ class CoverageRDDSuite extends ADAMFunSuite {
val f2 = Feature.newBuilder().setContigName("chr1").setStart(5).setEnd(7).setScore(3.0).build()
val f3 = Feature.newBuilder().setContigName("chr1").setStart(7).setEnd(20).setScore(4.0).build()

val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)), optStorageLevel = None)
val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)))
val coverageRDD: CoverageRDD = featureRDD.toCoverage

val coverage = coverageRDD
Expand Down

0 comments on commit 01b0dc7

Please sign in to comment.