## Working with RDDs (ch 3)

An RDD is simply an immutable, distributed collection of objects. 

Users create RDDs in two ways: by loading an external dataset, or by distributing a collection of objects (e.g., a list or set) in their driver program. 

first example (3.1): load a text file using the SparkContext sc

In [1]:
lines = sc.textFile("s3n://csc8101-spark-example-data/ricottaCake.txt") 
lines.count()

NameError: name 'sc' is not defined

In [3]:
lines.first()

'Ricotta cake algorithm (recipe, really)'

The Spark context is a built-in variable, already bound to the Context. When using Java (or Scala), you need to do the binding yourself

In [2]:
sc

''

passing a (lambda) function to a worker

In [10]:
pythonLines = lines.filter(lambda line: "dough" in line)

Once created, RDDs offer two types of operations: transformations and actions. Transformations construct a new RDD from a previous one. For example, one com‐ mon transformation is filtering data that matches a predicate.

Actions, on the other hand, compute a result based on an RDD, and either return it to the driver program or save it to an external storage system (e.g., HDFS). One example of an action we called earlier is first():

In [15]:
pythonLines.first()

"Cooking time = rest time for dough + baking time = 1hr + 40'"

In [16]:
pythonLines.count()

5

**Lazy evaluation:** 

Spark computes them only in a lazy fashion—that is, the first time they are used in an action.

If Spark were to load and store all the lines in the file as soon as we wrote lines = sc.textFile(...), it would waste a lot of storage space, given that we then immediately filter out many lines. Instead, once Spark sees the whole chain of transformations, it can compute just the data needed for its result. In fact, for the first() action, Spark scans the file only until it finds the first matching line; it doesn’t even read the whole file.

**Persistence:**
    
    If you would like to reuse an RDD in multiple actions, you can ask Spark to persist it using RDD.persist().

In [17]:
pythonLines.persist()

PythonRDD[11] at RDD at PythonRDD.scala:43

### Creating RDDs and RDD Operations: Actions and Transformations

The simplest way to create RDDs is to take an existing collection in your program and pass it to SparkContext’s parallelize() method

Ex 3.5:

In [19]:
lines = sc.parallelize(["pmandas", "i like pandas"])

Ex - create a new RDD by loading a dataset

In [2]:
    inputRDD = sc.textFile("s3n://csc8101-spark-example-data/sample_movielens_movies.txt")

**RDD Operations**

RDDs support two types of operations: transformations and actions. *Transformations* are operations on RDDs that return a new RDD, such as map() and filter(). *Actions* are operations that return a result to the driver pro‐ gram or write it to storage, and kick off a computation, such as count() and first(). Spark treats transformations and actions very differently, so understanding which type of operation you are performing will be important.

Examples of transformations:

In [3]:
thrillerRDD = inputRDD.filter(lambda x: "Thriller" in x)

In [4]:
comedyRDD = inputRDD.filter(lambda x: "Comedy" in x)

In [5]:
whatILikeRDD = thrillerRDD.union(comedyRDD)

*Actions* - printing out the first 10 lines of each movie category

In [6]:
thrillerRDD.take(10)

['2::Movie 2::Romance|Thriller',
 '9::Movie 9::Anime|Thriller',
 '13::Movie 13::Thriller|Action',
 '15::Movie 15::Comedy|Thriller',
 '17::Movie 17::Thriller|Action',
 '21::Movie 21::Romance|Thriller',
 '28::Movie 28::Thriller|Anime',
 '30::Movie 30::Thriller|Romance',
 '31::Movie 31::Thriller|Romance',
 '35::Movie 35::Action|Thriller']

In [8]:
comedyRDD.take(10)

['0::Movie 0::Romance|Comedy',
 '4::Movie 4::Anime|Comedy',
 '6::Movie 6::Action|Comedy',
 '7::Movie 7::Anime|Comedy',
 '8::Movie 8::Comedy|Action',
 '12::Movie 12::Anime|Comedy',
 '14::Movie 14::Anime|Comedy',
 '15::Movie 15::Comedy|Thriller',
 '18::Movie 18::Action|Comedy',
 '23::Movie 23::Comedy|Comedy']

In [9]:
whatILikeRDD.take(20)

['2::Movie 2::Romance|Thriller',
 '9::Movie 9::Anime|Thriller',
 '13::Movie 13::Thriller|Action',
 '15::Movie 15::Comedy|Thriller',
 '17::Movie 17::Thriller|Action',
 '21::Movie 21::Romance|Thriller',
 '28::Movie 28::Thriller|Anime',
 '30::Movie 30::Thriller|Romance',
 '31::Movie 31::Thriller|Romance',
 '35::Movie 35::Action|Thriller',
 '38::Movie 38::Thriller|Romance',
 '41::Movie 41::Comedy|Thriller',
 '43::Movie 43::Thriller|Anime',
 '52::Movie 52::Thriller|Action',
 '54::Movie 54::Romance|Thriller',
 '58::Movie 58::Thriller|Romance',
 '59::Movie 59::Thriller|Comedy',
 '63::Movie 63::Romance|Thriller',
 '67::Movie 67::Thriller|Anime',
 '68::Movie 68::Thriller|Romance']

note: RDDs also have a collect() function to retrieve the entire RDD.

Action: count()

In [10]:
whatILikeRDD.count()

73

**Passing functions to Spark** (ch 3 pg. 30)

In [18]:
def isThriller(s): 
    return "Thriller" in s
thrillers = whatILikeRDD.filter(isThriller)
for t in thrillers.collect():
    print (t)

