Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Making join return types consistent #1737

Merged
merged 2 commits into from Sep 25, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
72 changes: 32 additions & 40 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala
Expand Up @@ -756,19 +756,18 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
*
* @see broadcastRegionJoinAgainst
*/
def broadcastRegionJoin[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(T, X), Z]](
def broadcastRegionJoin[X, Y <: GenomicRDD[X, Y]](
genomicRdd: GenomicRDD[X, Y])(
implicit tTag: ClassTag[T],
xTag: ClassTag[X],
txTag: ClassTag[(T, X)]): GenomicRDD[(T, X), Z] = InnerBroadcastJoin.time {
txTag: ClassTag[(T, X)]): GenericGenomicRDD[(T, X)] = InnerBroadcastJoin.time {

// key the RDDs and join
GenericGenomicRDD[(T, X)](InnerTreeRegionJoin[T, X]().broadcastAndJoin(
buildTree(flattenRddByRegions()),
genomicRdd.flattenRddByRegions()),
sequences ++ genomicRdd.sequences,
kv => { getReferenceRegions(kv._1) ++ genomicRdd.getReferenceRegions(kv._2) })
.asInstanceOf[GenomicRDD[(T, X), Z]]
}

/**
Expand All @@ -790,17 +789,16 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
*
* @see broadcastRegionJoin
*/
def broadcastRegionJoinAgainst[X, Z <: GenomicRDD[(X, T), Z]](
def broadcastRegionJoinAgainst[X](
broadcastTree: Broadcast[IntervalArray[ReferenceRegion, X]])(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(X, T), Z] = InnerBroadcastJoin.time {
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenericGenomicRDD[(X, T)] = InnerBroadcastJoin.time {

// key the RDDs and join
GenericGenomicRDD[(X, T)](InnerTreeRegionJoin[X, T]().join(
broadcastTree,
flattenRddByRegions()),
sequences,
kv => { getReferenceRegions(kv._2) }) // FIXME
.asInstanceOf[GenomicRDD[(X, T), Z]]
}

/**
Expand All @@ -821,11 +819,11 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
*
* @see rightOuterBroadcastRegionJoin
*/
def rightOuterBroadcastRegionJoin[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(Option[T], X), Z]](
def rightOuterBroadcastRegionJoin[X, Y <: GenomicRDD[X, Y]](
genomicRdd: GenomicRDD[X, Y])(
implicit tTag: ClassTag[T],
xTag: ClassTag[X],
otxTag: ClassTag[(Option[T], X)]): GenomicRDD[(Option[T], X), Z] = RightOuterBroadcastJoin.time {
otxTag: ClassTag[(Option[T], X)]): GenericGenomicRDD[(Option[T], X)] = RightOuterBroadcastJoin.time {

// key the RDDs and join
GenericGenomicRDD[(Option[T], X)](RightOuterTreeRegionJoin[T, X]().broadcastAndJoin(
Expand All @@ -836,7 +834,6 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
Seq(kv._1.map(v => getReferenceRegions(v))).flatten.flatten ++
genomicRdd.getReferenceRegions(kv._2)
})
.asInstanceOf[GenomicRDD[(Option[T], X), Z]]
}

/**
Expand All @@ -860,9 +857,9 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
*
* @see rightOuterBroadcastRegionJoin
*/
def rightOuterBroadcastRegionJoinAgainst[X, Z <: GenomicRDD[(Option[X], T), Z]](
def rightOuterBroadcastRegionJoinAgainst[X](
broadcastTree: Broadcast[IntervalArray[ReferenceRegion, X]])(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[X], T), Z] = RightOuterBroadcastJoin.time {
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenericGenomicRDD[(Option[X], T)] = RightOuterBroadcastJoin.time {

// key the RDDs and join
GenericGenomicRDD[(Option[X], T)](RightOuterTreeRegionJoin[X, T]().join(
Expand All @@ -872,7 +869,6 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
kv => {
getReferenceRegions(kv._2) // FIXME
})
.asInstanceOf[GenomicRDD[(Option[X], T), Z]]
}

/**
Expand All @@ -890,18 +886,17 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
*
* @see broadcastRegionJoinAgainstAndGroupByRight
*/
def broadcastRegionJoinAndGroupByRight[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(Iterable[T], X), Z]](genomicRdd: GenomicRDD[X, Y])(
def broadcastRegionJoinAndGroupByRight[X, Y <: GenomicRDD[X, Y]](genomicRdd: GenomicRDD[X, Y])(
implicit tTag: ClassTag[T],
xTag: ClassTag[X],
itxTag: ClassTag[(Iterable[T], X)]): GenomicRDD[(Iterable[T], X), Z] = BroadcastJoinAndGroupByRight.time {
itxTag: ClassTag[(Iterable[T], X)]): GenericGenomicRDD[(Iterable[T], X)] = BroadcastJoinAndGroupByRight.time {

// key the RDDs and join
GenericGenomicRDD[(Iterable[T], X)](InnerTreeRegionJoinAndGroupByRight[T, X]().broadcastAndJoin(
buildTree(flattenRddByRegions()),
genomicRdd.flattenRddByRegions()),
sequences ++ genomicRdd.sequences,
kv => { (kv._1.flatMap(getReferenceRegions) ++ genomicRdd.getReferenceRegions(kv._2)).toSeq })
.asInstanceOf[GenomicRDD[(Iterable[T], X), Z]]
}

/**
Expand All @@ -923,17 +918,16 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
*
* @see broadcastRegionJoinAndGroupByRight
*/
def broadcastRegionJoinAgainstAndGroupByRight[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(Iterable[X], T), Z]](
def broadcastRegionJoinAgainstAndGroupByRight[X, Y <: GenomicRDD[X, Y]](
broadcastTree: Broadcast[IntervalArray[ReferenceRegion, X]])(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Iterable[X], T), Z] = BroadcastJoinAndGroupByRight.time {
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenericGenomicRDD[(Iterable[X], T)] = BroadcastJoinAndGroupByRight.time {

// key the RDDs and join
GenericGenomicRDD[(Iterable[X], T)](InnerTreeRegionJoinAndGroupByRight[X, T]().join(
broadcastTree,
flattenRddByRegions()),
sequences,
kv => { getReferenceRegions(kv._2).toSeq })
.asInstanceOf[GenomicRDD[(Iterable[X], T), Z]]
}

/**
Expand All @@ -954,10 +948,10 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
*
* @see rightOuterBroadcastRegionJoinAgainstAndGroupByRight
*/
def rightOuterBroadcastRegionJoinAndGroupByRight[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(Iterable[T], X), Z]](genomicRdd: GenomicRDD[X, Y])(
def rightOuterBroadcastRegionJoinAndGroupByRight[X, Y <: GenomicRDD[X, Y]](genomicRdd: GenomicRDD[X, Y])(
implicit tTag: ClassTag[T],
xTag: ClassTag[X],
itxTag: ClassTag[(Iterable[T], X)]): GenomicRDD[(Iterable[T], X), Z] = RightOuterBroadcastJoinAndGroupByRight.time {
itxTag: ClassTag[(Iterable[T], X)]): GenericGenomicRDD[(Iterable[T], X)] = RightOuterBroadcastJoinAndGroupByRight.time {

// key the RDDs and join
GenericGenomicRDD[(Iterable[T], X)](RightOuterTreeRegionJoinAndGroupByRight[T, X]().broadcastAndJoin(
Expand All @@ -968,7 +962,6 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
Seq(kv._1.map(v => getReferenceRegions(v))).flatten.flatten ++
genomicRdd.getReferenceRegions(kv._2)
})
.asInstanceOf[GenomicRDD[(Iterable[T], X), Z]]
}

/**
Expand All @@ -992,17 +985,16 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
*
* @see rightOuterBroadcastRegionJoinAndGroupByRight
*/
def rightOuterBroadcastRegionJoinAgainstAndGroupByRight[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(Iterable[X], T), Z]](
def rightOuterBroadcastRegionJoinAgainstAndGroupByRight[X, Y <: GenomicRDD[X, Y]](
broadcastTree: Broadcast[IntervalArray[ReferenceRegion, X]])(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Iterable[X], T), Z] = RightOuterBroadcastJoinAndGroupByRight.time {
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenericGenomicRDD[(Iterable[X], T)] = RightOuterBroadcastJoinAndGroupByRight.time {

// key the RDDs and join
GenericGenomicRDD[(Iterable[X], T)](RightOuterTreeRegionJoinAndGroupByRight[X, T]().join(
broadcastTree,
flattenRddByRegions()),
sequences,
kv => getReferenceRegions(kv._2).toSeq)
.asInstanceOf[GenomicRDD[(Iterable[X], T), Z]]
}

/**
Expand All @@ -1014,7 +1006,7 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
* None, the number of partitions on the resulting RDD does not change.
* @return a case class containing all the prepared data for ShuffleRegionJoins
*/
private def prepareForShuffleRegionJoin[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(T, X), Z]](
private def prepareForShuffleRegionJoin[X, Y <: GenomicRDD[X, Y]](
genomicRdd: GenomicRDD[X, Y],
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): (RDD[(ReferenceRegion, T)], RDD[(ReferenceRegion, X)]) = {
Expand Down Expand Up @@ -1085,12 +1077,12 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
* overlapped in the genomic coordinate space, and all keys from the
* right RDD that did not overlap a key in the left RDD.
*/
def rightOuterShuffleRegionJoin[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(Option[T], X), Z]](
def rightOuterShuffleRegionJoin[X, Y <: GenomicRDD[X, Y]](
genomicRdd: GenomicRDD[X, Y],
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T],
xTag: ClassTag[X],
otxTag: ClassTag[(Option[T], X)]): GenomicRDD[(Option[T], X), Z] = RightOuterShuffleJoin.time {
otxTag: ClassTag[(Option[T], X)]): GenericGenomicRDD[(Option[T], X)] = RightOuterShuffleJoin.time {

val (leftRddToJoin, rightRddToJoin) =
prepareForShuffleRegionJoin(genomicRdd, optPartitions)
Expand All @@ -1105,7 +1097,7 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
kv => {
Seq(kv._1.map(v => getReferenceRegions(v))).flatten.flatten ++
genomicRdd.getReferenceRegions(kv._2)
}).asInstanceOf[GenomicRDD[(Option[T], X), Z]]
})
}

/**
Expand All @@ -1124,12 +1116,12 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
* overlapped in the genomic coordinate space, and all keys from the
* left RDD that did not overlap a key in the right RDD.
*/
def leftOuterShuffleRegionJoin[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(T, Option[X]), Z]](
def leftOuterShuffleRegionJoin[X, Y <: GenomicRDD[X, Y]](
genomicRdd: GenomicRDD[X, Y],
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T],
xTag: ClassTag[X],
toxTag: ClassTag[(T, Option[X])]): GenomicRDD[(T, Option[X]), Z] = LeftOuterShuffleJoin.time {
toxTag: ClassTag[(T, Option[X])]): GenericGenomicRDD[(T, Option[X])] = LeftOuterShuffleJoin.time {

val (leftRddToJoin, rightRddToJoin) =
prepareForShuffleRegionJoin(genomicRdd, optPartitions)
Expand All @@ -1144,7 +1136,7 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
kv => {
Seq(kv._2.map(v => genomicRdd.getReferenceRegions(v))).flatten.flatten ++
getReferenceRegions(kv._1)
}).asInstanceOf[GenomicRDD[(T, Option[X]), Z]]
})
}

