In [15]:
from pyspark import SparkConf, SparkContext

In [16]:
number_cores = 2
memory_gb = 4
# Create a configuration object and
# set the name of the application
conf = (
    SparkConf()
        .setAppName("SparkExample")
        .setMaster('local[{}]'.format(number_cores))
        .set('spark.driver.memory', '{}g'.format(memory_gb))
)
# Create a Spark Context object
sc = SparkContext(conf=conf)

# Group by key

In [17]:
pairs = sc.parallelize([("a", 1), ("a", 2), ("b", 3), ("c", 4)],
                            numSlices=4)

In [18]:
pairs.groupByKey().collect()

                                                                                

[('b', <pyspark.resultiterable.ResultIterable at 0x108970760>),
 ('c', <pyspark.resultiterable.ResultIterable at 0x108b4f790>),
 ('a', <pyspark.resultiterable.ResultIterable at 0x108b4f610>)]

In [19]:
[x for x in pairs.groupByKey().collect()[0][1]]

[3]

# Reduce by key

In [20]:
states = sc.parallelize(["TX", "TX", "CA", "TX", "CA"])

In [21]:
import operator

states.map(lambda x:(x,1)).reduceByKey(operator.add).collect()

[('TX', 3), ('CA', 2)]

# Aggregate by key

In [22]:
zero_value = set()

def seq_op(x,y):
    x.add(y)
    return x

def comb_op(x,y):
    return x.union(y)

In [23]:
numbers = sc.parallelize([0,0,1,2,5,4,5,5,5]).map(
    lambda x: ["even" if (x % 2 == 0) else "odd", x])

In [24]:
numbers.collect()

[['even', 0],
 ['even', 0],
 ['odd', 1],
 ['even', 2],
 ['odd', 5],
 ['even', 4],
 ['odd', 5],
 ['odd', 5],
 ['odd', 5]]

In [25]:
numbers.aggregateByKey(zero_value, seq_op, comb_op).collect()

[('even', {0, 2, 4}), ('odd', {1, 5})]

# Sort by key

In [26]:
pairs = sc.parallelize([("B", 1), ("a", 2), ["A", 3], ("d", 4)])

In [27]:
pairs.sortByKey().collect()

[('A', 3), ('B', 1), ('a', 2), ('d', 4)]

In [28]:
pairs.sortByKey(ascending=False).collect()

[('d', 4), ('a', 2), ('B', 1), ('A', 3)]

In [29]:
pairs.sortByKey(numPartitions=1).glom().collect()

[[['A', 3], ('B', 1), ('a', 2), ('d', 4)]]

In [30]:
pairs.sortByKey(numPartitions=3).glom().collect()

[[('A', 3), ('B', 1)], [('a', 2)], [('d', 4)]]

In [31]:
pairs.sortByKey(keyfunc=lambda x:x.lower()).collect()

[('a', 2), ('A', 3), ('B', 1), ('d', 4)]

# Join

In [32]:
a = sc.parallelize([(1, "a"), (2, "a")])

In [33]:
b = sc.parallelize([(2, "b"), (3, "b")])

In [34]:
a.join(b).collect()

[(2, ('a', 'b'))]

In [35]:
c = sc.parallelize([(2, "b"), (3, "b"), (2, "c")])

In [36]:
a.join(c).collect()

[(2, ('a', 'b')), (2, ('a', 'c'))]

In [37]:
a.leftOuterJoin(b).collect()

[(1, ('a', None)), (2, ('a', 'b'))]

# CoGroup

In [38]:
a = sc.parallelize([(1, "a"), (2, "a")])

In [39]:
b = sc.parallelize([(2,"b"), (2, "c"), (3, "d")])

In [40]:
a.join(b).collect()

[(2, ('a', 'b')), (2, ('a', 'c'))]

In [41]:
a.cogroup(b).mapValues(lambda x:[list(x[0]), list(x[1])]).collect()

[(1, [['a'], []]), (2, [['a'], ['b', 'c']]), (3, [[], ['d']])]

# Stop the Spark Context

In [42]:
sc.stop()