From d14ec49e9bb45138d5becd830b3b426c478f6465 Mon Sep 17 00:00:00 2001 From: devin-petersohn Date: Thu, 13 Apr 2017 14:58:53 -0700 Subject: [PATCH 1/2] Update to include tests and full working merge pipeline --- .../scala/org/bdgenomics/lime/cli/Merge.scala | 6 -- .../lime/set_theory/Intersect.scala | 7 ++ .../bdgenomics/lime/set_theory/Merge.scala | 17 ++-- .../lime/set_theory/SetTheory.scala | 80 ++++++++++++++++--- lime-core/src/test/resources/cpg_20merge.bed | 20 +++++ .../lime/set_theory/MergeSuite.scala | 8 +- 6 files changed, 109 insertions(+), 29 deletions(-) create mode 100644 lime-core/src/test/resources/cpg_20merge.bed diff --git a/lime-cli/src/main/scala/org/bdgenomics/lime/cli/Merge.scala b/lime-cli/src/main/scala/org/bdgenomics/lime/cli/Merge.scala index b44261b..a1ac9fd 100644 --- a/lime-cli/src/main/scala/org/bdgenomics/lime/cli/Merge.scala +++ b/lime-cli/src/main/scala/org/bdgenomics/lime/cli/Merge.scala @@ -3,14 +3,10 @@ package org.bdgenomics.lime.cli import org.apache.spark.SparkContext import org.bdgenomics.adam.rdd.ADAMContext._ import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs -import org.bdgenomics.adam.rdd.feature.FeatureRDD import org.bdgenomics.lime.set_theory.DistributedMerge import org.bdgenomics.utils.cli._ import org.kohsuke.args4j.Argument -/** - * Created by DevinPetersohn on 4/6/17. - */ object Merge extends BDGCommandCompanion { val commandName = "merge" val commandDescription = "Merges the regions in a file." @@ -40,8 +36,6 @@ object Merge extends BDGCommandCompanion { DistributedMerge(genomicRdd.flattenRddByRegions(), genomicRdd.partitionMap.get) .compute() - .collect - .foreach(println) } } diff --git a/lime-core/src/main/scala/org/bdgenomics/lime/set_theory/Intersect.scala b/lime-core/src/main/scala/org/bdgenomics/lime/set_theory/Intersect.scala index ee34fa9..fc0e513 100644 --- a/lime-core/src/main/scala/org/bdgenomics/lime/set_theory/Intersect.scala +++ b/lime-core/src/main/scala/org/bdgenomics/lime/set_theory/Intersect.scala @@ -11,6 +11,13 @@ sealed abstract class Intersect[T: ClassTag] extends SetTheory[T] { currRegion.intersection(tempRegion) } + + def condition(firstRegion: ReferenceRegion, + secondRegion: ReferenceRegion, + distanceThreshold: Long = 0L): Boolean = { + true + + } } case class DistributedIntersect[T: ClassTag](rddToCompute: RDD[(ReferenceRegion, T)], diff --git a/lime-core/src/main/scala/org/bdgenomics/lime/set_theory/Merge.scala b/lime-core/src/main/scala/org/bdgenomics/lime/set_theory/Merge.scala index 6b4ec15..75db102 100644 --- a/lime-core/src/main/scala/org/bdgenomics/lime/set_theory/Merge.scala +++ b/lime-core/src/main/scala/org/bdgenomics/lime/set_theory/Merge.scala @@ -5,16 +5,21 @@ import org.bdgenomics.adam.models.ReferenceRegion import scala.reflect.ClassTag sealed abstract class Merge[T: ClassTag] extends SetTheoryWithSingleCollection[T] { - def primitive(currRegion: ReferenceRegion, - tempRegion: ReferenceRegion, + def primitive(firstRegion: ReferenceRegion, + secondRegion: ReferenceRegion, distanceThreshold: Long = 0L): ReferenceRegion = { - currRegion.merge(tempRegion, distanceThreshold) + firstRegion.merge(secondRegion, distanceThreshold) + } + + def condition(firstRegion: ReferenceRegion, + secondRegion: ReferenceRegion, + distanceThreshold: Long = 0L): Boolean = { + + firstRegion.overlaps(secondRegion, distanceThreshold) } } case class DistributedMerge[T: ClassTag](rddToCompute: RDD[(ReferenceRegion, T)], partitionMap: Array[Option[(ReferenceRegion, ReferenceRegion)]], - distanceThreshold: Long = 0L) extends Merge[T] { - -} \ No newline at end of file + distanceThreshold: Long = 0L) extends Merge[T] \ No newline at end of file diff --git a/lime-core/src/main/scala/org/bdgenomics/lime/set_theory/SetTheory.scala b/lime-core/src/main/scala/org/bdgenomics/lime/set_theory/SetTheory.scala index 8512697..b666571 100644 --- a/lime-core/src/main/scala/org/bdgenomics/lime/set_theory/SetTheory.scala +++ b/lime-core/src/main/scala/org/bdgenomics/lime/set_theory/SetTheory.scala @@ -3,6 +3,7 @@ package org.bdgenomics.lime.set_theory import org.apache.spark.rdd.RDD import org.bdgenomics.adam.models.ReferenceRegion import org.bdgenomics.lime.util.Partitioners.ReferenceRegionRangePartitioner +import scala.annotation.tailrec import scala.collection.mutable.ListBuffer import scala.reflect.ClassTag @@ -14,6 +15,10 @@ abstract class SetTheory[T: ClassTag] extends Serializable { secondRegion: ReferenceRegion, distanceThreshold: Long = 0L): ReferenceRegion + def condition(firstRegion: ReferenceRegion, + secondRegion: ReferenceRegion, + distanceThreshold: Long = 0L): Boolean + } abstract class SetTheoryBetweenCollections[T: ClassTag, U: ClassTag, RT, RU] extends SetTheory[T] { @@ -28,7 +33,7 @@ abstract class SetTheoryWithSingleCollection[T: ClassTag] extends SetTheory[T] { def compute(): RDD[(ReferenceRegion, Iterable[T])] = { val localComputed = localCompute(rddToCompute.map(f => (f._1, Iterable(f._2))), distanceThreshold) - externalCompute(localComputed, distanceThreshold, 2) + externalCompute(localComputed, partitionMap, distanceThreshold, 2) } protected def localCompute(rdd: RDD[(ReferenceRegion, Iterable[T])], distanceThreshold: Long): RDD[(ReferenceRegion, Iterable[T])] = { @@ -42,7 +47,7 @@ abstract class SetTheoryWithSingleCollection[T: ClassTag] extends SetTheory[T] { while (iter.hasNext) { val tempTuple = iter.next() val tempRegion = tempTuple._1 - if (currRegion.isNearby(tempRegion, distanceThreshold)) { + if (condition(currRegion, tempRegion, distanceThreshold)) { currRegion = primitive(currRegion, tempRegion, distanceThreshold) tempValueListBuffer ++= currTuple._2 } else { @@ -58,29 +63,47 @@ abstract class SetTheoryWithSingleCollection[T: ClassTag] extends SetTheory[T] { }) } - protected def externalCompute(rdd: RDD[(ReferenceRegion, Iterable[T])], - distanceThreshold: Long, round: Int): RDD[(ReferenceRegion, Iterable[T])] = { + /** + * Computes the primitives between partitions. + * + * @param rdd The rdd to compute on. + * @param partitionMap The current partition map for rdd. + * @param distanceThreshold The distance threshold for the condition and primitive. + * @param round The current round of computation in the recursion tree. Increments by a factor of 2 each round. + * @return The computed rdd for this round. + */ + @tailrec private def externalCompute(rdd: RDD[(ReferenceRegion, Iterable[T])], + partitionMap: Array[Option[(ReferenceRegion, ReferenceRegion)]], + distanceThreshold: Long, + round: Int): RDD[(ReferenceRegion, Iterable[T])] = { - if (round == partitionMap.length) { + if (round > partitionMap.length) { return rdd } - val partitionedRdd = rdd.mapPartitionsWithIndex((idx, iter) => { + lazy val partitionedRdd = rdd.mapPartitionsWithIndex((idx, iter) => { val indexWithinParent = idx % round - val partnerPartitionBounds = + val partnerPartition = { + var i = 1 if (idx > 0) { - partitionMap(idx - 1).get + while (partitionMap(idx - i).isEmpty) { + i += 1 + } + idx - i } else { - partitionMap(idx).get + idx } + } + + val partnerPartitionBounds = partitionMap(partnerPartition) iter.map(f => { val (region, value) = f if (indexWithinParent == round / 2 && - (region.covers(partnerPartitionBounds._2, distanceThreshold) || - region.compareTo(partnerPartitionBounds._2) <= 0)) { + (region.covers(partnerPartitionBounds.get._2, distanceThreshold) || + region.compareTo(partnerPartitionBounds.get._2) <= 0)) { - ((region, idx - 1), value) + ((region, partnerPartition), value) } else { ((region, idx), value) } @@ -88,6 +111,37 @@ abstract class SetTheoryWithSingleCollection[T: ClassTag] extends SetTheory[T] { }).repartitionAndSortWithinPartitions(new ReferenceRegionRangePartitioner(partitionMap.length)) .map(f => (f._1._1, f._2)) - externalCompute(localCompute(partitionedRdd, distanceThreshold), distanceThreshold, round * 2) + + lazy val newPartitionMap = partitionedRdd.mapPartitions(iter => { + getRegionBoundsFromPartition(iter) + }).collect + + externalCompute(localCompute(partitionedRdd, distanceThreshold), newPartitionMap, distanceThreshold, round * 2) + } + + /** + * Gets the partition bounds from a ReferenceRegion keyed Iterator + * + * @param iter The data on a given partition. ReferenceRegion keyed + * @return The bounds of the ReferenceRegions on that partition, in an Iterator + */ + private def getRegionBoundsFromPartition(iter: Iterator[(ReferenceRegion, Iterable[T])]): Iterator[Option[(ReferenceRegion, ReferenceRegion)]] = { + if (iter.isEmpty) { + // This means that there is no data on the partition, so we have no bounds + Iterator(None) + } else { + val firstRegion = iter.next + val lastRegion = + if (iter.hasNext) { + // we have to make sure we get the full bounds of this partition, this + // includes any extremely long regions. we include the firstRegion for + // the case that the first region is extremely long + (iter ++ Iterator(firstRegion)).maxBy(f => (f._1.referenceName, f._1.end, f._1.start)) + // only one record on this partition, so this is the extent of the bounds + } else { + firstRegion + } + Iterator(Some((firstRegion._1, lastRegion._1))) + } } } diff --git a/lime-core/src/test/resources/cpg_20merge.bed b/lime-core/src/test/resources/cpg_20merge.bed new file mode 100644 index 0000000..e7d4617 --- /dev/null +++ b/lime-core/src/test/resources/cpg_20merge.bed @@ -0,0 +1,20 @@ +chr1 28735 29810 CpG:_116 +chr1 29800 29820 CpG:_30 +chr1 29815 29830 CpG:_29 +chr1 29825 29840 CpG:_84 +chr1 29835 29850 CpG:_99 +chr1 29845 29860 CpG:_94 +chr1 29855 29870 CpG:_171 +chr1 29865 29880 CpG:_60 +chr1 29875 29890 CpG:_115 +chr1 29885 29900 CpG:_28 +chr1 29895 29910 CpG:_24 +chr1 29905 29920 CpG:_50 +chr1 29915 29930 CpG:_83 +chr1 29925 29940 CpG:_153 +chr1 29935 29950 CpG:_16 +chr1 29945 29960 CpG:_257 +chr1 29955 29970 CpG:_178 +chr1 29965 29980 CpG:_246 +chr1 29975 29990 CpG:_18 +chr1 29985 30000 CpG:_615 diff --git a/lime-core/src/test/scala/org/bdgenomics/lime/set_theory/MergeSuite.scala b/lime-core/src/test/scala/org/bdgenomics/lime/set_theory/MergeSuite.scala index 75ded2a..8dbef9a 100644 --- a/lime-core/src/test/scala/org/bdgenomics/lime/set_theory/MergeSuite.scala +++ b/lime-core/src/test/scala/org/bdgenomics/lime/set_theory/MergeSuite.scala @@ -8,10 +8,10 @@ import org.bdgenomics.adam.rdd.ADAMContext._ import org.bdgenomics.lime.LimeFunSuite class MergeSuite extends LimeFunSuite { - sparkTest("test local merge") { - val genomicRdd = sc.loadBed(resourcesFile("/cpg.bed")).repartitionAndSort() - + sparkTest("test local merge when all data merges to a single region") { + val genomicRdd = sc.loadBed(resourcesFile("/cpg_20merge.bed")).repartitionAndSort() val x = DistributedMerge(genomicRdd.flattenRddByRegions, genomicRdd.partitionMap.get).compute() - x.collect.foreach(println) } + + } \ No newline at end of file From 33b9b351b34a6bf4aed0359aec5467ab1cc4677b Mon Sep 17 00:00:00 2001 From: devin-petersohn Date: Thu, 13 Apr 2017 16:58:49 -0700 Subject: [PATCH 2/2] Intersection implementation Minor formatting changes --- .../bdgenomics/lime/cli/Intersection.scala | 3 +- .../org/bdgenomics/lime/cli/LimeMain.scala | 2 +- .../scala/org/bdgenomics/lime/cli/Merge.scala | 1 + .../lime/set_theory/Intersect.scala | 25 ------ .../lime/set_theory/Intersection.scala | 83 +++++++++++++++++++ .../bdgenomics/lime/set_theory/Merge.scala | 2 +- .../lime/set_theory/SetTheory.scala | 53 ++++++------ .../lime/set_theory/MergeSuite.scala | 1 - 8 files changed, 113 insertions(+), 57 deletions(-) delete mode 100644 lime-core/src/main/scala/org/bdgenomics/lime/set_theory/Intersect.scala create mode 100644 lime-core/src/main/scala/org/bdgenomics/lime/set_theory/Intersection.scala diff --git a/lime-cli/src/main/scala/org/bdgenomics/lime/cli/Intersection.scala b/lime-cli/src/main/scala/org/bdgenomics/lime/cli/Intersection.scala index 2712a59..cac7b97 100644 --- a/lime-cli/src/main/scala/org/bdgenomics/lime/cli/Intersection.scala +++ b/lime-cli/src/main/scala/org/bdgenomics/lime/cli/Intersection.scala @@ -3,6 +3,7 @@ package org.bdgenomics.lime.cli import org.apache.spark.SparkContext import org.bdgenomics.adam.rdd.ADAMContext._ import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs +import org.bdgenomics.lime.set_theory.DistributedIntersection import org.bdgenomics.utils.cli._ import org.kohsuke.args4j.Argument @@ -39,8 +40,6 @@ object Intersection extends BDGCommandCompanion { val leftGenomicRDD = sc.loadBed(args.leftInput) val rightGenomicRDD = sc.loadBed(args.rightInput) - leftGenomicRDD.shuffleRegionJoin(rightGenomicRDD) - leftGenomicRDD.rdd.collect.foreach(println) } } } \ No newline at end of file diff --git a/lime-cli/src/main/scala/org/bdgenomics/lime/cli/LimeMain.scala b/lime-cli/src/main/scala/org/bdgenomics/lime/cli/LimeMain.scala index 220db5f..197d2af 100644 --- a/lime-cli/src/main/scala/org/bdgenomics/lime/cli/LimeMain.scala +++ b/lime-cli/src/main/scala/org/bdgenomics/lime/cli/LimeMain.scala @@ -10,7 +10,7 @@ private object LimeMain { } private class LimeMain(args: Array[String]) extends Logging { - private def commands: List[BDGCommandCompanion] = List(Intersection, Sort) + private def commands: List[BDGCommandCompanion] = List(Intersection, Sort, Merge) private def printVersion() { println("Version 0") diff --git a/lime-cli/src/main/scala/org/bdgenomics/lime/cli/Merge.scala b/lime-cli/src/main/scala/org/bdgenomics/lime/cli/Merge.scala index a1ac9fd..df555f8 100644 --- a/lime-cli/src/main/scala/org/bdgenomics/lime/cli/Merge.scala +++ b/lime-cli/src/main/scala/org/bdgenomics/lime/cli/Merge.scala @@ -36,6 +36,7 @@ object Merge extends BDGCommandCompanion { DistributedMerge(genomicRdd.flattenRddByRegions(), genomicRdd.partitionMap.get) .compute() + .collect.foreach(println) } } diff --git a/lime-core/src/main/scala/org/bdgenomics/lime/set_theory/Intersect.scala b/lime-core/src/main/scala/org/bdgenomics/lime/set_theory/Intersect.scala deleted file mode 100644 index fc0e513..0000000 --- a/lime-core/src/main/scala/org/bdgenomics/lime/set_theory/Intersect.scala +++ /dev/null @@ -1,25 +0,0 @@ -package org.bdgenomics.lime.set_theory - -import org.apache.spark.rdd.RDD -import org.bdgenomics.adam.models.ReferenceRegion -import scala.reflect.ClassTag - -sealed abstract class Intersect[T: ClassTag] extends SetTheory[T] { - def primitive(currRegion: ReferenceRegion, - tempRegion: ReferenceRegion, - distanceThreshold: Long = 0L): ReferenceRegion = { - - currRegion.intersection(tempRegion) - } - - def condition(firstRegion: ReferenceRegion, - secondRegion: ReferenceRegion, - distanceThreshold: Long = 0L): Boolean = { - true - - } -} - -case class DistributedIntersect[T: ClassTag](rddToCompute: RDD[(ReferenceRegion, T)], - partitionMap: Array[Option[(ReferenceRegion, ReferenceRegion)]], - distanceThreshold: Long = 0L) extends Intersect[T] \ No newline at end of file diff --git a/lime-core/src/main/scala/org/bdgenomics/lime/set_theory/Intersection.scala b/lime-core/src/main/scala/org/bdgenomics/lime/set_theory/Intersection.scala new file mode 100644 index 0000000..f521945 --- /dev/null +++ b/lime-core/src/main/scala/org/bdgenomics/lime/set_theory/Intersection.scala @@ -0,0 +1,83 @@ +package org.bdgenomics.lime.set_theory + +import org.apache.spark.rdd.RDD +import org.bdgenomics.adam.models.ReferenceRegion +import scala.collection.mutable.ListBuffer +import scala.reflect.ClassTag + +sealed abstract class Intersection[T: ClassTag, U: ClassTag] extends SetTheoryBetweenCollections[T, U, T, U] { + + val threshold: Long + + def primitive(currRegion: ReferenceRegion, + tempRegion: ReferenceRegion, + minimumOverlap: Long = 0L): ReferenceRegion = { + + currRegion.intersection(tempRegion, minimumOverlap) + } + + def condition(firstRegion: ReferenceRegion, + secondRegion: ReferenceRegion, + minimumOverlap: Long = 0L): Boolean = { + + firstRegion.overlapsBy(secondRegion).exists(_ >= threshold) + } +} + +case class DistributedIntersection[T: ClassTag, U: ClassTag](leftRdd: RDD[(ReferenceRegion, T)], + rightRdd: RDD[(ReferenceRegion, U)], + partitionMap: Array[Option[(ReferenceRegion, ReferenceRegion)]], + threshold: Long = 0L) extends Intersection[T, U] { + + private val cache: ListBuffer[(ReferenceRegion, U)] = ListBuffer.empty[(ReferenceRegion, U)] + + def compute(): RDD[(ReferenceRegion, (T, U))] = { + leftRdd.zipPartitions(rightRdd)(sweep) + } + + private def sweep(leftIter: Iterator[(ReferenceRegion, T)], + rightIter: Iterator[(ReferenceRegion, U)]): Iterator[(ReferenceRegion, (T, U))] = { + + makeIterator(leftIter.buffered, rightIter.buffered) + } + + private def makeIterator(left: BufferedIterator[(ReferenceRegion, T)], + right: BufferedIterator[(ReferenceRegion, U)]): Iterator[(ReferenceRegion, (T, U))] = { + + def advanceCache(until: ReferenceRegion) = { + while (right.hasNext && (right.head._1.compareTo(until) <= 0 || + right.head._1.covers(until))) { + + cache += right.next + } + } + + def pruneCache(to: ReferenceRegion) = { + cache.trimStart({ + val index = cache.indexWhere(f => !(f._1.compareTo(to) < 0 && !f._1.covers(to))) + if (index <= 0) { + 0 + } else { + index + } + }) + } + + left.flatMap(f => { + val (currentRegion, _) = f + advanceCache(currentRegion) + pruneCache(currentRegion) + processHits(f) + }) + } + + private def processHits(current: (ReferenceRegion, T)): Iterator[(ReferenceRegion, (T, U))] = { + + val (currentRegion, _) = current + cache.filter(f => f._1.overlapsBy(currentRegion).exists(_ >= threshold)) + .map(g => { + (currentRegion.intersection(g._1, threshold), (current._2, g._2)) + }).iterator + } + +} diff --git a/lime-core/src/main/scala/org/bdgenomics/lime/set_theory/Merge.scala b/lime-core/src/main/scala/org/bdgenomics/lime/set_theory/Merge.scala index 75db102..e4d8f09 100644 --- a/lime-core/src/main/scala/org/bdgenomics/lime/set_theory/Merge.scala +++ b/lime-core/src/main/scala/org/bdgenomics/lime/set_theory/Merge.scala @@ -22,4 +22,4 @@ sealed abstract class Merge[T: ClassTag] extends SetTheoryWithSingleCollection[T case class DistributedMerge[T: ClassTag](rddToCompute: RDD[(ReferenceRegion, T)], partitionMap: Array[Option[(ReferenceRegion, ReferenceRegion)]], - distanceThreshold: Long = 0L) extends Merge[T] \ No newline at end of file + threshold: Long = 0L) extends Merge[T] \ No newline at end of file diff --git a/lime-core/src/main/scala/org/bdgenomics/lime/set_theory/SetTheory.scala b/lime-core/src/main/scala/org/bdgenomics/lime/set_theory/SetTheory.scala index b666571..053de73 100644 --- a/lime-core/src/main/scala/org/bdgenomics/lime/set_theory/SetTheory.scala +++ b/lime-core/src/main/scala/org/bdgenomics/lime/set_theory/SetTheory.scala @@ -7,9 +7,9 @@ import scala.annotation.tailrec import scala.collection.mutable.ListBuffer import scala.reflect.ClassTag -abstract class SetTheory[T: ClassTag] extends Serializable { +abstract class SetTheory extends Serializable { val partitionMap: Array[Option[(ReferenceRegion, ReferenceRegion)]] - val distanceThreshold: Long + val threshold: Long def primitive(firstRegion: ReferenceRegion, secondRegion: ReferenceRegion, @@ -21,19 +21,19 @@ abstract class SetTheory[T: ClassTag] extends Serializable { } -abstract class SetTheoryBetweenCollections[T: ClassTag, U: ClassTag, RT, RU] extends SetTheory[T] { +abstract class SetTheoryBetweenCollections[T: ClassTag, U: ClassTag, RT, RU] extends SetTheory { val leftRdd: RDD[(ReferenceRegion, T)] val rightRdd: RDD[(ReferenceRegion, U)] def compute(): RDD[(ReferenceRegion, (RT, RU))] } -abstract class SetTheoryWithSingleCollection[T: ClassTag] extends SetTheory[T] { +abstract class SetTheoryWithSingleCollection[T: ClassTag] extends SetTheory { val rddToCompute: RDD[(ReferenceRegion, T)] def compute(): RDD[(ReferenceRegion, Iterable[T])] = { - val localComputed = localCompute(rddToCompute.map(f => (f._1, Iterable(f._2))), distanceThreshold) - externalCompute(localComputed, partitionMap, distanceThreshold, 2) + val localComputed = localCompute(rddToCompute.map(f => (f._1, Iterable(f._2))), threshold) + externalCompute(localComputed, partitionMap, threshold, 2) } protected def localCompute(rdd: RDD[(ReferenceRegion, Iterable[T])], distanceThreshold: Long): RDD[(ReferenceRegion, Iterable[T])] = { @@ -64,24 +64,24 @@ abstract class SetTheoryWithSingleCollection[T: ClassTag] extends SetTheory[T] { } /** - * Computes the primitives between partitions. - * - * @param rdd The rdd to compute on. - * @param partitionMap The current partition map for rdd. - * @param distanceThreshold The distance threshold for the condition and primitive. - * @param round The current round of computation in the recursion tree. Increments by a factor of 2 each round. - * @return The computed rdd for this round. - */ + * Computes the primitives between partitions. + * + * @param rdd The rdd to compute on. + * @param partitionMap The current partition map for rdd. + * @param distanceThreshold The distance threshold for the condition and primitive. + * @param round The current round of computation in the recursion tree. Increments by a factor of 2 each round. + * @return The computed rdd for this round. + */ @tailrec private def externalCompute(rdd: RDD[(ReferenceRegion, Iterable[T])], - partitionMap: Array[Option[(ReferenceRegion, ReferenceRegion)]], - distanceThreshold: Long, - round: Int): RDD[(ReferenceRegion, Iterable[T])] = { + partitionMap: Array[Option[(ReferenceRegion, ReferenceRegion)]], + distanceThreshold: Long, + round: Int): RDD[(ReferenceRegion, Iterable[T])] = { if (round > partitionMap.length) { return rdd } - lazy val partitionedRdd = rdd.mapPartitionsWithIndex((idx, iter) => { + val partitionedRdd = rdd.mapPartitionsWithIndex((idx, iter) => { val indexWithinParent = idx % round val partnerPartition = { var i = 1 @@ -111,20 +111,19 @@ abstract class SetTheoryWithSingleCollection[T: ClassTag] extends SetTheory[T] { }).repartitionAndSortWithinPartitions(new ReferenceRegionRangePartitioner(partitionMap.length)) .map(f => (f._1._1, f._2)) - - lazy val newPartitionMap = partitionedRdd.mapPartitions(iter => { - getRegionBoundsFromPartition(iter) - }).collect + val newPartitionMap = partitionedRdd.mapPartitions(iter => { + getRegionBoundsFromPartition(iter) + }).collect externalCompute(localCompute(partitionedRdd, distanceThreshold), newPartitionMap, distanceThreshold, round * 2) } /** - * Gets the partition bounds from a ReferenceRegion keyed Iterator - * - * @param iter The data on a given partition. ReferenceRegion keyed - * @return The bounds of the ReferenceRegions on that partition, in an Iterator - */ + * Gets the partition bounds from a ReferenceRegion keyed Iterator + * + * @param iter The data on a given partition. ReferenceRegion keyed + * @return The bounds of the ReferenceRegions on that partition, in an Iterator + */ private def getRegionBoundsFromPartition(iter: Iterator[(ReferenceRegion, Iterable[T])]): Iterator[Option[(ReferenceRegion, ReferenceRegion)]] = { if (iter.isEmpty) { // This means that there is no data on the partition, so we have no bounds diff --git a/lime-core/src/test/scala/org/bdgenomics/lime/set_theory/MergeSuite.scala b/lime-core/src/test/scala/org/bdgenomics/lime/set_theory/MergeSuite.scala index 8dbef9a..57bc25d 100644 --- a/lime-core/src/test/scala/org/bdgenomics/lime/set_theory/MergeSuite.scala +++ b/lime-core/src/test/scala/org/bdgenomics/lime/set_theory/MergeSuite.scala @@ -13,5 +13,4 @@ class MergeSuite extends LimeFunSuite { val x = DistributedMerge(genomicRdd.flattenRddByRegions, genomicRdd.partitionMap.get).compute() } - } \ No newline at end of file