Skip to content

Commit

Permalink
add treeAggregate
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Jun 17, 2014
1 parent 23a12ce commit fe42a5e
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 0 deletions.
24 changes: 24 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,30 @@ abstract class RDD[T: ClassTag](
jobResult
}

@DeveloperApi
def treeAggregate[U: ClassTag](zeroValue: U)(
seqOp: (U, T) => U,
combOp: (U, U) => U,
level: Int): U = {
require(level >= 1, s"Level must be greater than 1 but got $level.")
if (this.partitions.size == 0) {
return Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
}
val cleanSeqOp = sc.clean(seqOp)
val cleanCombOp = sc.clean(combOp)
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
var local = this.mapPartitions(it => Iterator(aggregatePartition(it)))
var numPartitions = local.partitions.size
val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / level)).toInt, 2)
while (numPartitions > scale + numPartitions / scale) {
numPartitions /= scale
local = local.mapPartitionsWithIndex { (i, iter) =>
iter.map((i % numPartitions, _))
}.reduceByKey(new HashPartitioner(numPartitions), cleanCombOp).values
}
local.reduce(cleanCombOp)
}

/**
* Return the number of elements in the RDD.
*/
Expand Down
10 changes: 10 additions & 0 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -769,4 +769,14 @@ class RDDSuite extends FunSuite with SharedSparkContext {
mutableDependencies += dep
}
}

test("treeAggregate") {
val rdd = sc.makeRDD(-1000 until 1000, 10)
def seqOp = (c: Long, x: Int) => c + x
def combOp = (c1: Long, c2: Long) => c1 + c2
for (level <- 1 until 10) {
val sum = rdd.treeAggregate(0L)(seqOp, combOp, level)
assert(sum === -1000L)
}
}
}

0 comments on commit fe42a5e

Please sign in to comment.