<center><img src='./figs/cs-logo.png' width=200></center>



<h6><center>Big Data: Informatique pour les données et calculs massifs</center></h6>

<h1>
<hr style=" border:none; height:3px;">
<center>Lab Assignment 3: Introduction to Spark</center>
<hr style=" border:none; height:3px;">
</h1>

# 1. Introduction (PLEASE READ ME)


<p align="justify">
<font size="3">
In this lab assignment you'll learn basic Spark programming skills that are necessary to develop simple, yet powerful, applications to be executed in a distributed environment.
</font>
</p>

<p align="justify">
<font size="3">
The assignment is presented in this __Jupyter Notebook__, an interface that offers support for text, code, images and other media. Essentially, a Jupyter Notebook consists of multiple _cells_, either containing some text, like the one that you are reading, or code that you can execute. 
</font>
</p>

<p align="justify">
<font size="3">
The cells that contain code are marked with the label "In [  ]" that is found on the left of the cell. In order to execute the code in the cell, you need to select the cell and press Shift+Enter (hold Shift while pressing Enter). During the execution of the code, the label on the left of the cell will change to "In [ * ]"; when the execution is over, the asterisk is replaced by a number. IMPORTANT: wait for the execution of the cell to be over before proceeding in the Notebook. 
Whenever you define a variable or a function in a cell, that variable and function will be visible in the cells below. 
This means that one can split the code of an application across different cells to interleave it with textual explanations.
    </font>
</p>

<p align="justify">
<font size="3">
Spark supports four programming languages: Scala (the one used to implement Spark itself), Java, R and Python. In this assignment we use Python.
The assignment will guide you through the Spark programming notions by using simple examples. 
After the examples, the exercises will give you the chance to practice those notions.
</font>
</p>


<p align="justify">
<font size="3">
**Apache Spark** is a cluster computing framework for parallel data processing that was conceived to address the inefficiencies of Hadoop with respect to iterative computations. 
Spark is used by both data scientists, who analyze large datasets, and engineers, who develop data processing applications. Spark allows both to concentrate on their application by hiding all the complexity of running applications in a distributed environment: distributed systems programming, network communication and fault tolerance.
</font>
</p>

<p align="justify">
<font size="3">
In order to run a distributed computation on Spark, one has to develop a **Spark application**.
A Spark application runs as a set of independent processes (called the _Executors_) across the machines (a.k.a., _Worker_ nodes) of a cluster, coordinated by the _Driver_, the process that runs the $main()$ function of the application.
</font>
</p>

<p align="justify">
<font size="3">
The _Driver_ creates an object called _SparkContext_ that communicates with the underlying cluster manager and coordinates the distribution of the computation across the _Executors_.
For example, if we were running an application to count the number of lines in a file, 
different machines might count lines in different ranges of the file. 
</font>
</p>

<img src="./figs/spark-execution.png" width=400>

<p align="justify">
<hr style=" border:none; height:2px;">
 <font  size="3" color='#91053d'>**Execute the following cell in order to initialize the _SparkContext_.**</font>
<hr style=" border:none; height:2px;">
</p>

In [None]:
import findspark
findspark.init()

import pyspark
import random
sc = pyspark.SparkContext(appName="td3")
print("Initialization successful")

# 2. Resilient Distributed Dataset (RDD)


<p align="justify">
<font size="3">
Spark distributes the data and the computations across the machines of a cluster by using the notion of **Resilient Distributed Dataset (RDD)**. 
An RDD is an immutable distributed collection of data. 
Each element of an RDD can be an instance of any Python type, including a user-defined class.
The _SparkContext_  splits an RDD into multiple _partitions_ and scatters them across different machines of the cluster. 
</font>
</p>

<p align="justify">
<font size="3">
The distribution of the partitions of an RDD is completely transparent to the application.
The only thing a Spark application has to do is to create some RDDs and 
specify the computations on these RDDs, by using special functions that Spark provides to this purpose. 
</font>
</p>

## 2.1 Creating RDDs

<p align="justify">
<font size="3">
Spark provides two ways to create an RDD:

<ul>
<li> By distributing a collection of objects.
<li> By loading an external dataset (either in a file or a database).
</ul>
</font>
</p>

<p>
<hr style=" border:none; height:2px;">
 <font size="3" color='#91053d'>**Execute the following code to create an RDD called $words$, where each element is a string taken from the list $wordList$.**</font>
<hr style=" border:none; height:2px;">
</p>

In [None]:
wordList = ["Al", "Ani", "Jackie", "Lalitha", "Mark", "Neil", "Nick", "Shirin"]
words = sc.parallelize(wordList)
print("Done!")

<p align="justify">
<font size="3">
Once created, Spark provides two types of  operations on a RDD:
<ol>
<li> **Transformations**. A transformation takes in one or more RDDs and returns a new RDD.
<li> **Actions**. An action takes in an RDD and outputs a value.
</ol>
</font>
</p>

<p>
</p>

<p align="justify">
<font size="3">
One common transformation is filtering data that matches a predicate by using the function $filter$.
The function $filter$ is applied on an RDD and takes in a predicate.
It loops through each element of the RDD and verifies whether that element satisfies the predicate. The function $filter$ outputs a new RDD whose elements are those that satisfy the predicate.
</font>
</p>

