Skip to content

Commit

Permalink
Reflecting newest changes
Browse files Browse the repository at this point in the history
  • Loading branch information
devin-petersohn committed Sep 26, 2017
1 parent 9445658 commit 7f12ed2
Showing 1 changed file with 8 additions and 7 deletions.
Expand Up @@ -1540,7 +1540,7 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
* @param genomicRdd The right RDD in the join.
* @param optPartitions Optionally sets the number of output partitions. If
* None, the number of partitions on the resulting RDD does not change.
* @param threshold Sets a threshold for the distance between elements to be
* @param flankSize Sets a flankSize for the distance between elements to be
* joined. If set to 0, an overlap is required to join two elements.
* @return Returns a new genomic RDD containing all pairs of keys that
* overlapped in the genomic coordinate space, and all keys from the
Expand All @@ -1549,13 +1549,13 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
def leftOuterShuffleRegionJoinAndGroupByLeft[X, Y <: GenomicRDD[X, Y]](
genomicRdd: GenomicRDD[X, Y],
optPartitions: Option[Int],
threshold: Long)(
flankSize: Long)(
implicit tTag: ClassTag[T],
xTag: ClassTag[X],
toxTag: ClassTag[(T, Iterable[X])]): GenericGenomicRDD[(T, Iterable[X])] = LeftOuterShuffleJoin.time {

val (leftRddToJoin, rightRddToJoin) =
prepareForShuffleRegionJoin(genomicRdd, optPartitions, threshold)
prepareForShuffleRegionJoin(genomicRdd, optPartitions, flankSize)

// what sequences do we wind up with at the end?
val combinedSequences = sequences ++ genomicRdd.sequences
Expand All @@ -1565,7 +1565,8 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
.compute(),
combinedSequences,
kv => {
getReferenceRegions(kv._1).map(_.pad(-1 * threshold)) ++
// pad by -1 * flankSize to undo flank from preprocessing
getReferenceRegions(kv._1).map(_.pad(-1 * flankSize)) ++
Seq(kv._2.map(v => genomicRdd.getReferenceRegions(v))).flatten.flatten
})
}
Expand Down Expand Up @@ -1610,20 +1611,20 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
* RDD, it will be paired with a `None` in the product of the join.
*
* @param genomicRdd The right RDD in the join.
* @param threshold Sets a threshold for the distance between elements to be
* @param flankSize Sets a flankSize for the distance between elements to be
* joined. If set to 0, an overlap is required to join two elements.
* @return Returns a new genomic RDD containing all pairs of keys that
* overlapped in the genomic coordinate space, and all keys from the
* left RDD that did not overlap a key in the right RDD.
*/
def leftOuterShuffleRegionJoinAndGroupByLeft[X, Y <: GenomicRDD[X, Y]](
genomicRdd: GenomicRDD[X, Y],
threshold: Long)(
flankSize: Long)(
implicit tTag: ClassTag[T],
xTag: ClassTag[X],
toxTag: ClassTag[(T, Iterable[X])]): GenericGenomicRDD[(T, Iterable[X])] = {

leftOuterShuffleRegionJoinAndGroupByLeft(genomicRdd, None, threshold)
leftOuterShuffleRegionJoinAndGroupByLeft(genomicRdd, None, flankSize)
}

/**
Expand Down

0 comments on commit 7f12ed2

Please sign in to comment.