This notebook is part of the material from this MOOC: https://www.edx.org/course/introduction-apache-spark-uc-berkeleyx-cs105x

Last update: 13/02/2019 (updated to Python 3)

Created by: Vanessa Gömez Verdejo (vanessa@tsc.uc3m.es)

# **Introduction to Spark and RDDs**

In this notebook let's review the main operations with RDDs (Resilient Distributed Datasets). As you know, there are 2 types of operations on RDDs:
- *Transformation*: operation that manipulates the RDD to produce another RDD. It is run over each worker. They are lazy: nothing happens until an action is executed. Examples: map, filter, join, …
- *Action*: operation that produces a result in the driver. Examples: count, collect, reduce, …

In the website http://nbviewer.jupyter.org/github/jkthompson/pyspark-pictures/blob/master/pyspark-pictures.ipynb you can find a summary of the main RDD operations.

Before starting to work, you may need to install the PyPi Package "spark_mooc_meta" which includes some usefull utilities:
* Go to Workspace --> Create --> Library
* Change source to "Upload Python Egg or PyPi"
* Type "spark_mooc_meta" into the PyPi Package field and click Install Library. 
This should add the library to your cluster.

### ** 1.- Creating my first RDD **

We will create a simple RDD with 4 data partitions and apply some basic operations over it...

In [5]:
# Creating an RRD from a python list with sc.parallelize
fruits = ['apple', 'orange', 'banana', 'grape', 'watermelon', 'apple', 'orange', 'apple']
number_partitions = 4
dataRDD = sc.parallelize(fruits, number_partitions)
print(type(dataRDD))

####  ** Exercise:** 

Apply the corresponding operation to:
- obtain the total number of elements in the RDD (count)
- get the first two elements in the RDD (take)
- get the first two alphabetically sorted elements in the RDD (takeOrdered)

In [7]:
#Complete the #FILL IN# gaps

N_data = dataRDD.count()
print("There are %d elements in the RDD\n" % N_data)

First_elements = dataRDD.take(num=2)
print("These are the first two:")
print(First_elements)

First_sorted_elements = dataRDD.takeOrdered(num=2)
print("\nThese are the first two, alphabetically ordered:")
print(First_sorted_elements)

**_The answer should be_:**
<pre><code>
There are 8 elements in the RDD

These are the first two:
['apple', 'orange']

These are the first two, alphabetically ordered:
['apple', 'apple']
</code></pre>

### ** 2.- Simple transformations** 

#### ** Exercise:** 
Complete the code of the function 'complete_word' so that it adds the word 'fruit' to the input word. Use this function to process all elements in the RDD using a map() transformation. Print all of the elements in the resulting RDD using collect().

In [10]:
#Complete the #FILL IN# gaps

def complete_word(word):
    return (word+"fruit")

print("Testing the function:")
print(complete_word('apple'))

dataRDDprocessed = dataRDD.map(complete_word)

print("\nResult of collect:")
print(dataRDDprocessed.collect())

**_The answer should be_:**
<pre><code>
Testing the function:
apple fruit

Result of collect:
['apple fruit', 'orange fruit', 'banana fruit', 'grape fruit', 'watermelon fruit', 'apple fruit', 'orange fruit', 'apple fruit']
</code></pre>

Python supports the use of small one-line anonymous functions, called **lambda functions**, that are not bound to a name at runtime.

`lambda` functions can be used wherever function objects are required. They are syntactically restricted to a single expression. Remember that `lambda` functions are a matter of style and using them is never required - semantically, they are just syntactic sugar for a normal function definition. You can always define a separate normal function instead, but using a `lambda` function is an equivalent and more compact form of coding. Ideally you should consider using `lambda` functions where you want to encapsulate non-reusable code without littering your code with one-line functions.

For example, we can create an RDD and apply an operation over their elements with a lambda function:
<pre><code>
x = sc.parallelize([1,2,3]) #  creating an RDD from the passed object
y = x.map(lambda x: (x,x**2)) # apply a lambda function over each element
print(x.collect())  # collect original RDD elements to a list on the driver
print(y.collect())  # collect transformed RDD elements to a list on the driver
[1, 2, 3]
[(1, 1), (2, 4), (3, 9)]
</code></pre>

So, let's try to repeat the above operation (adding the word 'fruit' to an input word) using a lambda function.

In [13]:
# Complete the #FILL IN# gaps
dataRDDprocessed_lambda = dataRDD.map(lambda x: (x+" fruit"))

print("Result with a lambda function:")
print(dataRDDprocessed_lambda.collect())

