Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding examples of how to use joins in the real world #1605

Merged
Merged
59 changes: 29 additions & 30 deletions docs/source/55_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,10 @@ datasets have high cardinality.

The BroadcastRegionJoin performs an overlap join by broadcasting a copy of the
entire left dataset to each node. The BroadcastRegionJoin should be used when
you have a dataset that is small enough to be collected and broadcast out, the
larger side of the join is unsorted and either the data is so skewed that it is
hard to load balance, the data is too large to be worth shuffling, or you don't
want sorted output.
the right side of your join is small enough to be collected and broadcast out,
and the larger side of the join is unsorted and the data is too large to be
worth shuffling, the data is sufficiently skewed that it is hard to load
balance, or you can tolerate unsorted output.

Another important distinction between ShuffleRegionJoin and
BroadcastRegionJoin is the join operations available in ADAM. Since the
Expand All @@ -305,18 +305,19 @@ read a genomic dataset into memory, this condition is met.
ADAM has a variety of region join types that you can perform on your data, and
all are called in a similar way:

[Joins Available](img/join_examples.png)
####[Joins Available](img/join_examples.png)

| Join call | action | Availability |
|----------|--------|--------|
| ```left.shuffleRegionJoin(right)``` ```left.broadcastRegionJoin(right)``` ```right.broadcastRegionJoinAgainst(broadcastedLeft)``` | perform an inner join | ShuffleRegionJoin BroadcastRegionJoin |
| ```left.fullOuterShuffleRegionJoin(right)``` | perform an outer join | ShuffleRegionJoin |
| ```left.leftOuterShuffleRegionJoin(right)``` | perform a left outer join | ShuffleRegionJoin |
| ```left.rightOuterShuffleRegionJoin(right)``` ```left.rightOuterBroadcastRegionJoin(right)``` ```right.rightOuterBroadcastRegionJoinAgainst(broadcastedLeft)``` | perform a right outer join | ShuffleRegionJoin BroadcastRegionJoin |
| ```left.shuffleRegionJoinAndGroupByLeft(right)``` | perform an inner join and group joined values by the records on the left | ShuffleRegionJoin |
| ```left.broadcastRegionJoinAndGroupByRight(right)``` ```right.broadcastRegionJoinAgainstAndGroupByRight(broadcastedLeft)``` | perform an inner join and group joined values by the records on the right | ShuffleRegionJoin |
| ```left.rightOuterShuffleRegionJoinAndGroupByLeft(right)``` | perform a right outer join and group joined values by the records on the left | ShuffleRegionJoin |
| ```left.rightOuterBroadcastRegionJoinAndGroupByRight(right)``` ```right.rightOuterBroadcastRegionJoinAgainstAndGroupByRight(broadcastedLeft)``` | perform a right outer join and group joined values by the records on the right | BroadcastRegionJoin |
* Joins implemented across both shuffle and broadcast
* Inner join
* Right outer join
* Shuffle-only joins
* Full outer join
* Inner join and group by left
* Left outer join
* Right outer join and group by left
* Broadcast-only joins
* Inner join and group by right
* Right outer join and group by right

One common pattern involves joining a single dataset against many datasets. An
example of this is joining an RDD of features (e.g., gene/exon coordinates)
Expand Down Expand Up @@ -357,16 +358,15 @@ be expanded from.
###### Filter Genotypes by Features

This query joins an RDD of Genotypes against an RDD of Features using an inner
join. The inner join will result in an RDD of key-value pairs, where the key is
a genotype and the value is the feature that it overlaps. Because this is an
inner join, records from either dataset that don't pair to the other are
automatically dropped, providing the filter we are interested in. This query is
useful for trying to identify genotypes that overlap features of interest. For
example, if our feature file contains all the exonic regions of the genome,
this query would extract all genotypes that fall in exonic regions.
join. Because this is an inner join, records from either dataset that don't
pair to the other are automatically dropped, providing the filter we are
interested in. This query is useful for trying to identify genotypes that
overlap features of interest. For example, if our feature file contains all the
exonic regions of the genome, this query would extract all genotypes that fall
in exonic regions.

```scala
// Inner join will filter out genotypic data not represented in the feature dataset
// Inner join will filter out genotypes not covered by a feature
val genotypes = sc.loadGenotypes("my/genotypes.adam")
val features = sc.loadFeatures("my/features.adam")

Expand All @@ -387,10 +387,9 @@ manipulate it into providing the answer to our question. Since we were
interested in the `Genotype`s that overlap a `Feature`, we map over the tuples
and select just the `Genotype`.

The difference between the ShuffleRegionJoin and BroadcastRegionJoin strategies
is that a broadcast join sends the left dataset to all executors. In this case,
we chose to send the `features` dataset because feature data is usually smaller
in size than genotypic data.
Since a broadcast join sends the left dataset to all executors, we chose to
send the `features` dataset because feature data is usually smaller in size
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

data is → data are

than genotypic data.

###### Group overlapping variant data by the gene they overlap

Expand All @@ -414,9 +413,9 @@ val variantsByFeatureShuffle = features.shuffleRegionJoinAndGroupByLeft(variants
val variantsByFeatureBcast = variants.broadcastRegionJoinAndGroupByRight(features)
```

When we switch join strategies, we need to change the dataset that is on the
left side of the join. BroadcastRegionJoin only supports grouping by the right
dataset, and ShuffleRegionJoin supports only grouping by the left dataset.
When we switch join strategies, to swap which dataset is on the left side of
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to swap which -> we swap which

the join. BroadcastRegionJoin only supports grouping by the right dataset, and
ShuffleRegionJoin supports only grouping by the left dataset.

The reason BroadcastRegionJoin does not have a `joinAndGroupByLeft`
implementation is due to the fact that the left dataset is broadcasted to all
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point about locality is correct, but comment about the broadcast is somewhat misleading. If the right dataset was sorted and we did a broadcast join, we could do a shuffle free join. But, we don't guarantee any sort invariants when running the broadcast join, so we can't provide a broadcastJoinAndGroupByLeft that has predictable performance.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see rephrased sentence below.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems reasonable. Might tighten up to:

Unlike shuffle joins, broadcast joins don't maintain a sort order invariant. Because of this, we would need to shuffle all data to a group-by on the left side of the dataset, and there is no opportunity to optimize by combining the join and group-by.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is broadcasted → is broadcast

Expand Down