Skip to content

Commit

Permalink
Add inferSequenceDictionary ctr to FeatureRDD.
Browse files Browse the repository at this point in the history
  • Loading branch information
heuermh committed Mar 21, 2017
1 parent dbf4f85 commit 01af076
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 37 deletions.
Expand Up @@ -49,17 +49,21 @@ class TransformFeaturesArgs extends Args4jBase with ParquetSaveArgs {
usage = "Save as a single file, for the text formats.")
var single: Boolean = false

@Args4jOption(required = false, name = "-storage_level", usage = "Set the storage level to use for caching.")
@Args4jOption(required = false, name = "-cache", usage = "Cache before building the sequence dictionary. Recommended for formats other than IntervalList and Parquet.")
var cache: Boolean = false

@Args4jOption(required = false, name = "-storage_level", usage = "Set the storage level to use for caching. Defaults to MEMORY_ONLY.")
var storageLevel: String = "MEMORY_ONLY"
}

class TransformFeatures(val args: TransformFeaturesArgs)
extends BDGSparkCommand[TransformFeaturesArgs] {
val companion = TransformFeatures
val storageLevel = StorageLevel.fromString(args.storageLevel)
val optStorageLevel = if (args.cache) Some(storageLevel) else None

def run(sc: SparkContext) {
sc.loadFeatures(args.featuresFile, storageLevel, None, Option(args.numPartitions))
sc.loadFeatures(args.featuresFile, optStorageLevel, None, Option(args.numPartitions))
.save(args.outputPath, args.single)
}
}
41 changes: 23 additions & 18 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Expand Up @@ -1149,7 +1149,8 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
* Loads features stored in GFF3 format.
*
* @param filePath The path to the file to load.
* @param storageLevel Storage level to use for cache before building the SequenceDictionary.
* @param optStorageLevel Optional storage level to use for cache before building the SequenceDictionary.
* Defaults to StorageLevel.MEMORY_ONLY.
* @param minPartitions An optional minimum number of partitions to load. If
* not set, falls back to the configured Spark default parallelism.
* @param stringency Optional stringency to pass. LENIENT stringency will warn
Expand All @@ -1158,20 +1159,21 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
* @return Returns a FeatureRDD.
*/
def loadGff3(filePath: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY),
minPartitions: Option[Int] = None,
stringency: ValidationStringency = ValidationStringency.LENIENT): FeatureRDD = {
val records = sc.textFile(filePath, minPartitions.getOrElse(sc.defaultParallelism))
.flatMap(new GFF3Parser().parse(_, stringency))
if (Metrics.isRecording) records.instrument() else records
FeatureRDD(records, storageLevel)
FeatureRDD.inferSequenceDictionary(records, optStorageLevel)
}

/**
* Loads features stored in GFF2/GTF format.
*
* @param filePath The path to the file to load.
* @param storageLevel Storage level to use for cache before building the SequenceDictionary.
* @param optStorageLevel Optional storage level to use for cache before building the SequenceDictionary.
* Defaults to StorageLevel.MEMORY_ONLY.
* @param minPartitions An optional minimum number of partitions to load. If
* not set, falls back to the configured Spark default parallelism.
* @param stringency Optional stringency to pass. LENIENT stringency will warn
Expand All @@ -1180,20 +1182,21 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
* @return Returns a FeatureRDD.
*/
def loadGtf(filePath: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY),
minPartitions: Option[Int] = None,
stringency: ValidationStringency = ValidationStringency.LENIENT): FeatureRDD = {
val records = sc.textFile(filePath, minPartitions.getOrElse(sc.defaultParallelism))
.flatMap(new GTFParser().parse(_, stringency))
if (Metrics.isRecording) records.instrument() else records
FeatureRDD(records, storageLevel)
FeatureRDD.inferSequenceDictionary(records, optStorageLevel)
}

