# Implementation of PLSI algorithm using Spark 

In [2]:
import pandas as pd
import time
import numpy as np
import matplotlib.pyplot as plt
import findspark
findspark.init("/Users/gaelleguillou/Desktop/Database management/spark-2.2.3-bin-hadoop2.7")
import pyspark
from numpy import random
import math

In [3]:
from pyspark import sql

## 1. Prepare the environment

### Import the data

We will implement the PLSI algorithm on the Movielens dataset. To simplify our task, we will start by implementing the PLSI algorithm on a reduced version of the Movielens dataset ("ratings_short"), which contains 100 836 observations. 

In [4]:
ratings_data = pd.read_csv("ratings.csv")
ratings_data.head()

Unnamed: 0,user_id,user_username,movie_id,rating
0,2,William,1768,1
1,3,James,615,3
2,7,Joseph,82,3
3,7,Joseph,532,3
4,8,Thomas,698,3


**Description of the dataset :** 
- userId, to characterize the users 
- movieId, to characterize the movies
- rating : the rating of the user to the corresponding movie. Ratings are going from 1 to 5 but here we will only use the seen / not seen information to provide movie recommendations.  
- timestamp : we will not be using this column

More information about the movies are available in the "movies.csv" dataset : the movieId gives us access to the corresponding movie information such as the title and the genres. 

In [5]:
movies = pd.read_csv("movies.csv")
movies.head()

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy


In [21]:
movies.shape

(9742, 3)

### Create a Spark environment

!sudo hostname

In [6]:
sc = pyspark.SparkContext().getOrCreate()

To be able to implement the PLSI algorithm in Spark, we will need to transform the dataset into an RDD, and then perform pyspark operations on it. 

In [7]:
rdd = sc.textFile("ratings.csv")

#Remove header line 
header = rdd.first()
rdd = rdd.filter(lambda x: x != header)

rdd.collect()[0:10]

['2,William,1768,1',
 '3,James,615,3',
 '7,Joseph,82,3',
 '7,Joseph,532,3',
 '8,Thomas,698,3',
 '10,Robert,1693,3',
 '11,Edward,615,1',
 '18,David,1,3',
 '18,David,28,3',
 '18,David,1596,5']

In [8]:
rdd.count()

295

## 2. Probabilistic Latent Semantic Indexing algorithm (PLSI)

The PLSI algorithm that we will implement here is based on Das description of the Google News recommendation system. The algorithm is based on the following model : 
- u (users) and s (movies) are random variables 
- The relationship between users and movies is learned by modeling the joint distribution of users and items as a mixture distribution 
- To capture this relationship, we introduce a hidden variable z (latent variable), that kind of represents user communities (same preferences) and movie communities (sames genres). 

All in all, we try to compute the following probability for each (user, movie) couple : p(s|u) = sum(p(s|z)p(z|u)), which is the probability for a given user to see a given movie. This is obtained by summing for each community the probability for a movie s to be seen given a community z times the probability to be in the community z given a user u. 

### Going through the algorithm : main steps

**INITIALISATION**

**E-STEP - Compute q( z | (u,s) ) : the probability that the (user, movie) couple belongs to the class z**
This step is first initialized at random :
- To each couple (u,s), assign each possible community 
- Ex with number of classes = 2 : the lines (Marie, Star Wars) and (Gaëlle, Matrix) will give (Marie, Star Wars, 1), (Marie, Star Wars, 2), (Gaëlle, Matrix, 1), (Gaëlle, Matrix, 2)
- To each line, assign a random probability. This random probability corresponds to q*( z | (u,s) ). For example if I have (Marie, Star Wars, 1, 0.3), then the probability that the couple (Marie, Star Wars) is in class 1 is 0.3. 

LogLik = 0

**ITERATION**

