Skip to content

Commit

Permalink
[SPARK-14408][CORE] Changed RDD.treeAggregate to use fold instead of …
Browse files Browse the repository at this point in the history
…reduce

## What changes were proposed in this pull request?

Previously, `RDD.treeAggregate` used `reduceByKey` and `reduce` in its implementation, neither of which technically allows the `seq`/`combOps` to modify and return their first arguments.

This PR uses `foldByKey` and `fold` instead and notes that `aggregate` and `treeAggregate` are semantically identical in the Scala doc.

Note that this had some test failures by unknown reasons. This was actually fixed in e355460.

The root cause was, the `zeroValue` now becomes `AFTAggregator` and it compares `totalCnt` (where the value is actually 0). It starts merging one by one and it keeps returning `this` where `totalCnt` is 0. So, this looks not the bug in the current change.

This is now fixed in the commit. So, this should pass the tests.

## How was this patch tested?

Test case added in `RDDSuite`.

Closes #12217

Author: Joseph K. Bradley <joseph@databricks.com>
Author: hyukjinkwon <gurwls223@gmail.com>

Closes #18198 from HyukjinKwon/SPARK-14408.
  • Loading branch information
jkbradley authored and srowen committed Jun 9, 2017
1 parent 2a23cdd commit 5a33718
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 5 deletions.
9 changes: 5 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Expand Up @@ -1118,9 +1118,9 @@ abstract class RDD[T: ClassTag](

/**
* Aggregates the elements of this RDD in a multi-level tree pattern.
* This method is semantically identical to [[org.apache.spark.rdd.RDD#aggregate]].
*
* @param depth suggested depth of the tree (default: 2)
* @see [[org.apache.spark.rdd.RDD#aggregate]]
*/
def treeAggregate[U: ClassTag](zeroValue: U)(
seqOp: (U, T) => U,
Expand All @@ -1134,7 +1134,7 @@ abstract class RDD[T: ClassTag](
val cleanCombOp = context.clean(combOp)
val aggregatePartition =
(it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
var partiallyAggregated = mapPartitions(it => Iterator(aggregatePartition(it)))
var partiallyAggregated: RDD[U] = mapPartitions(it => Iterator(aggregatePartition(it)))
var numPartitions = partiallyAggregated.partitions.length
val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)
// If creating an extra level doesn't help reduce
Expand All @@ -1146,9 +1146,10 @@ abstract class RDD[T: ClassTag](
val curNumPartitions = numPartitions
partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
(i, iter) => iter.map((i % curNumPartitions, _))
}.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values
}.foldByKey(zeroValue, new HashPartitioner(curNumPartitions))(cleanCombOp).values
}
partiallyAggregated.reduce(cleanCombOp)
val copiedZeroValue = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
partiallyAggregated.fold(copiedZeroValue)(cleanCombOp)
}
}

Expand Down
31 changes: 30 additions & 1 deletion core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Expand Up @@ -192,6 +192,23 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
assert(ser.serialize(union.partitions.head).limit() < 2000)
}

test("fold") {
val rdd = sc.makeRDD(-1000 until 1000, 10)
def op: (Int, Int) => Int = (c: Int, x: Int) => c + x
val sum = rdd.fold(0)(op)
assert(sum === -1000)
}

test("fold with op modifying first arg") {
val rdd = sc.makeRDD(-1000 until 1000, 10).map(x => Array(x))
def op: (Array[Int], Array[Int]) => Array[Int] = { (c: Array[Int], x: Array[Int]) =>
c(0) += x(0)
c
}
val sum = rdd.fold(Array(0))(op)
assert(sum(0) === -1000)
}

test("aggregate") {
val pairs = sc.makeRDD(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3)))
type StringMap = HashMap[String, Int]
Expand All @@ -218,7 +235,19 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
def combOp: (Long, Long) => Long = (c1: Long, c2: Long) => c1 + c2
for (depth <- 1 until 10) {
val sum = rdd.treeAggregate(0L)(seqOp, combOp, depth)
assert(sum === -1000L)
assert(sum === -1000)
}
}

test("treeAggregate with ops modifying first args") {
val rdd = sc.makeRDD(-1000 until 1000, 10).map(x => Array(x))
def op: (Array[Int], Array[Int]) => Array[Int] = { (c: Array[Int], x: Array[Int]) =>
c(0) += x(0)
c
}
for (depth <- 1 until 10) {
val sum = rdd.treeAggregate(Array(0))(op, op, depth)
assert(sum(0) === -1000)
}
}

Expand Down

0 comments on commit 5a33718

Please sign in to comment.