# 02 - RDD Basics

This notebooks describes the concept of RDD and operations on RDD.

## Startup

First, let's check if spark is running (don't forget there is a **Spark UI** available)

In [1]:
spark

## Resilient Distributed Datasets (RDD)

* immutable (read-only), distributed and resilient (fault-tolerant)
* data is distributed accross nodes

The RDD PySpark API is [here](http://spark.apache.org/docs/default/api/python/pyspark.html#pyspark.RDD)

In [27]:
data = sc.parallelize(range(10))

In [28]:
data

PythonRDD[43] at RDD at PythonRDD.scala:53

An RDD can contains several entity with different types.

In [29]:
data = sc.parallelize(["abc",1,2.34,False])

Access all elements in the RDD using [`collect()`](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.collect) (**WARNING:** the content of all the RDD is returned to the Spark driver, *i.e.* the python kernel of your notebook in this case)

In [30]:
data.collect()

['abc', 1, 2.34, False]

Access the first element of the RDD using [`first()`](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.first)

In [31]:
data.first()

'abc'

Access the first elements of the RDD using [`take()`](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.take) (**SAME WARNING AS PREVIOUS CELL**)

In [32]:
data.take(3)

['abc', 1, 2.34]

## RDD transformations and actions

RDD Computation are **lazy**: they are launched only when needed
* [`parallelize()`](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.parallelize) is a **transformation**, it produces a new RDD
* [`count()`](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.count) is an **action**, it gives non-RDD values.

In [33]:
data = sc.parallelize(range(10))

In [34]:
data

PythonRDD[51] at RDD at PythonRDD.scala:53

In [35]:
data.count()

10

Transformations build an **RDD lineage** (also known as RDD operator graph or RDD dependency graph), which is a logical execution plan, *i.e.* a **Directed Acyclic Graph** (DAG) of the entire parent RDDs of RDD. An action is the final operation of a RDD lineage and it starts all RDD transformations following the DAG to obtain the output value of the action.

For instance, the following cell contains 3 transformations, and the action is [`collect()`](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.collect):

In [36]:
data = sc.parallelize(range(10)).map(lambda x: x * 2).filter(lambda x: x > 9)

In [37]:
data

PythonRDD[54] at RDD at PythonRDD.scala:53

In [38]:
data.collect()

[10, 12, 14, 16, 18]

### RDD Transformations

Let's see some transformation methods, you can guess by yourself what they are doing...

#### [`map()`](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.map)

In [39]:
sc.parallelize(range(10)).map(lambda x: x + 2).collect()

[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]

#### [`flatMap()`](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.flatMap)

In [40]:
sc.parallelize(range(10)).flatMap(lambda x: (x, x + 2)).collect()

[0, 2, 1, 3, 2, 4, 3, 5, 4, 6, 5, 7, 6, 8, 7, 9, 8, 10, 9, 11]

#### [`filter()`](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.filter)

In [41]:
sc.parallelize(['a','b','c','d','e']).filter(lambda x: x != 'a').collect()

['b', 'c', 'd', 'e']

#### [`union()`](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.union)

In [42]:
rdd1 = sc.parallelize(['a','b','c','d','e'])
rdd2 = sc.parallelize(['d','e','f','g','h'])
rdd1.union(rdd2).collect()

['a', 'b', 'c', 'd', 'e', 'd', 'e', 'f', 'g', 'h']

#### [`intersection()`](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.intersection)

In [43]:
rdd1 = sc.parallelize(['a','b','c','d','e'])
rdd2 = sc.parallelize(['d','e','f','g','h'])
rdd1.intersection(rdd2).collect()

['e', 'd']

**TODO:** sort()

### RDD actions

Actions are the final step after a set of transformations, and returns a non-RDD results.

In [19]:
rdd = sc.parallelize([4, 8, 3, 1, 7, 6, 5, 2])

In [20]:
rdd.collect()

[4, 8, 3, 1, 7, 6, 5, 2]

In [21]:
rdd.count()

8

In [22]:
rdd.first()

4

In [23]:
rdd.max()

8

In [24]:
rdd.mean()

4.5

In [25]:
rdd.min()

1

In [26]:
rdd.stdev()

2.29128784747792

In [27]:
rdd.variance()

5.25

In [28]:
rdd.take(3)

[4, 8, 3]

In [29]:
rdd.takeOrdered(10)

[1, 2, 3, 4, 5, 6, 7, 8]

In [30]:
rdd.takeSample(num=6,withReplacement=True,seed=0xCAFEBABE)

[5, 7, 6, 4, 4, 8]

In [31]:
rdd.takeSample(num=6,withReplacement=False,seed=0xCAFEBABE)

[6, 2, 8, 4, 1, 3]

## Pair RDD

Pair RDD is a RDD containing key/values pairs, *i.e.* tuple of data. Pair RDD transformations produce key,value pairs whereas operations on RDD gives you a collection of values or a single value

*Note:* Some examples are stolen from this [tutorial](https://programmathics.com/big-data/apache-spark/apache-spark-working-with-key-value-pairs-in-rdd/apache-spark-creating-pair-rdds-and-associated-transformations-part-1/).

In [32]:
rdd1= sc.parallelize([(1,2),(3,2),(2,4),(1,2),(3,4),(1,5)])

### [`keys()`](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.keys)

In [33]:
rdd1.keys().collect()

[1, 3, 2, 1, 3, 1]

### [`values()`](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.values)

In [34]:
rdd1.values().collect()

[2, 2, 4, 2, 4, 5]

In [35]:
rdd1.countByKey()

defaultdict(int, {1: 3, 3: 2, 2: 1})

In [36]:
rdd1.countByValue()

defaultdict(int, {(1, 2): 2, (3, 2): 1, (2, 4): 1, (3, 4): 1, (1, 5): 1})

In [37]:
rdd1.map(lambda x: (x[0]+1,x[1]+1)).collect()

[(2, 3), (4, 3), (3, 5), (2, 3), (4, 5), (2, 6)]

### [`mapValues()`](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.mapValues)

In [38]:
rdd = sc.parallelize([(1, [10, 11, 12]), (2, [20, 21])])
rdd1.mapValues(lambda x: (x, 3,4)).collect()

[(1, (2, 3, 4)),
 (3, (2, 3, 4)),
 (2, (4, 3, 4)),
 (1, (2, 3, 4)),
 (3, (4, 3, 4)),
 (1, (5, 3, 4))]

### [`flatMapValues()`](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.flatMapValues)

In [39]:
rdd = sc.parallelize([(1, [10, 11, 12]), (2, [20, 21])])
rdd.flatMapValues(lambda x: (x, 3,4)).collect()

[(1, [10, 11, 12]), (1, 3), (1, 4), (2, [20, 21]), (2, 3), (2, 4)]

**TODO:** sortByKey()

### Joins operations

In [40]:
rdd1 = sc.parallelize([(1,"A"),(2,"B"),(3,"C"),(4,"D"),(5,"E")])
rdd2 = sc.parallelize([(4,"W"),(5,"X"),(6,"Y"),(7,"Z")])

In [41]:
rdd1.leftOuterJoin(rdd2).collect()

[(1, ('A', None)),
 (2, ('B', None)),
 (3, ('C', None)),
 (4, ('D', 'W')),
 (5, ('E', 'X'))]

In [42]:
rdd1.subtractByKey(rdd2).collect()

[(1, 'A'), (2, 'B'), (3, 'C')]

In [43]:
rdd1.join(rdd2).collect()

[(4, ('D', 'W')), (5, ('E', 'X'))]

In [44]:
rdd1.rightOuterJoin(rdd2).collect()

[(4, ('D', 'W')), (5, ('E', 'X')), (6, (None, 'Y')), (7, (None, 'Z'))]

#### [`cartesian()`](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.cartesian)

In [45]:
rdd1 = sc.parallelize(range(10))
rdd2 = sc.parallelize(range(7,17))
rdd1.cartesian(rdd2).take(10)

[(0, 7),
 (0, 8),
 (1, 7),
 (1, 8),
 (0, 9),
 (0, 10),
 (0, 11),
 (1, 9),
 (1, 10),
 (1, 11)]

## [`reduce()`](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduce)

action that aggregates all the elements of the RDD using using the specified commutative and associative binary operatorand returns the final result to the driver program

In [46]:
rdd = sc.parallelize([1,2,3,2,5])
rdd.reduce(lambda a,b : a + b)

13

In [47]:
rdd.reduce(lambda a,b : a + (1 if b % 2 else 0))

3

In [48]:
rdd.reduce(lambda a,b : [a,b])

[[[1, 2], 3], [2, 5]]

## Group operations

### [`reduceByKey()`](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduceByKey)

Same as reduce() but grouping by key. For instance let's do a Map-Reduce operation (Hadoop-like)

In [21]:
data = sc.parallelize("""i decided, very early on, just to accept life unconditionally; i never expected it to do anything special for me, yet i seemed
to accomplish far more than i had ever hoped. most of the time it just happened to me without me ever seeking it.""".split())

In [26]:
data.take(10)

['i',
 'decided,',
 'very',
 'early',
 'on,',
 'just',
 'to',
 'accept',
 'life',
 'unconditionally;']

In [23]:
pairs = data.map(lambda s: (s, 1)).reduceByKey(lambda a, b: a + b)

In [24]:
pairs.collect()

[('i', 4),
 ('yet', 1),
 ('seemed', 1),
 ('far', 1),
 ('ever', 2),
 ('of', 1),
 ('early', 1),
 ('accept', 1),
 ('life', 1),
 ('me,', 1),
 ('accomplish', 1),
 ('without', 1),
 ('decided,', 1),
 ('on,', 1),
 ('just', 2),
 ('unconditionally;', 1),
 ('anything', 1),
 ('than', 1),
 ('it', 2),
 ('for', 1),
 ('hoped.', 1),
 ('do', 1),
 ('happened', 1),
 ('to', 4),
 ('special', 1),
 ('most', 1),
 ('the', 1),
 ('time', 1),
 ('seeking', 1),
 ('very', 1),
 ('never', 1),
 ('more', 1),
 ('it.', 1),
 ('expected', 1),
 ('had', 1),
 ('me', 2)]

### [`groupByKey()`](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.groupByKey)
 
It's aggregate type method with naive map-reduce approach: all keys are sent to reducers with high network I/O (shuffle).

In [53]:
data.groupBy(lambda x : x).mapValues(lambda x: len(x)).collect()

[('i', 4),
 ('do', 1),
 ('yet', 1),
 ('seemed', 1),
 ('far', 1),
 ('ever', 2),
 ('of', 1),
 ('happened', 1),
 ('early', 1),
 ('to', 4),
 ('accept', 1),
 ('life', 1),
 ('special', 1),
 ('me,', 1),
 ('accomplish', 1),
 ('most', 1),
 ('the', 1),
 ('time', 1),
 ('without', 1),
 ('seeking', 1),
 ('decided,', 1),
 ('very', 1),
 ('on,', 1),
 ('just', 2),
 ('unconditionally;', 1),
 ('never', 1),
 ('anything', 1),
 ('more', 1),
 ('than', 1),
 ('it.', 1),
 ('expected', 1),
 ('it', 2),
 ('for', 1),
 ('had', 1),
 ('hoped.', 1),
 ('me', 2)]

### [`combineByKey()`](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.combineByKey)

Variant of reduceByKey with three parameters and the notion of combiner. 

* first: create the combiner (init the combiner, usually with a one-element list)
* second: merge the values of the combiners
* third: merge the combiners

The following example is stolen from pyspark documentation.

In [54]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])