2::Movie 2::Romance|Thriller
9::Movie 9::Anime|Thriller
13::Movie 13::Thriller|Action
15::Movie 15::Comedy|Thriller
17::Movie 17::Thriller|Action
21::Movie 21::Romance|Thriller
28::Movie 28::Thriller|Anime
30::Movie 30::Thriller|Romance
31::Movie 31::Thriller|Romance
35::Movie 35::Action|Thriller
38::Movie 38::Thriller|Romance
41::Movie 41::Comedy|Thriller
43::Movie 43::Thriller|Anime
52::Movie 52::Thriller|Action
54::Movie 54::Romance|Thriller
58::Movie 58::Thriller|Romance
59::Movie 59::Thriller|Comedy
63::Movie 63::Romance|Thriller
67::Movie 67::Thriller|Anime
68::Movie 68::Thriller|Romance
70::Movie 70::Thriller|Thriller
72::Movie 72::Thriller|Romance
79::Movie 79::Thriller|Thriller
83::Movie 83::Comedy|Thriller
85::Movie 85::Thriller|Anime
87::Movie 87::Thriller|Thriller
88::Movie 88::Romance|Thriller
91::Movie 91::Anime|Thriller
93::Movie 93::Romance|Thriller
94::Movie 94::Thriller|Comedy
96::Movie 96::Thriller|Romance
97::Movie 97::Thriller|Thriller
98::Movie 98::Thriller|Comedy

**Common Transformations and Actions**

Element-wise transformations: **map**  (pg 34)

In [25]:
nums = sc.parallelize([1, 2, 3, 4])
squared = nums.map(lambda x: x * x).collect()
for num in squared:
    print('{i}'.format(i=num))

1
4
9
16


**flatMap** and map -- this is ex. 3.29

In [29]:
lines = sc.parallelize(["the quick red fox", "advanced in the woods"])
words = lines.flatMap(lambda line: line.split(" "))
lists = lines.map(lambda line: line.split(" "))
for w in words.collect():
    print(w)
lists.collect()

the
quick
red
fox
advanced
in
the
woods


[['the', 'quick', 'red', 'fox'], ['advanced', 'in', 'the', 'woods']]

Pseudo set operations: **union**

In [38]:
unionSet = thrillerRDD.union(comedyRDD)
unionSet.collect()

['2::Movie 2::Romance|Thriller',
 '9::Movie 9::Anime|Thriller',
 '13::Movie 13::Thriller|Action',
 '15::Movie 15::Comedy|Thriller',
 '17::Movie 17::Thriller|Action',
 '21::Movie 21::Romance|Thriller',
 '28::Movie 28::Thriller|Anime',
 '30::Movie 30::Thriller|Romance',
 '31::Movie 31::Thriller|Romance',
 '35::Movie 35::Action|Thriller',
 '38::Movie 38::Thriller|Romance',
 '41::Movie 41::Comedy|Thriller',
 '43::Movie 43::Thriller|Anime',
 '52::Movie 52::Thriller|Action',
 '54::Movie 54::Romance|Thriller',
 '58::Movie 58::Thriller|Romance',
 '59::Movie 59::Thriller|Comedy',
 '63::Movie 63::Romance|Thriller',
 '67::Movie 67::Thriller|Anime',
 '68::Movie 68::Thriller|Romance',
 '70::Movie 70::Thriller|Thriller',
 '72::Movie 72::Thriller|Romance',
 '79::Movie 79::Thriller|Thriller',
 '83::Movie 83::Comedy|Thriller',
 '85::Movie 85::Thriller|Anime',
 '87::Movie 87::Thriller|Thriller',
 '88::Movie 88::Romance|Thriller',
 '91::Movie 91::Anime|Thriller',
 '93::Movie 93::Romance|Thriller',
 '94::

**distinct()**: extract genres from movie lines, then select distinct

In [57]:
comedyGenres = comedyRDD.map(lambda line: line.split("::")).map(lambda l: l[2])
comedyGenres.distinct().collect()

['Comedy|Action',
 'Thriller|Comedy',
 'Anime|Comedy',
 'Romance|Comedy',
 'Action|Comedy',
 'Comedy|Comedy',
 'Comedy|Anime',
 'Comedy|Thriller',
 'Comedy|Romance']

In [58]:
thrillerGenres = thrillerRDD.map(lambda line: line.split("::")).map(lambda l: l[2])
thrillerGenres.distinct().collect()

['Thriller|Thriller',
 'Thriller|Comedy',
 'Anime|Thriller',
 'Romance|Thriller',
 'Thriller|Action',
 'Thriller|Anime',
 'Action|Thriller',
 'Comedy|Thriller',
 'Thriller|Romance']

**intersection**

In [60]:
thrillerGenres.intersection(comedyGenres).collect()

['Comedy|Thriller', 'Thriller|Comedy']

**set difference**

In [62]:
thrillerGenres.subtract(comedyGenres).distinct().collect()

['Anime|Thriller',
 'Thriller|Anime',
 'Thriller|Romance',
 'Thriller|Action',
 'Thriller|Thriller',
 'Romance|Thriller',
 'Action|Thriller']

**cartesian**

In [63]:
thrillerGenres.cartesian(comedyGenres).count()

1326

Refer to Table 3-2 for a summary of Basic RDD transformations on an RDD

Computing averages using map and reduce

In [73]:
nums = sc.parallelize([2,3,4,5])
pairs = nums.map(lambda x: (x,1))
(sums, count) = pairs.reduce(lambda x,y: (x[0]+y[0],x[1]+y[1]) )
print(sums/float(count))

3.5


Computing average using the **aggregate()** function

In [74]:
sumCount = nums.aggregate((0, 0),
                          (lambda acc, value: (acc[0] + value, acc[1] + 1)),
                          (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
sumCount[0] / float(sumCount[1])

3.5