**M-STEP - Compute p(s|z) and p(z|u) based on q( z | (u,s) )**
- Compute p(s | z) :  sum the probas associated to every couple (s,z) and divide it by the sum of probas associated to this z
- Compute p(z | u) : sum the probas associated to every couple (u,z) and divide by the sum of probas associated to this u

**E-STEP - Compute new q( z | (u,s) ) = p(s|z)p(z|u) / ∑p(s|z)p(z|u)**
- For each (u,s,z), compute p(s | z) * p(z | u)
- For each (u,s), compute ∑ p(s | z)* p(z | u) (summing over z)     ***(this corresponds to p(s|u))***
- For each (u,s,z), compute p(s|z)p(z|u) / ∑p(s|z)p(z|u)             ***(this corresponds to the new q( z | (u,s) )***
	    
**Update LogLik** = sum( log( ∑ p(s | z) * p(z | u))) = sum( log (p(s | u))

**Iterate again until LogLik converges** : this means that it has reached its maximum and we have found the best estimation of p(z | u) and p(s | z).
	    
**We can now predict the probability that Gaëlle will watch Star Wars** :
p(Star Wars | Gaëlle) = p( 1 | Gaëlle) * p(Star Wars |1) + p(2 | Gaëlle) * p(Star Wars | 2)

### Implementing the algorithm 

Keep only (user, movie) information : 

In [9]:
rdd = rdd.map(lambda line : line.split(',')).map(lambda line : line[0] + ',' + line[2])

#### Initialisation of q : (first E-Step)

To each couple (u,s), assign each possible community z : 

In [10]:
nb_z = 3 #number of classes
classes = sc.parallelize(range(nb_z))
classes.collect()
rdd = rdd.cartesian(classes)
rdd = rdd.distinct()

In [11]:
ordered_rdd = rdd.map(lambda x: (x[0].split(','), x[1])).sortBy(lambda x : (x[0][0], x[0][1], x[1]))
ordered_rdd.count()

882

To each line, assign a random probability :

In [12]:
proba0 = np.random.rand(int(ordered_rdd.count()/nb_z), nb_z)
random_p = (proba0 / np.reshape(proba0.sum(1), (int(ordered_rdd.count()/nb_z), 1))).flatten()
random_p = list(random_p)

In [13]:
q = ordered_rdd.map(lambda x : (x, random_p.pop(0))) 

In [14]:
q.count()

882

#### One iteration step 

M-STEP - Compute p(s|z) and p(z|u) based on q( z | (u,s) )

Compute p(s | z) : sum the probas associated to every couple (s,z) and divide it by the sum of probas associated to this z

In [15]:
SZ_probas = q.map(lambda x: ((x[0][0][1], x[0][1]), x[1]))

In [16]:
Nsz = SZ_probas.reduceByKey(lambda x,y: x + y)

In [17]:
Z_probas = q.map(lambda x: (x[0][1], x[1]))

In [18]:
Nz = Z_probas.reduceByKey(lambda x,y: x+y)

In [19]:
Nsz = Nsz.map(lambda x : (x[0][1], (x[0][0], x[1])))

In [20]:
Psz = Nsz.join(Nz)

In [21]:
Psz = Psz.map(lambda x : ((x[1][0][0], x[0]), x[1][0][1] / x[1][1]))

Compute p(z | u) : sum the probas associated to every couple (u,z) and divide by the sum of probas associated to this u

In [22]:
ZU_probas = q.map(lambda x: ((x[0][0][0], x[0][1]), x[1]))

In [23]:
Nzu = ZU_probas.reduceByKey(lambda x,y: x + y)

In [24]:
U_probas = q.map(lambda x: (x[0][0][0], x[1]))

In [25]:
Nu = U_probas.reduceByKey(lambda x,y: x+y)

In [26]:
Nzu = Nzu.map(lambda x : (x[0][0], (x[0][1], x[1])))

In [27]:
Pzu = Nzu.join(Nu)

In [28]:
Pzu = Pzu.map(lambda x : ((x[1][0][0], x[0]), x[1][0][1] / x[1][1]))

E-STEP - Compute new q( z | (u,s) ) = p(s|z)p(z|u) / ∑p(s|z)p(z|u)

For each (u,s,z), compute p(s | z) * p(z | u)

In [29]:
q_int = q.map(lambda x : ((x[0][1], x[0][0][0]), (x[0][0][1], x[0][1])))

In [30]:
q_int2 = q_int.join(Pzu)

In [31]:
q_int3 = q_int2.map(lambda x : (x[1][0], (x[0], x[1][1])))

In [32]:
PzuPsz = q_int3.join(Psz)

In [33]:
PzuPsz = PzuPsz.map(lambda x: ((x[1][0][0][1], x[0][0]), (x[0][1], x[1][0][1]*x[1][1])))

For each (u,s), compute ∑ p(s | z)* p(z | u) (summing over z) (this corresponds to p(s|u))

In [34]:
SumPzuPsz = PzuPsz.map(lambda x : (x[0], x[1][1])).reduceByKey(lambda x,y : x+y)

For each (u,s,z), compute p(s|z)p(z|u) / ∑p(s|z)p(z|u) (this corresponds to the new q( z | (u,s) )

In [35]:
q1 = PzuPsz.join(SumPzuPsz)

In [36]:
q1 = q1.map(lambda x : ((x[0], x[1][0][0]), x[1][0][1]/x[1][1]))

Update LogLik = sum( log( ∑ p(s | z) * p(z | u))) = sum( log (p(s | u))

In [37]:
log = SumPzuPsz.map(lambda x : np.log(x[1]))
LogLik = log.reduce(lambda x,y : x+y)

End of the iteration step 

### Full Algorithm 

In [38]:
#Open rdd

rdd = sc.textFile("ratings.csv")

#Remove header line 
header = rdd.first()
rdd = rdd.filter(lambda x: x != header)

#Keep only (user, movie) information
rdd = rdd.map(lambda line : line.split(',')).map(lambda line : line[0] + ',' + line[2])

#Initialize number of classes and number of iteratins
nb_z = 3 
nb_iterations = 5

#Compute the cartesian product of the (user, movie) couples with the 3 classes
classes = sc.parallelize(range(nb_z))
classes.collect()
rdd = rdd.cartesian(classes)
rdd = rdd.distinct()

## Initialize q0 ##

#Order rdd by user, movie, class
ordered_rdd = rdd.map(lambda x: (x[0].split(','), x[1])).sortBy(lambda x : (x[0][0], x[0][1], x[1])) 

#Create a vector of probabilities that sum to 1 every three probas
proba0 = np.random.rand(int(ordered_rdd.count()/nb_z), nb_z)
random_p = list((proba0 / np.reshape(proba0.sum(1), (int(ordered_rdd.count()/nb_z), 1))).flatten())

#Assign a probability to each triplet (user, movie, class)
q = ordered_rdd.map(lambda x : (x, random_p.pop(0))) 

#Create an empty list to keep track of the LogLikelihood
LogLik = []

###### Run the EM algorithm on nb_iterations #####

for i in range(nb_iterations) : 
    
    start = time.time()
    
    #### M-STEP - Compute p(s|z) and p(z|u) based on q( z | (u,s) ) ####
    
    ## Compute p(s | z) : sum the probas associated to every couple (s,z) and divide it by the sum of probas associated to this z ##
    
    #Keep the probabilities of all the (movie, class) couples 
    SZ_probas = q.map(lambda x: ((x[0][0][1], x[0][1]), x[1]))
    
    #Sum the probabilities for the same (movie, class) couples
    Nsz = SZ_probas.reduceByKey(lambda x,y: x + y)
    
    #Keep the probabilities associated to each class in the rdd and sum the probabilities by class
    Z_probas = q.map(lambda x: (x[0][1], x[1]))
    Nz = Z_probas.reduceByKey(lambda x,y: x+y)
    
    #Divide the probability of the (movie, class) couple by the probability of the class
    Nsz = Nsz.map(lambda x : (x[0][1], (x[0][0], x[1])))
    Psz = Nsz.join(Nz)
    Psz = Psz.map(lambda x : ((x[1][0][0], x[0]), x[1][0][1] / x[1][1])) #This gives us p(s | u)
    
    ## Compute p(z | u) : sum the probas associated to every couple (u,z) and divide by the sum of probas associated to this u ##
    
    #Same idea : Keep the probabilities of all the (class, user) couples and sum them by couple
    ZU_probas = q.map(lambda x: ((x[0][0][0], x[0][1]), x[1]))
    Nzu = ZU_probas.reduceByKey(lambda x,y: x + y)
    
    # Keep the probabilities associated to each user in the rdd and sum the probabilities by user
    U_probas = q.map(lambda x: (x[0][0][0], x[1]))
    Nu = U_probas.reduceByKey(lambda x,y: x+y)
    
    #Divide the probability of the (class, user) couple by the probability of the user
    Nzu = Nzu.map(lambda x : (x[0][0], (x[0][1], x[1])))
    Pzu = Nzu.join(Nu)
    Pzu = Pzu.map(lambda x : ((x[1][0][0], x[0]), x[1][0][1] / x[1][1])) #This gives us p(u | z)
    
    ### E-STEP - Compute new q( z | (u,s) ) = p(s|z)p(z|u) / ∑p(s|z)p(z|u) ###
    
    ## For each (u,s,z), compute p(s | z) * p(z | u) ##
    
    #Here we want to join Pzu and Psz : to each triplet (u,s,z), we want to associate p(z|u) and p(s|z) (computed above)
    #We create couples (z,u) and (s,z) for each triplet (u,s,z) and change their places to make the join with Pzu and Psz possible
    
    q_int = q.map(lambda x : ((x[0][1], x[0][0][0]), (x[0][0][1], x[0][1])))
    q_int2 = q_int.join(Pzu)
    q_int3 = q_int2.map(lambda x : (x[1][0], (x[0], x[1][1])))
    PzuPsz = q_int3.join(Psz)
    
    #We now multiply p(z|u) and p(s|z) to obtain p(s|u)
    PzuPsz = PzuPsz.map(lambda x: ((x[1][0][0][1], x[0][0]), (x[0][1], x[1][0][1]*x[1][1])))
    
    ## For each (u,s), we compute ∑ p(s | z)* p(z | u) (summing over z) (this corresponds to p(s|u)) ##
    SumPzuPsz = PzuPsz.map(lambda x : (x[0], x[1][1])).reduceByKey(lambda x,y : x+y)
    
    #Update LogLikelihood
    log = SumPzuPsz.map(lambda x : np.log(x[1]))
    N = SumPzuPsz.count()
    L = log.reduce(lambda x,y : x+y)
    print(L/N)
    LogLik.append(L/N)
    
    #For each (u,s,z), compute p(s|z)p(z|u) / ∑p(s|z)p(z|u) (this corresponds to the new q( z | (u,s) )
    q = PzuPsz.join(SumPzuPsz)
    q = q.map(lambda x : ((x[0], x[1][0][0]), x[1][0][1]/x[1][1]))    
    
    #Persist q
    q = q.persist()
    
    end = time.time()

    print("Iteration "+str(i)+" completed in "+str(end-start))

-3.7013337967183704
Iteration 0 completed in 5.745774745941162
-3.636919997518941
Iteration 1 completed in 34.87203788757324
-3.518349313395475
Iteration 2 completed in 249.09931588172913
-3.3443403214993337
Iteration 3 completed in 6572.328587055206


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 15354 in stage 301.0 failed 1 times, most recent failure: Lost task 15354.0 in stage 301.0 (TID 300096, localhost, executor driver): java.io.IOException: No space left on device
	at java.io.FileOutputStream.writeBytes(Native Method)
	at java.io.FileOutputStream.write(FileOutputStream.java:326)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at java.io.DataOutputStream.writeLong(DataOutputStream.java:224)
	at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply$mcVJ$sp(IndexShuffleBlockResolver.scala:151)
	at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:149)
	at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:149)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofLong.foreach(ArrayOps.scala:246)
	at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply$mcV$sp(IndexShuffleBlockResolver.scala:149)
	at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145)
	at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1340)
	at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:153)
	at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:224)
	at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:169)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:344)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1575)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1563)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1562)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1562)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:803)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:803)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:803)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1790)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1745)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1734)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:619)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:944)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:943)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:511)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor55.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: No space left on device
	at java.io.FileOutputStream.writeBytes(Native Method)
	at java.io.FileOutputStream.write(FileOutputStream.java:326)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at java.io.DataOutputStream.writeLong(DataOutputStream.java:224)
	at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply$mcVJ$sp(IndexShuffleBlockResolver.scala:151)
	at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:149)
	at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:149)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofLong.foreach(ArrayOps.scala:246)
	at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply$mcV$sp(IndexShuffleBlockResolver.scala:149)
	at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145)
	at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1340)
	at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:153)
	at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:224)
	at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:169)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:344)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