# First
def to_list(a):
    return [a]

# Second
def append(a, b):
    a.append(b)
    return a

# Third
def extend(a, b):
    a.extend(b)
    return a

sorted(rdd.combineByKey(to_list, append, extend).collect())

[('a', [1, 2]), ('b', [1])]

In [55]:
def create_tuple(x):
    return (x,1)

def identity(a):
    return a

def add(a, b):
    return a + b

data.map(create_tuple).combineByKey(identity, add, add).collect()

[('i', 4),
 ('do', 1),
 ('yet', 1),
 ('seemed', 1),
 ('far', 1),
 ('ever', 2),
 ('of', 1),
 ('happened', 1),
 ('early', 1),
 ('to', 4),
 ('accept', 1),
 ('life', 1),
 ('special', 1),
 ('me,', 1),
 ('accomplish', 1),
 ('most', 1),
 ('the', 1),
 ('time', 1),
 ('without', 1),
 ('seeking', 1),
 ('decided,', 1),
 ('very', 1),
 ('on,', 1),
 ('just', 2),
 ('unconditionally;', 1),
 ('never', 1),
 ('anything', 1),
 ('more', 1),
 ('than', 1),
 ('it.', 1),
 ('expected', 1),
 ('it', 2),
 ('for', 1),
 ('had', 1),
 ('hoped.', 1),
 ('me', 2)]

