Skip to content

Commit

Permalink
Removed feature and gene contexts.
Browse files Browse the repository at this point in the history
  • Loading branch information
fnothaft authored and massie committed Feb 12, 2015
1 parent 32f51ce commit 3f0eadb
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.models.BaseFeature
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.features.FeaturesContext._
import org.bdgenomics.formats.avro.Feature
import org.kohsuke.args4j.Argument

Expand All @@ -49,16 +48,6 @@ class Features2ADAM(val args: Features2ADAMArgs)
val companion = Features2ADAM

def run(sc: SparkContext, job: Job) {
// get file extension
// regex: anything . (extension) EOL
val extensionPattern = """.*[.]([^.]*)$""".r
val extensionPattern(extension) = args.featuresFile
val features: RDD[Feature] = extension.toLowerCase match {
case "gff" => sc.adamGTFFeatureLoad(args.featuresFile) // TODO(Timothy) write a GFF-specific loader?
case "gtf" => sc.adamGTFFeatureLoad(args.featuresFile)
case "bed" => sc.adamBEDFeatureLoad(args.featuresFile)
case "narrowpeak" => sc.adamNarrowPeakFeatureLoad(args.featuresFile)
}
features.adamParquetSave(args)
sc.loadFeatures(args.featuresFile).adamParquetSave(args)
}
}
10 changes: 3 additions & 7 deletions adam-cli/src/main/scala/org/bdgenomics/adam/cli/PrintGenes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,12 @@
package org.bdgenomics.adam.cli

import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.models._
import org.bdgenomics.adam.rdd.features.FeaturesContext._
import org.bdgenomics.adam.models.GeneContext._
import org.bdgenomics.adam.rdd.features.GeneFeatureRDD._
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.formats.avro.Feature
import org.kohsuke.args4j.{ Option => option, Argument }
import org.bdgenomics.adam.rdd.ADAMContext._
import org.apache.spark.SparkContext

