Skip to content

Commit

Permalink
[ADAM-1813] Delegate right outer shuffle region join to left OSRJ imp…
Browse files Browse the repository at this point in the history
…lementation.

Left and right outer joins are symmetric: that is to say, a right outer join is
can be rewritten as a left outer join by swapping the two input tables, and by
modifying the layout of the output. To resolve the mismatch between the left and
right outer joins, this PR deletes the right outer join implementation and
delegates back to the left outer join + tuple order swap. Resolves #1813.
  • Loading branch information
fnothaft authored and heuermh committed Dec 1, 2017
1 parent 895fd64 commit 573e544
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 91 deletions.
Expand Up @@ -1283,8 +1283,9 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
val combinedSequences = sequences ++ genomicRdd.sequences

GenericGenomicRDD[(Option[T], X)](
RightOuterShuffleRegionJoin[T, X](leftRddToJoin, rightRddToJoin)
.compute(),
LeftOuterShuffleRegionJoin[X, T](rightRddToJoin, leftRddToJoin)
.compute()
.map(_.swap),
combinedSequences,
kv => {
// pad by -1 * flankSize to undo pad from preprocessing
Expand Down
Expand Up @@ -248,51 +248,6 @@ case class LeftOuterShuffleRegionJoinAndGroupByLeft[T: ClassTag, U: ClassTag](le
}
}

case class RightOuterShuffleRegionJoin[T: ClassTag, U: ClassTag](leftRdd: RDD[(ReferenceRegion, T)],
rightRdd: RDD[(ReferenceRegion, U)])
extends SortedIntervalPartitionJoinWithVictims[T, U, Option[T], U] {

/**
* Handles the case where the left or the right iterator were empty.
*
* @param left The left iterator.
* @param right The right iterator.
* @return The iterator containing properly formatted tuples.
*/
protected def emptyFn(left: Iterator[(ReferenceRegion, T)],
right: Iterator[(ReferenceRegion, U)]): Iterator[(Option[T], U)] = {
right.map(u => (None, u._2))
}

/**
* Computes post processing required to complete the join and properly format
* hits.
*
* @param iter The iterator of hits.
* @param currentLeft The current left value.
* @return the post processed iterator.
*/
protected def postProcessHits(iter: Iterable[U],
currentLeft: T): Iterable[(Option[T], U)] = {
if (iter.nonEmpty) {
// group all hits for currentLeft into an iterable
iter.map(f => (Some(currentLeft), f))
} else {
Iterable.empty
}
}

/**
* Properly formats right values that did not join with a left.
*
* @param pruned A record on the right.
* @return The formatted tuple containing the pruned value.
*/
protected def postProcessPruned(pruned: U): (Option[T], U) = {
(None, pruned)
}
}

case class FullOuterShuffleRegionJoin[T: ClassTag, U: ClassTag](leftRdd: RDD[(ReferenceRegion, T)],
rightRdd: RDD[(ReferenceRegion, U)])
extends SortedIntervalPartitionJoinWithVictims[T, U, Option[T], Option[U]] {
Expand Down

This file was deleted.

0 comments on commit 573e544

Please sign in to comment.