# Programming with RDD


## download


## RDD Creation examples

1. Parallelize method
2. Reading from external source

In [None]:
sc

In [None]:
localList = range(1,20)

In [None]:
listRDD = sc.parallelize(localList)

In [None]:
listRDD

We've now created a RDD of integers ... let's peek into it

In [None]:
listRDD.take(2)


Let's now read a text file from the distributed filesystem. It contains information about people. The data structure is as follows:

- name: String
- surname: String 
- town: String 
- age: Int 
- gender: String 
- bloodType: String 
- occupation: String

Let's create an object representing our domain

In [None]:
from collections import namedtuple

Person = namedtuple("Person",["name","surname","town","age","gender","bloodType","occupation"])

#Parsing util
def personFromString(inputString):
    rawFields = map(lambda x: x.strip(),inputString.split(";"))
    return Person(rawFields[1],
           rawFields[2],
           rawFields[3],
           int(rawFields[4]),
           rawFields[0],
           rawFields[5],
           rawFields[6])


In [None]:
rawPeopleRdd = sc.textFile("/localpath/to/the/file/peopleEnriched.csv")

In [None]:
rawPeopleRdd

In [None]:
rawPeopleRdd.take(1)

In [None]:
rawPeopleRdd.count()

In [None]:
peopleRDD = rawPeopleRdd.map(lambda line: personFromString(line))
peopleRDD.cache()

In [None]:
peopleRDD.count()

In [None]:
peopleRDD.take(1)


# Distinct names and surname in the dataset

In order to complete this assignment we need to:

- _extract_ the name of **each** Person from out _peopleRDD_

- remove duplicates from the above intermediate result

- invoke a count

- repeat the above steps for the surname 











In [None]:
nameRDD = peopleRDD.map(lambda person: person.name)

In [None]:
distinctNameRDD = nameRDD.distinct()

In [None]:
distinctNameRDD.count()

In [None]:
peopleRDD.map(lambda p: p.surname).distinct().count()

# Most frequent name and surname in the dataset 

This assignment is trickier: we need to count each name in the dataset, then find the most frequent.

Let's break the problem into two parts

#### Count the frequency of each name

- We need to assign to each name a count. We start with one.
- From a peopleRDD we need a **pair rdd** name -> 1
- We then need to **reduce** those ones **by name**

In [None]:
nameOneRDD = peopleRDD.map(lambda p: (p.name,1))

In [None]:
nameOneRDD.take(2)

In [None]:
nameCountRDD = nameOneRDD.reduceByKey(lambda a,b: a+b)

In [None]:
nameCountRDD.take(2)

We now need to **find the max** in our RDD accourding to some order. 

From the documentation:

> max(key=None)
>
> Find the maximum item in this RDD.
> 
> Parameters:	key – A function used to generate key for comparing

In [None]:
nameCountRDD.max(key = lambda kv: kv[1])

In [None]:
peopleRDD.map(lambda p: (p.surname,1)).reduceByKey(lambda a,b: a+b).max(key = lambda kv: kv[1])

To retrieve the top N in the collection we have to globally order the RDD.

In [None]:
nameCountRDDordered = nameCountRDD.sortBy(lambda x: x[1], False)

In [None]:
nameCountRDDordered.take(5)

## Double check

Now let's select only the people with name "Nadia" and count the results

In [None]:
peopleRDD.filter(lambda x: x.name == "Nadia").count()



Just as we expected. Now let's check the count of the people with surname "Marino"

In [None]:
peopleRDD.filter(lambda x: x.surname == "Marino").count()

 Perfect :-)
 

## Find every age distance for people with the same occupation

- we first need to **join** together people with the same occupation
- we then need to filter out the couple people that have been joined with themselves (same name and surname)
- then we select the age distance for the occupation "Music conductor"

In [None]:
peopleByOccupationRDD = peopleRDD.keyBy(lambda person: person.occupation)

In [None]:
peopleByOccupationRDD.take(2)

In [None]:
joinedPeopleRDD = peopleByOccupationRDD.join(peopleByOccupationRDD)

In [None]:
joinedPeopleRDD.take(3)

In [None]:
noSamePersonRDD = joinedPeopleRDD.filter(lambda (occupation, (p1,p2)): p1.name != p2.name or p1.surname != p2.surname)

In [None]:
noSamePersonRDD.count()

In [None]:
ageDistanceRDD = noSamePersonRDD.mapValues(lambda (p1,p2): abs(p1.age - p2.age))

In [None]:
musicConductorRDD = ageDistanceRDD.filter(lambda (occupation, distance): occupation == "Music conductor")

In [None]:
musicConductorRDD.collect()

In [None]:
ageDistanceRDD.max(key = lambda kv: kv[1])

## Double check

Now let's select how many people are Music conductors

In [None]:
peopleRDD.filter(lambda p: p.occupation == "Mail handler").collect()

# RDD Persistance


#### let's start with a bad example

In [None]:
# Group people ages by occupation
occupationAges = peopleRDD.map( lambda p: (p.occupation,p.age)).groupByKey()

In [None]:
#Get average age of people per occupation
occupationAvgAgeRDD = occupationAges.mapValues(lambda ages: sum(ages) / float(len(ages)))

In [None]:
occupationAvgAgeRDD.take(5)

In [None]:
occupationAvgAgeRDD.max(key=lambda (occupation, avgAge): avgAge)

In [None]:
occupationAvgAgeRDD.min(key=lambda (occupation, avgAge): avgAge)

#### Good example
Every time we invoke an action on occupationAvgAgeRDD its Dag is recomputed from scratch