The same with lambdas

In [56]:
data.map(lambda x: (x,1)).combineByKey(\
    lambda value: value,\
    lambda x, value: x + value,\
    lambda x, y: x+y)\
.collect()

[('i', 4),
 ('do', 1),
 ('yet', 1),
 ('seemed', 1),
 ('far', 1),
 ('ever', 2),
 ('of', 1),
 ('happened', 1),
 ('early', 1),
 ('to', 4),
 ('accept', 1),
 ('life', 1),
 ('special', 1),
 ('me,', 1),
 ('accomplish', 1),
 ('most', 1),
 ('the', 1),
 ('time', 1),
 ('without', 1),
 ('seeking', 1),
 ('decided,', 1),
 ('very', 1),
 ('on,', 1),
 ('just', 2),
 ('unconditionally;', 1),
 ('never', 1),
 ('anything', 1),
 ('more', 1),
 ('than', 1),
 ('it.', 1),
 ('expected', 1),
 ('it', 2),
 ('for', 1),
 ('had', 1),
 ('hoped.', 1),
 ('me', 2)]

**TODO**: `fold()`, `foldByKey()`, `aggregate()` and `aggregateByKey()`

## Reading files

In [57]:
passwd_rdd = sc.textFile('/etc/passwd')

In [58]:
passwd_rdd.count()

46

In [59]:
passwd_uid = passwd_rdd.map(lambda x: x.split(':')[2])

In [60]:
passwd_gid = passwd_rdd.map(lambda x: x.split(':')[3])

In [61]:
passwd_gid.collect()

['0',
 '1',
 '2',
 '3',
 '65534',
 '60',
 '12',
 '7',
 '8',
 '9',
 '10',
 '13',
 '33',
 '34',
 '38',
 '39',
 '41',
 '65534',
 '102',
 '103',
 '106',
 '107',
 '65534',
 '111',
 '112',
 '46',
 '65534',
 '114',
 '29',
 '117',
 '65534',
 '119',
 '120',
 '122',
 '123',
 '7',
 '124',
 '65534',
 '125',
 '100',
 '116',
 '128',
 '65534',
 '130',
 '133',
 '65534']