Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to filter genotype RDD with FeatureRDD #890

NeillGibson opened this issue Nov 29, 2015 · 10 comments

How to filter genotype RDD with FeatureRDD #890

NeillGibson opened this issue Nov 29, 2015 · 10 comments


Copy link

@NeillGibson NeillGibson commented Nov 29, 2015


How can I filter a genotypeRDD with a FeatureRDD? I get the following error:

(BroadcastRegionJoin.partitionAndJoin(genes, humanGenotypesRDD).map(_._2))
<console>:39: error: type mismatch;
 found   : org.apache.spark.rdd.RDD[org.bdgenomics.formats.avro.Feature]
 required: org.apache.spark.rdd.RDD[(org.bdgenomics.adam.models.ReferenceRegion, ?)]
              (BroadcastRegionJoin.partitionAndJoin(genesRDD, genotypesRDD).map(_._2))

with this code:

import org.bdgenomics.adam.rdd.ADAMContext
import org.bdgenomics.formats.avro._
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.BroadcastRegionJoin
import org.bdgenomics.adam.rdd.ADAMContext._

 # Load the genotype RDDs using the AdamContext
val ac = new ADAMContext(sc)
val genotypesRDD = ac.loadGenotypes("/user/ec2-user/1kg/chr22.adam")

val genesRDD = ac.loadFeatures("/user/ec2-user/1kg/gene_annotation.adam")

# based on example from Spark Analytics book (page 213 bottom)
genotypesInGenesRdd = (BroadcastRegionJoin.partitionAndJoin(genesRDD , genotypesRDD).map(_._2))

Do I need to convert the genotypeRDD and FeatureRDD to a ReferenceRegionRDD ? Is this an implicit conversion done automatically by importing a certain class?

Thank you,


Copy link

@fnothaft fnothaft commented Nov 30, 2015

Hi @NeillGibson!

You'll need to key each RDD with a ReferenceRegion, e.g.:

