In [1]:
# Let us create some RDDs
valueRDDA = sc.parallelize(["k", "f", "x", "w", "y", "f", "y", "k", "f"])

rddB = sc.parallelize([1, 2, 1, 2, 1, 2, 1, 1, 1])


In [2]:
# Now, we can zip these RDDs to create new RDDs with different keys
rdd1 = rddB.zip(valueRDDA)

print("RDD 1 : ", rdd1.collect())


RDD 1 :  [(1, 'k'), (2, 'f'), (1, 'x'), (2, 'w'), (1, 'y'), (2, 'f'), (1, 'y'), (1, 'k'), (1, 'f')]


# reduceByKey()

In [3]:
# Writing a transformation that finds all the unique strings corresponding to each key.
# rdd.map(kv => (kv._1, new Set[String]() + kv._2)).reduceByKey(_ ++ _)

rddResult=rdd1.map(lambda x: (x[0], set(x[1]) ) ).reduceByKey(lambda x, y: x.union(y))
rddResult.collect()

[(1, {'f', 'k', 'x', 'y'}), (2, {'f', 'w'})]

# aggregateByKey()


In [6]:
# Better would be to use aggregateByKey()
def my_add(x, y):
    x.add(y)
    return x


rddResult2 = rdd1.aggregateByKey(set() , my_add , lambda x, y: x.union( y))
rddResult2.collect()

[(1, {'f', 'k', 'x', 'y'}), (2, {'f', 'w'})]

# combineByKey()

In [19]:
# Using combineByKey()


# We need to provide 3 functions. 

# createCombiner, which turns a V into a C (e.g., creates a one-element list)
# mergeValue, to merge a V into a C (e.g., adds it to the end of a list)
# mergeCombiners, to combine two C’s into a single one (e.g., merges the lists)

def to_set(a):
    return set(a)


def my_add(x, y):
    x.add(y)
    return x


rddResult2 = rdd1.combineByKey(toSet, my_add , lambda x, y: x.union( y))

rddResult2.collect()

[(1, {'f', 'k', 'x', 'y'}), (2, {'f', 'w'})]

In [18]:
# We can also aggregate to a list. 
def to_list(a):
    return [a]

def addToList(x, y):
    x.append(y)
    return x

def extend(x,y):
    x.extend(y)
    return x

rddResult2 = rdd1.combineByKey(to_list, addToList, extend)

rddResult2.collect()

[(1, ['k', 'x', 'y', 'y', 'k', 'f']), (2, ['f', 'w', 'f'])]