**_The answer should be_:**
<pre><code>
Result with a lambda function:
['apple fruit', 'orange fruit', 'banana fruit', 'grape fruit', 'watermelon fruit', 'apple fruit', 'orange fruit', 'apple fruit']
</code></pre>

Now let's count the number of characters of every processed word.

In [16]:
# Complete the #FILL IN# gaps
wordLengths = dataRDDprocessed_lambda.map(lambda x: len(x))
wordLengths = wordLengths.collect()
print("Length of each word:")
print(wordLengths)

**_The answer should be_:**
<pre><code>
Length of each word:
[11, 12, 12, 11, 16, 11, 12, 11]
</code></pre>

Examine the next code where we obtain a single string with all the words in the original RDD using two different approaches...

In [19]:
# Approach 1: Collect all the elements of the RDD and join them with a blank space
string1 = " ".join(dataRDD.collect())
print(type(string1))
print(string1)

# Approach 2: Join distribuitelly the RDD elements with a blank space and print the final result
string2 = dataRDD.reduce(lambda x, y: x + " " + y)
print(type(string2))
print(string2)

**Discussion:** In a real application, with a very large amount of data,...
* which approach could be applied?

Second one
* could you compute the collect() of a very large RDD?

Nope

#### ** Exercise:** 
Working with distributed operations, propose an alternative code to obtain the total number of characters in the RDD:

In [22]:
# Complete the #FILL IN# gaps

# Computing the total number of characters in the driver
Nchars = sum(dataRDD.map(len).collect())
print(Nchars)

# Computing the total number of characters in a distributed way
Nchars = dataRDD.map(lambda x: len(x)).reduce(lambda x,y:x+y)
print(Nchars)

**_The answer should be_:**
<pre><code>
48
48
</code></pre>

### ** 3.- Counting words**

Now, let's count the number of times that each word (in our example, each fruit) is into a document (in our example, is into the list and/or RDD of different fruits). For this prupose, complete the following exercises:

** Exercise:** Transform each element of the original RDD into a tuple `(k,v)` where `k`, the key, is the fruit number, and `v`, the value is fixed to 1 for the elements.

In [26]:
#Complete the #FILL IN# gaps
pairRDD = dataRDD.flatMap(lambda x:x.split(" ")).map(lambda word:(word,1))#.reduceByKey(lambda a,b:a+b)#FILL IN# 
print(pairRDD.collect())

**_The answer should be_:**
<pre><code>
[('apple', 1), ('orange', 1), ('banana', 1), ('grape', 1), ('watermelon', 1), ('apple', 1), ('orange', 1), ('apple', 1)]
</code></pre>

** Exercise:** Use the reduceByKey operator to count the number of times that each word is the original list. Note that reduceByKey apply a given reduce function over the elements with the same key.

In [29]:
#Complete the #FILL IN# gaps
print("Result: (key, count):")
countingWords = pairRDD.reduceByKey(lambda a,b:a+b)
print(countingWords.collect())

**_The answer should be_:**
<pre><code>
Result: (key, count):
[('orange', 2), ('watermelon', 1), ('grape', 1), ('apple', 3), ('banana', 1)]
</code></pre>

### **4.- Filtering a RDD**

** Exercise:** Now, complete the following code to count the number of words that only appear once in the dataset. You may need the filter() operator.

In [32]:
#Complete the #FILL IN# gaps
N_unique_words = countingWords.filter(lambda a:a[1]==1).count()
print(N_unique_words)

**_The answer should be_:**
<pre><code>
3
</code></pre>

### ** 5.- Counting words in a file **

Now let's work with the data file. [Complete Works of William Shakespeare](http://www.gutenberg.org/ebooks/100) from [Project Gutenberg](http://www.gutenberg.org/wiki/Main_Page). To be able to work with it, we have to upload the data file to Databricks platform:
* Go to Data (on left bar menu) and press over the sign '+' of Tables (new table)
* Select:
  - Data source: 'Upload File' (default option) 
  - Upload to DBFS: don't do anything (we will use a default path)
  - File: drop or select the file 'shakespeare.txt' 
  
Then, you will see "File uploaded to /FileStore/tables/shakespeare.txt". This indicates that the file has been sucessfully uploaded and it also provides you the path file ("/FileStore/tables/shakespeare.txt"). 

Now, go back to the notebook and run the following cell to create an RRD with the content of the text file. Note that each line of the file will become an element of your RDD.

In [35]:
#textRDD = sc.textFile('/FileStore/tables/100_0-67cbc.txt', 8)
textRDD = sc.textFile('/FileStore/tables/shakespeare.txt', 8)