<p>
<hr style=" border:none; height:2px;">
 <font size="3" color='#91053d'>**Execute the following code to create an RDD called $nNames$ by retaining only the names whose first letter is 'N' from the RDD $words$ .**</font>
<hr style=" border:none; height:2px;">
</p>

In [None]:
'''
As mentioned before, the function filter takes in a predicate that is itself a function (returning a boolean value).
An elegant way to pass a function to a function in Python is the lambda notation.

In the code below, the argument of the function filter is a function that takes in a variable called 
"name"; the type of this variable must match the type of the elements of the RDD words, in this case 
a string.
Then the function returns whether the first character of the string "name" is N. 

Another way to express the same thing without the lambda notation would be to explicitly define the predicate, 
as follows:

def predicate(name):
    return name[:1]=='N'

nNames = words.filter(predicate)

'''
nNames = words.filter(lambda name: name[:1]=='N') 
print("done!")

<p align="justify">
<font size="3">
The transformation $filter$ above is not executed by Spark until an action is called on the RDD $nNames$.
In general, Spark postpones the execution of a transformation on an RDD to when an action is invoked on the RDD itself. This is called _lazy evaluation_. The reason for this approach will be clearer in the next example.
    </font>
</p>

<p align="justify">
<font size="3">
One example of action is the function $first()$ that returns the first element in the RDD.
    </font>
</p>

<p>
<hr style=" border:none; height:2px;">
 <font size="3" color='#91053d'>**Execute the following code to print the first element of the RDD $nNames$.**</font>
<hr style=" border:none; height:2px;">
</p>

In [None]:
'''
REMEMBER: the variable nNames has been defined in the cell above, so 
it is VISIBLE in this cell as well as in the cells below the current one
'''
print(nNames.first())

<p align="justify">
<font size="3">
The previous example clearly shows that a Spark application is just a sequence of operations that create, transform and perform some actions on RDDs.
</font>
</p>

<p align="justify">
<font size="3">
The following code shows an example of creation of RDD from an external dataset, more precisely a text file that contains log information of a Neo4j database. 
The function $textFile()$ takes in the path to the input text file and returns an RDD, where each element is a line of the file.
The code goes through the following steps:

<ol>
<li> **RDD creation**. Creates an RDD called $lines$, where each element is a line from the input text file.
<li> **RDD filter**. Creates an RDD called $exceptions$ from the RDD $lines$ by only retaining the lines containing the string "exception".
<li> **Action count()**. Counts the number of elements of the RDD $exceptions$. 
<li> **Print first line**. Prints the first line of the RDD $exceptions$. 
</ol>
</font>
</p>

<p>
</p>

<p align="justify">
<font size="3">
Here we can see the advantage of the _lazy evaluation_ of transformations. 
Spark does not execute the function $textFile()$ as soon as it is invoked, which would result
in loading into main memory the whole content of the log file (it can be very large). 
Instead, it waits until the first action $count()$ is invoked.
Since this action is called on a filtered version of the RDD $lines$, 
Spark will load into memory only the lines that contain the word "exception".
</font>
</p>

<p>
<hr style=" border:none; height:2px;">
 <font size="3" color='#91053d'>**Execute the following code.**</font>
 <hr style=" border:none; height:2px;">
</p>

In [None]:
data_file = "./data/neo4j.log"

# 1. RDD creation
lines = sc.textFile(data_file)

#2. RDD filter
exceptions = lines.filter(lambda line : "exception" in line)

#3. Action count()
nbLines = exceptions.count()
print("Number of exception lines ", nbLines)

#4. Print first line.
exceptions.take(1)


## 2.2 Transformations

<p align="justify">
<font size="3">
Here are some common transformations in Spark. 
In the following list, $r$ denotes the RDD on which the transformation is invoked.
<ol>
<li> $r.filter()$. Returns an RDD consisting of only the elements of the input RDD $r$ that satisfy a predicate.
<li> $r.map(func)$. Applies a function $func$ to each element of the input RDD $r$ and returns an RDD of the result.
<li> $r.flatMap(func)$. Same as $map()$, but used when $map()$ would return an RDD where each element is a list.
<li> $r.union(other)$. Takes in two RDDs ($r$ and $other$) and returns an RDD that contains the elements from both. Unlike the mathematical operation, $union$ in Spark does not remove the duplicates.
<li> $r.intersection(other)$. Takes in two RDDs ($r$ and $other$) and returns an RDD that contains the elements found in both.
<li> $r.subtract(other)$. Takes in two RDDs ($r$ and $other$) and returns an RDD that contains the elements from the RDD $r$, except those that are found in $other$.
<li> $r.cartesian(other)$. Takes in two RDDs ($r$ and $other$) and returns an RDD that contains the Cartesian product of both.
<li> $r.distinct()$. Returns an RDD that contains the same elements as the input RDD $r$ without duplicates.
</ol>
    
We are now going to look at an example of use of these transformations.
</font>    
</p>


<p>
<hr style=" border:none; height:2px;">
 <font size="3" color='#91053d'>**Execute the following code to create the two RDDs $r1$ and $r2$**</font>
<hr style=" border:none; height:2px;">
</p>