object PrintGenes extends ADAMCommandCompanion {
val commandName: String = "print_genes"
Expand All @@ -48,8 +45,7 @@ class PrintGenes(protected val args: PrintGenesArgs)
val companion = PrintGenes

def run(sc: SparkContext, job: Job): Unit = {
val features: RDD[Feature] = sc.adamGTFFeatureLoad(args.gtfInput)
val genes: RDD[Gene] = features.asGenes()
val genes: RDD[Gene] = sc.loadGenes(args.gtfInput)

genes.map(printGene).collect().foreach(println)
}
Expand Down
18 changes: 1 addition & 17 deletions adam-core/src/main/scala/org/bdgenomics/adam/models/Gene.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.bdgenomics.adam.models

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.rdd.features.GeneFeatureRDD._
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.util.SequenceUtils
import org.bdgenomics.formats.avro.{ Strand, Feature }

Expand Down Expand Up @@ -215,22 +215,6 @@ case class UTR(transcriptId: String, strand: Boolean, region: ReferenceRegion)
extends BlockExtractable(strand, region) {
}

object GeneContext {
implicit def sparkContextToGeneContext(sc: SparkContext): GeneContext = new GeneContext(sc)
}

class GeneContext(sc: SparkContext) {

// This import has to go here and not at the top level, because there's a
// conflict with another implicit used in asGenes, below.
import org.bdgenomics.adam.rdd.ADAMContext._

def loadGTFGenes(filename: String): RDD[Gene] = {
val features: RDD[Feature] = sc.adamLoad(filename)
features.asGenes()
}
}

object ReferenceUtils {

def unionReferenceSet(refs: Iterable[ReferenceRegion]): Iterable[ReferenceRegion] = {
Expand Down
47 changes: 47 additions & 0 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.bdgenomics.adam.models._
import org.bdgenomics.adam.predicates.ADAMPredicate
import org.bdgenomics.adam.projections.{ AlignmentRecordField, NucleotideContigFragmentField, Projection }
import org.bdgenomics.adam.rdd.contig.NucleotideContigFragmentRDDFunctions
import org.bdgenomics.adam.rdd.features._
import org.bdgenomics.adam.rdd.pileup.{ PileupRDDFunctions, RodRDDFunctions }
import org.bdgenomics.adam.rdd.read.{ AlignmentRecordContext, AlignmentRecordRDDFunctions }
import org.bdgenomics.adam.rdd.variation.VariationContext._
Expand Down Expand Up @@ -69,6 +70,9 @@ object ADAMContext {
// Add methods specific to the ADAMNucleotideContig RDDs
implicit def rddToContigFragmentRDD(rdd: RDD[NucleotideContigFragment]) = new NucleotideContigFragmentRDDFunctions(rdd)

// add gene feature rdd functions
implicit def convertBaseFeatureRDDToGeneFeatureRDD(rdd: RDD[Feature]) = new GeneFeatureRDDFunctions(rdd)

// Add implicits for the rich adam objects
implicit def recordToRichRecord(record: AlignmentRecord): RichAlignmentRecord = new RichAlignmentRecord(record)

Expand Down Expand Up @@ -330,6 +334,49 @@ class ADAMContext(val sc: SparkContext) extends Serializable with Logging {
}
}

private def maybeLoadGTF(filePath: String): Option[RDD[Feature]] = {
if (filePath.endsWith(".gtf") || filePath.endsWith(".gff")) {
Some(sc.textFile(filePath).flatMap(new GTFParser().parse))
} else {
None
}
}

private def maybeLoadBED(filePath: String): Option[RDD[Feature]] = {
if (filePath.endsWith(".bed")) {
Some(sc.textFile(filePath).flatMap(new BEDParser().parse))
} else {
None
}
}

private def maybeLoadNarrowPeak(filePath: String): Option[RDD[Feature]] = {
if (filePath.toLowerCase.endsWith(".narrowpeak")) {
Some(sc.textFile(filePath).flatMap(new NarrowPeakParser().parse))
} else {
None
}
}

def loadFeatures[U <: ADAMPredicate[Feature]](
filePath: String,
predicate: Option[Class[U]] = None,
projection: Option[Schema] = None): RDD[Feature] = {
maybeLoadBED(filePath).orElse(
maybeLoadGTF(filePath)
).orElse(
maybeLoadNarrowPeak(filePath)
).fold(adamLoad[Feature, U](filePath, predicate, projection))(applyPredicate(_, predicate))
}

def loadGenes[U <: ADAMPredicate[Feature]](filePath: String,
predicate: Option[Class[U]] = None,
projection: Option[Schema] = None): RDD[Gene] = {
new GeneFeatureRDDFunctions(maybeLoadGTF(filePath)
.fold(adamLoad[Feature, U](filePath, predicate, projection))(applyPredicate(_, predicate)))
.asGenes()
}

def loadSequence[U <: ADAMPredicate[NucleotideContigFragment]](
filePath: String,
predicate: Option[Class[U]] = None,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,13 @@ import org.bdgenomics.adam.rich.ReferenceMappingContext.FeatureReferenceMapping
import org.bdgenomics.formats.avro.{ Strand, Feature }
import scala.collection.JavaConversions._

object GeneFeatureRDD {
implicit def convertBaseFeatureRDDToGeneFeatureRDD(rdd: RDD[Feature]): GeneFeatureRDD =
new GeneFeatureRDD(rdd)

def strand(str: Strand): Boolean =
str match {
case Strand.Forward => true
case Strand.Reverse => false
case Strand.Independent => true
}
}
class GeneFeatureRDDFunctions(featureRDD: RDD[Feature]) extends Serializable with Logging {

class GeneFeatureRDD(featureRDD: RDD[Feature]) extends Serializable with Logging {
private def strand(str: Strand): Boolean = str match {
case Strand.Forward => true
case Strand.Reverse => false
case Strand.Independent => true
}

def asGenes(): RDD[Gene] = {

Expand Down Expand Up @@ -77,23 +71,23 @@ class GeneFeatureRDD(featureRDD: RDD[Feature]) extends Serializable with Logging
case ("exon", ftr: Feature) =>
val ids: Seq[String] = ftr.getParentIds.map(_.toString)
ids.map(transcriptId => (transcriptId,
Exon(ftr.getFeatureId.toString, transcriptId, GeneFeatureRDD.strand(ftr.getStrand), FeatureReferenceMapping.getReferenceRegion(ftr))))
Exon(ftr.getFeatureId.toString, transcriptId, strand(ftr.getStrand), FeatureReferenceMapping.getReferenceRegion(ftr))))
}.groupByKey()

val cdsByTranscript: RDD[(String, Iterable[CDS])] =
typePartitioned.filter(_._1 == "CDS").flatMap {
case ("CDS", ftr: Feature) =>
val ids: Seq[String] = ftr.getParentIds.map(_.toString)
ids.map(transcriptId => (transcriptId,
CDS(transcriptId, GeneFeatureRDD.strand(ftr.getStrand), FeatureReferenceMapping.getReferenceRegion(ftr))))
CDS(transcriptId, strand(ftr.getStrand), FeatureReferenceMapping.getReferenceRegion(ftr))))
}.groupByKey()

val utrsByTranscript: RDD[(String, Iterable[UTR])] =
typePartitioned.filter(_._1 == "UTR").flatMap {
case ("UTR", ftr: Feature) =>
val ids: Seq[String] = ftr.getParentIds.map(_.toString)
ids.map(transcriptId => (transcriptId,
UTR(transcriptId, GeneFeatureRDD.strand(ftr.getStrand), FeatureReferenceMapping.getReferenceRegion(ftr))))
UTR(transcriptId, strand(ftr.getStrand), FeatureReferenceMapping.getReferenceRegion(ftr))))
}.groupByKey()

