# Lab 4 - Data processing at scale - Introduction to Spark

----
**Group**:
 * Student Name
 * Student Name

---- 

We'll be using the [Spark](http://spark.apache.org) framework to process and analyse some data ([Quick overview](http://spark.apache.org/docs/1.6.2/quick-start.html))

The `pyspark` module provide the necessary bindings to the Spark engine.

In [None]:
# dependencies - run this cell first
import sys
import json
import os
import os.path
from pandas import DataFrame
import random
import matplotlib.pyplot as plt
from IPython.display import display
%matplotlib inline

from pyspark import SparkContext, SparkConf
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

## Introduction



We first define the Spark Context:

In [None]:
if not globals().get('sc'):
    sc = SparkContext('local', 'test')

You can have a look to the functions provided by the SparkContext in this notebook using the `help()` function or [online](https://spark.apache.org/docs/1.6.2/api/python/pyspark.html#pyspark.SparkContext)

In [None]:
help(sc)

Once the Spark Context is started, you can have access to the UI to visualize some information:
http://127.0.0.1:4040/ (change the ip to the correct one if needed)

We create a random list of integers as a sample of data to perform some analysis

In [None]:
data = [random.randint(0,10) for i in range(0, 1000)]

We can then create a RDD from this data using the `parallelize` function:

In [None]:
help(sc.parallelize)

`numSlices` indicates the number of partitions into which the data will be split. Each partition represent a subset of the data on which Spark will apply your transformations/processing in parallel.

For instance to create 4 partitions for our data:

In [None]:
my_rdd = sc.parallelize(data, 4)
print 'df type:', type(my_rdd)
print 'Num partitions:', my_rdd.getNumPartitions()

The RDD also has its own set of functions ([online](https://spark.apache.org/docs/1.6.2/api/python/pyspark.html#pyspark.RDD) documentation):

In [None]:
help(my_rdd)

We can simply count the number of objects in the RDD:

In [None]:
print 'Total number of elements:', my_rdd.count()

To filter elements greater than 5, fetch first 10, then count total:

In [None]:
greater_than_5 = my_rdd.filter(lambda x: x>=5)
print '10 first elements greater than 5:', greater_than_5.top(10)
print 'count of elements greater than 5:', greater_than_5.count()

Rather than sorting the RDD with `top()`, we can just `take()` the first elements of the RDD (unsorted):

In [None]:
greater_than_5.take(10)

Let's imagine we want to know the distinct values contained in the RDD, we can use the `distinct()` function:

In [None]:
d_values = greater_than_5.distinct()
d_values.collect()

We can pass a function as a filter rather than a lambda function if the filtering is more complex:

In [None]:
def my_filter(value):
    return value<5

lower_than_5 = my_rdd.filter(my_filter)

print '5 last elements lower than 5:', lower_than_5.top(5)
print 'list of all elements lower than 5:', lower_than_5.collect()

As you can see, `collect()` returns all the elements and data can be stored in a local python variable. 

In case of large data sets, this can be not practical and the result can rather be stored to a file:

In [None]:
if not os.path.exists('lower_than_5'):
    lower_than_5.saveAsTextFile('lower_than_5')
else:
    print 'Directory already exists'

The file is written to the `lower_than_5` directory with one file per partition of the RDD (`part-XXXXX`):

In [None]:
os.listdir('lower_than_5')    

* Each `.crc` file contain a checksum of the associated file. This is for checking integrity (Cyclical Redundancy Check)
* `_SUCCESS` means the operation completed successfully

Transformations can be applied to an RDD do derive new data:

In [None]:
multiply_by_2 = my_rdd.map(lambda x: x*2)

print 'Multiply by 2:', multiply_by_2.take(10)

Let's count the number of occurences of each integer using the `reduceByKey` function:

In [None]:
help(my_rdd.reduceByKey)

The concept is similar to Map/Reduce, for each integer of the RDD we emit a tuple `(int, 1)`, reduceByKey will:
 * group the new RDD by its key (first element of the tuple) 
 * sum up the list of values associated to this key

In [None]:
group_by_value = my_rdd.map(lambda x: (x, 1)).reduceByKey(lambda x,y: x+y)

print 'Group by value:'
for el in group_by_value.collect():
    print '\tvalue %s: count: %s' % el 

External data sets can be loaded, for instance we can load a text file:

In [None]:
lines = sc.textFile('lower_than_5/part-00000')

To get the number of lines:

In [None]:
print 'Number of lines:', lines.count()

## Exercice 1 - Word Count

We are considering in this exercice the list of all unique artist terms (Echo Nest tags) from the [Million Song Data Set](http://labrosa.ee.columbia.edu/millionsong/pages/getting-dataset).

You can download the data set at:

    https://s3-eu-west-1.amazonaws.com/scimus-data/lab4/unique_terms.txt

Using this data set, answer the following questions:

* create a RDD from this file
* generate a new RDD by transforming each line and splitting it to get all the words (use the [`flatMap` function](https://spark.apache.org/docs/1.6.2/api/python/pyspark.html#pyspark.RDD.flatMap))
* count the number of words by emitting a key-value pair `(word, 1)`
* aggregate the count by word
* collect the top 10 by count (look at the [`takeOrdered` function](https://spark.apache.org/docs/1.6.2/api/python/pyspark.html#pyspark.RDD.takeOrdered) of a RDD)
* display a word cloud of the data sets

For the last question (Word Cloud), use the `wordcloud` library (https://github.com/amueller/word_cloud)
* Install it in your python environment if required

        pip install --user wordcloud

* Generate the cloud using the method `fit_words`
* Display the image (see http://amueller.github.io/word_cloud/auto_examples/simple.html as an example)
        
        plt.imshow(wordcloud)
        plt.axis("off")
        plt.show()
    

## Exercice 2 - Data exploration 1
 
This data set contains the playlist of about 1K lastFM users.

See http://www.dtic.upf.edu/~ocelma/MusicRecommendationDataset/lastfm-1K.html

You can download the specific file at:
    
    https://s3-eu-west-1.amazonaws.com/scimus-data/lab4/lastfm-dataset-1K/userid-timestamp-artid-artname-traid-traname.tsv.gz
    
For instance

    curl -O https://s3-eu-west-1.amazonaws.com/scimus-data/lab4/lastfm-dataset-1K/userid-timestamp-artid-artname-traid-traname.tsv.gz
    gunzip userid-timestamp-artid-artname-traid-traname.tsv.gz
    

To simplify the testing/validation, run first your code on a smaller subset:

    https://s3-eu-west-1.amazonaws.com/scimus-data/lab4/lastfm-dataset-1K/userid-timestamp-artid-artname-traid-traname-1000.tsv

Once you are confident, you can run the code on the full data set.

The format of this file is a TSV (Tab Separated Values) with the following structure:

    userid \t timestamp \t musicbrainz-artist-id \t artist-name \t musicbrainz-track-id \t track-name
    
As artist identifiers and track identifiers, you will use the `artist-name` field and the `track-name` fields (the `musicbrain-*` fields may be not provided for all rows).    

For this exercice, you will:

* Load the lastfm data set into Spark
* Compute the total number of plays by `artist-name` and display the top 10.
* Compute the total number of plays by `userid` and display the top 10.
* Compute the top 100 tracks being played. Hint: Several artists may have created a track of the same name, they are however different tracks and should be counted independently from each others ... 
* Create a RDD containing the words in the tracks names to answer the following questions:
  * How many distinct words in total?
  * What is the top 10 of most frequent words?
  * Filter out preposition, articles, ... and compute the new top 10
  * Display a Word Cloud of all the words

In [None]:
# Your code here

## Exercice 3 - Collaborative filtering

### Introduction
https://en.wikipedia.org/wiki/Collaborative_filtering

> Collaborative filtering (CF) is a technique used by some recommender systems [...]

> [...] collaborative filtering is a method of making automatic predictions (filtering) about the interests of a user by collecting preferences or taste information from many users (collaborating). The underlying assumption of the collaborative filtering approach is that if a person A has the same opinion as a person B on an issue, A is more likely to have B's opinion on a different issue x than to have the opinion on x of a person chosen randomly. For example, a collaborative filtering recommendation system for television tastes could make predictions about which television show a user should like given a partial list of that user's tastes (likes or dislikes). Note that these predictions are specific to the user, but use information gleaned from many users. This differs from the simpler approach of giving an average (non-specific) score for each item of interest, for example based on its number of votes.

For this exercice we'll be using the [Last.fm Dataset - 360K users](http://www.dtic.upf.edu/~ocelma/MusicRecommendationDataset/lastfm-360K.html) and build a model to recommend new artists to users based on their listening activities. 

A full data set and a reduced one (for testing) is available at:

    https://s3-eu-west-1.amazonaws.com/scimus-data/lab4/lastfm-dataset-360K/usersha1-artmbid-artname-plays.tsv.1000
    https://s3-eu-west-1.amazonaws.com/scimus-data/lab4/lastfm-dataset-360K/usersha1-artmbid-artname-plays.tsv.gz
    
These are TSV files with the following format:

    user-mboxsha1 \t musicbrainz-artist-id \t artist-name \t plays

The Spark mlib library implements one commonly used algorithm for collaborative filtering analysis based on the Alternating Least Square method (ALS). See http://spark.apache.org/docs/1.6.2/mllib-collaborative-filtering.html for example and details.

### Data preparation

To apply the ALS technique on the data set, we first need to convert all values to numerical fields to build a [`Rating` object](https://spark.apache.org/docs/1.6.3/api/python/pyspark.mllib.html?highlight=rating#pyspark.mllib.recommendation.Rating).

By consequence, we need to assign unique integer values to users and artists (they currently are respectively SHA-1 and UUIDs - strings -)

First, load the file into a RDD using the `textFile` method

In [None]:
# your code
source = ...

We go through the data set, list all artists and assign them a unique ID.

For this:
 * We write a function that given a line of the file, split it and return the tuple `(musicbrainz-artist-id, artist-name)`

In [None]:
def get_artist(line):
    data = line.split('\t')
    return (data[1], data[2])

We can then run this function through the data set and get the list of unique artists using the `distinct()` function:

In [None]:
artists = source.map(get_artist).distinct()
print 'Number of artists:', artists.count()

We'll need later on to get the unique id for the tuple `(musicbrainz-artist-id, artist-name)`, so we build a quick index with the [`zipWithUniqueId` function](https://spark.apache.org/docs/1.6.3/api/python/pyspark.html?highlight=zipwithuniqueid#pyspark.RDD.zipWithUniqueId) and use the Spark `broadcat` feature to make it available to all the Spark workers.

In [None]:
# create a unique ID for each artist
artists_with_unique_id = artists.zipWithUniqueId().collect()
artists_with_unique_id[0:2]

In [None]:
# build an index to map artist to its id which can be used by Spark
artists_index = sc.broadcast(dict(zip([a[0] for a in artists_with_unique_id], [a[1] for a in artists_with_unique_id])))

# artists_index is a Broadcast object
print type(artists_index)
# which .value property is a python dictionary
print type(artists_index.value)
# the dictionary contains the mapping artits --> unique integer id
print artists_index.value.items()[0:10]

Now it's your turn to do the same for users:
* write a function to get the `userid` from a line of the source RDD
* create a RDD containing all unique users
* create a unique ID for each user using the previous technique
* create a spark broadcast variable to distribute it to the Spark workers

In [None]:
# Your code

We now have to parse the data set and create a new `Rating` object for each data point, this will be 
```
Rating(user_id, artist_id, listens)
```
where:
* `user_id` is the integer id we have assigned
* `artist_id` is the integer id of the artist we have assigned
* `listens` is the number of listens for this user and artist

In order to generate the Ratings:
* Write a function that will parse each line of the source and build the above `Rating` object.
* Apply the function to the data set
* Display the first 5 ratings of the data set

In [None]:
# Your code

### Data splitting

We need to split the data set in 2 different ones:
 * training
 * test

To split the data set, we'll be removing random entries in the data set by sampling it. This can be done using the `randomSplit()` function.

In [None]:
# your code

### Modelling
We can now run the recommender on this data set

In [None]:
# Build the recommendation model using Alternating Least Squares with implicit ratings
rank = 10
numIterations = 10
alpha = 0.01
lambda_ = 0.01

model = ALS.trainImplicit(training, rank, numIterations, alpha=alpha, lambda_=lambda_, nonnegative=True)

### Prediction

The generated model can be saved for later use:

In [None]:
model.save(sc, "myCollaborativeFilter")

... and then loaded when needed

In [None]:
recommender = MatrixFactorizationModel.load(sc, "myCollaborativeFilter")

The `recommender` object can now be used to perform some recomendations.

* Pick a random user in the training set and predict the 10 most likely artists he will be interested in (`recommendProducts()`)

In [None]:
# Your code

* For some users in the test set, compute the list of unique artists in the data set for this user and compare against the recommendations for that user

In [None]:
# your code

* Pick a random artist and predict the 10 most likely users that will be interested by this artists

In [None]:
# your code

### Evaluation

We have arbitrary chosen some factors for training the model (*lambda*, *rank*, *iterations*, ...).

In practice, the recommender has to be evaluated on a data set and tuned according to an evaluation metric.

For simplicity we'll evaluate the model on whether or not the first top 10 recommendations are in the users playlist.

* For each users, write a function that counts the number of common artists between the user's listen patterns and the recommendations
* Compute the average number of common elements on the test data set
* Investigate the impact of different parameters on the average value

In [None]:
# your code here