# Lab 3 - Data processing at scale - Introduction to Spark

---

**Date:**

**Group:**
 - *Student Name 1*
 - *Student Name 2*
---

In this lab, we'll be using the [Spark](http://spark.apache.org) framework to process and analyse some data ([Quick overview](http://spark.apache.org/docs/1.6.2/quick-start.html))

The `pyspark` Python module provide the necessary bindings to the Spark engine.

In [None]:
# General dependencies
# !! run this cell first before any other ones
import sys
import json
import os
import os.path
from pandas import DataFrame
import random
import matplotlib.pyplot as plt
from IPython.display import display
%matplotlib inline

from pyspark import SparkContext, SparkConf
import pyspark.rdd
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

## Spark

### Introduction


We first define the Spark Context:

In [None]:
if not globals().get('sc'):
    sc = SparkContext('local', 'test')

You can have a look to the functions provided by the SparkContext in this notebook using the `help()` function or [online](https://spark.apache.org/docs/1.6.2/api/python/pyspark.html#pyspark.SparkContext)

In [None]:
help(sc)

Once the Spark Context is started, you can have access to the UI to visualize some information:
http://127.0.0.1:4040/ (change the ip to the correct one if needed)

We create a random list of integers as a sample of data to perform some analysis

In [None]:
data = [random.randint(0,10) for i in range(0, 1000)]

We can then create a RDD from this data using the `parallelize` function:

In [None]:
help(sc.parallelize)

`numSlices` indicates the number of partitions into which the data will be split. Each partition represent a subset of the data on which Spark will apply your transformations/processing in parallel.

For instance to create 4 partitions for our data:

In [None]:
my_rdd = sc.parallelize(data, 4)
print('df type:', type(my_rdd))
print('Num partitions:', my_rdd.getNumPartitions())

The RDD also has its own set of functions ([online](https://spark.apache.org/docs/1.6.2/api/python/pyspark.html#pyspark.RDD) documentation):

In [None]:
help(pyspark.rdd.RDD)

We can simply count the number of objects in the RDD using the `count()` action:

In [None]:
print('Total number of elements:', my_rdd.count())

If you access the UI at http://127.0.0.1:4040/jobs/, you can see that a new job (computation) have been executed and Spark gives some information about it. You can see for instance that 4 tasks were created which correspond to the initial partitioning of the data set.

By following the link for this stage (http://127.0.0.1:4040/stages/stage/?id=0&attempt=0), you can also display the execution graph (DAG visualization) and get more metrics about the computation.

### Filtering

Using the `filter` method, we can filter elements in a RDD.

In [None]:
help(pyspark.rdd.RDD.filter)

For instance, to filter elements greater than 5:

In [None]:
greater_than_5 = my_rdd.filter(lambda x: x>=5)
greater_than_5

As you can see in the UI (http://127.0.0.1:4040/jobs/), no jobs have been submitted yet. Spark only execute code when an action is requested (see https://spark.apache.org/docs/1.6.2/programming-guide.html#actions for the list of actions).

`filter` is a transformation: https://spark.apache.org/docs/1.6.2/programming-guide.html#transformations

To query the first 10 elements greater than 5 we can use the `top` action:

In [None]:
help(pyspark.rdd.RDD.top)

In [None]:
print('10 first elements greater than 5:', greater_than_5.top(10))

You can see that a new job has appeared in the UI: http://127.0.0.1:4040/jobs

To count the number of elements greater than 5:

In [None]:
print('Number of elements greater than 5:', greater_than_5.count())

Rather than sorting the RDD with `top()`, we can just `take()` the first elements of the RDD (unsorted):

In [None]:
help(pyspark.rdd.RDD.take)

In [None]:
greater_than_5.take(10)

Let's imagine we want to know the distinct values contained in the RDD, we can use the `distinct()` function:

In [None]:
help(pyspark.rdd.RDD.distinct)

In [None]:
d_values = greater_than_5.distinct()

And in order to return all the elements in the new RDD `d_values`, we can use the `collect()` function:

In [None]:
help(d_values.collect)

In [None]:
d_values.collect()

We can define a proper Python function and use it as a filter rather than using lambda function. This is useful when the filtering or processing is more complex:

In [None]:
def my_filter(value):
    return value<5

lower_than_5 = my_rdd.filter(my_filter)

print('5 last elements lower than 5:', lower_than_5.top(5))
print('list of all elements lower than 5:', lower_than_5.collect())

### Storing data into files

As you can see, `collect()` returns all the elements and data can be stored in a local Python variable. 

In case of large data sets, this may be not practical and the result can rather be stored into a file using the `saveAsTextFile` action:

In [None]:
help(pyspark.rdd.RDD.saveAsTextFile)

In [None]:
if not os.path.exists('lower_than_5'):
    lower_than_5.saveAsTextFile('lower_than_5')
else:
    print('Directory already exists')

The output is being written to the `lower_than_5` directory with one file per partition of the RDD (`part-XXXXX`):

In [None]:
os.listdir('lower_than_5')    

* Each `.crc` file contain a checksum of the associated file. This is for checking integrity of the files (Cyclical Redundancy Check)
* `_SUCCESS` means the operation completed successfully
* Data is stored in the `part-*` files

In [None]:
# Get the first 10 lines of the part-0001 file
! head lower_than_5/part-00001

In [None]:
# count number of lines for each part-xxxx generated files
! wc -l lower_than_5/part*

### Map / Reduce
We have looked at `filter` that is a Spark transformation, and there are more transformations provided by the Spark API. 

The `map` transformation allows to apply some processing to all elements of the RDD.

In [None]:
help(pyspark.rdd.RDD.map)

For instance, to multiply all the elements by 2:

In [None]:
multiply_by_2 = my_rdd.map(lambda x: x*2)

print('Multiply by 2:', multiply_by_2.take(10))

A reduce function is also available. Let's count the number of occurences of each integer using the `reduceByKey` function:

In [None]:
help(pyspark.rdd.RDD.reduceByKey)

The concept is similar to Map/Reduce, for each integer of the RDD we emit a tuple `(int, 1)`, `reduceByKey` will:
 * group the new RDD by its key (first element of the tuple) 
 * sum up the list of values associated to this key

In [None]:
# emit a tuple (x, 1) for each record in the original data set
values = my_rdd.map(lambda x: (x, 1))

print('Example of record in values:', values.take(10))

# Group the values by the first element of the tuple (key) and sum all the values (second element of the tuple)
group_by_value = values.reduceByKey(lambda x,y: x+y)

print('Example of record in group_by_value', group_by_value.take(5))

# Display sl result
print('Group by value:')
for (k, v) in group_by_value.collect():
    print('\tvalue %s: count: %s' % (k, v))

### Loading data

External data sets can be loaded, for instance we can load a text file:

In [None]:
help(sc.textFile)

In [None]:
lines = sc.textFile('lower_than_5/part-00000')

Lines is an RDD:

In [None]:
type(lines)

To get the number of lines for instance:

In [None]:
print('Number of lines:', lines.count())

## Exercice 1 - Word Count

We are considering in this exercice the list of all unique artist terms (Echo Nest tags) from the [Million Song Data Set](http://labrosa.ee.columbia.edu/millionsong/pages/getting-dataset).

You can download the data set at:

    https://s3-eu-west-1.amazonaws.com/scimus-data/scimus-2017/lab3/unique_terms.txt

This is a text file containing a list of tags associated to artits in the Million Song Data Set. There is one tag per line.

In [None]:
# Download the data set
!curl -O https://s3-eu-west-1.amazonaws.com/scimus-data/scimus-2017/lab3/unique_terms.txt

In [None]:
# check/display full path
!ls $(pwd)/unique_terms.txt

In [None]:
! head $(pwd)/unique_terms.txt

Using this data set, answer the following questions:

**Q:** Create a RDD from this file using the `textFile()` function.

In [None]:
# your code here
lines = sc.textFile(...)

**Q:** Generate a new RDD by transforming each line and splitting it to get all the words (use the [`flatMap` function](https://spark.apache.org/docs/1.6.2/api/python/pyspark.html#pyspark.RDD.flatMap)). You can use the `split` function on a Python string to return a list of words.

In [None]:
help(str.split)

In [None]:
# your code here
words = lines.flatMap(...)

**Q:** Count the number of words by emitting a key-value pair `(word, 1)` and using the `reduceByKey` function.

In [None]:
# your code here
word_count = ...

We can then collect the top 10 by count (look at the [`takeOrdered` function](https://spark.apache.org/docs/1.6.2/api/python/pyspark.html#pyspark.RDD.takeOrdered) of a RDD)

In [None]:
help(pyspark.rdd.RDD.takeOrdered)

In [None]:
for word, count in word_count.takeOrdered(10, key=lambda x: -x[1]):
    print("'%s': %d" % (word, count))

Make sure the `wordcloud` library is present in your environment:

In [None]:
import wordcloud

If the previous command failed, install the libary by running the following command:

In [None]:
!pip install --user wordcloud

We can now display a word cloud of the data sets using the method `fit_words` 
(see http://amueller.github.io/word_cloud/auto_examples/simple.html as an example)        

In [None]:
# transform the result data set into a dictionary word -> count to use with the fit_words method
frequencies = {}
for el in word_count.collect():
    frequencies[el[0]] = el[1]
    
# build the WordCloude object
wc = wordcloud.WordCloud()

# build & display the image
wc.fit_words(frequencies)
plt.imshow(wc)
plt.axis("off")
plt.show()

## Exercice 2 - Data exploration 1
 
This data set contains the playlist of about 1K lastFM users.

See http://www.dtic.upf.edu/~ocelma/MusicRecommendationDataset/lastfm-1K.html

You can download the specific file at:
    
    https://s3-eu-west-1.amazonaws.com/scimus-data/scimus-2017/lab3/lastfm-dataset-1K/userid-timestamp-artid-artname-traid-traname.tsv.gz
    
For instance you could run the following shell commands in a terminal:

    curl -O https://s3-eu-west-1.amazonaws.com/scimus-data/scimus-2017/lab3/lastfm-dataset-1K/userid-timestamp-artid-artname-traid-traname.tsv.gz
    gunzip userid-timestamp-artid-artname-traid-traname.tsv.gz

To simplify the testing/validation, run first your code on a smaller subset:

    https://s3-eu-west-1.amazonaws.com/scimus-data/scimus-2017/lab3/lastfm-dataset-1K/userid-timestamp-artid-artname-traid-traname-1000.tsv

In [None]:
! curl -O https://s3-eu-west-1.amazonaws.com/scimus-data/scimus-2017/lab3/lastfm-dataset-1K/userid-timestamp-artid-artname-traid-traname-1000.tsv

Once you are confident, you can run the code on the full data set.

The format of this file is a TSV (Tab Separated Values) with the following structure:

    userid \t timestamp \t musicbrainz-artist-id \t artist-name \t musicbrainz-track-id \t track-name
    
As artist identifiers and track identifiers, you will use the `artist-name` field and the `track-name` fields (the `musicbrain-*` fields may be not provided for all rows).    

Using this data set, answer the following questions.

**Q:** Load the lastfm data set into Spark.

In [None]:
# your code here

**Q:** Compute the total number of plays by `artist-name` and display the top 10.

In [None]:
# your code here

**Q:** Compute the total number of plays by `userid` and display the top 10.

In [None]:
# your code here

**Q:** Compute the top 20 tracks being played. Hint: Several artists may have created a track of the same name, they are however different tracks and should be counted independently from each others. 

In [None]:
# your code here

**Q:** Create a RDD containing the words in the tracks names.

In [None]:
# your code here

**Q:** How many distinct words in total?

In [None]:
# your code here

**Q:** What is the top 10 of most frequent words?

In [None]:
# your code here

**Q:** Filter out preposition, articles, ... and compute the new top 10

In [None]:
# your code here

**Q:** Display a Word Cloud of all the words

In [None]:
# Your code here

**Q**: Generate the word cloud for the full data set:
        
        https://s3-eu-west-1.amazonaws.com/scimus-data/scimus-2017/lab3/lastfm-dataset-1K/userid-timestamp-artid-artname-traid-traname.tsv.gz
        
If needed, modify the words to be excluded to only keep the one really relevant (eg `in`, `and`, `for`, ... should also be removed).

In [None]:
# your code here

## Exercice 3 - Collaborative filtering

### Introduction
https://en.wikipedia.org/wiki/Collaborative_filtering

> Collaborative filtering (CF) is a technique used by some recommender systems [...]

> [...] collaborative filtering is a method of making automatic predictions (filtering) about the interests of a user by collecting preferences or taste information from many users (collaborating). The underlying assumption of the collaborative filtering approach is that if a person A has the same opinion as a person B on an issue, A is more likely to have B's opinion on a different issue x than to have the opinion on x of a person chosen randomly. For example, a collaborative filtering recommendation system for television tastes could make predictions about which television show a user should like given a partial list of that user's tastes (likes or dislikes). Note that these predictions are specific to the user, but use information gleaned from many users. This differs from the simpler approach of giving an average (non-specific) score for each item of interest, for example based on its number of votes.

For this exercice we'll be using the [Last.fm Dataset - 360K users](http://www.dtic.upf.edu/~ocelma/MusicRecommendationDataset/lastfm-360K.html) and build a model to recommend new artists to users based on their listening activities. 

A full data set and a reduced one (for testing) is available at:

    https://s3-eu-west-1.amazonaws.com/scimus-data/scimus-2017/lab3/lastfm-dataset-360K/usersha1-artmbid-artname-plays.tsv.1000
    https://s3-eu-west-1.amazonaws.com/scimus-data/scimus-2017/lab3/lastfm-dataset-360K/usersha1-artmbid-artname-plays.tsv.gz

In [None]:
! curl -O https://s3-eu-west-1.amazonaws.com/scimus-data/scimus-2017/lab3/lastfm-dataset-360K/usersha1-artmbid-artname-plays.tsv.1000    

In [None]:
! curl -O  https://s3-eu-west-1.amazonaws.com/scimus-data/scimus-2017/lab3/lastfm-dataset-360K/usersha1-artmbid-artname-plays.tsv.gz

    
These are TSV files with the following format:

    user-mboxsha1 \t musicbrainz-artist-id \t artist-name \t plays

The Spark mlib library implements one commonly used algorithm for collaborative filtering analysis based on the Alternating Least Square method (ALS). See http://spark.apache.org/docs/1.6.2/mllib-collaborative-filtering.html for example and details.

### Data preparation

To apply the ALS technique on the data set, we first need to convert all values to numerical fields to build a [`Rating` object](https://spark.apache.org/docs/1.6.3/api/python/pyspark.mllib.html?highlight=rating#pyspark.mllib.recommendation.Rating).

By consequence, we need to assign unique integer values to users and artists (they currently are respectively SHA-1 and UUIDs - strings -)

**Q:** Load the file into a RDD using the `textFile` method

In [None]:
# your code here
source = ...

We go through the data set, list all artists and assign them a unique ID.

For this:
 * We write a function that given a line of the file, split it and return the tuple `(musicbrainz-artist-id, artist-name)`

In [None]:
def get_artist(line):
    data = line.split('\t')
    return (data[1], data[2])

We can then run this function through the data set and get the list of unique artists using the `distinct()` function:

In [None]:
artists = source.map(get_artist).distinct()
print('Number of artists:', artists.count())

Later on to train the model, we need to transform the artist into a integer. To achieve this, we can for instance build an index with the [`zipWithUniqueId` function](https://spark.apache.org/docs/1.6.3/api/python/pyspark.html?highlight=zipwithuniqueid#pyspark.RDD.zipWithUniqueId) and use the Spark `broadcat` feature to make it available to all the Spark workers.

* Create a unique integer ID for each artist

In [None]:
artists_with_unique_id = artists.zipWithUniqueId().collect()
artists_with_unique_id[0:2]

* Build an index to map artist to its id which can be used by Spark


In [None]:
artists_index = sc.broadcast(dict(zip([a[0] for a in artists_with_unique_id], [a[1] for a in artists_with_unique_id])))

Artists_index is a Spark `Broadcast` object.

In [None]:
print(type(artists_index))

The `value` property of the `Broadcast` object is a python dictionary:

In [None]:
print(type(artists_index.value))

The dictionary contains the mapping artits --> unique integer id

In [None]:
print(artists_index.value[('5e7ccd92-6277-451a-aab9-1efd587c50f3', 'steve vai')])
print(artists_index.value[('82dc508a-dbda-4954-aedc-28895edfa42e', "2 many dj's")])

Now it's your turn to do the same for users:

**Q:** Write a function to get the `userid` from a line of the source RDD

In [None]:
# your code here
def get_user(line):
    ...
    return ...

**Q:** Create a RDD containing all unique users

In [None]:
# your code here
users = ...

**Q:** Create a unique ID for each user using the previous technique

In [None]:
# your code here

**Q:** Create a spark broadcast variable to distribute it to the Spark workers

In [None]:
# Your code here

We now have to parse the data set and create a new `Rating` object for each data point, this will be 
```
Rating(user_id, artist_id, listens)
```
where:
* `user_id` is the integer id we have assigned
* `artist_id` is the integer id of the artist we have assigned
* `listens` is the number of listens for this user and artist

In order to generate the Ratings:

**Q:** Write a function that will parse each line of the source and build the above `Rating` object.

In [None]:
# Your code
def extract_rating(line):
    ...
    return Rating(...)

**Q:** Apply the function to the data set

In [None]:
# your code here
ratings = ...

**Q:** Display the first 5 ratings of the data set

In [None]:
# your code here

### Data splitting

We need to split the data set in 2 different ones:
 * training
 * test

To split the data set, we'll be removing random entries in the data set by sampling it. 

**Q:** Use the `randomSplit()` function on the `ratings` rdd to split the data set into a `training` and `test` data set.

In [None]:
help(ratings.randomSplit)

In [None]:
# your code
training, test = ...

### Modelling
We can now run the recommender on this data set

In [None]:
# Build the recommendation model using Alternating Least Squares with implicit ratings
rank = 10
numIterations = 10
alpha = 0.01
lambda_ = 0.01

model = ALS.trainImplicit(training, rank, numIterations, alpha=alpha, lambda_=lambda_, nonnegative=True)

The `trainImplicit` method is going to trigger a set of Spark jobs to train the model. Use the Spark UI to have a look to the different steps generated (http://localhost:4040/jobs/)

### Prediction

The generated model can be saved to disk for later use:

In [None]:
model.save(sc, "myCollaborativeFilter")

... and then loaded when needed

In [None]:
recommender = MatrixFactorizationModel.load(sc, "myCollaborativeFilter")

The `recommender` object can now be used to perform some recommendations.

**Q:** Pick a random user in the training set and predict the 10 most likely artists he will be interested in (`recommendProducts()`)

In [None]:
help(recommender.recommendProducts)

In [None]:
# Your code

**Q:** Pick a random artist and predict the 10 most likely users that will be interested by this artists

In [None]:
# your code

**Q:** Now that we have all the elements in place, re-run your model against the full data set available at:
        
    https://s3-eu-west-1.amazonaws.com/scimus-data/scimus-2017/lab3/lastfm-dataset-360K/usersha1-artmbid-artname-plays.tsv.gz

This means:
 * loading the data into an RDD
 * splitting it into training and test
 * creating a new model based on the training set

In [None]:
# your code here

### Evaluation

We have arbitrary chosen some factors for training the model (*lambda*, *rank*, *iterations*, ...).

In practice, the recommender has to be evaluated on a data set and tuned according to an evaluation metric.

For simplicity we'll evaluate the model on whether or not the first top 10 recommendations are in the users playlist.

* For each users, write a function that counts the number of common artists between the user's listen patterns and the recommendations
* Compute the average number of common elements on the test data set
* Investigate the impact of different parameters on the average value

In [None]:
# your code here