---------------------------
# Spark Lab Session - Exercices with Spark
---------------------------
*Thomas KOCH*

## 1. Opening Spark

In [1]:
import pyspark
import random
# import os ?

sc = pyspark.SparkContext(appName="Spark Lab Session")

## 2. First steps with Spark

### 2.1 First RDD

In [2]:
l = list(range(3000))
rddl = sc.parallelize(l)
rddl.take(20)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

### 2.2 Computing the sum of cubes

In [3]:
rddc = rddl.map(lambda x : x*x*x)
rddc.take(10)

[0, 1, 8, 27, 64, 125, 216, 343, 512, 729]

In [4]:
rddc.sum()

20236502250000

Remarque : on pouvait aussi faire un reduce avant de sommer, mais cela ne servait à rien ici (aspect pédagogique mis à part).

In [5]:
rddc.reduce(lambda x, y : x+y) # ce reduce ne fait rien ici
rddc.take(10)

[0, 1, 8, 27, 64, 125, 216, 343, 512, 729]

### 2.3 Last digits of elements in C

In [6]:
# Première étape : calculer la liste contenant
# les derniers chiffres
lastDigits = rddc.map(lambda x: x%10)
lastDigits.take(10)

[0, 1, 8, 7, 4, 5, 6, 3, 2, 9]

In [7]:
# Deuxième étape : compter combien de fois chaque
# items apparait 
countLastDigits = lastDigits.map(lambda x:(x,1)).reduceByKey(lambda x,y: x+y)
countLastDigits.take(20)

[(0, 300),
 (8, 300),
 (4, 300),
 (1, 300),
 (5, 300),
 (9, 300),
 (6, 300),
 (2, 300),
 (7, 300),
 (3, 300)]

In [8]:
# On peut aussi directement faire 
rddc.map(lambda x:(x%10,1)).reduceByKey(lambda x, y : x+y).collect()

[(0, 300),
 (8, 300),
 (4, 300),
 (1, 300),
 (5, 300),
 (9, 300),
 (6, 300),
 (2, 300),
 (7, 300),
 (3, 300)]

### 2.4 Digits of C

In [9]:
rddc.flatMap(lambda x : [(e,1) for e in str(x)]).reduceByKey(lambda x, y : x+y).collect()

[('4', 2762),
 ('7', 2787),
 ('6', 2713),
 ('3', 2814),
 ('0', 3127),
 ('1', 3667),
 ('8', 2639),
 ('9', 2521),
 ('2', 3294),
 ('5', 2653)]

En décomposant :

In [10]:
rddc.flatMap(lambda x : [(e,1) for e in str(x)]).collect()

