# Développement d'un algorithme d'évaluation des films en Spark

# Objectif du Projet
Il s'agit de développer en Spark une méthode de descente de gradient, dans le but de résoudre un problème de filtrage collaboratif, et de la comparer avec une méthode de la librairie MLIB. Ce Notebook a pour but le développement et la validation de l'approche, avant intégration et exploitation dans le cadre de l'infrastructure développée dans le projet. Pour information, de nombreuses versions de ce problème existent sur le web.

# Position du problème
Nous avons à notre disposition un RDD "ratings" du type (userID, movieID, rating). Les données sont fournies par le fichier `ratings.dat`, stockées  au format ci-joint :
```
UserID::MovieID::Rating::Timestamp
```

Ce RDD peut être stocké dans une matrice $R$ où l'on trouve "rating" à l'intersection de la ligne "userID" et de la colonne "movieID".
Si la matrice $R$ est de taille $m \times  n$, nous cherchons $P \in R^{m,k}$ et $Q \in R^{n,k}$ telles que $R \approx \hat{R} = PQ^T$.
Pour cela on considère le problème
$$ \min_{P,Q} \sum_{i,j : r_{ij} \text{existe}}  \ell_{i,j}(R,P,Q), $$
où
$$  \ell_{i,j}(R,P,Q)= \left(r_{ij} - q_{j}^{\top}p_{i}\right)^2 + \lambda(|| p_{i} ||^{2}_2 + || q_{j} ||^2_2 )  $$ et $(p_i)_{1\leq i\leq m}$ et $(q_j)_{1\leq j\leq n}$ sont les lignes des matrices $P$ et $Q$ respectivement. Le paramètre $\lambda\geq 0$ est un paramètre de régularisation.

Le problème que nous résolvons ici est un problème dit de "filtrage collaboratif", qui permet d'apporter une solution possible du  problème Netflix. Les données sont issues de la base de données  "The MoviLens Datasets" :

F. Maxwell Harper and Joseph A. Konstan. 2015. The MovieLens Datasets: History and Context. ACM Transactions on Interactive Intelligent Systems (TiiS) 5, 4: 19:1–19:19


In [1]:
# Librairies
import numpy as np
from scipy import sparse
import findspark
findspark.init()
# Environnement Spark 
from pyspark import SparkContext, SparkConf



conf = SparkConf()
conf.setMaster("local[*]")
conf.setAppName("Matrix Factorization")

sc = SparkContext(conf = conf)


#### Création du RDD et premières statistiques sur le jeu de données.

In [2]:
# Répertoire contenant le jeu de données
movieLensHomeDir="data/"

# ratings est un RDD du type (userID, movieID, rating)
def parseRating(line):
    fields = line.split('::')
    return int(fields[0]), int(fields[1]), float(fields[2])

ratingsRDD = sc.textFile(movieLensHomeDir + "ratings.dat").map(parseRating).setName("ratings").cache()

# Calcul du nombre de ratings
numRatings = ratingsRDD.count()
# Calcul du nombre d'utilisateurs distincts
numUsers = ratingsRDD.map(lambda r: r[0]).distinct().count()
# Calcul du nombre de films distincts
numMovies = ratingsRDD.map(lambda r: r[1]).distinct().count()
print("We have %d ratings from %d users on %d movies.\n" % (numRatings, numUsers, numMovies))

# Dimensions de la matrice R
M = ratingsRDD.map(lambda r: r[0]).max()
N = ratingsRDD.map(lambda r: r[1]).max()
matrixSparsity = float(numRatings)/float(M*N)
print("We have %d users, %d movies and the rating matrix has %f percent of non-zero value.\n" % (M, N, 100*matrixSparsity))

We have 1000209 ratings from 6040 users on 3706 movies.

We have 6040 users, 3952 movies and the rating matrix has 4.190221 percent of non-zero value.



Nous allons utiliser la routine ALS.train() de la librairie  [MLLib](http://spark.apache.org/docs/latest/ml-guide.html) et en évaluer la performance par un calcul de " Mean Squared Error" du  rating de prédiction.

In [3]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

# Construction du modèle de recommendations depuis l'approche "Alternating Least Squares"
rank = 10
numIterations = 10

# Paramètres de la méthode Alternating Least Squares (ALS)
# ratings – RDD de Rating ou tuple (userID, productID, rating).
# rank – Rang de la matrice modèle.
# iterations – Nombre d'itérations. (default: 10)
# lambda_ – Paramètre de régularisation. (default: 0.01)

# Build the recommendation model using ALS
model = ALS.train(ratingsRDD, rank, iterations=numIterations, lambda_=0.02)

# Evaluation du modèle sur le jeu de données complet

# Evaluate the model on rating data
testdata = ratingsRDD.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratingsRDD.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))

