Skip to content

Commit

Permalink
Add additional filter by convenience methods.
Browse files Browse the repository at this point in the history
  • Loading branch information
heuermh committed Apr 13, 2018
1 parent 75b51e7 commit 62fb35a
Show file tree
Hide file tree
Showing 2 changed files with 200 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,34 @@ case class DatasetBoundFeatureRDD private[rdd] (
.withColumnRenamed("score", "count")
.as[Coverage], sequences)
}

override def filterByFeatureType(featureType: String): FeatureRDD = {
transformDataset(dataset => dataset.filter(dataset.col("featureType").eqNullSafe(featureType)))
}

override def filterByGene(geneId: String): FeatureRDD = {
transformDataset(dataset => dataset.filter(dataset.col("geneId").eqNullSafe(geneId)))
}

override def filterByTranscript(transcriptId: String): FeatureRDD = {
transformDataset(dataset => dataset.filter(dataset.col("transcriptId").eqNullSafe(transcriptId)))
}

override def filterByExon(exonId: String): FeatureRDD = {
transformDataset(dataset => dataset.filter(dataset.col("exonId").eqNullSafe(exonId)))
}

override def filterByScore(minimumScore: Double): FeatureRDD = {
transformDataset(dataset => dataset.filter(dataset.col("score").geq(minimumScore)))
}

override def filterByParent(parentId: String): FeatureRDD = {
transformDataset(dataset => dataset.filter(dataset.col("parentIds").contains(parentId)))
}

override def filterByAttribute(key: String, value: String): FeatureRDD = {
transformDataset(dataset => dataset.filter(dataset.col("attributes").getItem(key).eqNullSafe(value)))
}
}