In [None]:
r1 = sc.parallelize([1, 2, 3, 4])
r2 = sc.parallelize([3, 4, 5, 6, 7])
print("done")

### Use of $map()$

<p align="justify">
<font size="3">
Here we see an example of use of the transformation $map()$.
</font>
</p>

<p>
<hr style=" border:none; height:2px;">
<font size="3" color='#91053d'>**Execute the following code to create:
 <ol>
 <li> an RDD $square$, where each element is the square of the corresponding element in $r1$; 
 <li> an RDD $half$, where each element is the half of the corresponding element in $r2$.**
 </ol>
 </font>
 <hr style=" border:none; height:2px;">
</p>

In [None]:
square = r1.map(lambda x : x*x)
half = r2.map(lambda x: x/2)

# The function collect() is an action that transforms the RDD into a Python list that can be printed.
print("Elements of RDD square ", square.collect())
print("Elements of the RDD half ", half.collect())

### Use of $flatMap()$

<p align="justify">
<font size="3">
The transformation $flatMap()$ works by applying the transformation $map()$ on the input RDD; 
if each element of the resulting RDD is a list, $flatMap()$ returns an RDD where all lists are merged.
In other words, when $map()$ returns an RDD where the elements are lists, $flatMap()$ returns an RDD where the elements are the values of the list.
</font>
</p>

<p align="justify">
<font size="3">
Let's see an example. Suppose that we want to return an RDD from $r1$ where each element is associated to its square. 
More precisely, the output RDD will be as follows:
<center>
[ [1, 1], [2, 4], [3, 9], [4, 16] ]
</center>
</font>    
</p>

<p>
<hr style=" border:none; height:2px;">
 <font size="3" color='#91053d'>**Execute the following code to create an RDD $squares$ where each element of $r1$ is associated to its square.**</font>
<hr style=" border:none; height:2px;">
</p>

In [None]:
squares = r1.map(lambda x : [x, x*x])
print("Elements of RDD squares ", squares.collect())

<p align="justify">
<font size="3">

As you can see, each element of the RDD $squares$ is a list of two elements; indeed, 
after calling the action $collect()$, we obtain a list of lists in Python. 
In order to obtain a simple list of elements, we flatten the lists with $flatMap()$.
More precisely, with $flatMap()$ we obtain the following RDD:
<center>
[1, 1, 2, 4, 3, 9, 4, 16]
</center>
</font>
</p>

<p>
<hr style=" border:none; height:2px;">

 <font size="3" color='#91053d'>**Execute the following code to create an RDD $squares$ where each element of $r1$ is associated to its square (but each element is just a value instead of a list of values)**</font>
<hr style=" border:none; height:2px;">

In [None]:
squares = r1.flatMap(lambda x : [x, x*x])
print("Elements of RDD squares ", squares.collect())

### Use of set transformations

<p align="justify">
<font size="3">
The set transformations are $union$, $intersection$, $subtract$ and $cartesian$.
</font>
</p>

<p>
<hr style=" border:none; height:2px;">
 <font size="3" color='#91053d'>**Execute the following code to see an example of these transformations**</font>
<hr style=" border:none; height:2px;">

In [None]:
union = r1.union(r2)
intersection = r1.intersection(r2)
subtract = r1.subtract(r2)
cartesian = r1.cartesian(r2)

print("Elements of RDD r1 ", r1.collect())
print("Elements of RDD r2 ", r2.collect())
print("Elements of RDD union ", union.collect())
print("Elements of RDD intesection ", intersection.collect())
print("Elements of RDD subtract ", subtract.collect())
print("Elements of RDD cartesian ", cartesian.collect())


### Use of $distinct()$

<p align="justify">
<font size="3">
The transformation $distinct()$ returns an RDD that contains the same elements as the input RDD without the duplicates.
</font>
</p>

<p>
<hr style=" border:none; height:2px;">

 <font size="3" color='#91053d'>**Execute the following code to see an example of use of $distinct()$**</font>
 <hr style=" border:none; height:2px;">
</p>

In [None]:
nodup = union.distinct()
print("Elements of the RDD union: ", union.collect())
print("Elements of the RDD union (with no duplicates): ", nodup.collect())

## 2.3. Actions

<p align="justify">
<font size="3">
Here are some common actions in Spark. 
As with transformations, $r$ denotes the RDD on which the action is invoked.

<ol>
<li> $r.reduce(func)$. Performs a pair-wise application of the given function $func$ to the elements of the input RDD $r$.
<li> $r.collect()$. Returns a list with all the elements of the input RDD $r$.
<li> $r.count()$. Returns the number of elements in the input RDD $r$.
<li> $r.countByValue()$. Returns the number of times each element occurs in the input RDD $r$.
<li> $r.take(num)$. Prints the first $num$ elements of the input RDD $r$.
<li> $r.top(num)$. Prints the top $num$ elements of the input RDD $r$ (sorted in decreasing order).
</ol>
</font>
</p>

### Use of $reduce(func)$

<p align="justify">
<font size="3">
The action $reduce(func)$ performs a pair-wise application of the given function $func$ on the elements of the input RDD. 
</font>
</p>

<p>
<hr style=" border:none; height:2px;">
<font size="3" color='#91053d'>**Execute the following code to sum all values of the RDD $r1$**</font>
<hr style=" border:none; height:2px;">