Mean Squared Error = 0.5857052377695804


#  Algorithmes de descente de gradient

Le but de cette section est  
1. de calculer le gradient de la fonction,
2. d'implémenter une méthode de gradient,
3. de mesurer la précision de cette méthode

__Etapes :__

> Séparer le jeu de données en un jeu d'apprentissage (70%) et un jeu de test, en utilisant la fonction randomsplit ( http://spark.apache.org/docs/2.0.0/api/python/pyspark.html )

> Compléter la routine ci-dessous qui retourne le "rating" prédit. Créer un RDD contenant `(i,j,true rating,predicted rating)`. 

> Compléter la routine qui calcule le Mean Square Error (MSE) sur le jeu de données.

> Tester ensuite la routine de MSE en vous donnant les matrices $P$ et $Q$ aléatoires (utiliser np.random.rand(M,K)) et calculer quelques "ratings" prédits. 



In [6]:
# Séparation du jeu de données en un jeu d'apprentissage et un jeu de test
# Taille du jeu d'apprentissage (en %) 
learningWeight = 0.7


# Création des RDD "apprentissage" et "test" depuis la fonction randomsplit
trainRDD, testRDD = ratingsRDD.randomSplit([learningWeight, 1 - learningWeight], seed = None)

# Calcul du rating prédit.
def predictedRating(x, P, Q):
    """ 
    This function computes predicted rating
    Args:
        x: tuple (UserID, MovieID, Rating)
        P: user's features matrix (M by K)
        Q: item's features matrix (N by K)
    Returns:
        predicted rating: l 
    """
    
    return (x[0], x[1], x[2], np.dot(P[x[0] - 1,:], Q[x[1] - 1,:].T))

# Calcul de l'erreur MSE 
def computeMSE(rdd, P, Q):
    """ 
    This function computes Mean Square Error (MSE)
    Args:
        rdd: RDD(UserID, MovieID, Rating)
        P: user's features matrix (M by K)
        Q: item's features matrix (N by K)
    Returns:
        mse: mean square error 
    """ 
    
    r = rdd.collect()
    s = sum((r[i][2] - predictedRating(r[i], P, Q)[3])**2  for i in range(len(r)))
    return  s/len(r)
    

In [7]:
# Tailles des jeux de données d'apprentissage et de tests.
print("Size of the training dataset:", trainRDD.count())
print("Size of the testing dataset:", testRDD.count())


# Création de matrices aléatoires de dimension (M,K) et (N,K)
K = 20 
P = np.random.rand(M,K)
Q = np.random.rand(N,K)

# Calcul et affichage de l'erreur MSE pour ces matrices aléatoires
MSE = computeMSE(ratingsRDD, P, Q)
print("\nMSE (training set) = ", MSE)

# Affichage de 10 ratings prédits depuis ces matrices
print("\n(userID, movieID, rating, predicted)")
r = ratingsRDD.collect()

for x in np.random.randint(len(r), size=10) :
    print(predictedRating(r[x], P, Q))

Size of the training dataset: 699826
Size of the testing dataset: 300383

MSE (training set) =  4.299108202618736

(userID, movieID, rating, predicted)
(3391, 1013, 4.0, 3.633423293885936)
(881, 2662, 4.0, 4.146602176713707)
(5074, 3910, 5.0, 4.005480729203342)
(4518, 1179, 5.0, 4.30121301431469)
(5348, 1407, 2.0, 7.508451078752509)
(3209, 459, 1.0, 7.262100510203043)
(1930, 1030, 2.0, 5.0715055970209715)
(5509, 3329, 4.0, 3.9851207076896475)
(309, 1393, 5.0, 6.9267892549309895)
(1449, 1225, 4.0, 4.935409028817727)


Etapes :

> Donner la formule des dérivées des fonctions $\ell_{i,j}$ selon $p_t$ et $q_s$ avec $1\leq t\leq m$ et $1\leq s\leq n$.

> Implanter de l'algorithme de gradient sur l'ensemble d'apprentissage. Prendre un pas égal à $\gamma=0.001$ et arrêter sur un nombre maximum d'itérations. 

> Commenter les tracés de convergence et des indicateurs de qualité de la prévision en fonction de la dimension latente (rang de $P$ et $Q$).