print("Number of lines of text = %d \n" % textRDD.count())

print("First 10 lines of the file:")
print(textRDD.take(10))

** Exercise:** Use the code written in the previous section ("3.- Counting words") to obtain the counts for every word in the text. Print the first 10 results. Observe the result, is this what we want? What is going wrong?

In [37]:
#Complete the #FILL IN# gaps
countingWords = textRDD.flatMap(lambda x:x.split(" "))
#print(countingWords.take(10))
countingWords = countingWords.map(lambda x:(x,1))
#print(countingWords.take(10))
countingWords = countingWords.reduceByKey(lambda a,b:a+b)
print(countingWords.take(10))
#.flatMap(lamba x:(x,1))
#countingWords=countingWords.filter(lambda x:x[1]==111)
print(countingWords.take(10))

**_The answer should be_:**
<pre><code>
[(u'', 9493), (u'    thou diest in thine unthankfulness, and thine ignorance makes', 1), (u"    Which I shall send you written, be assur'd", 1), (u'    I do beseech you, take it not amiss:', 1), (u'    their mastiffs are of unmatchable courage.', 1), (u'    With us in Venice, if it be denied,', 1), (u"  Hot. I'll have it so. A little charge will do it.", 1), (u'     By what yourself, too, late have spoke and done,', 1), (u"  FIRST LORD. He's but a mad lord, and nought but humours sways him.", 1), (u'    none will entertain it.', 1)]
</code></pre>

** Exercise:** Modify the above code by introducing a operation which divides each line into their words and makes each word become an element of the RDD.

Note: you may need the python funtion "split" (this is a method of Python belonging to string objects) and the "flatMap" operation of Spark.

In [40]:
#Complete the #FILL IN# gaps
countingWords = #FILL IN#
              
print(countingWords.take(10))

**_The answer should be_:**
<pre><code>
[(u'fawn', 11), (u'bishops.', 2), (u'divinely', 1), (u'mustachio', 1), (u'four', 114), (u'reproach-', 1), (u'drollery.', 1), (u'conjuring', 1), (u'slew.', 1), (u'Calen', 1)]
</code></pre>

** Exercise:** Modify the code to obtain 5 words that appear exactly 111 times in the text.

In [43]:
#Complete the #FILL IN# gaps
countingWords = countingWords.filter(lambda a: a[1]==111)
           
print(countingWords.take(10))

**_The answer should be_:**
<pre><code>
[(u'think,', 111), (u'see,', 111), (u'gone.', 111), (u"King's", 111), (u'having', 111)]
</code></pre>

** Exercise:** Modify the code to obtain  the 5 most frequently used words in the text.

In [46]:
#Complete the #FILL IN# gaps
counting = countingWords.top(5,key=lambda x:x[1])

print(counting)

d **_The answer should be_:**
<pre><code>
[(u'the', 23197), (u'I', 19540), (u'and', 18263), (u'to', 15592), (u'of', 15507)]
</code></pre>

### ** 6.- Cleaning the text **

You may see in the results that we observe some words in capital letters, that some other punctuation characters appear as well. We will incorporate in the code a cleaning function such that we can eliminate unwanted characters. By now, we provide you a simple cleaning function that lowers all the characters. 

** Exercise:** Use the clean_text function in the code of the above cell (which obtained the 5 words that most appear in the text) and verify, for example, that the word "I" is printed as "i".

**Note:** Since we are modifying the strings, the counts will differ with respect to the previous values.

In [49]:
def clean_text(string):
    string = string.lower()
    return string    

In [50]:
#Complete the #FILL IN# gaps
counting = #FILL IN#

print(counting)

**_The answer should be_:**
<pre><code>
[(u'the', 27267), (u'and', 25340), (u'i', 19540), (u'to', 18656), (u'of', 17301)]
</code></pre>

We will now search for non-alphabetical characters in the dataset. We can use the Python method 'isalpha' to decide whether or not a string is composed of characters a-z. 

** Exercise:** Use that function to print the 20 words with non-alphabetic characters that most appear in the text and print the total number of strings with non-alphabetic characters.

In [53]:
#Complete the #FILL IN# gaps
counting = #FILL IN#
              
print(counting)

**_The answer should be_:**
<pre><code>
The database has 40957 words that need cleaning, for example:

[(u"i'll", 1737), (u'you,', 1478), (u"'tis", 1367), (u'sir,', 1235), (u'me,', 1219), (u"th'", 1146), (u'o,', 1008), (u'lord,', 977), (u'come,', 875), (u'me.', 823), (u'you.', 813), (u'why,', 805), (u'now,', 785), (u'it.', 784), (u'him.', 755), (u'lord.', 702), (u'him,', 698), (u'ay,', 661), (u'well,', 647), (u'and,', 647)]
</code></pre>