[('0', 1),
 ('1', 1),
 ('8', 1),
 ('2', 1),
 ('7', 1),
 ('6', 1),
 ('4', 1),
 ('1', 1),
 ('2', 1),
 ('5', 1),
 ('2', 1),
 ('1', 1),
 ('6', 1),
 ('3', 1),
 ('4', 1),
 ('3', 1),
 ('5', 1),
 ('1', 1),
 ('2', 1),
 ('7', 1),
 ('2', 1),
 ('9', 1),
 ('1', 1),
 ('0', 1),
 ('0', 1),
 ('0', 1),
 ('1', 1),
 ('3', 1),
 ('3', 1),
 ('1', 1),
 ('1', 1),
 ('7', 1),
 ('2', 1),
 ('8', 1),
 ('2', 1),
 ('1', 1),
 ('9', 1),
 ('7', 1),
 ('2', 1),
 ('7', 1),
 ('4', 1),
 ('4', 1),
 ('3', 1),
 ('3', 1),
 ('7', 1),
 ('5', 1),
 ('4', 1),
 ('0', 1),
 ('9', 1),
 ('6', 1),
 ('4', 1),
 ('9', 1),
 ('1', 1),
 ('3', 1),
 ('5', 1),
 ('8', 1),
 ('3', 1),
 ('2', 1),
 ('6', 1),
 ('8', 1),
 ('5', 1),
 ('9', 1),
 ('8', 1),
 ('0', 1),
 ('0', 1),
 ('0', 1),
 ('9', 1),
 ('2', 1),
 ('6', 1),
 ('1', 1),
 ('1', 1),
 ('0', 1),
 ('6', 1),
 ('4', 1),
 ('8', 1),
 ('1', 1),
 ('2', 1),
 ('1', 1),
 ('6', 1),
 ('7', 1),
 ('1', 1),
 ('3', 1),
 ('8', 1),
 ('2', 1),
 ('4', 1),
 ('1', 1),
 ('5', 1),
 ('6', 1),
 ('2', 1),
 ('5', 1),
 ('1', 1),

## 3. Approximating $\pi$

To compute the value of $\pi$, you will generate the list of all pairs $(x,y)$ of integers from 0 to K. Then you will compute the number of such pairs such that $(2x+1)^2+(2y+1)^2$ is less then $(2*K)^2$. The ratio between the number of such pairs and the number of total pairs is an approximation of $\pi$. For $K=3000$ you should obtain a value close to 3.14159.

### 3.1 Step 1 : computing set of pairs


### 3.2 Step 2 : counting the pairs

### 3.3 Step 3 : counting the approximaton

In [11]:
K=3000
intUpToK = sc.parallelize(range(K))
pairs = intUpToK.cartesian(intUpToK)
nbTotal = pairs.count()

def isOk(v):
    x,y = v
    return (2*x+1)**2+(2*y+1)**2 <= 4*K*K

nbOk = pairs.filter(isOk).count()
print(4*float(nbOk)/nbTotal)
print(nbOk)

3.1415933333333332
7068585


## 4. Using the Movie Lens dataset
### 4.1 Getting the dataset
The dataset is in the ***ml-latest-small*** folder.
### 4.2 Getting the dataset into an RDD
To read the dataset we can use the following code :

In [40]:
import re

path_data = "/home/p5hngk/Downloads/GitHub/SD_701---Data_Mining/ml-latest-small"

In [44]:
ratingsFile = sc.textFile(path_data+"/ratings.csv")
moviesFile = sc.textFile(path_data+"/movies.csv")
moviesFile.take(5)

['movieId,title,genres',
 '1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy',
 '2,Jumanji (1995),Adventure|Children|Fantasy',
 '3,Grumpier Old Men (1995),Comedy|Romance',
 '4,Waiting to Exhale (1995),Comedy|Drama|Romance']

In [45]:
ratingsFile.take(5)

['userId,movieId,rating,timestamp',
 '1,1,4.0,964982703',
 '1,3,4.0,964981247',
 '1,6,4.0,964982224',
 '1,47,5.0,964983815']

### 4.3 Cleaning data
We're going to clean the data with the `parseCSV` function that we can define like this :

In [51]:
future_pattern = re.compile("""([^,"]+|"[^"]+")(?=,|$)""")

def parseCSV(line): 
     return future_pattern.findall(line)

So we can now use this function to clean our datas :

In [52]:
ratings = ratingsFile.map(parseCSV).filter(lambda x: x[0]!="userId") #on enlève la première ligne pour faciliter les calculs
ratings.take(5)

[['1', '1', '4.0', '964982703'],
 ['1', '3', '4.0', '964981247'],
 ['1', '6', '4.0', '964982224'],
 ['1', '47', '5.0', '964983815'],
 ['1', '50', '5.0', '964982931']]

In [55]:
movies = moviesFile.map(parseCSV).filter(lambda x : x[0]!="movieId") #on enlève la première ligne
movies.take(5)

[['1', 'Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy'],
 ['2', 'Jumanji (1995)', 'Adventure|Children|Fantasy'],
 ['3', 'Grumpier Old Men (1995)', 'Comedy|Romance'],
 ['4', 'Waiting to Exhale (1995)', 'Comedy|Drama|Romance'],
 ['5', 'Father of the Bride Part II (1995)', 'Comedy']]

### 4.4 Compute the 10 best rated movies of all times
We want now to compute the 10 movies that have the best average rating by using `sortBy` and `take`.                      

We will first compute the average in the simplest way, that is : 
<center>
$\frac{sum(ratings)}{numberOfRatings}$
</center>

In [86]:
ratedMovie = ratings.map(lambda x : (x[1],(float(x[2]),1))).reduceByKey(lambda x,y : (x[0]+y[0],x[1]+y[1]))
ratedMovie.take(5)

[('1', (843.0, 215)),
 ('50', (864.5, 204)),
 ('70', (193.0, 55)),
 ('110', (955.5, 237)),
 ('157', (31.5, 11))]

In [101]:
ratedMovie1 = ratedMovie.mapValues(lambda x: x[0]/x[1]).sortBy(lambda x : x[1], ascending=False)
ratedMovie1.take(10)

[('6835', 5.0),
 ('1151', 5.0),
 ('1631', 5.0),
 ('102217', 5.0),
 ('27523', 5.0),
 ('53', 5.0),
 ('1140', 5.0),
 ('8238', 5.0),
 ('47736', 5.0),
 ('53355', 5.0)]

So we have here the top 10 ***movieId*** with the ***average rating***. Let's try to give the ***title*** for each ***movieId***.

### 4.5 Ordered list of movies with names

In [98]:
movieJoin = movies.map(lambda x: (x[0], x[1]))
movieJoin.take(5)

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)'),
 ('4', 'Waiting to Exhale (1995)'),
 ('5', 'Father of the Bride Part II (1995)')]

