In [None]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("Spark RDD Course")
sc = SparkContext(conf=conf)

# Introduction to `Spark` RDD

In [None]:
rdd = sc.parallelize([2, 3, 4])

Note that `parallelize` takes an optional argument to choose the number of partitions

In [None]:
rdd.getNumPartitions()

In [None]:
rdd = sc.parallelize(range(1000), 10)
rdd.getNumPartitions()

## Transformations

### `map`

In [None]:
rdd = sc.parallelize([2, 3, 4])
rdd = rdd.map(lambda x: list(range(1, x)))

`map` is a transformation which is lazy evaluate (hence delayed until an action is met in the DAG).

In [None]:
rdd.collect()

### Exercice: `map` with a method

**Warning.** This example is a bad practice !!! Don't do this at home

In [None]:
class TelephoneDB(object):
    
    def __init__(self):
        self.tel = {'stephane': 1234, 'yasser': 4567}
    
    def add_tel(self, name):
        return name, self.tel[name]

In [None]:
tel_db = TelephoneDB()
names = ['stephane', 'yasser']
rdd = sc.parallelize(names).map(tel_db.add_tel).collect()
rdd

- Replace the `tel` dictionary by a `defaultdict` with default number `999` 
- Use it on a `rdd` containing names as above including an unknown one, and try it

In [None]:
from collections import defaultdict

class TelephoneDefaultDB(object):
    
    def __init__(self):
        self.tel = defaultdict(lambda: 999, {'stephane': 1234, 'yasser': 4567})
    
    def add_tel(self, name):
        return name, self.tel[name]
    
    def add_tel_rdd(self, rdd):
        
        return rdd.map(self.add_tel)

In [None]:
tel_db = TelephoneDefaultDB()
names = ['stephane', 'yasser', 'yiyang']
rdd = sc.parallelize(names).map(tel_db.add_tel).collect()
rdd

**Warning**. Once again, this is a bad idea to pass class methods to spark's `map`.
Since `add_tel` needs `self`, the whole object is serialized so that `spark` can use it.
This breaks if the `tel` is large, or if it is not serializable.

### `flatMap`

In [None]:
rdd = sc.parallelize([2, 3, 4, 5])
rdd.flatMap(lambda x: range(1, x)).collect()

### `filter`

In [None]:
rdd = sc.parallelize(range(10))
rdd.filter(lambda x: x % 2 == 0).collect()

### `distinct`

In [None]:
rdd = sc.parallelize([1, 1, 4, 2, 1, 3, 3])
rdd.distinct().collect()

### "Pseudo-set" operations

In [None]:
rdd1 = sc.parallelize(range(5))
rdd2 = sc.parallelize(range(3, 9))
rdd3 = rdd1.union(rdd2)
rdd3.collect()

In [None]:
rdd3.distinct().collect()

In [None]:
rdd1 = sc.parallelize([1, 2])
rdd2 = sc.parallelize(["a", "b"])
rdd1.cartesian(rdd2).collect()

## Actions

Well, `collect` is obviously an action...

### `count`, `countByValue`

In [None]:
rdd = sc.parallelize([1, 3, 1, 2, 2, 2])
rdd.count()

In [None]:
rdd.countByValue()

### `take`, `takeOrdered`

In [None]:
rdd = sc.parallelize([(3, 'a'), (1, 'b'), (2, 'd')])

In [None]:
rdd.takeOrdered(2)

In [None]:
rdd.takeOrdered(2, key=lambda x: x[1])

### `reduce`, `fold`

In [None]:
rdd = sc.range(1, 4)
rdd.reduce(lambda a, b: a + b)

In [None]:
rdd.fold(0, lambda a, b: a + b)

In [None]:
rdd = sc.parallelize([1, 2, 4], 2)
rdd.fold(2.5, lambda a, b: a + b)

In [None]:
rdd = sc.parallelize([1, 2, 3], 5)
rdd.fold(2, lambda a, b: a + b)

In [None]:
rdd.getNumPartitions()

### `aggregate`

In [None]:
seqOp = lambda x, y: (x[0] + y, x[1] + 1)
combOp = lambda x, y: (x[0] + y[0], x[1] + y[1])
sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)

### Exercice: sum of powers with `aggregate`

- Using `aggregate`, compute the sum, the sum of squares $x^2$ and the sum of $x^3$ for 
$x \in \{1, \ldots, 10 \}$.
- Check your computations using `numpy`

In [None]:
seqOp = lambda x, y: (x[0] + y, x[1] + y ** 2, x[2] + y ** 3)

In [None]:
combOp = lambda x, y: (x[0] + y[0], x[1] + y[1], x[2] + y[2])

In [None]:
sc.range(1, 11).aggregate((0, 0, 0), seqOp, combOp)

In [None]:
import numpy as np

x = np.arange(1, 11)
x

In [None]:
x.sum(), (x**2).sum(), (x**3).sum()

# `PairRDD`

In [None]:
rdd = sc.parallelize([[1, "a", 7], [2, "b", 13], [2, "c", 17]])
rdd = rdd.map(lambda x: (x[0], x[1:]))
rdd.collect()

## Transformations

### `keys`, `values`

In [None]:
rdd.keys().collect()

In [None]:
rdd.values().collect()

**Warning**. An element mut be tuples with two elements (the key and the value)

In [None]:
rdd = sc.parallelize([[1, "a", 7], [2, "b", 13], [2, "c", 17]])
rdd.keys().collect()

In [None]:
rdd.values().collect()

The values are **not** what we expected wrong... so we **must** do

In [None]:
rdd = sc.parallelize([[1, "a", 7], [2, "b", 13], [2, "c", 17]]).map(lambda x: (x[0], x[1:]))
rdd.keys().collect()

In [None]:
rdd.values().collect()

In [None]:
rdd.values().collect()

Now the values are correct.

### `mapValues`, `flatMapValues`

In [None]:
rdd = sc.parallelize([("a", "x y z"), ("b", "p r")])
rdd.mapValues(lambda v: v.split(' ')).collect()

In [None]:
rdd.flatMapValues(lambda v: v.split(' ')).collect()

### `groupByKey`

In [None]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1), ("b", 3), ("c", 42)])
rdd.groupByKey().mapValues(list).collect()

### `reduceByKey`

In [None]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
rdd.reduceByKey(lambda a, b: a + b).collect()

### `combineByKey`

In [None]:
rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 13)])

def add(a, b): 
    return a + str(b)

rdd.combineByKey(str, add, add).collect()

### `join`, `rightOuterJoin`, `leftOuterJoin`

In [None]:
employees = sc.parallelize([
    (31, "Rafferty"),
    (33, "Jones"),
    (33, "Heisenberg"),
    (34, "Robinson"),
    (34, "Smith"),
    (None, "Williams")
])

In [None]:
departments = sc.parallelize([
    (31, "Sales"),
    (33, "Engineering"),
    (34, "Clerical"),
    (35, "Marketing")
])

In [None]:
employees.join(departments).sortByKey().collect()

In [None]:
employees.rightOuterJoin(departments).sortByKey().collect()

In [None]:
employees.leftOuterJoin(departments).collect()

## Actions

In [None]:
employees.countByKey()

In [None]:
employees.lookup(33)

In [None]:
employees.lookup(None)

In [None]:
employees.collectAsMap()