In [1]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName('RDD Paired')
sc = SparkContext.getOrCreate(conf=conf)

## Creating paired RDD

In [2]:
rdd1 = sc.parallelize([('a1', 'b1', 'b2', 'b3'), ('a2', 'c1','c2','c3')])
rdd1.map(lambda x: (x[0], list(x[1:]))).collect()

[('a1', ['b1', 'b2', 'b3']), ('a2', ['c1', 'c2', 'c3'])]

## Transformations on Paired RDD


## reduceByKey(func)

combines values with the same key

```sc.parallelize([(key, value)])```

In [6]:
from operator import add
rdd2 = sc.parallelize([(1, 3), (3, 5), (3, 6)])
rdd2.reduceByKey(add).take(10)

[(1, 3), (3, 11)]

## groupByKey(func)

groups values with the same key

```sc.parallelize([(key, value)])```

### mapValues(func)

Pass each value in the key-value pair RDD through a map function without changing the keys; this also retains the original RDD’s partitioning.

```python
x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
def f(x): return len(x)
x.mapValues(f).collect()
```
output: ```[('a', 3), ('b', 1)]```

In [12]:
rdd3 = sc.parallelize([(1, 3), (2, 7), (2, 8)])
rdd3.groupByKey().mapValues(list).collect()

[(1, [3]), (2, [7, 8])]

## flatMapValues(func)

applies function that returns an iterator to each value of paired RDD and, for each paired element returned produces a key-value entry with old key; often used for tokenization.

In simple terms flatMapValues fill apply function to each paired rdd, key wise

In [21]:
rdd4 = sc.parallelize([(1, 2), (2, 1), (4, 4), (5, 3)])
rdd4.flatMapValues(lambda x: range(x, 5)).collect()

[(1, 2),
 (1, 3),
 (1, 4),
 (2, 1),
 (2, 2),
 (2, 3),
 (2, 4),
 (4, 4),
 (5, 3),
 (5, 4)]

## keys()

returns an RDD containing only the keys

In [25]:
sc.parallelize([(1, 3), (2, 5), (7, 8), (9, 10)]).keys().collect()

[1, 2, 7, 9]

## sortByKey()

returns an RDD sorted by key

In [27]:
sc.parallelize([(10, 2), (5, 20), (1, 21), (7, 15)]).sortByKey().take(10)

[(1, 21), (5, 20), (7, 15), (10, 2)]

# Transformations on two paired RDDs



## subtractByKey()

removes elements with the key present in the other RDD

In [32]:
rdd_1 = sc.parallelize([(1, 2), (3, 4), (3, 6)])
rdd_2 = sc.parallelize([(3, 9)])

rdd_1.subtractByKey(rdd_2).collect()

[(1, 2)]

## join()

performs inner join between the RDDs.

Keys must be present in both the RDDs.

In [36]:
rdd_1 = sc.parallelize([(1, 2), (3, 4), (3, 6)])
rdd_2 = sc.parallelize([(3, 9)])

rdd_1.join(rdd_2).take(10)

[(3, (4, 9)), (3, (6, 9))]

For example: 
There are 3 employee IDs : 100, 101 and 103
There are 6 departments having IDs: 500, 600, 700, 800, 1100

emp_id 100 works for dept_id 500 as well as dept_id 1100
emp_id 101 works for dept_id 600 as well as dept_id 800

Task is to find the ```(empIds, [deptIds])``` where emp_id work for multiple departments


In [37]:
rdd_3 = sc.parallelize([(100, 500), (101, 600), (103, 700)])
rdd_4 = sc.parallelize([(100, 1100),(101, 800)])
rdd_3.join(rdd_4).collect()

[(100, (500, 1100)), (101, (600, 800))]

## rightOuterJoin()

key must be present in the first RDD.

In [42]:
rdd_5 = sc.parallelize([(1, 2), (3, 4), (3, 6)])
rdd_6 = sc.parallelize([(3, 9), (5, 7)])
rdd_5.rightOuterJoin(rdd_6).collect()

[(3, (4, 9)), (3, (6, 9)), (5, (None, 7))]

In [43]:
rdd_5 = sc.parallelize([(1, 2), (3, 4), (3, 6), (5, 11)])
rdd_6 = sc.parallelize([(3, 9), (5, 7)])
rdd_5.rightOuterJoin(rdd_6).collect()

[(3, (4, 9)), (3, (6, 9)), (5, (11, 7))]

## leftOuterJoin()

key must be present in the other RDD.

In [44]:
rdd_5 = sc.parallelize([(1, 2), (3, 4), (3, 6)])
rdd_6 = sc.parallelize([(3, 9)])
rdd_5.leftOuterJoin(rdd_6).collect()

[(1, (2, None)), (3, (4, 9)), (3, (6, 9))]

## cogroup()

groups the data from both RDDs having the same key.

## RDD.cogroup(other, numPartitions=None)

For each key k in self or other, return a resulting RDD that contains a tuple with the list of values for that key in self as well as other.

### Examples

```python
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
[(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]
```

Output : ```[('a', ([1], [2])), ('b', ([4], []))]```

In [49]:
rdd_5 = sc.parallelize([(1, 2), (3, 4), (3, 6)])
rdd_6 = sc.parallelize([(3, 9)])
[(x, tuple(map(list, y))) for x, y in sorted(list(rdd_5.cogroup(rdd_6).collect()))]

[(1, ([2], [])), (3, ([4, 6], [9]))]

# RDD Lineage

- RDD Lineage is a graph of all parent RDDs of an RDD.
- It is build by applying transformations to RDD and creating a logical execution plan

Consider the below series of transformtions

In [55]:
sc.parallelize([(1, 3), (2, 3)]).flatMapValues(lambda c: range(c, 5)).toDebugString()

b'(4) PythonRDD[305] at RDD at PythonRDD.scala:53 []\n |  ParallelCollectionRDD[304] at readRDDFromFile at PythonRDD.scala:262 []'

An RDD Lineage Graph is hence a graph of transformations that need to be executed after action has been called

We can create RDD Lineage graph using the ```RDD.toDebugString``` method