Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding examples of how to use joins in the real world #1605

Merged
Merged
204 changes: 165 additions & 39 deletions docs/source/55_api.md
Expand Up @@ -86,7 +86,7 @@ class LoadReads {
JavaSparkContext jsc) {
// create an ADAMContext first
ADAMContext ac = new ADAMContext(jsc.sc());

// then wrap that in a JavaADAMContext
JavaADAMContext jac = new JavaADAMContext(ac);

Expand Down Expand Up @@ -163,7 +163,7 @@ described above, the `ADAMContext` adds the implicit methods needed for using

## Working with genomic data using GenomicRDDs {#genomic-rdd}

As described in the section on using the [ADAMContext}(#adam-context), ADAM
As described in the section on using the [ADAMContext](#adam-context), ADAM
loads genomic data into a `GenomicRDD` which is specialized for each datatype.
This `GenomicRDD` wraps Apache Spark's Resilient Distributed Dataset (RDD,
[@zaharia12]) API with genomic metadata. The `RDD` abstraction presents an
Expand Down Expand Up @@ -257,30 +257,35 @@ bypassing the ADAM APIs.

Another useful API implemented in ADAM is the RegionJoin API, which joins two
genomic datasets that contain overlapping regions. This primitive is useful for
a number of applications including variant calling (identifying all of the reads
that overlap a candidate variant), coverage analysis (determining the coverage
depth for each region in a reference), and INDEL realignment (identify INDELs
a number of applications including variant calling (identifying all of the reads
that overlap a candidate variant), coverage analysis (determining the coverage
depth for each region in a reference), and INDEL realignment (identify INDELs
aligned against a reference).

There are two overlap join implementations available in ADAM:
BroadcastRegionJoin and ShuffleRegionJoin. The result of a ShuffleRegionJoin
is identical to the BroadcastRegionJoin, however they serve different
There are two overlap join implementations available in ADAM:
BroadcastRegionJoin and ShuffleRegionJoin. The result of a ShuffleRegionJoin
is identical to the BroadcastRegionJoin, however they serve different
purposes depending on the content of the two datasets.

The ShuffleRegionJoin is at its core a distributed sort-merge overlap join.
To ensure that the data is appropriately colocated, we perform a copartition
on the right dataset before the each node conducts the join locally. The
BroadcastRegionJoin performs an overlap join by broadcasting a copy of the
entire left dataset to each node. ShuffleRegionJoin should be used if the right
dataset is too large to send to all nodes and both datasets have low
cardinality. The BroadcastRegionJoin should be used when you are joining a
smaller dataset to a larger one and/or the datasets in the join have high
cardinality.

Another important distinction between ShuffleRegionJoin and
BroadcastRegionJoin is the join operations available in ADAM. See the table
below for an exact list of what joins are available for each type of
RegionJoin.
The ShuffleRegionJoin is a distributed sort-merge overlap join. To ensure that
the data is appropriately colocated, we perform a copartition on the right
dataset before the each node conducts the join locally. ShuffleRegionJoin
should be used if the right dataset is too large to send to all nodes and both
datasets have high cardinality.

The BroadcastRegionJoin performs an overlap join by broadcasting a copy of the
entire left dataset to each node. The BroadcastRegionJoin should be used when
the right side of your join is small enough to be collected and broadcast out,
and the larger side of the join is unsorted and the data is too large to be
worth shuffling, the data is sufficiently skewed that it is hard to load
balance, or you can tolerate unsorted output.

Another important distinction between ShuffleRegionJoin and
BroadcastRegionJoin is the join operations available in ADAM. Since the
broadcast join doesn't co-partition the datasets and instead sends the full
right table to all nodes, some joins (e.g. left/full outer joins) cannot be
written as broadcast joins. See the table below for an exact list of what joins
are available for each type of region join.

To perform a ShuffleRegionJoin, use the following:

Expand All @@ -294,26 +299,25 @@ To perform a BroadcastRegionJoin, use the following:
dataset1.broadcastRegionJoin(dataset2)
```

Where `dataset1` and `dataset2` are `GenomicRDD`s. If you used the ADAMContext to
Where `dataset1` and `dataset2` are `GenomicRDD`s. If you used the ADAMContext to
read a genomic dataset into memory, this condition is met.

ADAM has a variety of ShuffleRegionJoin types that you can perform on your
data, and all are called in a similar way:

![Joins Available]
(img/join_examples.png)
ADAM has a variety of region join types that you can perform on your data, and
all are called in a similar way:

####[Joins Available](img/join_examples.png)

Join call | action | Availability
----------|--------|
```dataset1.shuffleRegionJoin(dataset2) ``` ```dataset1.broadcastRegionJoin(dataset2)```| perform an inner join | ShuffleRegionJoin BroadcastRegionJoin
```dataset1.fullOuterShuffleRegionJoin(datset2)```|perform an outer join | ShuffleRegionJoin
```dataset1.leftOuterShuffleRegionJoin(dataset2)```|perform a left outer join | ShuffleRegionJoin
```dataset1.rightOuterShuffleRegionJoin(dataset2)``` ```dataset1.rightOuterBroadcastRegionJoin(dataset2)```|perform a right outer join | ShuffleRegionJoin BroadcastRegionJoin
```dataset1.shuffleRegionJoinAndGroupByLeft(dataset2)``` |perform an inner join and group joined values by the records on the left | ShuffleRegionJoin
```dataset1.broadcastRegionJoinAndGroupByRight(dataset2)``` | perform an inner join and group joined values by the records on the right | ShuffleRegionJoin
```dataset1.rightOuterShuffleRegionJoinAndGroupByLeft(dataset2)```|perform a right outer join and group joined values by the records on the left | ShuffleRegionJoin
```rightOuterBroadcastRegionJoinAndGroupByRight``` | perform a right outer join and group joined values by the records on the right | BroadcastRegionJoin
* Joins implemented across both shuffle and broadcast
* Inner join
* Right outer join
* Shuffle-only joins
* Full outer join
* Inner join and group by left
* Left outer join
* Right outer join and group by left
* Broadcast-only joins
* Inner join and group by right
* Right outer join and group by right

One common pattern involves joining a single dataset against many datasets. An
example of this is joining an RDD of features (e.g., gene/exon coordinates)
Expand All @@ -339,6 +343,128 @@ val bcastFeatures = sc.loadFeatures("my/features.adam").broadcast()
val readsByFeature = reads.broadcastRegionJoinAgainst(bcastFeatures)
```

To demonstrate how the RegionJoin APIs can be used to answer scientific
questions, we will walk through three common queries that can be written using
the RegionJoin API. First, we will perform a simple filter on genotypes based
on a file of features. We will then demonstrate a join and group by on variants
and features, providing variant data grouped by the feature they overlap.
Finally, we will separate reads into those that overlap and those that do not
overlap features from a feature file.

Each of these demonstrations illustrates the difference between calling the
ShuffleRegionJoin and BroadcastRegionJoin and provides example code that can
be expanded from.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:

These demonstrations illustrate the difference between calling
ShuffleRegionJoin and BroadcastRegionJoin and provide example code
to expand from.

###### Filter Genotypes by Features

This query joins an RDD of Genotypes against an RDD of Features using an inner
join. Because this is an inner join, records from either dataset that don't
pair to the other are automatically dropped, providing the filter we are
interested in. This query is useful for trying to identify genotypes that
overlap features of interest. For example, if our feature file contains all the
exonic regions of the genome, this query would extract all genotypes that fall
in exonic regions.

```scala
// Inner join will filter out genotypes not covered by a feature
val genotypes = sc.loadGenotypes("my/genotypes.adam")
val features = sc.loadFeatures("my/features.adam")

// We can use ShuffleRegionJoin…
val joinedGenotypesShuffle = genotypes.shuffleRegionJoin(features)

// …or BroadcastRegionJoin
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A separate code block for the broadcast region join would be helpful. I know folks like to copy and paste from docs. ;)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather leave it as is. It makes it easier to visually inspect the differences between the broadcast and shuffle versions of the joins.

val joinedGenotypesBcast = features.broadcastRegionJoin(genotypes)

// In the case that we only want Genotypes, we can use a simple projection
val filteredGenotypesShuffle = joinedGenotypesShuffle.rdd.map(_._1)

val filteredGenotypesBcast = joinedGenotypesBcast.rdd.map(_._2)
```

After the join, we can perform a transform function on the resulting RDD to
manipulate it into providing the answer to our question. Since we were
interested in the `Genotype`s that overlap a `Feature`, we map over the tuples
and select just the `Genotype`.

Since a broadcast join sends the left dataset to all executors, we chose to
send the `features` dataset because feature data is usually smaller in size
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

data is → data are

than genotypic data.

###### Group overlapping variant data by the gene they overlap

This query joins an RDD of Variants against an RDD of Features, and immediately
performs a group-by on the Feature. This produces an RDD whose elements are a
tuple containing a Feature, and all of the Variants overlapping the Feature.
This query is useful for trying to identify annotated variants that may
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This produces an RDD whose elements are tuples containing a Feature and all of the Variants overlapping the Feature.

interact (identifying frameshift mutations within a transcript that may act as
a pair to shift and then restore the reading frame) or as the start of a query
that computes variant density over a set of genomic features.

```scala
// Inner join with a group by on the features
val features = sc.loadFeatures("my/features.adam")
val variants = sc.loadVariants("my/variants.adam")

// As a ShuffleRegionJoin, it can be implemented as follows:
val variantsByFeatureShuffle = features.shuffleRegionJoinAndGroupByLeft(variants)

// As a BroadcastRegionJoin, it can be implemented as follows:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...and separate broadcast code block

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-1 on separating into two blocks, please keep as is

val variantsByFeatureBcast = variants.broadcastRegionJoinAndGroupByRight(features)
```

When we switch join strategies, to swap which dataset is on the left side of
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to swap which -> we swap which

the join. BroadcastRegionJoin only supports grouping by the right dataset, and
ShuffleRegionJoin supports only grouping by the left dataset.

The reason BroadcastRegionJoin does not have a `joinAndGroupByLeft`
implementation is due to the fact that the left dataset is broadcasted to all
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point about locality is correct, but comment about the broadcast is somewhat misleading. If the right dataset was sorted and we did a broadcast join, we could do a shuffle free join. But, we don't guarantee any sort invariants when running the broadcast join, so we can't provide a broadcastJoinAndGroupByLeft that has predictable performance.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see rephrased sentence below.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems reasonable. Might tighten up to:

Unlike shuffle joins, broadcast joins don't maintain a sort order invariant. Because of this, we would need to shuffle all data to a group-by on the left side of the dataset, and there is no opportunity to optimize by combining the join and group-by.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is broadcasted → is broadcast

nodes. Unlike shuffle joins, broadcast joins don't maintain a sort order
invariant. Because of this, we would need to shuffle all data to a group-by on
the left side of the dataset, and there is no opportunity to optimize by
combining the join and group-by.

###### Separate reads into overlapping and non-overlapping features

This query joins an RDD of reads with an RDD of features using an outer join.
The outer join will produce an RDD where each read is optionally mapped to a
feature. If a given read does not overlap with any features provided, it is
paired with a `None`. After we perform the join, we use a predicate to separate
the reads into two RDDs. This query is useful for filtering out reads based on
feature data. For example, identifying reads that overlap with ATAC-seq data to
perform chromatin accessibility studies. It may be useful to separate the reads
to perform distinct analyses on each resulting dataset.

```scala
// An outer join provides us with both overlapping and non-overlapping data
val reads = sc.loadAlignments("my/reads.adam")
val features = sc.loadFeatures("my/features.adam")

// As a ShuffleRegionJoin, we can use a LeftOuterShuffleRegionJoin:
val readsToFeatures = reads.leftOuterShuffleRegionJoin(features)

// As a BroadcastRegionJoin, we can use a RightOuterBroadcastRegionJoin:
val featuresToReads = features.rightOuterBroadcastRegionJoin(reads)

// After we have our join, we need to separate the RDD
// If we used the ShuffleRegionJoin, we filter by None in the values
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Key/value is unintiutive to me here. Key/value implies that the key is derived from the value.

val overlapsFeatures = readsToFeatures.rdd.filter(_._2.isDefined)
val notOverlapsFeatures = readsToFeatures.rdd.filter(_._2.isEmpty)

// If we used BroadcastRegionJoin, we filter by None in the keys
val overlapsFeatures = featuresToReads.rdd.filter(_._1.isDefined)
val notOverlapsFeatures = featuresToReads.rdd.filter(_._1.isEmpty)
```

Because of the difference in how ShuffleRegionJoin and BroadcastRegionJoin are
called, the predicate changes between them. It is not possible to call a
`leftOuterJoin` using the BroadcastRegionJoin. As previously mentioned, the
BroadcastRegionJoin broadcasts the left dataset, so a left outer join would
require an additional shuffle phase. For an outer join, using a
ShuffleRegionJoin will be cheaper if your reads are already sorted, however if
the feature dataset is small and the reads are not sorted, the
BroadcastRegionJoin call would likely be more performant.

## Using ADAM's Pipe API {#pipes}

ADAM's `GenomicRDD` API provides support for piping the underlying genomic data
Expand All @@ -351,7 +477,7 @@ strings to the pipe. ADAM's pipe API adds several important functions:
and get variants back from the pipe)
* It adds the ability to set environment variables and to make local files
(e.g., a reference genome) available to the run command
* If the data is aligned, we ensure that each subcommand runs over a contiguious
* If the data is aligned, we ensure that each subcommand runs over a contiguous
section of the reference genome, and that data is sorted on this chunk. We
provide control over the size of any flanking region that is desired.

Expand Down