Permalink
Browse files

[ADAM-1220] Fix optPartitions parameter in shuffle region join hooks …

…in GenomicRDD.

Resolves #1220. Adds a function called in each of the shuffle join implementations
that calculates the sequence dictionary after the join, as well as the partition sizes
to request.
  • Loading branch information...
fnothaft committed Nov 8, 2016
1 parent ac2142a commit 11725a8f662fdc19d622bc921899b8c7d77ccbdd
Showing with 37 additions and 48 deletions.
  1. +37 −48 adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala
@@ -437,6 +437,31 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
.asInstanceOf[GenomicRDD[(Option[T], X), Z]]
}
/**
* Computes the partition size and final sequence dictionary for a join.
*
* @param optPartitions Optional user-requested number of partitions for the
* end of the shuffle.
* @param genomicRdd The genomic RDD we are joining against.
* @return Returns a tuple containing the (partition size, final sequence
* dictionary after the join).
*/
private[rdd] def joinPartitionSizeAndSequences[X, Y <: GenomicRDD[X, Y]](
optPartitions: Option[Int],
genomicRdd: GenomicRDD[X, Y]): (Long, SequenceDictionary) = {
// what sequences do we wind up with at the end?
val finalSequences = sequences ++ genomicRdd.sequences
// did the user provide a set partition count?
// if no, take the max partition count from our rdds
val partitions = optPartitions.getOrElse(Seq(rdd.partitions.length,
genomicRdd.rdd.partitions.length).max)
(finalSequences.records.map(_.length).sum / partitions,
finalSequences)
}
/**
* Performs a sort-merge inner join between this RDD and another RDD.
*
@@ -454,18 +479,12 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, X), Z] = {
// did the user provide a set partition count?
// if no, take the max partition count from our rdds
val partitions = optPartitions.getOrElse(Seq(rdd.partitions.length,
genomicRdd.rdd.partitions.length).max)
// what sequences do we wind up with at the end?
val endSequences = sequences ++ genomicRdd.sequences
val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions, genomicRdd)
// key the RDDs and join
GenericGenomicRDD[(T, X)](
InnerShuffleRegionJoin[T, X](endSequences,
partitions,
partitionSize,
rdd.context).partitionAndJoin(flattenRddByRegions(),
genomicRdd.flattenRddByRegions()),
endSequences,
@@ -493,18 +512,12 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], X), Z] = {
// did the user provide a set partition count?
// if no, take the max partition count from our rdds
val partitions = optPartitions.getOrElse(Seq(rdd.partitions.length,
genomicRdd.rdd.partitions.length).max)
// what sequences do we wind up with at the end?
val endSequences = sequences ++ genomicRdd.sequences
val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions, genomicRdd)
// key the RDDs and join
GenericGenomicRDD[(Option[T], X)](
RightOuterShuffleRegionJoin[T, X](endSequences,
partitions,
partitionSize,
rdd.context).partitionAndJoin(flattenRddByRegions(),
genomicRdd.flattenRddByRegions()),
endSequences,
@@ -535,18 +548,12 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, Option[X]), Z] = {
// did the user provide a set partition count?
// if no, take the max partition count from our rdds
val partitions = optPartitions.getOrElse(Seq(rdd.partitions.length,
genomicRdd.rdd.partitions.length).max)
// what sequences do we wind up with at the end?
val endSequences = sequences ++ genomicRdd.sequences
val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions, genomicRdd)
// key the RDDs and join
GenericGenomicRDD[(T, Option[X])](
LeftOuterShuffleRegionJoin[T, X](endSequences,
partitions,
partitionSize,
rdd.context).partitionAndJoin(flattenRddByRegions(),
genomicRdd.flattenRddByRegions()),
endSequences,
@@ -576,18 +583,12 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], Option[X]), Z] = {
// did the user provide a set partition count?
// if no, take the max partition count from our rdds
val partitions = optPartitions.getOrElse(Seq(rdd.partitions.length,
genomicRdd.rdd.partitions.length).max)
// what sequences do we wind up with at the end?
val endSequences = sequences ++ genomicRdd.sequences
val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions, genomicRdd)
// key the RDDs and join
GenericGenomicRDD[(Option[T], Option[X])](
FullOuterShuffleRegionJoin[T, X](endSequences,
partitions,
partitionSize,
rdd.context).partitionAndJoin(flattenRddByRegions(),
genomicRdd.flattenRddByRegions()),
endSequences,
@@ -618,18 +619,12 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, Iterable[X]), Z] = {
// did the user provide a set partition count?
// if no, take the max partition count from our rdds
val partitions = optPartitions.getOrElse(Seq(rdd.partitions.length,
genomicRdd.rdd.partitions.length).max)
// what sequences do we wind up with at the end?
val endSequences = sequences ++ genomicRdd.sequences
val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions, genomicRdd)
// key the RDDs and join
GenericGenomicRDD[(T, Iterable[X])](
InnerShuffleRegionJoinAndGroupByLeft[T, X](endSequences,
partitions,
partitionSize,
rdd.context).partitionAndJoin(flattenRddByRegions(),
genomicRdd.flattenRddByRegions()),
endSequences,
@@ -662,18 +657,12 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], Iterable[X]), Z] = {
// did the user provide a set partition count?
// if no, take the max partition count from our rdds
val partitions = optPartitions.getOrElse(Seq(rdd.partitions.length,
genomicRdd.rdd.partitions.length).max)
// what sequences do we wind up with at the end?
val endSequences = sequences ++ genomicRdd.sequences
val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions, genomicRdd)
// key the RDDs and join
GenericGenomicRDD[(Option[T], Iterable[X])](
RightOuterShuffleRegionJoinAndGroupByLeft[T, X](endSequences,
partitions,
partitionSize,
rdd.context).partitionAndJoin(flattenRddByRegions(),
genomicRdd.flattenRddByRegions()),
endSequences,

0 comments on commit 11725a8

Please sign in to comment.