/**
* Loads features stored in BED6/12 format.
*
* @param filePath The path to the file to load.
* @param storageLevel Storage level to use for cache before building the SequenceDictionary.
* @param optStorageLevel Optional storage level to use for cache before building the SequenceDictionary.
* Defaults to StorageLevel.MEMORY_ONLY.
* @param minPartitions An optional minimum number of partitions to load. If
* not set, falls back to the configured Spark default parallelism.
* @param stringency Optional stringency to pass. LENIENT stringency will warn
Expand All @@ -1202,20 +1205,21 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
* @return Returns a FeatureRDD.
*/
def loadBed(filePath: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY),
minPartitions: Option[Int] = None,
stringency: ValidationStringency = ValidationStringency.LENIENT): FeatureRDD = {
val records = sc.textFile(filePath, minPartitions.getOrElse(sc.defaultParallelism))
.flatMap(new BEDParser().parse(_, stringency))
if (Metrics.isRecording) records.instrument() else records
FeatureRDD(records, storageLevel)
FeatureRDD.inferSequenceDictionary(records, optStorageLevel)
}

/**
* Loads features stored in NarrowPeak format.
*
* @param filePath The path to the file to load.
* @param storageLevel Storage level to use for cache before building the SequenceDictionary.
* @param optStorageLevel Optional storage level to use for cache before building the SequenceDictionary.
* Defaults to StorageLevel.MEMORY_ONLY.
* @param minPartitions An optional minimum number of partitions to load. If
* not set, falls back to the configured Spark default parallelism.
* @param stringency Optional stringency to pass. LENIENT stringency will warn
Expand All @@ -1224,13 +1228,13 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
* @return Returns a FeatureRDD.
*/
def loadNarrowPeak(filePath: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY),
minPartitions: Option[Int] = None,
stringency: ValidationStringency = ValidationStringency.LENIENT): FeatureRDD = {
val records = sc.textFile(filePath, minPartitions.getOrElse(sc.defaultParallelism))
.flatMap(new NarrowPeakParser().parse(_, stringency))
if (Metrics.isRecording) records.instrument() else records
FeatureRDD(records, storageLevel)
FeatureRDD.inferSequenceDictionary(records, optStorageLevel)
}