/**
Expand All @@ -1162,12 +1154,12 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
* overlapped in the genomic coordinate space, and values that did not
* overlap will be paired with a `None`.
*/
def fullOuterShuffleRegionJoin[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(Option[T], Option[X]), Z]](
def fullOuterShuffleRegionJoin[X, Y <: GenomicRDD[X, Y]](
genomicRdd: GenomicRDD[X, Y],
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T],
xTag: ClassTag[X],
otoxTag: ClassTag[(Option[T], Option[X])]): GenomicRDD[(Option[T], Option[X]), Z] = FullOuterShuffleJoin.time {
otoxTag: ClassTag[(Option[T], Option[X])]): GenericGenomicRDD[(Option[T], Option[X])] = FullOuterShuffleJoin.time {

val (leftRddToJoin, rightRddToJoin) =
prepareForShuffleRegionJoin(genomicRdd, optPartitions)
Expand All @@ -1182,7 +1174,7 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
kv => {
Seq(kv._2.map(v => genomicRdd.getReferenceRegions(v)),
kv._1.map(v => getReferenceRegions(v))).flatten.flatten
}).asInstanceOf[GenomicRDD[(Option[T], Option[X]), Z]]
})
}

/**
Expand All @@ -1201,12 +1193,12 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
* overlapped in the genomic coordinate space, grouped together by
* the value they overlapped in the left RDD..
*/
def shuffleRegionJoinAndGroupByLeft[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(T, Iterable[X]), Z]](
def shuffleRegionJoinAndGroupByLeft[X, Y <: GenomicRDD[X, Y]](
genomicRdd: GenomicRDD[X, Y],
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T],
xTag: ClassTag[X],
tixTag: ClassTag[(T, Iterable[X])]): GenomicRDD[(T, Iterable[X]), Z] = ShuffleJoinAndGroupByLeft.time {
tixTag: ClassTag[(T, Iterable[X])]): GenericGenomicRDD[(T, Iterable[X])] = ShuffleJoinAndGroupByLeft.time {

val (leftRddToJoin, rightRddToJoin) =
prepareForShuffleRegionJoin(genomicRdd, optPartitions)
Expand All @@ -1221,7 +1213,7 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
kv => {
(kv._2.flatMap(v => genomicRdd.getReferenceRegions(v)) ++
getReferenceRegions(kv._1)).toSeq
}).asInstanceOf[GenomicRDD[(T, Iterable[X]), Z]]
})
}