In [None]:
'''
The function passed to the reduce MUST take in two arguments 
that have the same type as the elements of the input RDD
'''
sum = r1.reduce(lambda x, y : x + y)
print("Elements of the RDD r1 ", r1.collect())
print("Sum of the elements of the RDD r1: ", sum)


### Use of $countByValue()$

<p align="justify">
<font size="3">
The action $countByValue()$ counts the number of the occurrences of each element of the input RDD.
The result is a Python dictionary, where a key is an element of the input RDD and the corresponding value the number of its occurrences.
</font>
</p>

<p>
<hr style=" border:none; height:2px;">

<font size="3" color='#91053d'>**Execute the following code to get the number of occurrences of each element in the RDD $union$**</font>

<hr style=" border:none; height:2px;">
</p>

In [None]:
occurrences = union.countByValue()
print("Elements of the RDD union ", union.collect())
print("Occurrences of each element in the RDD union:")
for k, v in occurrences.items():
    print(k, " --> ",  v, " occurrences")

## 3. Exercises 

<p align="justify">
<font size="3">
Now it's your turn! 
You're going to apply yourself the transformations and the actions that we've seen so far. 
To this extent, we will use the file **./data/moby-dick.txt** that contains the text of the famous novel by Herman Melville. 
The goal of this exercise is to count the number of words in the file and the number of occurrences of each word.
</font>
</p>
<p align="justify">
<font size="3">
First, we are going to preprocess the input text, by removing punctuation and special characters and to lowercase all words.
</font>
</p>    

<hr style=" border:none; height:2px;">

###  Exercise 1

<p align="justify">
<font size="3">
The function $preprocess()$ defined below takes in an RDD that contains the lines of the input text file and returns an RDD where each element is a word. In the code below, the first step of the function $preprocess()$, which removes the non-letter characters from the input RDD by using a regular expression, is already implemented.
</font>
</p>

<p align="justify">
<font size="3" color='#91053d'>
**Complete the function $proprocess()$ with the following steps:**
<ol start="2">
<li>    Apply a transformation on the RDD $text$ to obtain an RDD $words$, where each element is a word from the file.
<li>    Filter out from the RDD $words$ the words with length 0.
<li>    Lowercase all words of the RDD $words$.
<li>    Return the RDD $words$.
</ol>
Execute the code, which will print the number of words in the input text file and the top-100 most frequent words.
</font>
</p>

<hr style=" border:none; height:2px;">

In [None]:
import operator;
import re;


def preprocess(text):
    '''
    Regular expression for removing all non-letter characters in the file.
    '''
    regex = re.compile('[^a-zA-Z ]')
    '''
    Step 1. 
    Remove the non-letter characters.
    After this transformation, mobydick is an RDD where each element is a line of the file
    '''
    text = text.map(lambda line: regex.sub('', line))
    
    '''
    COMPLETE STEP 2-4
    '''
    
    '''
    Step 5.
    Returns the RDD words.
    '''
    return words


# Reads the novel into a RDD
mobydick = sc.textFile('./data/moby-dick.txt')

words = preprocess(mobydick)

print("Number of words ", words.count())

'''
Top-100 most frequent words
'''
# countByValue() counts the number of occurrences of each word.
# It returns a dictionary. 
# The function sorted sorts by value and returns a list of tuples (w, f), where w is a word 
# and f its number of occurrences.
# The list will be sorted in ascending order of number of occurrences
occurrences = sorted(words.countByValue().items(), key=operator.itemgetter(1))

# Reverse the order
occurrences.reverse()
print("Top-100 most frequent words")
i = 0
for (w, f) in occurrences:
    i += 1
    if i > 100:
        break
    print(w,"--->",f)
    

<hr style=" border:none; height:2px;">

### Exercise 2

<p align="justify">
<font size="3">
In the previous example, you've probably noticed that the top-100 most frequent words are _functions_ words, such as articles, pronouns, conjunctions. These words have little lexical meaning and express a grammatical relationship with the other words. 
In many applications, these functions words are not useful and they can be ignored.
For instance, when you submit a query against a search engine (e.g., "restaurants in Paris"), function words (e.g., "in") are not useful to retrieve the Web pages relevant to the query.
</font>
</p>

<p align="justify">
<font size="3">
In text processing applications, function words are part of a list of _stop words_ that include all words that are frequently used and give little semantic contribution to the content of a text.
Since there isn't any agreement as to the definition of _stop word_, it's possible to find many such lists on the Web.
</font>
</p>

<p align="justify">
<font size="3" color='#91053d'>**Write a function $preprocess()$ that applies the same operations as the function that you defined above and, in addition, removes the stop words. This function $preprocess()$ takes in: (i) an RDD that contains the lines of the input text file (as above) and (ii) an RDD containing the stop words. The function returns an RDD with the words of the input text file. **</font>
</p>

<p align="justify">
<font size="3">
HINT: The file with the list of stop words is **./data/stopwords.txt**. All stop words are already lowercased.
</font>
</p>

<hr style=" border:none; height:2px;">