/**
Expand Down Expand Up @@ -1324,7 +1328,8 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
* IntervalList. If none of these match, we fall back to Parquet.
*
* @param filePath The path to the file to load.
* @param storageLevel Storage level to use for cache before building the SequenceDictionary.
* @param optStorageLevel Optional storage level to use for cache before building the SequenceDictionary.
* Defaults to StorageLevel.MEMORY_ONLY.
* @param projection An optional projection to push down.
* @param minPartitions An optional minimum number of partitions to use. For
* textual formats, if this is None, we fall back to the Spark default
Expand All @@ -1339,24 +1344,24 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
* @see loadParquetFeatures
*/
def loadFeatures(filePath: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY),
projection: Option[Schema] = None,
minPartitions: Option[Int] = None): FeatureRDD = LoadFeatures.time {

if (filePath.endsWith(".bed")) {
log.info(s"Loading $filePath as BED and converting to features. Projection is ignored.")
loadBed(filePath, storageLevel, minPartitions)
loadBed(filePath, optStorageLevel, minPartitions)
} else if (filePath.endsWith(".gff3")) {
log.info(s"Loading $filePath as GFF3 and converting to features. Projection is ignored.")
loadGff3(filePath, storageLevel, minPartitions)
loadGff3(filePath, optStorageLevel, minPartitions)
} else if (filePath.endsWith(".gtf") ||
filePath.endsWith(".gff")) {
log.info(s"Loading $filePath as GTF/GFF2 and converting to features. Projection is ignored.")
loadGtf(filePath, storageLevel, minPartitions)
loadGtf(filePath, optStorageLevel, minPartitions)
} else if (filePath.endsWith(".narrowPeak") ||
filePath.endsWith(".narrowpeak")) {
log.info(s"Loading $filePath as NarrowPeak and converting to features. Projection is ignored.")
loadNarrowPeak(filePath, storageLevel, minPartitions)
loadNarrowPeak(filePath, optStorageLevel, minPartitions)
} else if (filePath.endsWith(".interval_list")) {
log.info(s"Loading $filePath as IntervalList and converting to features. Projection is ignored.")
loadIntervalList(filePath, minPartitions)
Expand Down
Expand Up @@ -109,15 +109,17 @@ object FeatureRDD {
* aggregate to rebuild the SequenceDictionary.
*
* @param rdd The underlying Feature RDD to build from.
* @param storageLevel Storage level to use for cache before building the SequenceDictionary.
* @param optStorageLevel Optional storage level to use for cache before building the SequenceDictionary.
* @return Returns a new FeatureRDD.
*/
def apply(
def inferSequenceDictionary(
rdd: RDD[Feature],
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): FeatureRDD = BuildSequenceDictionary.time {
optStorageLevel: Option[StorageLevel]): FeatureRDD = BuildSequenceDictionary.time {

// cache the rdd, since we're making multiple passes
rdd.persist(storageLevel)
// optionally cache the rdd, since we're making multiple passes
optStorageLevel.map(
rdd.persist(_)
)

// create sequence records with length max(start, end) + 1L
val sequenceRecords = rdd
Expand Down
Expand Up @@ -129,7 +129,7 @@ class ADAMContextSuite extends ADAMFunSuite {

sparkTest("Can read a .bed file without cache") {
val path = testFile("gencode.v7.annotation.trunc10.bed")
val features: RDD[Feature] = sc.loadFeatures(path, StorageLevel.NONE).rdd
val features: RDD[Feature] = sc.loadFeatures(path, Some(StorageLevel.NONE)).rdd
assert(features.count === 10)
}

Expand Down
Expand Up @@ -50,7 +50,7 @@ class CoverageRDDSuite extends ADAMFunSuite {
val f2 = Feature.newBuilder().setContigName("chr1").setStart(15).setEnd(20).setScore(2.0).build()
val f3 = Feature.newBuilder().setContigName("chr2").setStart(15).setEnd(20).setScore(2.0).build()

val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)))
val featureRDD: FeatureRDD = FeatureRDD.inferSequenceDictionary(sc.parallelize(Seq(f1, f2, f3)), None)
val coverageRDD: CoverageRDD = featureRDD.toCoverage

val outputFile = tmpLocation(".bed")
Expand All @@ -71,7 +71,7 @@ class CoverageRDDSuite extends ADAMFunSuite {
val f2 = Feature.newBuilder().setContigName("chr1").setStart(15).setEnd(20).setScore(2.0).build()
val f3 = Feature.newBuilder().setContigName("chr2").setStart(15).setEnd(20).setScore(2.0).build()

val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)))
val featureRDD: FeatureRDD = FeatureRDD.inferSequenceDictionary(sc.parallelize(Seq(f1, f2, f3)), None)
val coverageRDD: CoverageRDD = featureRDD.toCoverage

val outputFile = tmpLocation(".adam")
Expand All @@ -88,7 +88,7 @@ class CoverageRDDSuite extends ADAMFunSuite {
val f2 = Feature.newBuilder().setContigName("chr1").setStart(5).setEnd(7).setScore(3.0).build()
val f3 = Feature.newBuilder().setContigName("chr1").setStart(7).setEnd(20).setScore(4.0).build()

val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)))
val featureRDD: FeatureRDD = FeatureRDD.inferSequenceDictionary(sc.parallelize(Seq(f1, f2, f3)), None)
val coverageRDD: CoverageRDD = featureRDD.toCoverage
val coverage = coverageRDD.coverage(bpPerBin = 4)

Expand All @@ -100,7 +100,7 @@ class CoverageRDDSuite extends ADAMFunSuite {
val f2 = Feature.newBuilder().setContigName("chr1").setStart(5).setEnd(7).setScore(3.0).build()
val f3 = Feature.newBuilder().setContigName("chr1").setStart(7).setEnd(20).setScore(4.0).build()

val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)))
val featureRDD: FeatureRDD = FeatureRDD.inferSequenceDictionary(sc.parallelize(Seq(f1, f2, f3)), None)
val coverageRDD: CoverageRDD = featureRDD.toCoverage