### Plot the LogLikelihood

In [None]:
fig = plt.figure()
fig.suptitle('Log-Likelihood', fontsize=20, fontweight='bold')
ax = fig.add_subplot(111)
ax.set_xlabel('Number of iterations')
ax.set_ylabel('Log-Likelihood')
ax.plot(LogLik)
plt.show()

## 3. Build the Recommendation System

In [None]:
rdd = sc.textFile("ratings.csv")

#Remove header line 
header = rdd.first()
rdd = rdd.filter(lambda x: x != header)
rdd = rdd.map(lambda line : line.split(','))
rdd = rdd.map(lambda line : (line[0], line[2]))

Let us build an rdd with every possible combination of (user, movie) so that we can compute the probability that a user has been or will be watching the movie. In order to do so, we will use the hidden variables Z (the classes) that will help us in the computation of the probability. Thus, we need to build an rdd with every possible combination of ((user, movie), class) : 

In [None]:
users = rdd.map(lambda x : x[0])
movies = rdd.map(lambda x : x[1])
classes = sc.parallelize(range(nb_z))

In [None]:
data = users.cartesian(movies)
data = data.cartesian(classes).map(lambda line : (line[0][0], line[0][1], line[1]))
data = data.distinct()

We can now compute the probability that a user has been or will be watching a movie using the hidden variables : : p(movie | user) = p(movie | class) * p(class | movie) : 

In [None]:
ordered_data = data.sortBy(lambda x : (x[0], x[1], x[2]))

In [None]:
couples = ordered_data.map(lambda x : ((x[2], x[0]), (x[1], x[2])))

In [None]:
probas = couples.join(Pzu).map(lambda x : (x[1][0], (x[0], x[1][1])))

In [None]:
probas = probas.join(Psz)

In [None]:
Psu = probas.map(lambda x : (x[1][0][0][1], x[0][0], x[1][0][1]*x[1][1]))

In [None]:
probs = Psu.map(lambda x : x[2])

Let us plot the distribution of the probabilities to see what's going on : 

In [None]:
plt.hist(probs.collect())

Given the above plot, we decide to set the treshold at 0.01. If the probability of the movie given the user is higher than 0.01, we can advise this movie to the user. 

In [None]:
def prediction(rdd, threshold):
    return(rdd.map(lambda x : (x[0],x[1], x[2] >=threshold)))

Here is the final result of our prediction : 'False' indicates that there is very little chance that the user sees that movie, 'True' indicates that the user should see that movie. 

In [None]:
result = prediction(Psu, 0.01)
result.collect()[0:10]