In [91]:
movieWithAvg1 = movieJoin.join(ratedMovie1)
movieWithAvg1.take(5)

[('4', ('Waiting to Exhale (1995)', 2.357142857142857)),
 ('10', ('GoldenEye (1995)', 3.496212121212121)),
 ('12', ('Dracula: Dead and Loving It (1995)', 2.4210526315789473)),
 ('16', ('Casino (1995)', 3.926829268292683)),
 ('20', ('Money Train (1995)', 2.5))]

In [92]:
movieWithAvg1 = movieWithAvg1.map(lambda x : x[1])
movieWithAvg1.take(5)

[('Waiting to Exhale (1995)', 2.357142857142857),
 ('GoldenEye (1995)', 3.496212121212121),
 ('Dracula: Dead and Loving It (1995)', 2.4210526315789473),
 ('Casino (1995)', 3.926829268292683),
 ('Money Train (1995)', 2.5)]

In [93]:
top10Movies1 = movieWithAvg1.sortBy(lambda x : x[1], ascending=False).take(10)
top10Movies1

[('Lamerica (1994)', 5.0),
 ('What Happened Was... (1994)', 5.0),
 ('Denise Calls Up (1995)', 5.0),
 ('Lesson Faust (1994)', 5.0),
 ('"Sandpiper, The (1965)"', 5.0),
 ('My Man Godfrey (1957)', 5.0),
 ('Black Tar Heroin: The Dark End of the Street (2000)', 5.0),
 ('Slumber Party Massacre II (1987)', 5.0),
 ('Moscow Does Not Believe in Tears (Moskva slezam ne verit) (1979)', 5.0),
 ('Cherish (2002)', 5.0)]

We can here understand that many movies have a five stars average. So **the rule we have used to compute de average rating is not so good**. 

### 4.6 Better ordered list
Let's try to compute it in an other way : 
<center>
$\frac{sum(ratings)}{1+numberOfRatings}$
</center>

In this way, we decided to put one 0 vote to every movies.

In [99]:
ratedMovie2 = ratedMovie.mapValues(lambda x: (x[0])/(1+x[1]))

movieWithAvg2 = movieJoin.join(ratedMovie2).map(lambda x: x[1])

top10movies2 = movieWithAvg2.sortBy(lambda x: x[1], ascending=False).take(10)
top10movies2

[('"Shawshank Redemption, The (1994)"', 4.415094339622642),
 ('"Godfather, The (1972)"', 4.266839378238342),
 ('"Streetcar Named Desire, A (1951)"', 4.261904761904762),
 ('Fight Club (1999)', 4.2534246575342465),
 ('"Godfather: Part II, The (1974)"', 4.226923076923077),
 ('Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1964)',
  4.224489795918367),
 ('"Three Billboards Outside Ebbing, Missouri (2017)"', 4.222222222222222),
 ('"Usual Suspects, The (1995)"', 4.217073170731707),
 ('Goodfellas (1990)', 4.216535433070866),
 ('Star Wars: Episode IV - A New Hope (1977)', 4.214285714285714)]

The second rule is a little bit better, but we can improve the model by using a new rule. The idea is to put a weight for movies that have a great number of rating compared to those wo haven't many ratings : 
<center>
$\frac{sum(ratings)}{1+numberOfRatings}.log(numberOfRatings)$
</center>

In [100]:
import math

ratedMovie3 = ratedMovie.mapValues(lambda x: (x[0]/x[1])*math.log(x[1]))

movieWithAvg3 = movieJoin.join(ratedMovie3).map(lambda x: x[1])

top10movies3 = movieWithAvg3.sortBy(lambda x: x[1], ascending=False).take(10)
top10movies3

[('"Shawshank Redemption, The (1994)"', 25.506303124680446),
 ('Forrest Gump (1994)', 24.135559630846686),
 ('Pulp Fiction (1994)', 24.035971735394476),
 ('"Matrix, The (1999)"', 23.593497870526754),
 ('"Silence of the Lambs, The (1991)"', 23.43310709209536),
 ('Star Wars: Episode IV - A New Hope (1977)', 23.37860964684444),
 ('Fight Club (1999)', 23.007601610036865),
 ("Schindler's List (1993)", 22.788076383338726),
 ('Star Wars: Episode V - The Empire Strikes Back (1980)', 22.561506207236786),
 ('"Godfather, The (1972)"', 22.549726244087907)]

## 5. Movie recommendation

Our goal here is to make recommandation for **userId number 1**.  

To start with movie recommendation we're going to compute the set of movies that user 1 rated.

In [None]:
moviesUser1 = 