From 23bf400e0ef4fd4f5eae65659741e96a68ae9ff8 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Thu, 8 May 2014 17:44:57 -0700 Subject: [PATCH 1/7] SPARK-554. Add aggregateByKey. --- .../apache/spark/rdd/PairRDDFunctions.scala | 50 +++++++++++++++++++ .../spark/rdd/PairRDDFunctionsSuite.scala | 13 +++++ docs/programming-guide.md | 4 ++ 3 files changed, 67 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 8909980957058..44ce782160f0e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -118,6 +118,56 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions)) } + /** + * Aggregate the values of each key, using given combine functions and a neutral "zero value". + * This function can return a different result type, U, than the type of the values in this RDD, + * V. Thus, we need one operation for merging a T into a U and one operation for merging two U's, + * as in scala.TraversableOnce. The former operation is used for merging values within a partition, + * and the latter is used for merging values between partitions. To avoid memory allocation, both + * of these functions are allowed to modify and return their first argument instead of creating a + * new U. + */ + def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, + combOp: (U, U) => U): RDD[(K, U)] = { + // Serialize the zero value to a byte array so that we can get a new clone of it on each key + val zeroBuffer = SparkEnv.get.closureSerializer.newInstance().serialize(zeroValue) + val zeroArray = new Array[Byte](zeroBuffer.limit) + zeroBuffer.get(zeroArray) + + lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance() + def createZero() = cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray)) + + combineByKey[U]((v: V) => seqOp(createZero(), v), seqOp, combOp, partitioner) + } + + /** + * Aggregate the values of each key, using given combine functions and a neutral "zero value". + * This function can return a different result type, U, than the type of the values in this RDD, + * V. Thus, we need one operation for merging a T into a U and one operation for merging two U's, + * as in scala.TraversableOnce. The former operation is used for merging values within a partition, + * and the latter is used for merging values between partitions. To avoid memory allocation, both + * of these functions are allowed to modify and return their first argument instead of creating a + * new U. + */ + def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, + combOp: (U, U) => U): RDD[(K, U)] = { + aggregateByKey(zeroValue, new HashPartitioner(numPartitions))(seqOp, combOp) + } + + /** + * Aggregate the values of each key, using given combine functions and a neutral "zero value". + * This function can return a different result type, U, than the type of the values in this RDD, + * V. Thus, we need one operation for merging a T into a U and one operation for merging two U's, + * as in scala.TraversableOnce. The former operation is used for merging values within a partition, + * and the latter is used for merging values between partitions. To avoid memory allocation, both + * of these functions are allowed to modify and return their first argument instead of creating a + * new U. + */ + def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, + combOp: (U, U) => U): RDD[(K, U)] = { + aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp) + } + /** * Merge the values for each key using an associative function and a neutral "zero value" which * may be added to the result an arbitrary number of times, and must not change the result diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 9ddafc451878d..0b9004448a63e 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -30,6 +30,19 @@ import org.apache.spark.SparkContext._ import org.apache.spark.{Partitioner, SharedSparkContext} class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { + test("aggregateByKey") { + val pairs = sc.parallelize(Array((1, 1), (1, 1), (3, 2), (5, 1), (5, 3)), 2) + + val sets = pairs.aggregateByKey(new HashSet[Int]())(_ += _, _ ++= _).collect() + assert(sets.size === 3) + val valuesFor1 = sets.find(_._1 == 1).get._2 + assert(valuesFor1.toList.sorted === List(1)) + val valuesFor3 = sets.find(_._1 == 3).get._2 + assert(valuesFor3.toList.sorted === List(2)) + val valuesFor5 = sets.find(_._1 == 5).get._2 + assert(valuesFor5.toList.sorted === List(1, 3)) + } + test("groupByKey") { val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))) val groups = pairs.groupByKey().collect() diff --git a/docs/programming-guide.md b/docs/programming-guide.md index 7989e02dfb732..79784682bfd1b 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -890,6 +890,10 @@ for details. reduceByKey(func, [numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument. + + aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) + When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument. + sortByKey([ascending], [numTasks]) When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument. From c2be415da0fc13a547a8b6f505072e3ca1f6a835 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Tue, 27 May 2014 09:48:32 -0700 Subject: [PATCH 2/7] Java and Python aggregateByKey --- .../apache/spark/api/java/JavaPairRDD.scala | 44 +++++++++++++++++++ .../java/org/apache/spark/JavaAPISuite.java | 31 +++++++++++++ python/pyspark/rdd.py | 11 +++++ python/pyspark/tests.py | 15 +++++++ 4 files changed, 101 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 7dcfbf741c4f1..165083f703193 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -228,6 +228,50 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) : PartialResult[java.util.Map[K, BoundedDouble]] = rdd.countByKeyApprox(timeout, confidence).map(mapAsJavaMap) + /** + * Aggregate the values of each key, using given combine functions and a neutral "zero value". + * This function can return a different result type, U, than the type of the values in this RDD, + * V. Thus, we need one operation for merging a T into a U and one operation for merging two U's, + * as in scala.TraversableOnce. The former operation is used for merging values within a partition, + * and the latter is used for merging values between partitions. To avoid memory allocation, both + * of these functions are allowed to modify and return their first argument instead of creating a + * new U. + */ + def aggregateByKey[U](zeroValue: U, partitioner: Partitioner, seqFunc: JFunction2[U, V, U], + combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] = { + implicit val ctag: ClassTag[U] = fakeClassTag + fromRDD(rdd.aggregateByKey(zeroValue, partitioner)(seqFunc, combFunc)) + } + + /** + * Aggregate the values of each key, using given combine functions and a neutral "zero value". + * This function can return a different result type, U, than the type of the values in this RDD, + * V. Thus, we need one operation for merging a T into a U and one operation for merging two U's, + * as in scala.TraversableOnce. The former operation is used for merging values within a partition, + * and the latter is used for merging values between partitions. To avoid memory allocation, both + * of these functions are allowed to modify and return their first argument instead of creating a + * new U. + */ + def aggregateByKey[U](zeroValue: U, numPartitions: Int, seqFunc: JFunction2[U, V, U], + combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] = { + implicit val ctag: ClassTag[U] = fakeClassTag + fromRDD(rdd.aggregateByKey(zeroValue, numPartitions)(seqFunc, combFunc)) + } + + /** + * Aggregate the values of each key, using given combine functions and a neutral "zero value". + * This function can return a different result type, U, than the type of the values in this RDD, + * V. Thus, we need one operation for merging a T into a U and one operation for merging two U's. + * The former operation is used for merging values within a partition, and the latter is used for + * merging values between partitions. To avoid memory allocation, both of these functions are + * allowed to modify and return their first argument instead of creating a new U. + */ + def aggregateByKey[U](zeroValue: U, seqFunc: JFunction2[U, V, U], combFunc: JFunction2[U, U, U]): + JavaPairRDD[K, U] = { + implicit val ctag: ClassTag[U] = fakeClassTag + fromRDD(rdd.aggregateByKey(zeroValue)(seqFunc, combFunc)) + } + /** * Merge the values for each key using an associative function and a neutral "zero value" which * may be added to the result an arbitrary number of times, and must not change the result diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 50a62129116f1..ef41bfb88de9d 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -317,6 +317,37 @@ public Integer call(Integer a, Integer b) { Assert.assertEquals(33, sum); } + @Test + public void aggregateByKey() { + JavaPairRDD pairs = sc.parallelizePairs( + Arrays.asList( + new Tuple2(1, 1), + new Tuple2(1, 1), + new Tuple2(3, 2), + new Tuple2(5, 1), + new Tuple2(5, 3)), 2); + + Map> sets = pairs.aggregateByKey(new HashSet(), + new Function2, Integer, Set>() { + @Override + public Set call(Set a, Integer b) { + a.add(b); + return a; + } + }, + new Function2, Set, Set>() { + @Override + public Set call(Set a, Set b) { + a.addAll(b); + return a; + } + }).collectAsMap(); + Assert.assertEquals(3, sets.size()); + Assert.assertEquals(new HashSet(Arrays.asList(1)), sets.get(1)); + Assert.assertEquals(new HashSet(Arrays.asList(2)), sets.get(3)); + Assert.assertEquals(new HashSet(Arrays.asList(1, 3)), sets.get(5)); + } + @SuppressWarnings("unchecked") @Test public void foldByKey() { diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 9c69c79236edc..104a3a18e05b2 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1178,6 +1178,17 @@ def _mergeCombiners(iterator): combiners[k] = mergeCombiners(combiners[k], v) return combiners.iteritems() return shuffled.mapPartitions(_mergeCombiners) + + def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None): + """ + Aggregate the values of each key, using given combine functions and a neutral "zero value". + This function can return a different result type, U, than the type of the values in this RDD, + V. Thus, we need one operation for merging a T into a U and one operation for merging two U's, + The former operation is used for merging values within a partition, and the latter is used + for merging values between partitions. To avoid memory allocation, both of these functions are + allowed to modify and return their first argument instead of creating a new U. + """ + return self.combineByKey(lambda v: func(zeroValue, v), seqFunc, combFunc, numPartitions) def foldByKey(self, zeroValue, func, numPartitions=None): """ diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 184ee810b861b..398aae1b3d03d 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -188,6 +188,21 @@ def test_deleting_input_files(self): os.unlink(tempFile.name) self.assertRaises(Exception, lambda: filtered_data.count()) + def testAggregateByKey(self): + data = self.sc.parallelize([(1, 1), (1, 1), (3, 2), (5, 1), (5, 3)], 2) + def seqOp(x, y): + x.add(y) + return x + + def combOp(x, y): + x |= y + return x + + sets = dict(pairs.aggregateByKey(set(), seqOp, combOp).collect()) + self.assertEqual(3, len(sets)) + self.assertEqual(set([1]), sets[1]) + self.assertEqual(set([2]), sets[3]) + self.assertEqual(set([1, 3]), sets[5]) class TestIO(PySparkTestCase): From ae567465b88f6375e2dcac7d24f47e8a95b60a67 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Wed, 28 May 2014 21:53:45 -0700 Subject: [PATCH 3/7] Fix doc (replace T with V) --- .../main/scala/org/apache/spark/api/java/JavaPairRDD.scala | 6 +++--- .../main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 6 +++--- python/pyspark/rdd.py | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 165083f703193..e9d62904eb755 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -231,7 +231,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) /** * Aggregate the values of each key, using given combine functions and a neutral "zero value". * This function can return a different result type, U, than the type of the values in this RDD, - * V. Thus, we need one operation for merging a T into a U and one operation for merging two U's, + * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's, * as in scala.TraversableOnce. The former operation is used for merging values within a partition, * and the latter is used for merging values between partitions. To avoid memory allocation, both * of these functions are allowed to modify and return their first argument instead of creating a @@ -246,7 +246,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) /** * Aggregate the values of each key, using given combine functions and a neutral "zero value". * This function can return a different result type, U, than the type of the values in this RDD, - * V. Thus, we need one operation for merging a T into a U and one operation for merging two U's, + * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's, * as in scala.TraversableOnce. The former operation is used for merging values within a partition, * and the latter is used for merging values between partitions. To avoid memory allocation, both * of these functions are allowed to modify and return their first argument instead of creating a @@ -261,7 +261,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) /** * Aggregate the values of each key, using given combine functions and a neutral "zero value". * This function can return a different result type, U, than the type of the values in this RDD, - * V. Thus, we need one operation for merging a T into a U and one operation for merging two U's. + * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's. * The former operation is used for merging values within a partition, and the latter is used for * merging values between partitions. To avoid memory allocation, both of these functions are * allowed to modify and return their first argument instead of creating a new U. diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 44ce782160f0e..961a1d080a724 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -121,7 +121,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) /** * Aggregate the values of each key, using given combine functions and a neutral "zero value". * This function can return a different result type, U, than the type of the values in this RDD, - * V. Thus, we need one operation for merging a T into a U and one operation for merging two U's, + * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's, * as in scala.TraversableOnce. The former operation is used for merging values within a partition, * and the latter is used for merging values between partitions. To avoid memory allocation, both * of these functions are allowed to modify and return their first argument instead of creating a @@ -143,7 +143,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) /** * Aggregate the values of each key, using given combine functions and a neutral "zero value". * This function can return a different result type, U, than the type of the values in this RDD, - * V. Thus, we need one operation for merging a T into a U and one operation for merging two U's, + * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's, * as in scala.TraversableOnce. The former operation is used for merging values within a partition, * and the latter is used for merging values between partitions. To avoid memory allocation, both * of these functions are allowed to modify and return their first argument instead of creating a @@ -157,7 +157,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) /** * Aggregate the values of each key, using given combine functions and a neutral "zero value". * This function can return a different result type, U, than the type of the values in this RDD, - * V. Thus, we need one operation for merging a T into a U and one operation for merging two U's, + * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's, * as in scala.TraversableOnce. The former operation is used for merging values within a partition, * and the latter is used for merging values between partitions. To avoid memory allocation, both * of these functions are allowed to modify and return their first argument instead of creating a diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 104a3a18e05b2..126fafd22d9a1 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1183,7 +1183,7 @@ def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None): """ Aggregate the values of each key, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of the values in this RDD, - V. Thus, we need one operation for merging a T into a U and one operation for merging two U's, + V. Thus, we need one operation for merging a V into a U and one operation for merging two U's, The former operation is used for merging values within a partition, and the latter is used for merging values between partitions. To avoid memory allocation, both of these functions are allowed to modify and return their first argument instead of creating a new U. From 0b735e92e8dbd063b9829812c65bd3d3f5c7dbb2 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Mon, 2 Jun 2014 19:25:57 -0700 Subject: [PATCH 4/7] Fix line lengths --- .../apache/spark/api/java/JavaPairRDD.scala | 16 ++++++------- .../apache/spark/rdd/PairRDDFunctions.scala | 24 +++++++++---------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index e9d62904eb755..14fa9d8135afe 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -232,10 +232,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * Aggregate the values of each key, using given combine functions and a neutral "zero value". * This function can return a different result type, U, than the type of the values in this RDD, * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's, - * as in scala.TraversableOnce. The former operation is used for merging values within a partition, - * and the latter is used for merging values between partitions. To avoid memory allocation, both - * of these functions are allowed to modify and return their first argument instead of creating a - * new U. + * as in scala.TraversableOnce. The former operation is used for merging values within a + * partition, and the latter is used for merging values between partitions. To avoid memory + * allocation, both of these functions are allowed to modify and return their first argument + * instead of creating a new U. */ def aggregateByKey[U](zeroValue: U, partitioner: Partitioner, seqFunc: JFunction2[U, V, U], combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] = { @@ -247,10 +247,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * Aggregate the values of each key, using given combine functions and a neutral "zero value". * This function can return a different result type, U, than the type of the values in this RDD, * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's, - * as in scala.TraversableOnce. The former operation is used for merging values within a partition, - * and the latter is used for merging values between partitions. To avoid memory allocation, both - * of these functions are allowed to modify and return their first argument instead of creating a - * new U. + * as in scala.TraversableOnce. The former operation is used for merging values within a + * partition, and the latter is used for merging values between partitions. To avoid memory + * allocation, both of these functions are allowed to modify and return their first argument + * instead of creating a new U. */ def aggregateByKey[U](zeroValue: U, numPartitions: Int, seqFunc: JFunction2[U, V, U], combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] = { diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 961a1d080a724..b6ad9b6c3e168 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -122,10 +122,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Aggregate the values of each key, using given combine functions and a neutral "zero value". * This function can return a different result type, U, than the type of the values in this RDD, * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's, - * as in scala.TraversableOnce. The former operation is used for merging values within a partition, - * and the latter is used for merging values between partitions. To avoid memory allocation, both - * of these functions are allowed to modify and return their first argument instead of creating a - * new U. + * as in scala.TraversableOnce. The former operation is used for merging values within a + * partition, and the latter is used for merging values between partitions. To avoid memory + * allocation, both of these functions are allowed to modify and return their first argument + * instead of creating a new U. */ def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] = { @@ -144,10 +144,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Aggregate the values of each key, using given combine functions and a neutral "zero value". * This function can return a different result type, U, than the type of the values in this RDD, * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's, - * as in scala.TraversableOnce. The former operation is used for merging values within a partition, - * and the latter is used for merging values between partitions. To avoid memory allocation, both - * of these functions are allowed to modify and return their first argument instead of creating a - * new U. + * as in scala.TraversableOnce. The former operation is used for merging values within a + * partition, and the latter is used for merging values between partitions. To avoid memory + * allocation, both of these functions are allowed to modify and return their first argument + * instead of creating a new U. */ def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] = { @@ -158,10 +158,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Aggregate the values of each key, using given combine functions and a neutral "zero value". * This function can return a different result type, U, than the type of the values in this RDD, * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's, - * as in scala.TraversableOnce. The former operation is used for merging values within a partition, - * and the latter is used for merging values between partitions. To avoid memory allocation, both - * of these functions are allowed to modify and return their first argument instead of creating a - * new U. + * as in scala.TraversableOnce. The former operation is used for merging values within a + * partition, and the latter is used for merging values between partitions. To avoid memory + * allocation, both of these functions are allowed to modify and return their first argument + * instead of creating a new U. */ def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] = { From 2f3afa381556568ded7945609c5a2e2c19ce0053 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Sun, 8 Jun 2014 21:44:57 -0400 Subject: [PATCH 5/7] Fix Python test --- python/pyspark/rdd.py | 2 +- python/pyspark/tests.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 126fafd22d9a1..954fd0a83d235 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1188,7 +1188,7 @@ def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None): for merging values between partitions. To avoid memory allocation, both of these functions are allowed to modify and return their first argument instead of creating a new U. """ - return self.combineByKey(lambda v: func(zeroValue, v), seqFunc, combFunc, numPartitions) + return self.combineByKey(lambda v: seqFunc(zeroValue, v), seqFunc, combFunc, numPartitions) def foldByKey(self, zeroValue, func, numPartitions=None): """ diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 398aae1b3d03d..c15bb457759ed 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -198,7 +198,7 @@ def combOp(x, y): x |= y return x - sets = dict(pairs.aggregateByKey(set(), seqOp, combOp).collect()) + sets = dict(data.aggregateByKey(set(), seqOp, combOp).collect()) self.assertEqual(3, len(sets)) self.assertEqual(set([1]), sets[1]) self.assertEqual(set([2]), sets[3]) From f52e0ade968403206069bc25937686e783a9e8c7 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Wed, 11 Jun 2014 17:44:28 -0700 Subject: [PATCH 6/7] Fix Python tests for real --- python/pyspark/rdd.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 954fd0a83d235..2fe446b8256b9 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1188,7 +1188,10 @@ def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None): for merging values between partitions. To avoid memory allocation, both of these functions are allowed to modify and return their first argument instead of creating a new U. """ - return self.combineByKey(lambda v: seqFunc(zeroValue, v), seqFunc, combFunc, numPartitions) + def createZero(): + return copy.deepcopy(zeroValue) + + return self.combineByKey(lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions) def foldByKey(self, zeroValue, func, numPartitions=None): """ @@ -1201,7 +1204,10 @@ def foldByKey(self, zeroValue, func, numPartitions=None): >>> rdd.foldByKey(0, add).collect() [('a', 2), ('b', 1)] """ - return self.combineByKey(lambda v: func(zeroValue, v), func, func, numPartitions) + def createZero(): + return copy.deepcopy(zeroValue) + + return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions) # TODO: support variant with custom partitioner From 2302b8f87e186828867a524bb8df0e505ee0a4d4 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Thu, 12 Jun 2014 01:16:28 -0700 Subject: [PATCH 7/7] Add MIMA exclude --- project/MimaExcludes.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index dd7efceb23c96..f087005805378 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -52,7 +52,10 @@ object MimaExcludes { ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.api.java.JavaRDDLike.countApproxDistinct$default$1"), ProblemFilters.exclude[MissingMethodProblem]( - "org.apache.spark.api.java.JavaDoubleRDD.countApproxDistinct$default$1") + "org.apache.spark.api.java.JavaDoubleRDD.countApproxDistinct$default$1"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$" + + "createZero$1") ) ++ MimaBuild.excludeSparkClass("rdd.ZippedRDD") ++ MimaBuild.excludeSparkClass("rdd.ZippedPartition") ++