import org.bdgenomics.adam.models.ReferenceRegion
genotypesInGenesRdd = (BroadcastRegionJoin.partitionAndJoin(genesRDD.keyBy(ReferenceRegion(_) , genotypesRDD.keyBy(g => ReferenceRegion(g.getVariant.getContig.getContigName, g.getVariant.getStart, g.getVariant.getEnd))).map(_._2))
Copy link
Contributor Author

@NeillGibson NeillGibson commented Dec 1, 2015

Hi @fnothaft . Thank you for the information, needed to move some parentheses but the command starts to execute.

val genotypesInGenesRDD = (BroadcastRegionJoin.partitionAndJoin(genesRDD.keyBy(ReferenceRegion(_)),genotypesRDD.keyBy(g => ReferenceRegion(g.getVariant.getContig.getContigName,g.getVariant.getStart, g.getVariant.getEnd)))).map(_._2)

Tasks of the operation keep failing thoug and keep being resubmitted:

15/12/01 07:57:24 WARN TaskSetManager: Lost task 6.0 in stage 3.1 (TID 170, Failed to connect to
15/12/01 07:58:24 WARN TaskSetManager: Lost task 6.1 in stage 3.1 (TID 171, Failed to connect to
15/12/01 08:40:16 WARN TaskSetManager: Lost task 21.1 in stage 4.1 (TID 300, FetchFailed(null, shuffleId=1, mapId=-1, reduceId=22, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1
15/12/01 08:40:16 WARN TaskSetManager: Lost task 44.0 in stage 4.1 (TID 307, FetchFailed(null, shuffleId=1, mapId=-1, reduceId=45, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1

15/12/01 09:04:43 INFO DAGScheduler: Resubmitting ShuffleMapStage 3 (flatMap at BroadcastRegionJoin.scala:117) because some of its tasks had failed: 0, 1, 2, 4, 5, 6, 7, 8, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 24, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 45, 47, 48, 49, 50, 51, 52, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 67, 69, 71, 72, 73, 74, 75, 76, 77, 78, 79, 81, 82, 83
15/12/01 09:04:43 INFO DAGScheduler: Submitting ShuffleMapStage 3 (MapPartitionsRDD[13] at flatMap at BroadcastRegionJoin.scala:117), which has no missing parents

Is this by any chance a very resource intensive operation? I am running this on 11 m3.xlarge machines. Or is there another likely cause for the taks that keep failing / being resubmitted. In the end I just aborted the process.

The annoation file that I am using is

(removed the chr from the chromosome names to match the chromosome names in the 1000 genomes vcf file, reference version should match.)

Copy link

@fnothaft fnothaft commented Dec 1, 2015

It shouldn't be terribly expensive, however you may want to try running the ShuffleRegionJoin instead of the BroadcastRegionJoin. I'll need to look over the GTF loading code, but I'm not sure off of the top of my head whether that code maps each feature in a GTF to a Feature object, or just the Gene/Transcript. The reason I'm concerned about that is that if you have just genes as Features, you'll have an RDD of O(20k) Features, while if you have all GTF features (gene, transcript, exon, intron, cds, etc), it could be several orders of magnitude bigger and thus might not fit on the driver, which would cause the BroadcastRegionJoin to fail where the ShuffleRegionJoin would succeed. That being said, I'm a bit skeptical of that as the problem. Are you joining against the whole 1000Genomes variant calls? I can try to repro this on my side.

Copy link
Contributor Author

@NeillGibson NeillGibson commented Dec 2, 2015

Hi @fnothaft. Thank you for the information. I can try to increase the driver memory and the ShuffleRegionJoin.

The vcf file I am testing with is
This is based on the b37 assembly

Some searching online led me to this gene annotation file (the previous one mentioned is for an older genome build).
This is based on the GRCh37 which is the same as b37, just with chr in front of the chromosome names (fixed this with sed).

The basic thing I am trying to achieve is just to see how long it takes with Adam to filter a large set of genotypes based on a gene model. It doesn't need to be this exact gene model, or this detailed of a gene model. If you have a gene model which is compatible with the 1000 genomes data I would be happy to use that one.

I am only using chr22 because the full data set is much bigger. Queries with just chr22 already take some time. And I have other queries that I've tested on chr22. I'll filter the gene model (gtf file) also for just the chr22 gene annotations, before converting to Feature format.

I will post my results here after testing with the correct genome build and the increased driver memory and or ShuffleRegionJoin method.

Copy link
Contributor Author

@NeillGibson NeillGibson commented Dec 6, 2015

Hi @fnothaft,

Increasing the driver memory from 8 to 14 GB (is max machine mem) did not help. Still the same errors.

I am now trying to use the ShuffleRegionJoin to filter the genotypeRDD with the FeatureRDD.

import org.bdgenomics.formats.avro._
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.models.{

import org.bdgenomics.adam.rdd.ShuffleRegionJoin

# Load the genotype RDDs using the AdamContext
val genotypesRDD = sc.loadGenotypes("/user/ec2-user/1kg/chr22.adam")

# Load GRCh37 / hg19 / b37 gene annotation
val genesRDD = sc.loadFeatures("/user/ec2-user/1kg/gene_annotation.adam")

# create sequence dictionary
val seqDict = SequenceDictionary( SequenceRecord("22", 51304566, url = "test://chrom1"))
val partitionSize = 20

val genotypesInGenesRDD = (ShuffleRegionJoin(seqDict, partitionSize).partitionAndJoin(genesRDD.keyBy(ReferenceRegion(_)),humanGenotypesRDD.keyBy(g => ReferenceRegion(g.getVariant.getContig.getContigName,g.getVariant.getStart, g.getVariant.getEnd)))).map(_._2).count

On what should I base the partitionSize? If I set this to 3 (example that I found in Adam test forShuffleRegionJoin ) or 11 (number of nodes) I get an out of memory error.

15/12/06 08:06:00 WARN TaskSetManager: Lost task 10.0 in stage 2.0 (TID 22, java.lang.OutOfMemoryError: Java heap space

If I set partitionSize to 100 I get a lot of small tasks. And processing takes a long time, I did not wait for it to finish.

Do you maybe have an example somewhere with a public vcf file and feature file of real sizes that works, ie the filter a genotypeRDD with BroadCastRegoinJoin or ShuffleRegionJoin. I could then look if I can run that example.

Maybe the issue is just with my gene annotation file.....

@fnothaft fnothaft added this to the 0.21.0 milestone Jul 20, 2016
@heuermh heuermh modified the milestones: 0.21.0, 0.22.0 Oct 13, 2016
@heuermh heuermh added this to Triage in Release 0.23.0 Mar 8, 2017
Copy link

@fnothaft fnothaft commented May 12, 2017

@devin-petersohn Can you take this as part of #1324? I think that all that is needed to resolve this issue is to:

  • Add a unit test demonstrating how to do this exact join.
  • Add docs demonstrating this join to the region join Pandocs.

I'm assigning it to you for now; let me know if you'd rather not.

Copy link

@devin-petersohn devin-petersohn commented Jun 13, 2017

This will be a part of set difference implemented in #1561.

Copy link

@fnothaft fnothaft commented Jul 11, 2017

Ping @devin-petersohn; can you put together a small doc snippet for this? I would like to close this in 0.23.0.

Copy link

@heuermh heuermh commented Jul 13, 2017

I describe something similar here

The gene feature ADAM file came from Ensembl and was filtered via

val features = sc.loadFeatures("Homo_sapiens.GRCh38.89.chr.gff3.gz")
val geneFeatures = features.transform(_.filter(f => f.featureType == "gene"))
Copy link

@devin-petersohn devin-petersohn commented Jul 13, 2017

I am adding multiple additional examples for various things one could do with joins. I should have a PR in tonight. It didn't seem right to simply have the one real-world example.

fnothaft added a commit that referenced this issue Jul 21, 2017
@heuermh heuermh moved this from Triage to Completed in Release 0.23.0 Jan 4, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Linked pull requests

Successfully merging a pull request may close this issue.

None yet
4 participants
You can’t perform that action at this time.