From a4a580424b8eea3264ae9c4ae9ae2bec22af6201 Mon Sep 17 00:00:00 2001 From: uncleGen Date: Wed, 19 Nov 2014 19:09:11 +0800 Subject: [PATCH 1/3] add control over map-side aggregation --- python/pyspark/rdd.py | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 50535d2711708..08eaf0cf8d53f 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1319,7 +1319,7 @@ def values(self): """ return self.map(lambda (k, v): v) - def reduceByKey(self, func, numPartitions=None): + def reduceByKey(self, func, numPartitions=None, mapSideCombine=True): """ Merge the values for each key using an associative reduce function. @@ -1334,7 +1334,7 @@ def reduceByKey(self, func, numPartitions=None): >>> sorted(rdd.reduceByKey(add).collect()) [('a', 2), ('b', 1)] """ - return self.combineByKey(lambda x: x, func, func, numPartitions) + return self.combineByKey(lambda x: x, func, func, numPartitions, mapSideCombine) def reduceByKeyLocally(self, func): """ @@ -1516,9 +1516,8 @@ def add_shuffle_key(split, iterator): rdd._partitionFunc = partitionFunc return rdd - # TODO: add control over map-side aggregation def combineByKey(self, createCombiner, mergeValue, mergeCombiners, - numPartitions=None): + numPartitions=None, mapSideCombine=True): """ Generic function to combine the elements for each key using a custom set of aggregation functions. @@ -1559,18 +1558,21 @@ def combineLocally(iterator): merger.mergeValues(iterator) return merger.iteritems() - locally_combined = self.mapPartitions(combineLocally) - shuffled = locally_combined.partitionBy(numPartitions) - def _mergeCombiners(iterator): merger = ExternalMerger(agg, memory, serializer) \ if spill else InMemoryMerger(agg) merger.mergeCombiners(iterator) return merger.iteritems() - return shuffled.mapPartitions(_mergeCombiners, True) + if mapSideCombine == True: + locally_combined = self.mapPartitions(combineLocally) + shuffled = locally_combined.partitionBy(numPartitions) + return shuffled.mapPartitions(_mergeCombiners) + else: + shuffled = self.partitionBy(numPartitions) + return shuffled.mapPartitions(combineLocally) - def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None): + def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None, mapSideCombine=True): """ Aggregate the values of each key, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type @@ -1584,9 +1586,9 @@ def createZero(): return copy.deepcopy(zeroValue) return self.combineByKey( - lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions) + lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions, mapSideCombine) - def foldByKey(self, zeroValue, func, numPartitions=None): + def foldByKey(self, zeroValue, func, numPartitions=None, mapSideCombine=True): """ Merge the values for each key using an associative function "func" and a neutral "zeroValue" which may be added to the result an @@ -1601,10 +1603,11 @@ def foldByKey(self, zeroValue, func, numPartitions=None): def createZero(): return copy.deepcopy(zeroValue) - return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions) + return self.combineByKey( + lambda v: func(createZero(), v), func, func, numPartitions, mapSideCombine) # TODO: support variant with custom partitioner - def groupByKey(self, numPartitions=None): + def groupByKey(self, numPartitions=None, mapSideCombine=True): """ Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with into numPartitions partitions. @@ -1630,7 +1633,7 @@ def mergeCombiners(a, b): return a return self.combineByKey(createCombiner, mergeValue, mergeCombiners, - numPartitions).mapValues(lambda x: ResultIterable(x)) + numPartitions, mapSideCombine).mapValues(lambda x: ResultIterable(x)) def flatMapValues(self, f): """ From e3b0bc4f3a97e50a9584bf2281ddc6aa8034b3d6 Mon Sep 17 00:00:00 2001 From: uncleGen Date: Wed, 19 Nov 2014 19:28:31 +0800 Subject: [PATCH 2/3] fix --- python/pyspark/rdd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 08eaf0cf8d53f..53f911988545a 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1564,7 +1564,7 @@ def _mergeCombiners(iterator): merger.mergeCombiners(iterator) return merger.iteritems() - if mapSideCombine == True: + if mapSideCombine: locally_combined = self.mapPartitions(combineLocally) shuffled = locally_combined.partitionBy(numPartitions) return shuffled.mapPartitions(_mergeCombiners) From 66561d4aed9a02aeaaa84009ac679401ac4f4bfd Mon Sep 17 00:00:00 2001 From: uncleGen Date: Wed, 19 Nov 2014 19:46:03 +0800 Subject: [PATCH 3/3] fix --- python/pyspark/rdd.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 53f911988545a..af189071d8a5c 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1632,8 +1632,8 @@ def mergeCombiners(a, b): a.extend(b) return a - return self.combineByKey(createCombiner, mergeValue, mergeCombiners, - numPartitions, mapSideCombine).mapValues(lambda x: ResultIterable(x)) + return self.combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions, + mapSideCombine).mapValues(lambda x: ResultIterable(x)) def flatMapValues(self, f): """