In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('combineByKey').getOrCreate()

In [55]:
rdd = spark.sparkContext.parallelize([("a", 1), ("b", 1), ("a", 2)])

"""
Generic function to combine the elements for each key using a custom set of aggregation functions.

Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a “combined type” C.

Users provide three functions:

createCombiner, which turns a V into a C (e.g., creates a one-element list)

mergeValue, to merge a V into a C (e.g., adds it to the end of a list)

mergeCombiners, to combine two C’s into a single one (e.g., merges the lists)

To avoid memory allocation, both mergeValue and mergeCombiners are allowed to modify and return their first argument instead of creating a new C.

In addition, users can control the partitioning of the output RDD.

"""


def combiner(a):
    return [a]

def mergeValue(a, b):
    a.append(b)
    
    return a

def mergeCombiner(a, b):
    a.extend(b)
    return a

sorted(rdd.combineByKey(combiner, mergeValue, mergeCombiner).collect())

[('a', [1, 2]), ('b', [1])]

In [54]:
data = [
        ('A', 2.), ('A', 4.), ('A', 9.), 
        ('B', 10.), ('B', 20.), 
        ('Z', 3.), ('Z', 5.), ('Z', 8.), ('Z', 12.) 
       ]

rdd = spark.sparkContext.parallelize( data )

sumCount = rdd.combineByKey(lambda value: (value, 1),
                            lambda x, value: (x[0] + value, x[1] + 1),
                            lambda x, y: (x[0] + y[0], x[1] + y[1])
                           )
sumCount.collect()

[('A', (15.0, 3)), ('B', (30.0, 2)), ('Z', (28.0, 4))]

In [56]:
data = [
        ('A', 2.), ('A', 4.), ('A', 9.), 
        ('B', 10.), ('B', 20.), 
        ('Z', 3.), ('Z', 5.), ('Z', 8.), ('Z', 12.) 
       ]

rdd = spark.sparkContext.parallelize( data )

sumCount = rdd.combineByKey(lambda value: (value, 1),
                            lambda x, value: (x[0] + value, x[1] + 1),
                            lambda x, y: (x[0] + y[0])
                           )
sumCount.collect()

[('A', 15.0), ('B', 30.0), ('Z', 28.0)]