Skip to content

Commit

Permalink
add treeReduce
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Jun 17, 2014
1 parent fe42a5e commit eb71c33
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 0 deletions.
26 changes: 26 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,32 @@ abstract class RDD[T: ClassTag](
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}

def treeReduce(f: (T, T) => T, level: Int): T = {
require(level >= 1, s"Level must be greater than 1 but got $level.")
val cleanF = sc.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
} else {
None
}
}
val local = this.mapPartitions(it => Iterator(reducePartition(it)))
val op: (Option[T], Option[T]) => Option[T] = (c, x) => {
if (c.isDefined && x.isDefined) {
Some(cleanF(c.get, x.get))
} else if (c.isDefined) {
c
} else if (x.isDefined) {
x
} else {
None
}
}
local.treeAggregate(Option.empty[T])(op, op, level)
.getOrElse(throw new UnsupportedOperationException("empty collection"))
}

/**
* Aggregate the elements of each partition, and then the results for all the partitions, using a
* given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
Expand Down
8 changes: 8 additions & 0 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -779,4 +779,12 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(sum === -1000L)
}
}

test("treeReduce") {
val rdd = sc.makeRDD(-1000 until 1000, 10)
for (level <- 1 until 10) {
val sum = rdd.treeReduce(_ + _, level)
assert(sum === -1000)
}
}
}

0 comments on commit eb71c33

Please sign in to comment.