In [None]:
def preprocess(text, stopwords):
    '''
    Regular expression for removing all non-letter characters in the file.
    '''
    regex = re.compile('[^a-zA-Z ]')
    '''
    Step 1. 
    Remove the non-letter characters.
    After this transformation, mobydick is an RDD where each element is a line of the file
    '''
    text = text.map(lambda line: regex.sub('', line))
    
    '''
    COMPLETE STEPS 2-5
    '''
    words = words.subtract(stopwords)
    
    # Returns the words
    return words


# Load the stopwords to an RDD
stopwords = sc.textFile("./data/stopwords.txt")
words = preprocess(mobydick, stopwords)
print("Number of words after stopword removal ", words.count())

# Count the number of occurrences as before.
occurrences = sorted(words.countByValue().items(), key=operator.itemgetter(1))
# Reverse the order
occurrences.reverse()
print("Top-100 most frequent words")
i = 0
for (w, f) in occurrences:
    i += 1
    if i > 100:
        break
    print(w, ", ", f)
    


# 4. Pair RDDs 

<p align="justify">
<font size="3">
Pair RDDs are simply RDDs where each element is a key-value pair. They are useful for expressing _MapReduce_ computations. 
</font>
</p>

<p align="justify">
<font size="3">
We are now going to look at examples of  transformations and actions on pair RDDs.
</font>
</p>

## 4.1 Creation of Pair RDDs

<p align="justify">
<font size="3">
One common way to create a pair RDD is to transform an existing RDD with a $map()$. 
</font>
</p>

<hr style=" border:none; height:2px;">
<p align="justify">
<font size="3" color='#91053d'>**Execute the following code to create a Pair RDD $kvwords$ from the RDD $words$. Each element of $kvwords$ is a pair where the key is a word and the value is 1**</font>
</p>
<hr style=" border:none; height:2px;">

In [None]:
kvwords = words.map(lambda word : (word, 1))
print(kvwords.take(100))

## 4.2 Transformations on Pair RDDs


<p align="justify">
<font size="3">
All transformations applied to standard RDDs can be applied to Pair RDDs as well. The only difference is that any function that is passed to a transformation must take in tuples instead of single values.
</font>
</p>

<p align="justify">
<font size="3">
In addition, the following transformations can **only** be applied to Pair RDDs:
<ol>
<li> $r.reduceByKey(func)$. It applies the given function $func$ pairwise to all elements of the input RDD $r$ that are associated to the same key. 
<li> $r.sortBy(func, asc)$. Returns an RDD where the elements of the input RDD $r$ are sorted according to the given criteria.
<li> $r.groupByKey()$. Groups values with the same key from the input RDD $r$.
<li> $r.keys()$. Returns an RDD where the elements are the keys from the input RDD $r$.
<li> $r.values()$. Returns an RDD where the elements are the values from the input RDD $r$.
</ol>
</font>
</p>

<hr style=" border:none; height:2px;">
<font size="3" color='#91053d'>**Execute the following code to count the number of occurrences of each word in the RDD $kvwords$ and print the top-100 most frequent words**</font>
<hr style=" border:none; height:2px;">

In [None]:
'''
kvwords is an RDD where each element is (w, 1), w is a word from the novel Moby Dick.
If a word w occurs k times, there will be k pairs (w, 1) in the RDD kvwords.
To compute the number of occurrences of each word, 
we should just sum the 1s associated to all pairs with the same key, for all keys.
This is achieved with the reduceByKey() function.
'''
occurrences = kvwords.reduceByKey(lambda x, y : x + y)

'''
Each element of the RDD occurrences is a pair (w, f), where w is a word and f is the number of occurrences of w.
Finally, we sort the RDD occurrences by value by using sortBy(). 
The first argument of the function sortBy() is a function that takes in a key-value pair and returns the 
value (x[1]). 
The second argument specifies the order (ascending or descending).
'''
occurrences = occurrences.sortBy(lambda x: x[1], ascending=False)
occurrences.take(100)


<p align="justify">
<font size="3">
Another way to obtain the same result, would be to:
<ol>
<li> Apply the transformation $groupByKey()$ to the RDD $kvwords$. The result is a RDD $occurrences$, where each element is a pair $(w, L)$, $w$ is a word and $L$ is a list of 1s (as many as the number of occurrences of $w$).
<li> Apply a transformation $map()$ to the RDD $occurrences$ to obtain a new RDD $occurrences$, where each element is a pair $(w, len(L))$, $w$ being a word and $len(L)$ being the number of elements of $L$.
<li> Sort the RDD $occurrences$ as before.
</ol>
</font>
</p>

<hr style=" border:none; height:2px;">
<font size="3" color='#91053d'>**Execute the following code to see the result with this alternative solution**</font>
<hr style=" border:none; height:2px;">

In [None]:
occurrences = kvwords.groupByKey()
occurrences = occurrences.map(lambda x : (x[0], len(x[1])))
occurrences = occurrences.sortBy(lambda x: x[1], ascending=False)
occurrences.take(100)

## 4.3 Actions on Pair RDDs

<p justify="align">
<font size="3">
As with the transformations, all actions available for standard RDDs can be used on Pair RDDs as well.
In addition, the following actions can be performed on Pair RDDs:
<ol>
<li> $countByKey()$. Counts the number of values associated with the same key. Returns a dictionary.
<li> $collecAsMap()$. Collects the RDD as a dictionary (in the same way as the function $collect()$ returns a list from a standard RDD).
<li> $lookup(key)$. Returns a list with all the values associated with the given _key_.
</ol>
</font>
</p>