val coverage = coverageRDD
Expand Down
Expand Up @@ -417,7 +417,7 @@ class FeatureRDDSuite extends ADAMFunSuite with TypeCheckedTripleEquals {
val f6 = fb.setContigName("1").setStart(10L).setEnd(110L).clearStrand().build() // null strand last
val f7 = fb.setContigName("2").build()

val features = FeatureRDD(sc.parallelize(Seq(f7, f6, f5, f4, f3, f2, f1)))
val features = FeatureRDD.inferSequenceDictionary(sc.parallelize(Seq(f7, f6, f5, f4, f3, f2, f1)), None)
val sorted = features.sortByReference().rdd.collect()

assert(f1 == sorted(0))
Expand All @@ -439,7 +439,7 @@ class FeatureRDDSuite extends ADAMFunSuite with TypeCheckedTripleEquals {
val f6 = fb.setScore(0.9).build() // Double defaults to increasing sort order
val f7 = fb.clearScore().build() // nulls last

val features = FeatureRDD(sc.parallelize(Seq(f7, f6, f5, f4, f3, f2, f1)))
val features = FeatureRDD.inferSequenceDictionary(sc.parallelize(Seq(f7, f6, f5, f4, f3, f2, f1)), None)
val sorted = features.sortByReference().rdd.collect()

assert(f1 == sorted(0))
Expand All @@ -457,7 +457,7 @@ class FeatureRDDSuite extends ADAMFunSuite with TypeCheckedTripleEquals {
val f2 = fb.setGeneId("gene2").build()
val f3 = fb.clearGeneId().build() // nulls last

val features = FeatureRDD(sc.parallelize(Seq(f3, f2, f1)))
val features = FeatureRDD.inferSequenceDictionary(sc.parallelize(Seq(f3, f2, f1)), None)
val sorted = features.sortByReference().rdd.collect()

assert(f1 == sorted(0))
Expand All @@ -473,7 +473,7 @@ class FeatureRDDSuite extends ADAMFunSuite with TypeCheckedTripleEquals {
val f4 = fb.setGeneId("gene2").setTranscriptId("transcript2").build()
val f5 = fb.setGeneId("gene2").clearTranscriptId().build() // nulls last

val features = FeatureRDD(sc.parallelize(Seq(f5, f4, f3, f2, f1)))
val features = FeatureRDD.inferSequenceDictionary(sc.parallelize(Seq(f5, f4, f3, f2, f1)), None)
val sorted = features.sortByReference().rdd.collect()

assert(f1 == sorted(0))
Expand All @@ -495,7 +495,7 @@ class FeatureRDDSuite extends ADAMFunSuite with TypeCheckedTripleEquals {
val f8 = fb.setGeneId("gene2").setTranscriptId("transcript1").setAttributes(ImmutableMap.of("rank", "2")).build()
val f9 = fb.setGeneId("gene2").setTranscriptId("transcript1").clearAttributes().build() // nulls last

val features = FeatureRDD(sc.parallelize(Seq(f9, f8, f7, f6, f5, f4, f3, f2, f1)))
val features = FeatureRDD.inferSequenceDictionary(sc.parallelize(Seq(f9, f8, f7, f6, f5, f4, f3, f2, f1)), None)
val sorted = features.sortByReference().rdd.collect()

assert(f1 == sorted(0))
Expand All @@ -517,7 +517,7 @@ class FeatureRDDSuite extends ADAMFunSuite with TypeCheckedTripleEquals {
val f4 = fb.setAttributes(ImmutableMap.of("rank", "2")).build()
val f5 = fb.clearAttributes().build() // nulls last

val features = FeatureRDD(sc.parallelize(Seq(f5, f4, f3, f2, f1)))
val features = FeatureRDD.inferSequenceDictionary(sc.parallelize(Seq(f5, f4, f3, f2, f1)), None)
val sorted = features.sortByReference().rdd.collect()

assert(f1 == sorted(0))
Expand All @@ -532,7 +532,7 @@ class FeatureRDDSuite extends ADAMFunSuite with TypeCheckedTripleEquals {
val f2 = Feature.newBuilder().setContigName("chr1").setStart(15).setEnd(20).setScore(2.0).build()
val f3 = Feature.newBuilder().setContigName("chr2").setStart(15).setEnd(20).setScore(2.0).build()

val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)))
val featureRDD: FeatureRDD = FeatureRDD.inferSequenceDictionary(sc.parallelize(Seq(f1, f2, f3)), None)
val coverageRDD: CoverageRDD = featureRDD.toCoverage
val coverage = coverageRDD.flatten

Expand Down

0 comments on commit 01af076

Please sign in to comment.