In [10]:
# Algorithem de descente de gradient pour la factorisation de matrices
def GD(trainRDD, K=10, MAXITER=50, GAMMA=0.001, LAMBDA=0.05):
    # Construction de la matrice R (creuse)
    row=[]
    col=[]
    data=[]
    for part in trainRDD.collect():
        row.append(part[0]-1)
        col.append(part[1]-1)
        data.append(part[2])
        
    R=sparse.csr_matrix((data, (row, col)))        
        
    # Initialisation aléatoire des matrices P et Q
    M,N = R.shape
    P = np.random.rand(M,K)
    Q = np.random.rand(N,K)
    
    # Calcul de l'erreur MSE initiale
    mse=[]
    mse_tmp = computeMSE(trainRDD, P, Q)
    mse.append([0, mse_tmp])
    print("epoch: ", str(0), " - MSE: ", str(mse_tmp))
    
    # Boucle
    nonzero = R.nonzero()
    nbNonZero = R.nonzero()[0].size
    I,J = nonzero[0], nonzero[1]
    for epoch in range(MAXITER):
        for i,j in zip(I,J):
            # Mise à jour de P[i,:] et Q[j,:] par descente de gradient à pas fixe
            e = R[i, j] - np.dot(P[i, :], Q[j, :])
            P[i,:] += GAMMA*(e*Q[j,:] - LAMBDA*P[i,:])
            Q[j,:] += GAMMA*(e*P[i,:] - LAMBDA*Q[j,:])
        # Calcul de l'erreur MSE courante, et sauvegarde dans le tableau mse
        mse.append(computeMSE(trainRDD,P,Q))
        print("epoch: ", str(epoch + 1), " - MSE: ", str(computeMSE(trainRDD, P, Q)))
        
    return P, Q, mse

In [11]:
# Calcul de P, Q et de la mse
P,Q,mse = GD(trainRDD, K=10, MAXITER=10, GAMMA=0.001, LAMBDA=0.05)

epoch:  0  - MSE:  2.8940199397204034
epoch:  1  - MSE:  1.1875481871249656
epoch:  2  - MSE:  1.0046827594235856
epoch:  3  - MSE:  0.9359094153553321
epoch:  4  - MSE:  0.9002072712122479
epoch:  5  - MSE:  0.8783146379057778
epoch:  6  - MSE:  0.8635142951450294
epoch:  7  - MSE:  0.8528508575340871
epoch:  8  - MSE:  0.8448163598670041
epoch:  9  - MSE:  0.8385579085883652
epoch:  10  - MSE:  0.83355513844838


In [12]:
# Calcul de P, Q et de la mse
P,Q,mse = GD(trainRDD, K=10, MAXITER=10, GAMMA=0.001, LAMBDA=0.05)

epoch:  0  - MSE:  2.886611085371281
epoch:  1  - MSE:  1.1914870810127822
epoch:  2  - MSE:  1.007219637277599
epoch:  3  - MSE:  0.9376796467490484
epoch:  4  - MSE:  0.9014635467400778
epoch:  5  - MSE:  0.8792571184762279
epoch:  6  - MSE:  0.8642697516050447
epoch:  7  - MSE:  0.8534935725376351
epoch:  8  - MSE:  0.8453883301475035
epoch:  9  - MSE:  0.8390824476873907
epoch:  10  - MSE:  0.8340451310459949


In [13]:
import matplotlib.pyplot as plt 


# Affichage de l'erreur MSE
print('mse = ', computeMSE(testRDD,P,Q))

mse =  0.8642137295025326


Etapes :

> Calculer les ratings prédits par la solution de la méthode du gradient dans un RDD

> Comparer sur le jeu de test les valeurs prédites aux ratings sur 10 échantillons aléatoires.

In [15]:
# Calcul et affichage des ratings prédits
for i in range(10):
    print(predictedRating(testRDD.collect()[i],P,Q))

(1, 661, 3.0, 3.1784125336845226)
(1, 3408, 4.0, 4.180097884321104)
(1, 2804, 5.0, 4.16397014133272)
(1, 1035, 5.0, 3.912823569007833)
(1, 3105, 5.0, 3.80419119407417)
(1, 2340, 3.0, 3.745289332217483)
(1, 48, 5.0, 2.9360476554646455)
(1, 2294, 4.0, 3.6971769947017745)
(1, 1566, 4.0, 3.8001037448664943)
(1, 783, 4.0, 3.062943162016468)
