From afe14dce508b1e51820f16e33f09c9aa402bca3e Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Tue, 23 Feb 2016 16:54:30 +0000 Subject: [PATCH 1/7] [SPARK-11316] coalesce setupGroups doesn't handle UnionRDD with partial localtiy properly --- .../org/apache/spark/rdd/CoalescedRDD.scala | 43 ++++++++++-- .../scala/org/apache/spark/rdd/RDDSuite.scala | 67 +++++++++++++++++++ 2 files changed, 103 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index 90d9735cb3f69..693dfab1a0b2e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -192,7 +192,8 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: def resetIterator(): Iterator[(String, Partition)] = { val iterators = (0 to 2).map( x => prev.partitions.iterator.flatMap(p => { - if (currPrefLocs(p).size > x) Some((currPrefLocs(p)(x), p)) else None + val locs = currPrefLocs(p) + if (locs.size > x) Some((locs(x), p)) else None } ) ) iterators.reduceLeft((x, y) => x ++ y) @@ -272,14 +273,8 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: val pgroup = PartitionGroup(nxt_replica) groupArr += pgroup groupHash.getOrElseUpdate(nxt_replica, ArrayBuffer()) += pgroup - var tries = 0 - while (!addPartToPGroup(nxt_part, pgroup) && tries < targetLen) { // ensure at least one part - nxt_part = rotIt.next()._2 - tries += 1 - } numCreated += 1 } - } /** @@ -324,6 +319,40 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: } } } else { + // It is possible to have unionRDD where one rdd has preferred locations and another rdd + // that doesn't. To make sure we end up with the requested number of partitions, + // make sure to put a partitions in every group. + + if (groupArr.size > initialHash.size) { + // we don't have a partition assigned to every group yet so first try to fill them + // with the partitions with preferred locations + var tries = 0 + val rotIt = new LocationIterator(prev) + while (tries < prev.partitions.length && initialHash.size < groupArr.size) { + // if the number of partitions with preferred locations is less then + // number of total partitions this might loop over some more then once but we need to + // handle both cases and its not easy to get # of partitions with preferred locs + var (nxt_replica, nxt_part) = rotIt.next() + if (!initialHash.contains(nxt_part)) { + groupArr.find(pg => pg.size == 0).map(firstEmpty => { + firstEmpty.arr += nxt_part + initialHash += nxt_part + }) + } + tries += 1 + } + } + // we have went through all with preferred locations now just make sure one + // partition per group + val numEmptyPartitionGroups = groupArr.length - getPartitions.length + val partitionsNotInGroups = prev.partitions.filter(p => !initialHash.contains(p)) + for (i <- 0 until math.min(numEmptyPartitionGroups, partitionsNotInGroups.length)) { + groupArr.find(pg => pg.size == 0).map(firstEmpty => { + firstEmpty.arr += partitionsNotInGroups(i) + initialHash += partitionsNotInGroups(i) + }) + } + // finally pick bin for the rest for (p <- prev.partitions if (!initialHash.contains(p))) { // throw every partition into group pickBin(p).arr += p } diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 80347b800a7b4..615f6971bb9af 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -361,6 +361,33 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { map{x => List(x)}.toList, "Tried coalescing 9 partitions to 20 but didn't get 9 back") } + test("coalesced RDDs with partial locality") { + // Make an RDD that has some locality preferences and some without. This can happen + // with UnionRDD + val data = sc.makeRDD((1 to 9).map(i => { + if (i > 4) { + (i, (i to (i + 2)).map { j => "m" + (j % 6) }) + } else { + (i, Vector()) + } + })) + val coalesced1 = data.coalesce(3) + assert(coalesced1.collect().toList.sorted === (1 to 9).toList, "Data got *lost* in coalescing") + + val splits = coalesced1.glom().collect().map(_.toList).toList + assert(splits.length === 3, "Supposed to coalesce to 3 but got " + splits.length) + + assert(splits.forall(_.length >= 1) === true, "Some partitions were empty") + + // If we try to coalesce into more partitions than the original RDD, it should just + // keep the original number of partitions. + val coalesced4 = data.coalesce(20) + val listOfLists = coalesced4.glom().collect().map(_.toList).toList + val sortedList = listOfLists.sortWith{ (x, y) => !x.isEmpty && (y.isEmpty || (x(0) < y(0))) } + assert(sortedList === (1 to 9). + map{x => List(x)}.toList, "Tried coalescing 9 partitions to 20 but didn't get 9 back") + } + test("coalesced RDDs with locality, large scale (10K partitions)") { // large scale experiment import collection.mutable @@ -402,6 +429,46 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { } } + test("coalesced RDDs with partial locality, large scale (10K partitions)") { + // large scale experiment + import collection.mutable + val halfpartitions = 5000 + val partitions = 10000 + val numMachines = 50 + val machines = mutable.ListBuffer[String]() + (1 to numMachines).foreach(machines += "m" + _) + val rnd = scala.util.Random + for (seed <- 1 to 5) { + rnd.setSeed(seed) + + val firstBlocks = (1 to halfpartitions).map { i => + (i, Array.fill(3)(machines(rnd.nextInt(machines.size))).toList) + } + val blocksNoLocality = (halfpartitions + 1 to partitions).map { i => + (i, List()) + } + val blocks = firstBlocks ++ blocksNoLocality + + val data2 = sc.makeRDD(blocks) + + // first try going to same number of partitions + val coalesced2 = data2.coalesce(partitions) + + // test that we have 10000 partitions + assert(coalesced2.partitions.size == 10000, "Expected 10000 partitions, but got " + coalesced2.partitions.size) + + // test that we have 100 partitions + val coalesced3 = data2.coalesce(numMachines * 2) + assert(coalesced3.partitions.size == 100, "Expected 100 partitions, but got " + coalesced3.partitions.size) + + // test that the groups are load balanced with 100 +/- 20 elements in each + val maxImbalance3 = coalesced3.partitions + .map(part => part.asInstanceOf[CoalescedRDDPartition].parents.size) + .foldLeft(0)((dev, curr) => math.max(math.abs(100 - curr), dev)) + assert(maxImbalance3 <= 20, "Expected 100 +/- 20 per partition, but got " + maxImbalance3) + } + } + // Test for SPARK-2412 -- ensure that the second pass of the algorithm does not throw an exception test("coalesced RDDs with locality, fail first pass") { val initialPartitions = 1000 From c9eb032af8e453a5ba6776279cf0cd6946d0cd55 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Tue, 23 Feb 2016 19:55:59 +0000 Subject: [PATCH 2/7] fix scalastyle --- core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 615f6971bb9af..d2c18519a4178 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -455,11 +455,13 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { val coalesced2 = data2.coalesce(partitions) // test that we have 10000 partitions - assert(coalesced2.partitions.size == 10000, "Expected 10000 partitions, but got " + coalesced2.partitions.size) + assert(coalesced2.partitions.size == 10000, "Expected 10000 partitions, but got " + + coalesced2.partitions.size) // test that we have 100 partitions val coalesced3 = data2.coalesce(numMachines * 2) - assert(coalesced3.partitions.size == 100, "Expected 100 partitions, but got " + coalesced3.partitions.size) + assert(coalesced3.partitions.size == 100, "Expected 100 partitions, but got " + + coalesced3.partitions.size) // test that the groups are load balanced with 100 +/- 20 elements in each val maxImbalance3 = coalesced3.partitions From 86651146e90d5126d379a4abc6d73a8c6b7a50df Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Tue, 23 Feb 2016 21:12:29 +0000 Subject: [PATCH 3/7] remove extra whitespace --- core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index d2c18519a4178..0a0492e274bc8 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -455,12 +455,12 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { val coalesced2 = data2.coalesce(partitions) // test that we have 10000 partitions - assert(coalesced2.partitions.size == 10000, "Expected 10000 partitions, but got " + + assert(coalesced2.partitions.size == 10000, "Expected 10000 partitions, but got " + coalesced2.partitions.size) // test that we have 100 partitions val coalesced3 = data2.coalesce(numMachines * 2) - assert(coalesced3.partitions.size == 100, "Expected 100 partitions, but got " + + assert(coalesced3.partitions.size == 100, "Expected 100 partitions, but got " + coalesced3.partitions.size) // test that the groups are load balanced with 100 +/- 20 elements in each From 300326f505a8dd5c03a60c43bb0610e9dbc2e540 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Mon, 2 May 2016 12:55:12 +0000 Subject: [PATCH 4/7] Rework to remove the rotation iterator to minimize calls to get preferred locations --- .../org/apache/spark/rdd/CoalescedRDD.scala | 129 +++++++++--------- 1 file changed, 63 insertions(+), 66 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index 693dfab1a0b2e..ad2be3d6ab18c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -173,44 +173,38 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: var noLocality = true // if true if no preferredLocations exists for parent RDD + // contains all the partitions from the previous RDD that don't have preferred locations + val partsWithoutLocs = ArrayBuffer[Partition]() + // contains all the partitions from the previous RDD that have preferred locations along with + val partsWithLocs = getAllPrefLocs(prev) + // gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones) def currPrefLocs(part: Partition): Seq[String] = { prev.context.getPreferredLocs(prev, part.index).map(tl => tl.host) } - // this class just keeps iterating and rotating infinitely over the partitions of the RDD - // next() returns the next preferred machine that a partition is replicated on - // the rotator first goes through the first replica copy of each partition, then second, third - // the iterators return type is a tuple: (replicaString, partition) - class LocationIterator(prev: RDD[_]) extends Iterator[(String, Partition)] { - - var it: Iterator[(String, Partition)] = resetIterator() - - override val isEmpty = !it.hasNext - - // initializes/resets to start iterating from the beginning - def resetIterator(): Iterator[(String, Partition)] = { - val iterators = (0 to 2).map( x => - prev.partitions.iterator.flatMap(p => { - val locs = currPrefLocs(p) - if (locs.size > x) Some((locs(x), p)) else None - } ) - ) - iterators.reduceLeft((x, y) => x ++ y) - } - - // hasNext() is false iff there are no preferredLocations for any of the partitions of the RDD - override def hasNext: Boolean = { !isEmpty } - - // return the next preferredLocation of some partition of the RDD - override def next(): (String, Partition) = { - if (it.hasNext) { - it.next() - } else { - it = resetIterator() // ran out of preferred locations, reset and rotate to the beginning - it.next() - } - } + // has side affect of filling in partitions without locations as well + def getAllPrefLocs(prev: RDD[_]): Array[(String, Partition)] = { + val partsWithLocs = mutable.LinkedHashMap[Partition, Seq[String]]() + // first get the locations for each partition, only do this once since it can be expensive + prev.partitions.foreach(p => { + val locs = currPrefLocs(p) + if (locs.size > 0) { + partsWithLocs.put(p, locs) + } else { + partsWithoutLocs += p + } + } + ) + // convert it into an array of host to partition + val allLocs = (0 to 2).map(x => + partsWithLocs.toArray.flatMap(parts => { + val p = parts._1 + val locs = parts._2 + if (locs.size > x) Some((locs(x), p)) else None + } ) + ) + allLocs.reduceLeft((x, y) => x ++ y) } /** @@ -232,33 +226,32 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: } /** - * Initializes targetLen partition groups and assigns a preferredLocation - * This uses coupon collector to estimate how many preferredLocations it must rotate through - * until it has seen most of the preferred locations (2 * n log(n)) + * Initializes targetLen partition groups. If there are preferred locations, each group + * is assigned a preferredLocation. This uses coupon collector to estimate how many + * preferredLocations it must rotate through until it has seen most of the preferred + * locations (2 * n log(n)) * @param targetLen */ def setupGroups(targetLen: Int) { - val rotIt = new LocationIterator(prev) - // deal with empty case, just create targetLen partition groups with no preferred location - if (!rotIt.hasNext) { + if (partsWithLocs.isEmpty) { (1 to targetLen).foreach(x => groupArr += PartitionGroup()) return } noLocality = false - // number of iterations needed to be certain that we've seen most preferred locations val expectedCoupons2 = 2 * (math.log(targetLen)*targetLen + targetLen + 0.5).toInt var numCreated = 0 var tries = 0 // rotate through until either targetLen unique/distinct preferred locations have been created - // OR we've rotated expectedCoupons2, in which case we have likely seen all preferred locations, - // i.e. likely targetLen >> number of preferred locations (more buckets than there are machines) - while (numCreated < targetLen && tries < expectedCoupons2) { + // OR (we have went through either all partitions OR we've rotated expectedCoupons2 - in which case we + // have likely seen all preferred locations) + val numPartsToLookAt = math.min(expectedCoupons2, partsWithLocs.length) + while (numCreated < targetLen && tries < numPartsToLookAt) { + val (nxt_replica, nxt_part) = partsWithLocs(tries) tries += 1 - val (nxt_replica, nxt_part) = rotIt.next() if (!groupHash.contains(nxt_replica)) { val pgroup = PartitionGroup(nxt_replica) groupArr += pgroup @@ -267,13 +260,17 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: numCreated += 1 } } - - while (numCreated < targetLen) { // if we don't have enough partition groups, create duplicates - var (nxt_replica, nxt_part) = rotIt.next() + tries = 0 + // if we don't have enough partition groups, create duplicates + while (numCreated < targetLen) { + var (nxt_replica, nxt_part) = partsWithLocs(tries) + tries += 1 val pgroup = PartitionGroup(nxt_replica) groupArr += pgroup groupHash.getOrElseUpdate(nxt_replica, ArrayBuffer()) += pgroup + addPartToPGroup(nxt_part, pgroup) numCreated += 1 + if (tries >= partsWithLocs.length) tries = 0 } } @@ -285,7 +282,8 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: * @return partition group (bin to be put in) */ def pickBin(p: Partition): PartitionGroup = { - val pref = currPrefLocs(p).map(getLeastGroupHash(_)).sortWith(compare) // least loaded pref locs + val preflocs = partsWithLocs.filter(_._2 == p).map(_._1).toSeq + val pref = preflocs.map(getLeastGroupHash(_)).sortWith(compare) // least loaded pref locs val prefPart = if (pref == Nil) None else pref.head val r1 = rnd.nextInt(groupArr.size) @@ -321,37 +319,36 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: } else { // It is possible to have unionRDD where one rdd has preferred locations and another rdd // that doesn't. To make sure we end up with the requested number of partitions, - // make sure to put a partitions in every group. + // make sure to put a partition in every group. if (groupArr.size > initialHash.size) { // we don't have a partition assigned to every group yet so first try to fill them // with the partitions with preferred locations - var tries = 0 - val rotIt = new LocationIterator(prev) - while (tries < prev.partitions.length && initialHash.size < groupArr.size) { - // if the number of partitions with preferred locations is less then - // number of total partitions this might loop over some more then once but we need to - // handle both cases and its not easy to get # of partitions with preferred locs - var (nxt_replica, nxt_part) = rotIt.next() + val partIter = partsWithLocs.iterator + while (partIter.hasNext && initialHash.size < groupArr.size) { + var (nxt_replica, nxt_part) = partIter.next() if (!initialHash.contains(nxt_part)) { groupArr.find(pg => pg.size == 0).map(firstEmpty => { firstEmpty.arr += nxt_part initialHash += nxt_part }) } - tries += 1 } } - // we have went through all with preferred locations now just make sure one - // partition per group - val numEmptyPartitionGroups = groupArr.length - getPartitions.length - val partitionsNotInGroups = prev.partitions.filter(p => !initialHash.contains(p)) - for (i <- 0 until math.min(numEmptyPartitionGroups, partitionsNotInGroups.length)) { - groupArr.find(pg => pg.size == 0).map(firstEmpty => { - firstEmpty.arr += partitionsNotInGroups(i) - initialHash += partitionsNotInGroups(i) - }) + + // if we didn't get one partitions per group from partitions with preferred locations + // use partitions without preferred locations + val partNoLocIter = partsWithoutLocs.iterator + while (partNoLocIter.hasNext && initialHash.size < groupArr.size) { + var nxt_part = partNoLocIter.next() + if (!initialHash.contains(nxt_part)) { + groupArr.find(pg => pg.size == 0).map(firstEmpty => { + firstEmpty.arr += nxt_part + initialHash += nxt_part + }) + } } + // finally pick bin for the rest for (p <- prev.partitions if (!initialHash.contains(p))) { // throw every partition into group pickBin(p).arr += p From 2eff583d896b1032477a299aa9ae488711d5f01c Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Mon, 2 May 2016 19:34:18 +0000 Subject: [PATCH 5/7] Rework to upmerge to latest --- .../org/apache/spark/rdd/CoalescedRDD.scala | 98 ++++++++++--------- 1 file changed, 52 insertions(+), 46 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index eb5b5acb588be..9801dc068da2a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -169,40 +169,43 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) var noLocality = true // if true if no preferredLocations exists for parent RDD - // contains all the partitions from the previous RDD that don't have preferred locations - val partsWithoutLocs = ArrayBuffer[Partition]() - // contains all the partitions from the previous RDD that have preferred locations along with - val partsWithLocs = getAllPrefLocs(prev) + class PartitionLocations(prev: RDD[_]) { + + // contains all the partitions from the previous RDD that don't have preferred locations + val partsWithoutLocs = ArrayBuffer[Partition]() + // contains all the partitions from the previous RDD that have preferred locations + val partsWithLocs:Array[(String, Partition)] = getAllPrefLocs(prev) + + // has side affect of filling in partitions without locations as well + def getAllPrefLocs(prev: RDD[_]): Array[(String, Partition)] = { + val partsWithLocs = mutable.LinkedHashMap[Partition, Seq[String]]() + // first get the locations for each partition, only do this once since it can be expensive + prev.partitions.foreach(p => { + val locs = currPrefLocs(p, prev) + if (locs.size > 0) { + partsWithLocs.put(p, locs) + } else { + partsWithoutLocs += p + } + } + ) + // convert it into an array of host to partition + val allLocs = (0 to 2).map(x => + partsWithLocs.toArray.flatMap(parts => { + val p = parts._1 + val locs = parts._2 + if (locs.size > x) Some((locs(x), p)) else None + } ) + ) + allLocs.reduceLeft((x, y) => x ++ y) + } + } // gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones) def currPrefLocs(part: Partition, prev: RDD[_]): Seq[String] = { prev.context.getPreferredLocs(prev, part.index).map(tl => tl.host) } - // has side affect of filling in partitions without locations as well - def getAllPrefLocs(prev: RDD[_]): Array[(String, Partition)] = { - val partsWithLocs = mutable.LinkedHashMap[Partition, Seq[String]]() - // first get the locations for each partition, only do this once since it can be expensive - prev.partitions.foreach(p => { - val locs = currPrefLocs(p) - if (locs.size > 0) { - partsWithLocs.put(p, locs) - } else { - partsWithoutLocs += p - } - } - ) - // convert it into an array of host to partition - val allLocs = (0 to 2).map(x => - partsWithLocs.toArray.flatMap(parts => { - val p = parts._1 - val locs = parts._2 - if (locs.size > x) Some((locs(x), p)) else None - } ) - ) - allLocs.reduceLeft((x, y) => x ++ y) - } - /** * Sorts and gets the least element of the list associated with key in groupHash * The returned PartitionGroup is the least loaded of all groups that represent the machine "key" @@ -229,9 +232,9 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) * locations (2 * n log(n)) * @param targetLen */ - def setupGroups(targetLen: Int, prev: RDD[_]) { + def setupGroups(targetLen: Int, partitionLocs: PartitionLocations) { // deal with empty case, just create targetLen partition groups with no preferred location - if (partsWithLocs.isEmpty) { + if (partitionLocs.partsWithLocs.isEmpty) { (1 to targetLen).foreach(x => groupArr += new PartitionGroup()) return } @@ -245,9 +248,9 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) // rotate through until either targetLen unique/distinct preferred locations have been created // OR (we have went through either all partitions OR we've rotated expectedCoupons2 - in which case we // have likely seen all preferred locations) - val numPartsToLookAt = math.min(expectedCoupons2, partsWithLocs.length) + val numPartsToLookAt = math.min(expectedCoupons2, partitionLocs.partsWithLocs.length) while (numCreated < targetLen && tries < numPartsToLookAt) { - val (nxt_replica, nxt_part) = partsWithLocs(tries) + val (nxt_replica, nxt_part) = partitionLocs.partsWithLocs(tries) tries += 1 if (!groupHash.contains(nxt_replica)) { val pgroup = new PartitionGroup(Some(nxt_replica)) @@ -260,14 +263,14 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) tries = 0 // if we don't have enough partition groups, create duplicates while (numCreated < targetLen) { - var (nxt_replica, nxt_part) = partsWithLocs(tries) + var (nxt_replica, nxt_part) = partitionLocs.partsWithLocs(tries) tries += 1 val pgroup = new PartitionGroup(Some(nxt_replica)) groupArr += pgroup groupHash.getOrElseUpdate(nxt_replica, ArrayBuffer()) += pgroup addPartToPGroup(nxt_part, pgroup) numCreated += 1 - if (tries >= partsWithLocs.length) tries = 0 + if (tries >= partitionLocs.partsWithLocs.length) tries = 0 } } @@ -281,9 +284,10 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) * imbalance in favor of locality * @return partition group (bin to be put in) */ - def pickBin(p: Partition, prev: RDD[_], balanceSlack: Double): PartitionGroup = { + def pickBin(p: Partition, prev: RDD[_], balanceSlack: Double, + partitionLocs: PartitionLocations): PartitionGroup = { val slack = (balanceSlack * prev.partitions.length).toInt - val preflocs = partsWithLocs.filter(_._2 == p).map(_._1).toSeq + val preflocs = partitionLocs.partsWithLocs.filter(_._2 == p).map(_._1).toSeq // least loaded pref locs val pref = preflocs.map(getLeastGroupHash(_)).sortWith(compare) // least loaded pref locs val prefPart = if (pref == Nil) None else pref.head @@ -313,7 +317,8 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) } } - def throwBalls(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) { + def throwBalls(maxPartitions: Int, prev: RDD[_], + balanceSlack: Double, partitionLocs: PartitionLocations) { if (noLocality) { // no preferredLocations in parent RDD, no randomization needed if (maxPartitions > groupArr.size) { // just return prev.partitions for ((p, i) <- prev.partitions.zipWithIndex) { @@ -334,12 +339,12 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) if (groupArr.size > initialHash.size) { // we don't have a partition assigned to every group yet so first try to fill them // with the partitions with preferred locations - val partIter = partsWithLocs.iterator + val partIter = partitionLocs.partsWithLocs.iterator while (partIter.hasNext && initialHash.size < groupArr.size) { var (nxt_replica, nxt_part) = partIter.next() if (!initialHash.contains(nxt_part)) { - groupArr.find(pg => pg.size == 0).map(firstEmpty => { - firstEmpty.arr += nxt_part + groupArr.find(pg => pg.numPartitions == 0).map(firstEmpty => { + firstEmpty.partitions += nxt_part initialHash += nxt_part }) } @@ -348,12 +353,12 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) // if we didn't get one partitions per group from partitions with preferred locations // use partitions without preferred locations - val partNoLocIter = partsWithoutLocs.iterator + val partNoLocIter = partitionLocs.partsWithoutLocs.iterator while (partNoLocIter.hasNext && initialHash.size < groupArr.size) { var nxt_part = partNoLocIter.next() if (!initialHash.contains(nxt_part)) { - groupArr.find(pg => pg.size == 0).map(firstEmpty => { - firstEmpty.arr += nxt_part + groupArr.find(pg => pg.numPartitions == 0).map(firstEmpty => { + firstEmpty.partitions += nxt_part initialHash += nxt_part }) } @@ -361,7 +366,7 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) // finally pick bin for the rest for (p <- prev.partitions if (!initialHash.contains(p))) { // throw every partition into group - pickBin(p, prev, balanceSlack).partitions += p + pickBin(p, prev, balanceSlack, partitionLocs).partitions += p } } } @@ -375,8 +380,9 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) * @return array of partition groups */ def coalesce(maxPartitions: Int, prev: RDD[_]): Array[PartitionGroup] = { - setupGroups(math.min(prev.partitions.length, maxPartitions), prev) // setup the groups (bins) - throwBalls(maxPartitions, prev, balanceSlack) // assign partitions (balls) to each group (bins) + val partitionLocs = new PartitionLocations(prev) + setupGroups(math.min(prev.partitions.length, maxPartitions), partitionLocs) // setup the groups (bins) + throwBalls(maxPartitions, prev, balanceSlack, partitionLocs) // assign partitions (balls) to each group (bins) getPartitions } } From f012cd5fc20feb20088c808275cf283d0f594cec Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Mon, 2 May 2016 20:22:09 +0000 Subject: [PATCH 6/7] fix style --- .../org/apache/spark/rdd/CoalescedRDD.scala | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index 9801dc068da2a..6f9a84698c61a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -174,7 +174,7 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) // contains all the partitions from the previous RDD that don't have preferred locations val partsWithoutLocs = ArrayBuffer[Partition]() // contains all the partitions from the previous RDD that have preferred locations - val partsWithLocs:Array[(String, Partition)] = getAllPrefLocs(prev) + val partsWithLocs: Array[(String, Partition)] = getAllPrefLocs(prev) // has side affect of filling in partitions without locations as well def getAllPrefLocs(prev: RDD[_]): Array[(String, Partition)] = { @@ -187,7 +187,7 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) } else { partsWithoutLocs += p } - } + } ) // convert it into an array of host to partition val allLocs = (0 to 2).map(x => @@ -227,8 +227,8 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) /** * Initializes targetLen partition groups. If there are preferred locations, each group - * is assigned a preferredLocation. This uses coupon collector to estimate how many - * preferredLocations it must rotate through until it has seen most of the preferred + * is assigned a preferredLocation. This uses coupon collector to estimate how many + * preferredLocations it must rotate through until it has seen most of the preferred * locations (2 * n log(n)) * @param targetLen */ @@ -246,8 +246,8 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) var tries = 0 // rotate through until either targetLen unique/distinct preferred locations have been created - // OR (we have went through either all partitions OR we've rotated expectedCoupons2 - in which case we - // have likely seen all preferred locations) + // OR (we have went through either all partitions OR we've rotated expectedCoupons2 - in + // which case we have likely seen all preferred locations) val numPartsToLookAt = math.min(expectedCoupons2, partitionLocs.partsWithLocs.length) while (numCreated < targetLen && tries < numPartsToLookAt) { val (nxt_replica, nxt_part) = partitionLocs.partsWithLocs(tries) @@ -262,7 +262,7 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) } tries = 0 // if we don't have enough partition groups, create duplicates - while (numCreated < targetLen) { + while (numCreated < targetLen) { var (nxt_replica, nxt_part) = partitionLocs.partsWithLocs(tries) tries += 1 val pgroup = new PartitionGroup(Some(nxt_replica)) @@ -284,7 +284,7 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) * imbalance in favor of locality * @return partition group (bin to be put in) */ - def pickBin(p: Partition, prev: RDD[_], balanceSlack: Double, + def pickBin(p: Partition, prev: RDD[_], balanceSlack: Double, partitionLocs: PartitionLocations): PartitionGroup = { val slack = (balanceSlack * prev.partitions.length).toInt val preflocs = partitionLocs.partsWithLocs.filter(_._2 == p).map(_._1).toSeq @@ -317,7 +317,7 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) } } - def throwBalls(maxPartitions: Int, prev: RDD[_], + def throwBalls(maxPartitions: Int, prev: RDD[_], balanceSlack: Double, partitionLocs: PartitionLocations) { if (noLocality) { // no preferredLocations in parent RDD, no randomization needed if (maxPartitions > groupArr.size) { // just return prev.partitions @@ -381,8 +381,10 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) */ def coalesce(maxPartitions: Int, prev: RDD[_]): Array[PartitionGroup] = { val partitionLocs = new PartitionLocations(prev) - setupGroups(math.min(prev.partitions.length, maxPartitions), partitionLocs) // setup the groups (bins) - throwBalls(maxPartitions, prev, balanceSlack, partitionLocs) // assign partitions (balls) to each group (bins) + // setup the groups (bins) + setupGroups(math.min(prev.partitions.length, maxPartitions), partitionLocs) + // assign partitions (balls) to each group (bins) + throwBalls(maxPartitions, prev, balanceSlack, partitionLocs) getPartitions } } From 305a7db60a2fc836035ed06c8207ce772c5e3b23 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Tue, 3 May 2016 16:23:18 +0000 Subject: [PATCH 7/7] fix comments from review --- .../org/apache/spark/rdd/CoalescedRDD.scala | 68 +++++++++---------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index 6f9a84698c61a..c19ed1529bbf6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -174,38 +174,35 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) // contains all the partitions from the previous RDD that don't have preferred locations val partsWithoutLocs = ArrayBuffer[Partition]() // contains all the partitions from the previous RDD that have preferred locations - val partsWithLocs: Array[(String, Partition)] = getAllPrefLocs(prev) + val partsWithLocs = ArrayBuffer[(String, Partition)]() - // has side affect of filling in partitions without locations as well - def getAllPrefLocs(prev: RDD[_]): Array[(String, Partition)] = { - val partsWithLocs = mutable.LinkedHashMap[Partition, Seq[String]]() + getAllPrefLocs(prev) + + // gets all the preffered locations of the previous RDD and splits them into partitions + // with preferred locations and ones without + def getAllPrefLocs(prev: RDD[_]) { + val tmpPartsWithLocs = mutable.LinkedHashMap[Partition, Seq[String]]() // first get the locations for each partition, only do this once since it can be expensive prev.partitions.foreach(p => { - val locs = currPrefLocs(p, prev) + val locs = prev.context.getPreferredLocs(prev, p.index).map(tl => tl.host) if (locs.size > 0) { - partsWithLocs.put(p, locs) + tmpPartsWithLocs.put(p, locs) } else { partsWithoutLocs += p } } ) // convert it into an array of host to partition - val allLocs = (0 to 2).map(x => - partsWithLocs.toArray.flatMap(parts => { + (0 to 2).map(x => + tmpPartsWithLocs.foreach(parts => { val p = parts._1 val locs = parts._2 - if (locs.size > x) Some((locs(x), p)) else None + if (locs.size > x) partsWithLocs += ((locs(x), p)) } ) ) - allLocs.reduceLeft((x, y) => x ++ y) } } - // gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones) - def currPrefLocs(part: Partition, prev: RDD[_]): Seq[String] = { - prev.context.getPreferredLocs(prev, part.index).map(tl => tl.host) - } - /** * Sorts and gets the least element of the list associated with key in groupHash * The returned PartitionGroup is the least loaded of all groups that represent the machine "key" @@ -246,7 +243,7 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) var tries = 0 // rotate through until either targetLen unique/distinct preferred locations have been created - // OR (we have went through either all partitions OR we've rotated expectedCoupons2 - in + // OR (we have went through either all partitions OR we've rotated expectedCoupons2 - in // which case we have likely seen all preferred locations) val numPartsToLookAt = math.min(expectedCoupons2, partitionLocs.partsWithLocs.length) while (numCreated < targetLen && tries < numPartsToLookAt) { @@ -284,7 +281,10 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) * imbalance in favor of locality * @return partition group (bin to be put in) */ - def pickBin(p: Partition, prev: RDD[_], balanceSlack: Double, + def pickBin( + p: Partition, + prev: RDD[_], + balanceSlack: Double, partitionLocs: PartitionLocations): PartitionGroup = { val slack = (balanceSlack * prev.partitions.length).toInt val preflocs = partitionLocs.partsWithLocs.filter(_._2 == p).map(_._1).toSeq @@ -317,7 +317,9 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) } } - def throwBalls(maxPartitions: Int, prev: RDD[_], + def throwBalls( + maxPartitions: Int, + prev: RDD[_], balanceSlack: Double, partitionLocs: PartitionLocations) { if (noLocality) { // no preferredLocations in parent RDD, no randomization needed if (maxPartitions > groupArr.size) { // just return prev.partitions @@ -336,17 +338,15 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) // that doesn't. To make sure we end up with the requested number of partitions, // make sure to put a partition in every group. - if (groupArr.size > initialHash.size) { - // we don't have a partition assigned to every group yet so first try to fill them - // with the partitions with preferred locations - val partIter = partitionLocs.partsWithLocs.iterator - while (partIter.hasNext && initialHash.size < groupArr.size) { + // if we don't have a partition assigned to every group first try to fill them + // with the partitions with preferred locations + val partIter = partitionLocs.partsWithLocs.iterator + groupArr.filter(pg => pg.numPartitions == 0).foreach { pg => + while (partIter.hasNext && pg.numPartitions == 0) { var (nxt_replica, nxt_part) = partIter.next() if (!initialHash.contains(nxt_part)) { - groupArr.find(pg => pg.numPartitions == 0).map(firstEmpty => { - firstEmpty.partitions += nxt_part - initialHash += nxt_part - }) + pg.partitions += nxt_part + initialHash += nxt_part } } } @@ -354,13 +354,13 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) // if we didn't get one partitions per group from partitions with preferred locations // use partitions without preferred locations val partNoLocIter = partitionLocs.partsWithoutLocs.iterator - while (partNoLocIter.hasNext && initialHash.size < groupArr.size) { - var nxt_part = partNoLocIter.next() - if (!initialHash.contains(nxt_part)) { - groupArr.find(pg => pg.numPartitions == 0).map(firstEmpty => { - firstEmpty.partitions += nxt_part + groupArr.filter(pg => pg.numPartitions == 0).foreach { pg => + while (partNoLocIter.hasNext && pg.numPartitions == 0) { + var nxt_part = partNoLocIter.next() + if (!initialHash.contains(nxt_part)) { + pg.partitions += nxt_part initialHash += nxt_part - }) + } } } @@ -384,7 +384,7 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) // setup the groups (bins) setupGroups(math.min(prev.partitions.length, maxPartitions), partitionLocs) // assign partitions (balls) to each group (bins) - throwBalls(maxPartitions, prev, balanceSlack, partitionLocs) + throwBalls(maxPartitions, prev, balanceSlack, partitionLocs) getPartitions } }