<p justify="align">
<font size="3">
Referring again to the example of counting the occurrences of each word, the following code is yet another way to solve the problem by using the action $countByKey()$ on the RDD $kvwords$.
</font>
</p>

<hr style=" border:none; height:2px;">
<font size="3" color='#91053d'>**Execute the following code**</font>
<hr style=" border:none; height:2px;">

In [None]:
occurrences = sorted(kvwords.countByKey().items(), key=operator.itemgetter(1))
# Reverse the order
occurrences.reverse()
print("Top-100 most frequent words")
i = 0
for (w, f) in occurrences:
    i += 1
    if i > 100:
        break
    print(w, ", ", f)
    

# 5. Exercises

<p align="justify">
<font size="3">
Now it's your turn to apply the notions that you've learned so far to create some more complex applications.
</font>
</p>

<hr style=" border:none; height:2px;">

##  Exercise 3

<p align="justify">
<font size="3">
In the folder _./data/bbc_ you'll find a collection of 50 documents from the BBC news website corresponding to stories in five topics from 2004-2005. The five topics are: _business_, _entertainment_, _politics_, _sport_ and _tech_. 
In the directory, the stories are text files (named _001.txt_, _002.txt_...) organized into five directories, one for topic.
</font>
</p>

<p align="justify">
<font size="3">
In this exercise, we want to create an **inverted index**, one that associates each word to the list of files the word occurs in.
More precisely, for each word, the inverted index will have a list of the names  of the files (path relative to the folder _./data_) that contain the word. The figure below shows the entry in the index for the word "family".

<img src="./figs/inverted-index.png" width=400>
    An inverted index is an essential component of a search engine. In fact, given any word, the inverted index allows the search engine to quickly retrieve all documents containing that word.
</font>
</p>

<p align="justify">
<font size="3">
The idea is to use a **MapReduce** schema. 
The **Map** task will create a key-value pair $(w, d)$ for each word $w$ that appears in the document $d$. $d$ is the path of the document relative to the folder _./data_ (e.g., _./data/bbc/business/001.txt_).
The **Reduce** task will group by key all the key-value pairs with the same key to form the inverted index.
</font>
</p>

<p align="justify">
<font size="3" color='#91053d'>**Write a Spark application to create an inverted index for the BBC collection. By using the function $lookup$ (already provided), print the documents that contain the word 'family'.**
<br>
HINT:
<ol>
<li> Your program needs to go through all the documents in each of the five folders of the BBC collection. Use the function $getFiles()$ to get the list of all filenames in the collection.
<li> To preprocess the text of a document, you can use the function $preprocess()$ defined above. The RDD $words$ returned by this function contains a word as many times as it appears in a document. Since for an inverted index we just need to know that a word is in a specific document, you should remove the duplicates from the RDD $words$.
</ol>
</font>
</p>

<hr style=" border:none; height:2px;">

In [None]:
import operator;
import re;
import os;


'''
This function is given.
It takes in the invertex index and a word and returns 
the list of documents where the word occurs.
'''
def lookup(iindex, word):
    print("The following documents contain the word ", word)
    ld = iindex.lookup(word)[0]
    for d in ld:
        print(d)

'''
Get all the textual files.
'''
def getFiles():
    files=[]
    # The five topics (also, the names of the subdirectories under directory bbc)
    topics = ['business', 'entertainment', 'politics', 'sport', 'tech']
    for topic in topics:
    # Current directory
        cd = "./data/bbc/"+topic+"/"
        # For each document under the current topic
        for document in os.listdir(cd):
            # Get the name of the current document
            filename = cd + os.fsdecode(document)
            # This is to ensure that  we do not get any hidden file by mistake 
            if not filename.endswith(".txt"):
                continue
            files.append(filename)
    return files

'''
First step: Map part. 
Create a Pair RDD, where each element (w, d) is such that w is a word and d is the 
filename of the document that contains the word.
'''
stopwords = sc.textFile("./data/stopwords.txt")

# Initialize the pair RDD.
kv = sc.parallelize([])

# Get all files
files = getFiles()

'''
COMPLETE THE CODE
'''

<hr style=" border:none; height:2px;">

##  Exercise 4

<p align="justify">
<font size="3">
Given the BBC collection, we want to calculate the **co-occurrence matrix** $M$, such that $M[w_1][w_2]$ is the number of documents in which two words $w_1$ and $w_2$ appear together.
</font>
</p>

<p align="justify">
<font size="3">
A co-occurrence matrix is generally sparse (i.e., lot of entries are zero), because we expect the pairs of words that have no co-occurrence to be far more than the ones that have at least one co-occurrence. It is certainly wise to represent only the non-zero elements of the matrix.
To this extent, we use a **MapReduce** schema.
</font>
</p>