/**
Expand All @@ -1244,12 +1236,12 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
* the value they overlapped in the left RDD, and all values from the
* right RDD that did not overlap an item in the left RDD.
*/
def rightOuterShuffleRegionJoinAndGroupByLeft[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(Option[T], Iterable[X]), Z]](
def rightOuterShuffleRegionJoinAndGroupByLeft[X, Y <: GenomicRDD[X, Y]](
genomicRdd: GenomicRDD[X, Y],
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T],
xTag: ClassTag[X],
otixTag: ClassTag[(Option[T], Iterable[X])]): GenomicRDD[(Option[T], Iterable[X]), Z] = RightOuterShuffleJoinAndGroupByLeft.time {
otixTag: ClassTag[(Option[T], Iterable[X])]): GenericGenomicRDD[(Option[T], Iterable[X])] = RightOuterShuffleJoinAndGroupByLeft.time {

val (leftRddToJoin, rightRddToJoin) =
prepareForShuffleRegionJoin(genomicRdd, optPartitions)
Expand All @@ -1264,7 +1256,7 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
kv => {
(kv._2.flatMap(v => genomicRdd.getReferenceRegions(v)) ++
kv._1.toSeq.flatMap(v => getReferenceRegions(v))).toSeq
}).asInstanceOf[GenomicRDD[(Option[T], Iterable[X]), Z]]
})
}

/**
Expand Down