// Step #3
Expand All @@ -113,7 +107,7 @@ class GeneFeatureRDD(featureRDD: RDD[Feature]) extends Serializable with Logging
val geneIds: Seq[String] = tgtf.getParentIds.map(_.toString) // should be length 1
geneIds.map(geneId => (geneId,
Transcript(transcriptId, Seq(transcriptId), geneId,
GeneFeatureRDD.strand(tgtf.getStrand),
strand(tgtf.getStrand),
exons, cds.getOrElse(Seq()), utrs.getOrElse(Seq()))))
}.groupByKey()

Expand All @@ -123,7 +117,7 @@ class GeneFeatureRDD(featureRDD: RDD[Feature]) extends Serializable with Logging
}.leftOuterJoin(transcriptsByGene).map {
case (geneId: String, (ggtf: Feature, transcripts: Option[Iterable[Transcript]])) =>
Gene(geneId, Seq(geneId),
GeneFeatureRDD.strand(ggtf.getStrand),
strand(ggtf.getStrand),
transcripts.getOrElse(Seq()))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,18 @@ package org.bdgenomics.adam.models

import java.io.{ FileInputStream, File }
import java.util.zip.GZIPInputStream

import org.apache.spark.SparkContext._
import org.bdgenomics.adam.rdd.ADAMContext._
import GeneContext._
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.rdd.features.GeneFeatureRDD._
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.util.ADAMFunSuite
import org.bdgenomics.adam.rdd.features.FeaturesContext._
import org.bdgenomics.formats.avro.Feature

import scala.io.Source

class GeneSuite extends ADAMFunSuite {

sparkTest("can load a set of gene models from an Ensembl GTF file") {
val path = testFile("features/Homo_sapiens.GRCh37.75.trun100.gtf")
val features: RDD[Feature] = sc.adamGTFFeatureLoad(path)
val features: RDD[Feature] = sc.loadFeatures(path)

val genes: RDD[Gene] = features.asGenes()
assert(genes.count() === 4)
Expand All @@ -49,7 +44,7 @@ class GeneSuite extends ADAMFunSuite {

sparkTest("can load a set of gene models from a Gencode GTF file") {
val path = testFile("features/gencode.v19.annotation.chr20.250k.gtf")
val features: RDD[Feature] = sc.adamGTFFeatureLoad(path)
val features: RDD[Feature] = sc.loadFeatures(path)

val genes: RDD[Gene] = features.asGenes()
assert(genes.count() === 8)
Expand Down Expand Up @@ -102,7 +97,7 @@ class GeneSuite extends ADAMFunSuite {
(key.split("\\|").head, seq)
}

val features: RDD[Feature] = sc.adamGTFFeatureLoad(path)
val features: RDD[Feature] = sc.loadFeatures(path)
val genes: RDD[Gene] = features.asGenes()
val transcripts: Seq[Transcript] = genes.flatMap(g => g.transcripts).take(100)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.bdgenomics.adam.predicates.HighQualityReadPredicate
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.util.PhredUtils._
import org.bdgenomics.adam.util.ADAMFunSuite
import org.bdgenomics.formats.avro.{ AlignmentRecord, Contig }
import org.bdgenomics.formats.avro._

class ADAMContextSuite extends ADAMFunSuite {

Expand Down Expand Up @@ -178,5 +178,24 @@ class ADAMContextSuite extends ADAMFunSuite {
val tempDir = tempFile.getParentFile
new File(tempDir, tempFile.getName + suffix).getAbsolutePath
}

sparkTest("Can read a .gtf file") {
val path = testFile("features/Homo_sapiens.GRCh37.75.trun20.gtf")
val features: RDD[Feature] = sc.loadFeatures(path)
assert(features.count === 15)
}

sparkTest("Can read a .bed file") {
// note: this .bed doesn't actually conform to the UCSC BED spec...sigh...
val path = testFile("features/gencode.v7.annotation.trunc10.bed")
val features: RDD[Feature] = sc.loadFeatures(path)
assert(features.count === 10)
}

sparkTest("Can read a .narrowPeak file") {
val path = testFile("features/wgEncodeOpenChromDnaseGm19238Pk.trunc10.narrowPeak")
val annot: RDD[Feature] = sc.loadFeatures(path)
assert(annot.count === 10)
}
}

This file was deleted.

0 comments on commit 3f0eadb

Please sign in to comment.