<p align="justify">
<font size="3">
For any two words $w_i$, $w_j$ that co-occur in the same document, the **Map** task will output a key-value pair $((w_i, w_j), 1)$, where the key is the pair of co-occurring words $(w_i, w_j)$ and the value is 1.
Note that the two key-value pairs $((w_i, w_j), 1)$ and $((w_j, w_i), 1)$ ought to be considered as having the same key, because the order of occurrence of the two words does not matter (the only thing that matters is whether the two words co-occur in the same document). 
For this reason, the key-value pair $((w_i, w_j), 1)$ returned by the Map task is such that $w_i < w_j$, where $<$ is the lexicographic order.
</font>
</p>

<p align="justify">
<font size="3">
In the output of the Map task, a key-value pair $((w_i, w_j), 1)$ appears as many times as documents where $w_i$ and $w_j$ co-occur. In order to obtain the number of co-occurrences of $w_i$ and $w_j$, the **Reduce** task will just have to sum the values associated to the key $(w_i, w_j)$.
</font>
</p>



<p align="justify">
<font size="3" color='#91053d'>**Complete the functions $Map$ and $Reduce$ in the code below.**
<br>
</font>
</p>

<hr style=" border:none; height:2px;">

In [None]:

'''
Function that implements the Map task.
Takes in: the list of words from a document d.
Returns: the list containing the key-value pairs ((w_i, w_j), 1) for all words (w_i, w_j) in the document d.
'''
def Map(words):
    kvpairs = []
    '''
    ############ COMPLETE THE FUNCTION ############
    '''
    return kvpairs
            
'''
Function that implements the Reduce task.
Takes in: the RDD containing all the key-value pairs output by the Map task.
Returns: the RDD with the co-occurrence matrix.
'''
def Reduce(kv):
    '''
    ############ COMPLETE THE FUNCTION ############
    '''
    return 


# The stop words
stopwords = sc.textFile("./data/stopwords.txt")

files = getFiles()

# Initialize the RDD that will contain the pairs output by the Map task.
kv = sc.parallelize([])
# Iterate over all topics
for file in files:
    # Obtain an RDD with the textual content of the document
    text = sc.textFile(file)
    # Preproces the text. Obtain the words
    words = preprocess(text, stopwords)
    # Remove the duplicates
    words = words.distinct()
    # Invoke the map task and appends the output of the map to the RDD kv
    kv = kv.union(sc.parallelize(Map(words.collect())))

# Reduce task
matrix = Reduce(kv)
matrix = matrix.sortBy(lambda x: x[1], ascending=False)
matrix.take(10)

<hr style=" border:none; height:2px;">

##  Exercise 5

<p align="justify">
<font size="3">
One important task in the analysis of textual data is the computation of the similarity between two textual documents.
Intuitively, the more words two textual documents share, the more similar they are.
</font>
</p>

<p align="justify">
<font size="3">
Let $d_1$ and $d_2$ be two documents, and let $W(d_1)$ and $W(d_2)$ be the set of words in $d_1$ and $d_2$, respectively. The similarity $S(d_1, d_2)$ between $d_1$ and $d_2$ can be computed with the _Jaccard_ score as follows:
</font>
</p>

<p>
</p>

$$S(d_1, d_2) = \frac{W(d_1) \cap W(d_2)}{W(d_1) \cup W(d_2)}$$


<p align="justify">
<font size="3" color='#91053d'>**Write a function $jaccard$ that takes in two RDDs containing the words of two documents and returns the Jaccard similarity score of the documents. Test the function on the documents of the BBC.**
</font>
</p>

<hr style=" border:none; height:2px;">

In [None]:
'''
The Jaccard score.
Takes in: two RDDs with the words of two documents d1, d2
Returns: the Jaccard similarity of the two documents.
'''
def jaccard(d1w, d2w):
    '''
    ############ COMPLETE THE FUNCTION ############
    '''

# The stop words
stopwords = sc.textFile("./data/stopwords.txt")

# The two input documents.
d1 = sc.textFile("./data/bbc/politics/001.txt")
d2 = sc.textFile("./data/bbc/politics/001.txt")

# Preproces
d1w = preprocess(d1, stopwords)
d1w = d1w.distinct()
d2w = preprocess(d2, stopwords)
d2w = d2w.distinct()

sim = jaccard(d1w, d2w)

print("similarity ", sim)


<hr style=" border:none; height:2px;">

##  Exercise 6 (Optional Part)

<p align="justify">
<font size="3">
We are now going to use the similarity function that we defined in Exercise 4 to create a simple text classifier.
A _text classifier_ is an application that takes in a textual document and determines the topic that the document covers.
Usually, text classification is based on supervised machine learning algorithms, like Naive Bayes and SVM; 
since these algorithms are out of the scope of this course, we limit ourselves to a simple classifier that assigns a document to the topic with the highest similarity to the document itself.
</font>
</p>

<p align="justify">
<font size="3">
More precisely, let's consider five documents $d_1, \ldots, d_5$, one for each topic of the BBC dataset; arbitrarily, we say that each of these documents is representative of one topic. 
Now, consider a document $d$ that is different from the representative documents. 
The text classifier computes the similarity of $d$ to $d_1, \ldots, d_5$, selects the document $d_i$ that has the highest similarity to $d$ and assigns $d$ to the topic covered by $d_i$.
</font>
</p>

<p align="justify">
<font size="3" color='#91053d'>**Write a function $classify$ that takes in a document $d$ (already preprocessed) and the list of representative documents (already preprocessed) for each topic and outputs the predicted topic of $d$.**
</font>
</p>
<p>
<font size="3" color='#91053d'>
HINT: You can use the files named _001.txt_ as the representative documents for each topic.   
</font>
</p>