case class RDDBoundFeatureRDD private[rdd] (
Expand Down Expand Up @@ -427,6 +455,77 @@ sealed abstract class FeatureRDD extends AvroGenomicDataset[Feature, FeatureProd
*/
def toCoverage(): CoverageRDD

/**
* Filter this FeatureRDD by feature type.
*
* @param featureType Feature type to filter by.
* @return FeatureRDD filtered by the specified feature type.
*/
def filterByFeatureType(featureType: String): FeatureRDD = {
transform(rdd => rdd.filter(f => Option(f.getFeatureType).exists(_.equals(featureType))))
}

/**
* Filter this FeatureRDD by gene.
*
* @param geneId Gene to filter by.
* @return FeatureRDD filtered by the specified gene.
*/
def filterByGene(geneId: String): FeatureRDD = {
transform(rdd => rdd.filter(f => Option(f.getGeneId).exists(_.equals(geneId))))
}

/**
* Filter this FeatureRDD by transcript.
*
* @param transcriptId Transcript to filter by.
* @return FeatureRDD filtered by the specified transcript.
*/
def filterByTranscript(transcriptId: String): FeatureRDD = {
transform(rdd => rdd.filter(f => Option(f.getTranscriptId).exists(_.equals(transcriptId))))
}

/**
* Filter this FeatureRDD by exon.
*
* @param exonId Exon to filter by.
* @return FeatureRDD filtered by the specified exon.
*/
def filterByExon(exonId: String): FeatureRDD = {
transform(rdd => rdd.filter(f => Option(f.getExonId).exists(_.equals(exonId))))
}

/**
* Filter this FeatureRDD by score.
*
* @param minimumScore Minimum score to filter by, inclusive.
* @return FeatureRDD filtered by the specified minimum score.
*/
def filterByScore(minimumScore: Double): FeatureRDD = {
transform(rdd => rdd.filter(f => Option(f.getScore).exists(_ >= minimumScore)))
}

/**
* Filter this FeatureRDD by parent.
*
* @param parentId Parent to filter by.
* @return FeatureRDD filtered by the specified parent.
*/
def filterByParent(parentId: String): FeatureRDD = {
transform(rdd => rdd.filter(f => Option(f.getParentIds).exists(_.contains(parentId))))
}

/**
* Filter this FeatureRDD by attribute.
*
* @param key Attribute key to filter by.
* @param value Attribute value to filter by.
* @return FeatureRDD filtered by the specified attribute.
*/
def filterByAttribute(key: String, value: String): FeatureRDD = {
transform(rdd => rdd.filter(f => Option(f.getAttributes.get(key)).exists(_.equals(value))))
}

/**
* @param newRdd The RDD to replace the underlying RDD with.
* @return Returns a new FeatureRDD with the underlying RDD replaced.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1150,4 +1150,105 @@ class FeatureRDDSuite extends ADAMFunSuite {

checkSave(variantContexts)
}

sparkTest("filter RDD bound features by feature type") {
val features = sc.loadFeatures(testFile("dvl1.200.gff3"))
assert(features.filterByFeatureType("gene").rdd.count() === 22)
}

sparkTest("filter dataset bound features by feature type") {
val features = sc.loadFeatures(testFile("dvl1.200.gff3"))
val featuresDs = features.transformDataset(ds => ds)
assert(featuresDs.filterByFeatureType("gene").dataset.count() === 22)
}

sparkTest("filter RDD bound features by gene") {
val fb = Feature.newBuilder()
val f1 = fb.setContigName("1").setStart(1L).setEnd(101L).setGeneId("DVL1").build();
val f2 = fb.setContigName("1").setStart(2L).setEnd(102L).setGeneId("CCDS22.1").build();
val f3 = fb.setContigName("1").setStart(3L).setEnd(103L).setGeneId("CCDS22.1").build();
val features = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)))
assert(features.filterByGene("CCDS22.1").rdd.count() === 2)
}

sparkTest("filter dataset bound features by gene") {
val fb = Feature.newBuilder()
val f1 = fb.setContigName("1").setStart(1L).setEnd(101L).setGeneId("DVL1").build();
val f2 = fb.setContigName("1").setStart(2L).setEnd(102L).setGeneId("CCDS22.1").build();
val f3 = fb.setContigName("1").setStart(3L).setEnd(103L).setGeneId("CCDS22.1").build();
val features = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)))
val featuresDs = features.transformDataset(ds => ds)
assert(features.filterByGene("CCDS22.1").rdd.count() === 2)
}

sparkTest("filter RDD bound features by transcript") {
val fb = Feature.newBuilder()
val f1 = fb.setContigName("1").setStart(1L).setEnd(101L).setTranscriptId("ENST00000339381").build();
val f2 = fb.setContigName("1").setStart(2L).setEnd(102L).setTranscriptId("ENST00000445648").build();
val f3 = fb.setContigName("1").setStart(3L).setEnd(103L).setTranscriptId("ENST00000445648").build();
val features = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)))
assert(features.filterByTranscript("ENST00000445648").rdd.count() === 2)
}

sparkTest("filter dataset bound features by transcript") {
val fb = Feature.newBuilder()
val f1 = fb.setContigName("1").setStart(1L).setEnd(101L).setTranscriptId("ENST00000339381").build();
val f2 = fb.setContigName("1").setStart(2L).setEnd(102L).setTranscriptId("ENST00000445648").build();
val f3 = fb.setContigName("1").setStart(3L).setEnd(103L).setTranscriptId("ENST00000445648").build();
val features = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)))
val featuresDs = features.transformDataset(ds => ds)
assert(features.filterByTranscript("ENST00000445648").rdd.count() === 2)
}

sparkTest("filter RDD bound features by exon") {
val fb = Feature.newBuilder()
val f1 = fb.setContigName("1").setStart(1L).setEnd(101L).setExonId("ENSE00001691126").build();
val f2 = fb.setContigName("1").setStart(2L).setEnd(102L).setExonId("ENSE00001779983").build();
val f3 = fb.setContigName("1").setStart(3L).setEnd(103L).setExonId("ENSE00001779983").build();
val features = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)))
assert(features.filterByExon("ENSE00001779983").rdd.count() === 2)
}

sparkTest("filter dataset bound features by exon") {
val fb = Feature.newBuilder()
val f1 = fb.setContigName("1").setStart(1L).setEnd(101L).setExonId("ENSE00001691126").build();
val f2 = fb.setContigName("1").setStart(2L).setEnd(102L).setExonId("ENSE00001779983").build();
val f3 = fb.setContigName("1").setStart(3L).setEnd(103L).setExonId("ENSE00001779983").build();
val features = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)))
val featuresDs = features.transformDataset(ds => ds)
assert(features.filterByExon("ENSE00001779983").rdd.count() === 2)
}

sparkTest("filter RDD bound features by score") {
val features = sc.loadFeatures(testFile("dvl1.200.bed"))
assert(features.filterByScore(10.0d).rdd.count() === 23)
}

sparkTest("filter dataset bound features by score") {
val features = sc.loadFeatures(testFile("dvl1.200.bed"))
val featuresDs = features.transformDataset(ds => ds)
assert(features.filterByScore(10.0d).rdd.count() === 23)
}

sparkTest("filter RDD bound features by parent") {
val features = sc.loadFeatures(testFile("dvl1.200.gff3"))
assert(features.filterByParent("ENSG00000107404").rdd.count() === 8)
}

sparkTest("filter dataset bound features by parent") {
val features = sc.loadFeatures(testFile("dvl1.200.gff3"))
val featuresDs = features.transformDataset(ds => ds)
assert(features.filterByParent("ENSG00000107404").rdd.count() === 8)
}

sparkTest("filter RDD bound features by attribute") {
val features = sc.loadFeatures(testFile("dvl1.200.gff3"))
assert(features.filterByAttribute("biotype", "protein_coding").rdd.count() === 68)
}

sparkTest("filter dataset bound features by attribute") {
val features = sc.loadFeatures(testFile("dvl1.200.gff3"))
val featuresDs = features.transformDataset(ds => ds)
assert(features.filterByAttribute("biotype", "protein_coding").rdd.count() === 68)
}
}

0 comments on commit 62fb35a

Please sign in to comment.