Skip to content

Commit

Permalink
minor update
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Jul 24, 2014
1 parent 60be09e commit a6e35d6
Showing 1 changed file with 6 additions and 5 deletions.
11 changes: 6 additions & 5 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -154,28 +154,29 @@ class RangePartitioner[K : Ordering : ClassTag, V](
iter.map(_._1).filter(t => random.nextDouble() < fraction).toArray
}
val weight = (1.0 / fraction).toFloat
val resultHandler: (Int, Array[K]) => Unit = { (index, sample) =>
val resultHandler: (Int, Array[K]) => Unit = { (_, sample) =>
sample.foreach { key =>
candidates += ((key, weight))
}
}
rdd.context.runJob(
rdd, sampleFunc, imbalancedPartitions, allowLocal = false, resultHandler)
}
var sumWeights: Double = 0.0
val numCandidates = candidates.size
var sumWeights = 0.0
candidates.foreach { case (_, weight) =>
sumWeights += weight
}
val step = sumWeights / partitions
var cumWeight = 0.0
var target = step
val bounds = ArrayBuffer.empty[K]
val sorted = candidates.sortBy(_._1)
val orderedCandidates = candidates.sortBy(_._1)
var i = 0
var j = 0
var previousBound = Option.empty[K]
while ((i < sorted.length) && (j < partitions - 1)) {
val (key, weight) = sorted(i)
while ((i < numCandidates) && (j < partitions - 1)) {
val (key, weight) = orderedCandidates(i)
cumWeight += weight
if (cumWeight > target) {
// Skip duplicate values.
Expand Down

0 comments on commit a6e35d6

Please sign in to comment.