<hr style=" border:none; height:2px;">

In [None]:
'''
Classification function.
Takes in: RDD d of the preprocessed input document and RDDs of the preprocessed representative documents.
Returns: the predicted topic of the input document.
'''
def classify(d, representative):
    '''
    ############ COMPLETE THE FUNCTION ############
    '''
    return topic


# The stop words
stopwords = sc.textFile("./data/stopwords.txt")

'''
The representative documents.
Each item of this list is a pair (path, topic), associating a path of a document to the name of its topic.
'''
representative = [("./data/bbc/business/001.txt", "business"), 
                  ("./data/bbc/entertainment/001.txt", "entertainment"), 
                  ("./data/bbc/politics/001.txt", "politics"), 
                  ("./data/bbc/sport/001.txt", "sport"),
                  ("./data/bbc/tech/001.txt", "tech")]

# The RRDs with the preprocessed representative documents 
repwords = []

# Pre-process the representative documents
for rep in representative:
    repwords.append((preprocess(sc.textFile(rep[0]), stopwords).distinct(),rep[1]))
    


# Classify a document.
topic = classify(preprocess(sc.textFile("./data/bbc/business/001.txt"), stopwords).distinct(), repwords)
print("Predicted topic ", topic)

<hr style=" border:none; height:2px;">

##  Exercise 7 (Optional part)

<p align="justify">
<font size="3">

After implementing a classifier, we need to evaluate it. The evaluation is based on the **confusion matrix** that describes the performance of the classifier on a test dataset for which the true values are known.
An example of confusion matrix is shown in the figure below. 
Each row contains the instances in a predicted class while each column represents the instances in an actual class.
From the matrix in the Figure, we learn that there are 10 documents that are in the class "business" (see the total of the first column), of which 5 are correctly predicted as being in the class "business", 2 are incorrectly classified in the class "entertainment" and  3 are incorrectly classified in the class "sport".
</font>
<img src="./figs/confusion-matrix.png" width=800>
</p>

<p align="justify">
<font size="3">
For each class $C$, we can define the notion of true positives (TP), false positives (FP) and false negatives (FN) for that class (the figure shows the TP, FP and FN for the class "Business"):
<ul>
<li> **True positives** (TP). Set of documents whose actual and predicted class is $C$.
<li> **False positives** (FP). Set of documents whose predicted class is $C$, while the actual class is not $C$.
<li> **False negatives** (FN). Set of documents whose predicted class is not $C$, while the actual class is $C$.
</ul>
</font>
</p>

<p align="justify">
<font size="3">
</font>
</p>

<p align="justify">
<font size="3">
For each class $C$, we can compute three evaluation measures, called _precision_ ($P_C$), _recall_ ($R_C$) and 
_f-measure_ ($F_C$), as follows:
</font>
</p>

$$P_C = \frac{|TP|}{|TP| + |FP|}\quad R_C = \frac{|TP|}{|TP| + |FN|}\quad F_C = 2\cdot \frac{P_C\cdot R_C}{P_C + R_C}$$

<p align="justify">
<font size="3">
The _precision_ $P_C$ tells how many of the documents classified in $C$ are correctly classified;
the _recall_ $R_C$ tells how many of the documents that are actually in $C$ are correctly classified in $C$;
the _f-measure_ is an harmonic mean of _precision_ and _recall_ and gives an indication of the overall _accuracy_ of the classifier.
</font>
</p>


<p align="justify">
<font size="3" color='#91053d'>**Write a program that evaluates the classifier implemented in the previous exercise on the whole BBC dataset. Print precision, recall and f-measure for each class.
Compare different classifiers using different representative documents for the topics**
</font>
</p>


<hr style=" border:none; height:2px;">

In [None]:
topics = ['business', 'entertainment', 'politics', 'sport', 'tech']

# true positives for all topics
tp = {"business": 0, "entertainment": 0, "politics": 0, "sport":0, "tech":0}
# false positives for all topics
fp = {"business": 0, "entertainment": 0, "politics": 0, "sport":0, "tech":0}
# false negatives for all topics
fn = {"business": 0, "entertainment": 0, "politics": 0, "sport":0, "tech":0}

print("Evaluating classifier...")
for topic in topics:
    # Current directory
    cd = "./data/bbc/"+topic+"/"
    # For each document under the current topic
    for document in os.listdir(cd):
        # Get the name of the current document
        filename = cd + os.fsdecode(document)
        # This is to ensure with do not get any hidden file by mistake 
        if not filename.endswith(".txt"):
            continue
        # classify the document
        predicted = classify(preprocess(sc.textFile(filename), stopwords).distinct(), repwords)
        if predicted == topic:
            tp[predicted] += 1
        else:
            fn[topic] += 1
            fp[predicted] += 1


for topic in topics:
    print("Class ", topic);
    precision = tp[topic] / (tp[topic] + fp[topic])
    recall = tp[topic] / (tp[topic] + fn[topic])
    fmeasure = 2 * precision * recall / (precision + recall)
    print("P = ", precision, " R = ", recall, " F = ", fmeasure)
    