From da40d84d3712bfc421bed8ec41dba798b1c06732 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Mon, 18 May 2015 09:37:09 +0100 Subject: [PATCH 1/2] Document current limitation of rdd.fold. --- python/pyspark/rdd.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 545c5ad20cb96..0d5d0f82e63c6 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -820,6 +820,14 @@ def fold(self, zeroValue, op): as its result value to avoid object allocation; however, it should not modify C{t2}. + Currently, this implementation only works correctly if the fold operation + is commutative -- if op(a,b) == op(b,a) for any a and b. For example, + sc.parallelize(...).fold(0,lambda a,b:a+1) should return the count of + elements in the RDD, but currently does not work as intended. However, + sc.parallelize(...).fold(0,lambda a,b:a+b) works as intended to return + the sum of elements. In effect, it acts like a reduce operation with a + default value. See SPARK-6416. + >>> from operator import add >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) 15 From 9fef39f4498caa4f85a78625d71066b146ef9794 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Tue, 19 May 2015 21:12:00 +0100 Subject: [PATCH 2/2] Add comment to other languages; reword to highlight the difference from non-distributed collections and to not suggest it is a bug that is to be fixed --- .../apache/spark/api/java/JavaRDDLike.scala | 13 ++++++++++--- .../main/scala/org/apache/spark/rdd/RDD.scala | 13 ++++++++++--- python/pyspark/rdd.py | 18 +++++++++--------- 3 files changed, 29 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 8bf0627fc420d..74db7643224f5 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -386,9 +386,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Aggregate the elements of each partition, and then the results for all the partitions, using a - * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to - * modify t1 and return it as its result value to avoid object allocation; however, it should not - * modify t2. + * given associative and commutative function and a neutral "zero value". The function + * op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object + * allocation; however, it should not modify t2. + * + * This behaves somewhat differently from fold operations implemented for non-distributed + * collections in functional languages like Scala. This fold operation may be applied to + * partitions individually, and then fold those results into the final result, rather than + * apply the fold to each element sequentially in some defined ordering. For functions + * that are not commutative, the result may differ from that of a fold applied to a + * non-distributed collection. */ def fold(zeroValue: T)(f: JFunction2[T, T, T]): T = rdd.fold(zeroValue)(f) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index f7fa37e4cdcdc..d772f03f76651 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1015,9 +1015,16 @@ abstract class RDD[T: ClassTag]( /** * Aggregate the elements of each partition, and then the results for all the partitions, using a - * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to - * modify t1 and return it as its result value to avoid object allocation; however, it should not - * modify t2. + * given associative and commutative function and a neutral "zero value". The function + * op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object + * allocation; however, it should not modify t2. + * + * This behaves somewhat differently from fold operations implemented for non-distributed + * collections in functional languages like Scala. This fold operation may be applied to + * partitions individually, and then fold those results into the final result, rather than + * apply the fold to each element sequentially in some defined ordering. For functions + * that are not commutative, the result may differ from that of a fold applied to a + * non-distributed collection. */ def fold(zeroValue: T)(op: (T, T) => T): T = withScope { // Clone the zero value since we will also be serializing it as part of tasks diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 0d5d0f82e63c6..6ecf151d7c731 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -813,20 +813,20 @@ def op(x, y): def fold(self, zeroValue, op): """ Aggregate the elements of each partition, and then the results for all - the partitions, using a given associative function and a neutral "zero - value." + the partitions, using a given associative and commutative function and + a neutral "zero value." The function C{op(t1, t2)} is allowed to modify C{t1} and return it as its result value to avoid object allocation; however, it should not modify C{t2}. - Currently, this implementation only works correctly if the fold operation - is commutative -- if op(a,b) == op(b,a) for any a and b. For example, - sc.parallelize(...).fold(0,lambda a,b:a+1) should return the count of - elements in the RDD, but currently does not work as intended. However, - sc.parallelize(...).fold(0,lambda a,b:a+b) works as intended to return - the sum of elements. In effect, it acts like a reduce operation with a - default value. See SPARK-6416. + This behaves somewhat differently from fold operations implemented + for non-distributed collections in functional languages like Scala. + This fold operation may be applied to partitions individually, and then + fold those results into the final result, rather than apply the fold + to each element sequentially in some defined ordering. For functions + that are not commutative, the result may differ from that of a fold + applied to a non-distributed collection. >>> from operator import add >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)