You can clearly observe now all the punctuation symbols that have not been removed yet. 

** Exercise:** Write a new_clean_function such that all the unwanted symbols have been removed. As a hint, we provide you the code for removing the symbol '.'.

In [56]:
def new_clean_text(string):
    # Modifiy this code to remove all punctuation symbols (['.', ',', ';', "'", '"', '[', ']', '?', '-', '!', ':', '&', '(', ')','<','_','|', '}', '`'])
    string = string.lower()
    string = string.replace('.','')
    return string    

countsRDD = (textRDD
              .flatMap(lambda x: x.split())
              .map(new_clean_text)
              .filter(lambda x: not x.isnumeric())
              .filter(lambda x: len(x)>0)  
              .filter(lambda x: not x.isalnum())                     
              .map(lambda x: (x, 1))
              .reduceByKey(lambda x, y: x + y)
              )
countsRDD.cache()

Npreprocess = countsRDD.count()
print("The database has %d elements that need preprocessing, for example:" % Npreprocess)
print(countsRDD.takeOrdered(20,key = lambda x: -x[1]))

**_The answer should be_:**
<pre><code>
The database has 0 elements that need preprocessing, for example:
[]
</code></pre>

** Exercise:** Now that we have completely cleaned the words, try to find the 20 most frequent cleaned strings.

In [59]:
#Complete the #FILL IN# gaps
print("Processing the dataset to find the 20 most frequent strings:\n")
counting = #FILL IN#
        
print(counts)



**_The answer should be_:**
<pre><code>
Processing the dataset to find the 20 most frequent strings:

[(u'the', 27361), (u'and', 26028), (u'i', 20681), (u'to', 19150), (u'of', 17463), (u'a', 14593), (u'you', 13615), (u'my', 12481), (u'in', 10956), (u'that', 10890), (u'is', 9134), (u'not', 8497), (u'with', 7771), (u'me', 7769), (u'it', 7678), (u'for', 7558), (u'be', 6857), (u'his', 6857), (u'your', 6655), (u'this', 6602)]
</code></pre>

### ** 7.- Removing stopwords**

Many of the most frequent words obtained in the previous section are irrelevant to many tasks, they are know as stop-words. We will use here a stop list (list of meaningless words) to clean out those terms. 

Upload the file "stopwords.txt" to Databricks and use the next code to load it. Note that we are now loading the file as a standard python variable (instead of a RDD), so we are using the standard open and read file operations and the path used to load the file is "/dbfs/FileStore/tables/stopwords.txt" (if we wanted to load data with spark methods, the path would be: "/FileStore/tables/stopwords.txt")

In [62]:
with open('/dbfs/FileStore/tables/stopwords.txt','r') as file:
    stopwords = file.read()
    stopwords=stopwords.split()
    
print(stopwords[:10])

** Exercise:** Apply an extra filter that removes the stop words from the calculations.  Print the 50 most frequent words **ONLY THE WORDS separated with blank spaces**.

In [64]:
#Complete the #FILL IN# gaps
counts = #FILL IN#

print(counts)


**_The answer should be_:**
<pre><code>
These are the most frequent words:

[(u'thou', 5485), (u'thy', 4032), (u'shall', 3591), (u'thee', 3178), (u'lord', 3059), (u'king', 2861), (u'good', 2812), (u'sir', 2754), (u'o', 2607), (u'come', 2507), (u'well', 2462), (u'would', 2293), (u'let', 2099), (u'enter', 2098), (u'love', 2053), (u'ill', 1972), (u'hath', 1941), (u'man', 1835), (u'one', 1779), (u'go', 1733), (u'upon', 1731), (u'like', 1701), (u'say', 1680), (u'know', 1647), (u'may', 1633), (u'make', 1629), (u'us', 1621), (u'yet', 1569), (u'must', 1491), (u'see', 1438), (u'tis', 1405), (u'give', 1327), (u'take', 1197), (u'speak', 1165), (u'mine', 1161), (u'first', 1159), (u'th', 1148), (u'duke', 1071), (u'tell', 1059), (u'time', 1046), (u'exeunt', 1035), (u'much', 1035), (u'think', 1024), (u'never', 1011), (u'heart', 985), (u'exit', 984), (u'queen', 943), (u'doth', 939), (u'art', 915), (u'great', 899)]

</code></pre>