In [None]:
occupationAges = peopleRDD.map( lambda p: (p.occupation,p.age)).groupByKey()
occupationAvgAgeRDD = occupationAges.mapValues(lambda ages: sum(ages) / float(len(ages)))
occupationAvgAgeRDD.cache()

In [None]:
occupationAvgAgeRDD.take(5)

In [None]:
occupationAvgAgeRDD.max(key=lambda ((occupation, avgAge)): avgAge)

In [None]:
occupationAvgAgeRDD.min(key=lambda ((occupation, avgAge)): avgAge)

#### The result is not affected by the caching but performace is

### A detour on groupByKey

groupByKey causes all value of a given keys to be sent to one node. This may lead to slow task due to skewness in the data. There's a better way

In [None]:
# Group people ages by occupation
occupationAges = peopleRDD.map( lambda p: (p.occupation,(p.age,1)))
occupationAges.cache()

In [None]:
occupationAges.take(1)

In [None]:
occupationAgeReducedRDD = occupationAges.reduceByKey(lambda (age1,cnt1),(age2,cnt2): (age1+age2,cnt1+cnt2))

In [None]:
occupationAgeReducedRDD.take(1)

In [None]:
occupationAvgAgeRDD = occupationAgeReducedRDD.mapValues(lambda (age, count): age / float(count))

In [None]:
occupationAvgAgeRDD.max(key=lambda ((occupation, avgAge)): avgAge)

In [None]:
occupationAvgAgeRDD.min(key=lambda ((occupation, avgAge)): avgAge)

# Shared variables


## Accumulators

In [None]:
countAcc = sc.accumulator(0)

In [None]:
countAcc

In [None]:
def person2nameSurname(person):
    global countAcc
    countAcc += 1
    return (person.name,person.surname)

In [None]:
mappedRDD = peopleRDD.map(person2nameSurname)

In [None]:
countAcc.value

In [None]:
mappedRDD.count()

In [None]:
mappedRDD.take(5)

In [None]:
countAcc.value

That is quite unexpected .... But wait we are invoking **two actions** on mappedRDD

#### The right way

In [None]:
countAcc = sc.accumulator(0) #let's reset our accumulator

In [None]:
mappedRDD2 = peopleRDD.map(person2nameSurname).cache()

In [None]:
mappedRDD2.count()

In [None]:
countAcc.value

In [None]:
mappedRDD2.take(5)

In [None]:
countAcc.value

The map operation is carried out **once** and the result is saved into the memory. It's advisable to use accumulator only for debugging purposes

## Broadcast variables

In [None]:
ageMap = {69 : "Old", 0 : "Child", 88 : "Old", 5 : "Child", 10 : "Child", 56 : "Not so young", 42 : "Not so young", 24 : "Young", 37 : "Not so young", 25 : "Young", 52 : "Not so young", 14 : "Child", 20 : "Young", 46 : "Not so young", 93 : "Old", 57 : "Not so young", 78 : "Old", 29 : "Young", 84 : "Old", 61 : "Old", 89 : "Old", 1 : "Child", 74 : "Old", 6 : "Child", 60 : "Not so young", 85 : "Old", 28 : "Young", 38 : "Not so young", 70 : "Old", 21 : "Young", 33 : "Not so young", 92 : "Old", 65 : "Old", 97 : "Old", 9 : "Child", 53 : "Not so young", 77 : "Old", 96 : "Old", 13 : "Child", 41 : "Not so young", 73 : "Old", 2 : "Child", 32 : "Not so young", 34 : "Not so young", 45 : "Not so young", 64 : "Old", 17 : "Young", 22 : "Young", 44 : "Not so young", 59 : "Not so young", 27 : "Young", 71 : "Old", 12 : "Child", 54 : "Not so young", 49 : "Not so young", 86 : "Old", 81 : "Old", 76 : "Old", 7 : "Child", 39 : "Not so young", 98 : "Old", 91 : "Old", 66 : "Old", 3 : "Child", 80 : "Old", 35 : "Not so young", 48 : "Not so young", 63 : "Old", 18 : "Young", 95 : "Old", 50 : "Not so young", 67 : "Old", 16 : "Young", 31 : "Not so young", 11 : "Child", 72 : "Old", 43 : "Not so young", 99 : "Old", 87 : "Old", 40 : "Not so young", 26 : "Young", 55 : "Not so young", 23 : "Young", 8 : "Child", 75 : "Old", 58 : "Not so young", 82 : "Old", 36 : "Not so young", 30 : "Young", 51 : "Not so young", 19 : "Young", 4 : "Child", 79 : "Old", 94 : "Old", 47 : "Not so young", 15 : "Young", 68 : "Old", 62 : "Old", 90 : "Old", 83 : "Old", 100 : "Old"}
ageMapBC = sc.broadcast(ageMap)

In [None]:
def age2category(person):
    global ageMapBC
    
    if person.age in ageMapBC.value:
        return ageMapBC.value[person.age]
    else:
        r"N/A"

In [None]:
ageCatRDD = peopleRDD.map(age2category)

In [None]:
ageCatRDD.collect()

In [None]:
countByCategory = ageCatRDD.map(lambda x: (x,'')).countByKey()
countByCategory

# Partitioning

In [None]:
peopleByName = peopleRDD.keyBy(lambda p: p.name).partitionBy(100)


In [None]:
peopleByName.partitioner

In [None]:
peopleNameSurname = peopleByName.map(lambda (name, p): (name,p.surname))

In [None]:
peopleNameSurname.partitioner

Partitioner information has been **lost**

In [None]:
peopleNameSurname2 = peopleByName.mapValues(lambda p: p.surname)

In [None]:
peopleNameSurname2.partitioner