Skip to content

Commit

Permalink
implement reassignParentIds
Browse files Browse the repository at this point in the history
  • Loading branch information
heuermh committed Jun 14, 2016
1 parent 1ccc99f commit aa5b867
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,79 +47,57 @@ class FeatureFile(parser: FeatureParser) extends Serializable {
}

object GTFParser {
private val PATTERN = "\\s*([^\\s]+)\\s\"([^\"]+)\"".r

private val attr_regex = "\\s*([^\\s]+)\\s\"([^\"]+)\"".r

/**
* Parses a string of format
* token; token; token ...
*
* where each 'token' is of the form
* key "value"
*
* and turns it into a Map
*
* @param attributeField The original string of tokens
* @return The Map of attributes
*/
def parseAttrs(attributeField: String): Map[String, String] =
def parseAttributes(attributeField: String): Seq[(String, String)] =
attributeField.split(";").flatMap {
case token: String =>
attr_regex.findFirstMatchIn(token).map(m => (m.group(1), m.group(2)))
}.toMap
PATTERN.findFirstMatchIn(token).map(m => (m.group(1), m.group(2)))
}
}

/**
* GTF is a line-based GFF variant.
* Parser for GTF/GFF2 format.
*
* Details of the GTF/GFF format here:
* http://www.ensembl.org/info/website/upload/gff.html
* Specification:
* http://useast.ensembl.org/info/website/upload/gff.html
*/
class GTFParser extends FeatureParser {

override def parse(line: String): Seq[Feature] = {
// Just skip the '#' prefixed lines, these are comments in the
// GTF file format.
// skip header and comment lines
if (line.startsWith("#")) {
return Seq()
}

val fields = line.split("\t")

val (seqname, source, feature, start, end, score, strand, frame, attribute) =
// skip empty or invalid lines
if (fields.length < 9) {
return Seq()
}
val (seqname, source, featureType, start, end, score, strand, frame, attributes) =
(fields(0), fields(1), fields(2), fields(3), fields(4), fields(5), fields(6), fields(7), fields(8))

lazy val attrs = GTFParser.parseAttrs(attribute)

val f = Feature.newBuilder()
.setContigName(seqname)
.setStart(start.toLong - 1) // GTF/GFF ranges are 1-based
.setEnd(end.toLong) // GTF/GFF ranges are closed
.setFeatureType(feature)
.setSource(source)
.setFeatureType(featureType)
.setContigName(seqname)
.setStart(start.toLong - 1L) // GTF/GFF2 coordinate system is 1-based
.setEnd(end.toLong) // GTF/GFF2 ranges are closed

// set score if specified
if (score != ".") {
f.setScore(source.toDouble)
}

Features.toStrand(strand).foreach(f.setStrand(_))

val exonId: Option[String] = attrs.get("exon_id").orElse {
attrs.get("transcript_id").flatMap(t => attrs.get("exon_number").map(e => t + "_" + e))
// set frame if specified
if (frame != ".") {
f.setFrame(frame.toInt)
}
// set strand if specified
Features.toStrand(strand).foreach(f.setStrand(_))

val (_id, _parentId) =
feature match {
case "gene" => (attrs.get("gene_id"), None)
case "transcript" => (attrs.get("transcript_id"), attrs.get("gene_id"))
case "exon" => (exonId, attrs.get("transcript_id"))
case "CDS" | "UTR" => (attrs.get("id"), attrs.get("transcript_id"))
case _ => (attrs.get("id"), None)
}
_id.foreach(f.setFeatureId)
_parentId.foreach(parentId => f.setParentIds(List[String](parentId)))

f.setAttributes(attrs)
// assign values for various feature fields from attributes
Features.assignAttributes(GTFParser.parseAttributes(attributes), f)

Seq(f.build())
}
Expand All @@ -132,6 +110,9 @@ object GFF3Parser {

/**
* Parser for GFF3 format.
*
* Specification:
* http://www.sequenceontology.org/gff3.shtml
*/
class GFF3Parser extends FeatureParser {

Expand All @@ -142,6 +123,10 @@ class GFF3Parser extends FeatureParser {
}

val fields = line.split("\t")
// skip empty or invalid lines
if (fields.length < 9) {
return Seq()
}
val (seqid, source, featureType, start, end, score, strand, phase, attributes) =
(fields(0), fields(1), fields(2), fields(3), fields(4), fields(5), fields(6), fields(7), fields(8))

Expand Down Expand Up @@ -170,6 +155,12 @@ class GFF3Parser extends FeatureParser {
}
}

/**
* Parser for IntervalList format.
*
* In lieu of a specification, see:
* https://samtools.github.io/htsjdk/javadoc/htsjdk/htsjdk/samtools/util/IntervalList.html
*/
class IntervalListParser extends Serializable {
def parse(line: String): (Option[SequenceRecord], Option[Feature]) = {
val fields = line.split("[ \t]+")
Expand Down Expand Up @@ -240,6 +231,12 @@ class IntervalListParser extends Serializable {

}

/**
* Parser for BED format.
*
* Specification:
* http://www.genome.ucsc.edu/FAQ/FAQformat.html#format1
*/
class BEDParser extends FeatureParser {

def isHeader(line: String): Boolean = {
Expand Down Expand Up @@ -304,6 +301,12 @@ class BEDParser extends FeatureParser {
}
}

/**
* Parser for NarrowPeak format.
*
* Specification:
* http://www.genome.ucsc.edu/FAQ/FAQformat.html#format12
*/
class NarrowPeakParser extends FeatureParser {

override def parse(line: String): Seq[Feature] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.bdgenomics.adam.rdd.features

import org.bdgenomics.utils.misc.Logging
import com.google.common.base.Joiner
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.bdgenomics.adam.models._
Expand Down Expand Up @@ -135,6 +134,36 @@ class FeatureRDDFunctions(featureRDD: RDD[Feature]) extends Serializable with Lo
genes
}

def reassignParentIds(): RDD[Feature] = {
def reassignParentId(f: Feature): Feature = {
val fb = Feature.newBuilder(f)

val featureId: Option[String] = Option(f.getFeatureId)
val geneId: Option[String] = Option(f.getGeneId)
val transcriptId: Option[String] = Option(f.getTranscriptId)
val exonNumber: Option[String] = Option(f.getAttributes.get("exon_number"))
val exonId: Option[String] = Option(f.getExonId) match {
case Some(e) => Option(e)
case None => transcriptId.flatMap(t => exonNumber.map(e => t + "_" + e))
}

val (_featureId, _parentId) =
f.getFeatureType match {
case "gene" => (geneId, None)
case "transcript" => (transcriptId, geneId)
case "exon" => (exonId, transcriptId)
case "CDS" | "UTR" => (featureId, transcriptId)
case _ => (featureId, None)
}
// replace featureId and parentId if defined
_featureId.foreach(fb.setFeatureId)
_parentId.foreach(parentId => fb.setParentIds(List[String](parentId)))

fb.build()
}
featureRDD.map(reassignParentId)
}

def filterByOverlappingRegion(query: ReferenceRegion): RDD[Feature] = {
def overlapsQuery(rec: Feature): Boolean =
rec.getContigName == query.referenceName &&
Expand All @@ -157,8 +186,8 @@ class FeatureRDDFunctions(featureRDD: RDD[Feature]) extends Serializable with Lo
val score = Option(feature.getScore).getOrElse(".")
val strand = Features.toString(feature.getStrand)
val frame = Option(feature.getFrame).getOrElse(".")
val attributes = Joiner.on("; ").join(Features.gatherAttributes(feature).map(escape))
Joiner.on("\t").join(seqname, source, featureType, start: java.lang.Long, end: java.lang.Long, score, strand, frame, attributes)
val attributes = Features.gatherAttributes(feature).map(escape).mkString("; ")
List(seqname, source, featureType, start, end, score, strand, frame, attributes).mkString("\t")
}
featureRDD.map(toGtf).saveAsTextFile(fileName)
}
Expand All @@ -177,8 +206,8 @@ class FeatureRDDFunctions(featureRDD: RDD[Feature]) extends Serializable with Lo
val score = Option(feature.getScore).getOrElse(".")
val strand = Features.toString(feature.getStrand)
val phase = Option(feature.getPhase).getOrElse(".")
val attributes = Joiner.on(";").join(Features.gatherAttributes(feature).map(escape))
Joiner.on("\t").join(seqid, source, featureType, start: java.lang.Long, end: java.lang.Long, score, strand, phase, attributes)
val attributes = Features.gatherAttributes(feature).map(escape).mkString(";")
List(seqid, source, featureType, start, end, score, strand, phase, attributes).mkString("\t")
}
featureRDD.map(toGff3).saveAsTextFile(fileName)
}
Expand All @@ -194,7 +223,7 @@ class FeatureRDDFunctions(featureRDD: RDD[Feature]) extends Serializable with Lo

if (!feature.getAttributes.containsKey("thickStart")) {
// write BED6 format
Joiner.on('\t').join(chrom, start, end, name, score, strand)
List(chrom, start, end, name, score, strand).mkString("\t")
} else {
// write BED12 format
val thickStart = feature.getAttributes.getOrElse("thickStart", ".")
Expand All @@ -203,7 +232,7 @@ class FeatureRDDFunctions(featureRDD: RDD[Feature]) extends Serializable with Lo
val blockCount = feature.getAttributes.getOrElse("blockCount", ".")
val blockSizes = feature.getAttributes.getOrElse("blockSizes", ".")
val blockStarts = feature.getAttributes.getOrElse("blockStarts", ".")
Joiner.on('\t').join(chrom, start, end, name, score, strand, thickStart, thickEnd, itemRgb, blockCount, blockSizes, blockStarts)
List(chrom, start, end, name, score, strand, thickStart, thickEnd, itemRgb, blockCount, blockSizes, blockStarts).mkString("\t")
}
}
featureRDD.map(toBed).saveAsTextFile(fileName)
Expand All @@ -219,8 +248,8 @@ class FeatureRDDFunctions(featureRDD: RDD[Feature]) extends Serializable with Lo
val start = feature.getStart + 1 // IntervalList ranges are 1-based
val end = feature.getEnd // IntervalList ranges are closed
val strand = Features.toString(feature.getStrand)
val intervalName = Joiner.on(";").join(Features.gatherAttributes(feature).map(escape))
Joiner.on("\t").join(sequenceName, start, end, strand, intervalName)
val intervalName = Features.gatherAttributes(feature).map(escape).mkString(";")
List(sequenceName, start, end, strand, intervalName).mkString("\t")
}
// todo: SAM style header
featureRDD.map(toInterval).saveAsTextFile(fileName)
Expand All @@ -238,7 +267,7 @@ class FeatureRDDFunctions(featureRDD: RDD[Feature]) extends Serializable with Lo
val pValue = feature.getAttributes.getOrElse("pValue", "-1")
val qValue = feature.getAttributes.getOrElse("qValue", "-1")
val peak = feature.getAttributes.getOrElse("peak", "-1")
Joiner.on("\t").join(chrom, start, end, name, score, strand, signalValue, pValue, qValue, peak)
List(chrom, start, end, name, score, strand, signalValue, pValue, qValue, peak).mkString("\t")
}
featureRDD.map(toPeak).saveAsTextFile(fileName)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ private[features] object Features {
case "Parent" => parentIds += entry._2
case "Dbxref" => toDbxref(entry._2).foreach(dbxrefs += _)
case "Ontology_term" => toOntologyTerm(entry._2).foreach(ontologyTerms += _)
// commonly used keys in GTF/GFF2, e.g. via Ensembl
case "gene_id" => f.setGeneId(entry._2)
case "transcript_id" => f.setTranscriptId(entry._2)
case "exon_id" => f.setExonId(entry._2)
// unrecognized key, save to attributes
case _ => remaining += entry
}
)
Expand Down Expand Up @@ -199,6 +204,9 @@ private[features] object Features {
Option(feature.getGap).foreach(attrs += Tuple2("Gap", _))
Option(feature.getDerivesFrom).foreach(attrs += Tuple2("Derives_from", _))
Option(feature.getIsCircular).foreach(addBooleanTuple)
Option(feature.getGeneId).foreach(attrs += Tuple2("gene_id", _))
Option(feature.getTranscriptId).foreach(attrs += Tuple2("transcript_id", _))
Option(feature.getExonId).foreach(attrs += Tuple2("exon_id", _))
for (alias <- feature.getAliases) attrs += Tuple2("Alias", alias)
for (note <- feature.getNotes) attrs += Tuple2("Note", note)
for (parentId <- feature.getParentIds) attrs += Tuple2("Parent", parentId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ class GeneSuite extends ADAMFunSuite {
val path = testFile("features/Homo_sapiens.GRCh37.75.trun100.gtf")
val features: RDD[Feature] = sc.loadFeatures(path)

val genes: RDD[Gene] = features.toGenes()
val fixedParentIds: RDD[Feature] = features.reassignParentIds

val genes: RDD[Gene] = fixedParentIds.toGenes()
assert(genes.count() === 4)

val transcripts = genes.flatMap(_.transcripts)
Expand All @@ -47,7 +49,9 @@ class GeneSuite extends ADAMFunSuite {
val path = testFile("features/gencode.v19.annotation.chr20.250k.gtf")
val features: RDD[Feature] = sc.loadFeatures(path)

val genes: RDD[Gene] = features.toGenes()
val fixedParentIds: RDD[Feature] = features.reassignParentIds

val genes: RDD[Gene] = fixedParentIds.toGenes()
assert(genes.count() === 8)

val transcripts = genes.flatMap(_.transcripts)
Expand Down Expand Up @@ -99,7 +103,8 @@ class GeneSuite extends ADAMFunSuite {
}

val features: RDD[Feature] = sc.loadFeatures(path)
val genes: RDD[Gene] = features.toGenes()
val fixedParentIds: RDD[Feature] = features.reassignParentIds
val genes: RDD[Gene] = fixedParentIds.toGenes()
val transcripts: Seq[Transcript] = genes.flatMap(g => g.transcripts).take(100)

transcripts.foreach { transcript =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,35 +137,35 @@ class FeatureRDDFunctionsSuite extends ADAMFunSuite with TypeCheckedTripleEquals
}

sparkTest("save GFF3 as GTF format") {
val inputPath = resourcePath("features/amphimedon.gff3") // todo: find a more complete example
val inputPath = resourcePath("features/dvl1.gff3")
val features = sc.loadGff3(inputPath)
val outputPath = tempLocation(".gtf")
features.saveAsGtf(outputPath)
}

sparkTest("save GFF3 as BED format") {
val inputPath = resourcePath("features/amphimedon.gff3")
val inputPath = resourcePath("features/dvl1.gff3")
val features = sc.loadGff3(inputPath)
val outputPath = tempLocation(".bed")
features.saveAsBed(outputPath)
}

sparkTest("save GFF3 as IntervalList format") {
val inputPath = resourcePath("features/amphimedon.gff3")
val inputPath = resourcePath("features/dvl1.gff3")
val features = sc.loadGff3(inputPath)
val outputPath = tempLocation(".interval_list")
features.saveAsIntervalList(outputPath)
}

sparkTest("save GFF3 as NarrowPeak format") {
val inputPath = resourcePath("features/amphimedon.gff3")
val inputPath = resourcePath("features/dvl1.gff3")
val features = sc.loadGff3(inputPath)
val outputPath = tempLocation(".narrowPeak")
features.saveAsNarrowPeak(outputPath)
}

sparkTest("round trip GFF3 format") {
val inputPath = resourcePath("features/amphimedon.gff3")
val inputPath = resourcePath("features/dvl1.gff3")
val expected = sc.loadGff3(inputPath)
val outputPath = tempLocation(".gff3")
expected.saveAsGff3(outputPath)
Expand Down

0 comments on commit aa5b867

Please sign in to comment.