diff --git a/docs/source/55_api.md b/docs/source/55_api.md index 8223587a19..acceff7799 100644 --- a/docs/source/55_api.md +++ b/docs/source/55_api.md @@ -86,7 +86,7 @@ class LoadReads { JavaSparkContext jsc) { // create an ADAMContext first ADAMContext ac = new ADAMContext(jsc.sc()); - + // then wrap that in a JavaADAMContext JavaADAMContext jac = new JavaADAMContext(ac); @@ -163,7 +163,7 @@ described above, the `ADAMContext` adds the implicit methods needed for using ## Working with genomic data using GenomicRDDs {#genomic-rdd} -As described in the section on using the [ADAMContext}(#adam-context), ADAM +As described in the section on using the [ADAMContext](#adam-context), ADAM loads genomic data into a `GenomicRDD` which is specialized for each datatype. This `GenomicRDD` wraps Apache Spark's Resilient Distributed Dataset (RDD, [@zaharia12]) API with genomic metadata. The `RDD` abstraction presents an @@ -177,7 +177,7 @@ read group that a dataset came from. Specifically, ADAM supports the following metadata: * `GenomicRDD` base: A sequence dictionary, which describes the reference - assembly that data is aligned to, if it is aligned. Applies to all types. + assembly that data are aligned to, if it is aligned. Applies to all types. * `MultisampleGenomicRDD`: Adds metadata about the samples in a dataset. Applies to `GenotypeRDD`. * `ReadGroupGenomicRDD`: Adds metadata about the read groups attached to a @@ -257,30 +257,35 @@ bypassing the ADAM APIs. Another useful API implemented in ADAM is the RegionJoin API, which joins two genomic datasets that contain overlapping regions. This primitive is useful for -a number of applications including variant calling (identifying all of the reads -that overlap a candidate variant), coverage analysis (determining the coverage -depth for each region in a reference), and INDEL realignment (identify INDELs +a number of applications including variant calling (identifying all of the reads +that overlap a candidate variant), coverage analysis (determining the coverage +depth for each region in a reference), and INDEL realignment (identify INDELs aligned against a reference). -There are two overlap join implementations available in ADAM: -BroadcastRegionJoin and ShuffleRegionJoin. The result of a ShuffleRegionJoin -is identical to the BroadcastRegionJoin, however they serve different +There are two overlap join implementations available in ADAM: +BroadcastRegionJoin and ShuffleRegionJoin. The result of a ShuffleRegionJoin +is identical to the BroadcastRegionJoin, however they serve different purposes depending on the content of the two datasets. -The ShuffleRegionJoin is at its core a distributed sort-merge overlap join. -To ensure that the data is appropriately colocated, we perform a copartition -on the right dataset before the each node conducts the join locally. The -BroadcastRegionJoin performs an overlap join by broadcasting a copy of the -entire left dataset to each node. ShuffleRegionJoin should be used if the right -dataset is too large to send to all nodes and both datasets have low -cardinality. The BroadcastRegionJoin should be used when you are joining a -smaller dataset to a larger one and/or the datasets in the join have high -cardinality. - -Another important distinction between ShuffleRegionJoin and -BroadcastRegionJoin is the join operations available in ADAM. See the table -below for an exact list of what joins are available for each type of -RegionJoin. +The ShuffleRegionJoin is a distributed sort-merge overlap join. To ensure that +the data are appropriately colocated, we perform a copartition on the right +dataset before the each node conducts the join locally. ShuffleRegionJoin +should be used if the right dataset is too large to send to all nodes and both +datasets have high cardinality. + +The BroadcastRegionJoin performs an overlap join by broadcasting a copy of the +entire left dataset to each node. The BroadcastRegionJoin should be used when +the right side of your join is small enough to be collected and broadcast out, +and the larger side of the join is unsorted and the data are too large to be +worth shuffling, the data are sufficiently skewed that it is hard to load +balance, or you can tolerate unsorted output. + +Another important distinction between ShuffleRegionJoin and +BroadcastRegionJoin is the join operations available in ADAM. Since the +broadcast join doesn't co-partition the datasets and instead sends the full +right table to all nodes, some joins (e.g. left/full outer joins) cannot be +written as broadcast joins. See the table below for an exact list of what joins +are available for each type of region join. To perform a ShuffleRegionJoin, use the following: @@ -294,26 +299,25 @@ To perform a BroadcastRegionJoin, use the following: dataset1.broadcastRegionJoin(dataset2) ``` -Where `dataset1` and `dataset2` are `GenomicRDD`s. If you used the ADAMContext to +Where `dataset1` and `dataset2` are `GenomicRDD`s. If you used the ADAMContext to read a genomic dataset into memory, this condition is met. -ADAM has a variety of ShuffleRegionJoin types that you can perform on your -data, and all are called in a similar way: - -![Joins Available] -(img/join_examples.png) +ADAM has a variety of region join types that you can perform on your data, and +all are called in a similar way: +#### [Joins Available](img/join_examples.png) -Join call | action | Availability -----------|--------| -```dataset1.shuffleRegionJoin(dataset2) ``` ```dataset1.broadcastRegionJoin(dataset2)```| perform an inner join | ShuffleRegionJoin BroadcastRegionJoin -```dataset1.fullOuterShuffleRegionJoin(datset2)```|perform an outer join | ShuffleRegionJoin -```dataset1.leftOuterShuffleRegionJoin(dataset2)```|perform a left outer join | ShuffleRegionJoin -```dataset1.rightOuterShuffleRegionJoin(dataset2)``` ```dataset1.rightOuterBroadcastRegionJoin(dataset2)```|perform a right outer join | ShuffleRegionJoin BroadcastRegionJoin -```dataset1.shuffleRegionJoinAndGroupByLeft(dataset2)``` |perform an inner join and group joined values by the records on the left | ShuffleRegionJoin -```dataset1.broadcastRegionJoinAndGroupByRight(dataset2)``` | perform an inner join and group joined values by the records on the right | ShuffleRegionJoin -```dataset1.rightOuterShuffleRegionJoinAndGroupByLeft(dataset2)```|perform a right outer join and group joined values by the records on the left | ShuffleRegionJoin -```rightOuterBroadcastRegionJoinAndGroupByRight``` | perform a right outer join and group joined values by the records on the right | BroadcastRegionJoin +* Joins implemented across both shuffle and broadcast + * Inner join + * Right outer join +* Shuffle-only joins + * Full outer join + * Inner join and group by left + * Left outer join + * Right outer join and group by left +* Broadcast-only joins + * Inner join and group by right + * Right outer join and group by right One common pattern involves joining a single dataset against many datasets. An example of this is joining an RDD of features (e.g., gene/exon coordinates) @@ -339,6 +343,130 @@ val bcastFeatures = sc.loadFeatures("my/features.adam").broadcast() val readsByFeature = reads.broadcastRegionJoinAgainst(bcastFeatures) ``` +To demonstrate how the RegionJoin APIs can be used to answer scientific +questions, we will walk through three common queries that can be written using +the RegionJoin API. First, we will perform a simple filter on genotypes based +on a file of features. We will then demonstrate a join and group by on variants +and features, providing variant data grouped by the feature they overlap. +Finally, we will separate reads into those that overlap and those that do not +overlap features from a feature file. + +These demonstrations illustrate the difference between calling +ShuffleRegionJoin and BroadcastRegionJoin and provide example code to expand +from. + +###### Filter Genotypes by Features + +This query joins an RDD of Genotypes against an RDD of Features using an inner +join. Because this is an inner join, records from either dataset that don't +pair to the other are automatically dropped, providing the filter we are +interested in. This query is useful for trying to identify genotypes that +overlap features of interest. For example, if our feature file contains all the +exonic regions of the genome, this query would extract all genotypes that fall +in exonic regions. + +```scala +// Inner join will filter out genotypes not covered by a feature +val genotypes = sc.loadGenotypes("my/genotypes.adam") +val features = sc.loadFeatures("my/features.adam") + +// We can use ShuffleRegionJoin… +val joinedGenotypesShuffle = genotypes.shuffleRegionJoin(features) + +// …or BroadcastRegionJoin +val joinedGenotypesBcast = features.broadcastRegionJoin(genotypes) + +// In the case that we only want Genotypes, we can use a simple projection +val filteredGenotypesShuffle = joinedGenotypesShuffle.rdd.map(_._1) + +val filteredGenotypesBcast = joinedGenotypesBcast.rdd.map(_._2) +``` + +After the join, we can perform a transform function on the resulting RDD to +manipulate it into providing the answer to our question. Since we were +interested in the `Genotype`s that overlap a `Feature`, we map over the tuples +and select just the `Genotype`. + +Since a broadcast join sends the left dataset to all executors, we chose to +send the `features` dataset because feature data are usually smaller in size +than genotypic data. + +###### Group overlapping variant data by the gene they overlap + +This query joins an RDD of Variants against an RDD of Features, and immediately +performs a group-by on the Feature. This produces an RDD whose elements are a +tuple containing a Feature, and all of the Variants overlapping the Feature. +This produces an RDD whose elements are tuples containing a Feature and all of +the Variants overlapping the Feature.This query is useful for trying to +identify annotated variants that may interact (identifying frameshift mutations +within a transcript that may act as a pair to shift and then restore the +reading frame) or as the start of a query that computes variant density over a +set of genomic features. + +```scala +// Inner join with a group by on the features +val features = sc.loadFeatures("my/features.adam") +val variants = sc.loadVariants("my/variants.adam") + +// As a ShuffleRegionJoin, it can be implemented as follows: +val variantsByFeatureShuffle = features.shuffleRegionJoinAndGroupByLeft(variants) + +// As a BroadcastRegionJoin, it can be implemented as follows: +val variantsByFeatureBcast = variants.broadcastRegionJoinAndGroupByRight(features) +``` + +When we switch join strategies, we swap which dataset is on the left side of +the join. BroadcastRegionJoin only supports grouping by the right dataset, and +ShuffleRegionJoin supports only grouping by the left dataset. + +The reason BroadcastRegionJoin does not have a `joinAndGroupByLeft` +implementation is due to the fact that the left dataset is broadcast to all +nodes. Unlike shuffle joins, broadcast joins don't maintain a sort order +invariant. Because of this, we would need to shuffle all data to a group-by on +the left side of the dataset, and there is no opportunity to optimize by +combining the join and group-by. + +###### Separate reads into overlapping and non-overlapping features + +This query joins an RDD of reads with an RDD of features using an outer join. +The outer join will produce an RDD where each read is optionally mapped to a +feature. If a given read does not overlap with any features provided, it is +paired with a `None`. After we perform the join, we use a predicate to separate +the reads into two RDDs. This query is useful for filtering out reads based on +feature data. For example, identifying reads that overlap with ATAC-seq data to +perform chromatin accessibility studies. It may be useful to separate the reads +to perform distinct analyses on each resulting dataset. + +```scala +// An outer join provides us with both overlapping and non-overlapping data +val reads = sc.loadAlignments("my/reads.adam") +val features = sc.loadFeatures("my/features.adam") + +// As a ShuffleRegionJoin, we can use a LeftOuterShuffleRegionJoin: +val readsToFeatures = reads.leftOuterShuffleRegionJoin(features) + +// As a BroadcastRegionJoin, we can use a RightOuterBroadcastRegionJoin: +val featuresToReads = features.rightOuterBroadcastRegionJoin(reads) + +// After we have our join, we need to separate the RDD +// If we used the ShuffleRegionJoin, we filter by None in the values +val overlapsFeatures = readsToFeatures.rdd.filter(_._2.isDefined) +val notOverlapsFeatures = readsToFeatures.rdd.filter(_._2.isEmpty) + +// If we used BroadcastRegionJoin, we filter by None in the keys +val overlapsFeatures = featuresToReads.rdd.filter(_._1.isDefined) +val notOverlapsFeatures = featuresToReads.rdd.filter(_._1.isEmpty) +``` + +Because of the difference in how ShuffleRegionJoin and BroadcastRegionJoin are +called, the predicate changes between them. It is not possible to call a +`leftOuterJoin` using the BroadcastRegionJoin. As previously mentioned, the +BroadcastRegionJoin broadcasts the left dataset, so a left outer join would +require an additional shuffle phase. For an outer join, using a +ShuffleRegionJoin will be cheaper if your reads are already sorted, however if +the feature dataset is small and the reads are not sorted, the +BroadcastRegionJoin call would likely be more performant. + ## Using ADAM's Pipe API {#pipes} ADAM's `GenomicRDD` API provides support for piping the underlying genomic data @@ -351,8 +479,8 @@ strings to the pipe. ADAM's pipe API adds several important functions: and get variants back from the pipe) * It adds the ability to set environment variables and to make local files (e.g., a reference genome) available to the run command -* If the data is aligned, we ensure that each subcommand runs over a contiguious - section of the reference genome, and that data is sorted on this chunk. We +* If the data are aligned, we ensure that each subcommand runs over a contiguous + section of the reference genome, and that data are sorted on this chunk. We provide control over the size of any flanking region that is desired. The method signature of a pipe command is below: