Skip to content

Commit

Permalink
Takes locality and the sum of partition length into account.
Browse files Browse the repository at this point in the history
  • Loading branch information
watermen committed Jul 20, 2015
1 parent 79ec072 commit cb72d0f
Showing 1 changed file with 9 additions and 1 deletion.
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,15 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack:

val r1 = rnd.nextInt(groupArr.size)
val r2 = rnd.nextInt(groupArr.size)
val minPowerOfTwo = if (groupArr(r1).size < groupArr(r2).size) groupArr(r1) else groupArr(r2)
val minPowerOfTwo = if (p.isInstanceOf[HadoopPartition]) {
val groupLen1 = groupArr(r1).arr.map(part =>
part.asInstanceOf[HadoopPartition].inputSplit.value.getLength).sum
val groupLen2 = groupArr(r1).arr.map(part =>
part.asInstanceOf[HadoopPartition].inputSplit.value.getLength).sum
if (groupLen1 < groupLen2) groupArr(r1) else groupArr(r2)
} else {
if (groupArr(r1).size < groupArr(r2).size) groupArr(r1) else groupArr(r2)
}
if (prefPart.isEmpty) {
// if no preferred locations, just use basic power of two
return minPowerOfTwo
Expand Down

0 comments on commit cb72d0f

Please sign in to comment.