# Système de Recommandation avec Spark

## Objectif
Il s'agit de développer en Spark une méthode de gradient, dans le but de résoudre un problème de filtrage collaboratif, et de la comparer avec une méthode de la librairie MLlib à savoir ALS.

## Position du problème
Nous avons à notre disposition un RDD "ratings" du type (userID, movieID, rating). Les données sont fournies par le fichier `ratings.dat`, stockées  au format ci-joint :
```
UserID::MovieID::Rating::Timestamp
```

Ce RDD peut être stocké dans une matrice $R$ où l'on trouve "rating" à l'intersection de la ligne "userID" et de la colonne "movieID".
Si la matrice $R$ est de taille $m \times  n$, nous cherchons $P \in R^{m,k}$ et $Q \in R^{n,k}$ telles que $R \approx \hat{R} = PQ^T$.
Pour cela on considère le problème
$$ \min_{P,Q} \sum_{i,j : r_{ij} \text{existe}}  \ell_{i,j}(R,P,Q), $$
où
$$  \ell_{i,j}(R,P,Q)= \left(r_{ij} - q_{j}^{\top}p_{i}\right)^2 + \lambda(|| p_{i} ||^{2}_2 + || q_{j} ||^2_2 )  $$ et $(p_i)_{1\leq i\leq m}$ et $(q_j)_{1\leq j\leq n}$ sont les lignes des matrices $P$ et $Q$ respectivement. Le paramètre $\lambda\geq 0$ est un paramètre de régularisation.

Le problème que nous résolvons ici est un problème dit de "filtrage collaboratif", qui permet d'apporter une solution possible du  problème Netflix. Les données sont issues de la base de données  "The MoviLens Datasets" :

F. Maxwell Harper and Joseph A. Konstan. 2015. The MovieLens Datasets: History and Context. ACM Transactions on Interactive Intelligent Systems

In [1]:
# Librairies
import numpy as np
import pandas as pd
from scipy import sparse

In [2]:
# Environnement Spark 
from pyspark import SparkContext, SparkConf
import os
os.environ['PYSPARK_PYTHON'] = 'C:/Users/Karim/anaconda3/python'
conf = SparkConf()
conf.setMaster("local[*]")
conf.setAppName("Matrix Factorization")
sc = SparkContext(conf = conf)

###### Création du RDD et premières statistiques sur le jeu de données

In [3]:
# Répertoire contenant le jeu de données
movieLensHomeDir="data/"

# ratings est un RDD du type (userID, movieID, rating)
def parseRating(line):
    fields = line.split('::')
    return int(fields[0]), int(fields[1]), float(fields[2])

ratingsRDD = sc.textFile(movieLensHomeDir + "ratings.dat").map(parseRating).setName("ratings").cache()

# Calcul du nombre de ratings
numRatings = ratingsRDD.count()
# Calcul du nombre d'utilisateurs distincts
numUsers = ratingsRDD.map(lambda r: r[0]).distinct().count()
# Calcul du nombre de films distincts
numMovies = ratingsRDD.map(lambda r: r[1]).distinct().count()
print("We have %d ratings from %d users on %d movies.\n" % (numRatings, numUsers, numMovies))

# Dimensions de la matrice R
M = ratingsRDD.map(lambda r: r[0]).max()
N = ratingsRDD.map(lambda r: r[1]).max()
matrixSparsity = float(numRatings)/float(M*N)
print("We have %d users, %d movies and the rating matrix has %f percent of non-zero value.\n" % (M, N, 100*matrixSparsity))

We have 1000209 ratings from 6040 users on 3706 movies.

We have 6040 users, 3952 movies and the rating matrix has 4.190221 percent of non-zero value.



###### Nous allons utiliser ALS de la librairie MLlib et en évaluer la performance par un calcul de " Mean Squared Error" du rating de prédiction.

In [4]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

# Construction du modèle de recommendation en utilisant ALS
rank = 10
numIterations = 10
model = ALS.train(ratingsRDD, rank, iterations=numIterations, lambda_=0.02)

# Evaluation du modèle
testdata = ratingsRDD.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratingsRDD.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))

Mean Squared Error = 0.5854552189084702


##  Algorithme de descente de gradient
Le but de cette section est de calculer le gradient de la fonction, implémenter une méthode de gradient et de de mesurer la précision de cette méthode

In [5]:
# Séparation du jeu de données en un jeu d'apprentissage et un jeu de test
# Taille du jeu d'apprentissage (en %) 
learningWeight = 0.7
# Création des RDD "apprentissage" et "test" depuis la fonction randomsplit
trainRDD, testRDD = ratingsRDD.randomSplit([learningWeight, 1-learningWeight], seed=None)

# Calcul du rating prédit
def predictedRating(t, P, Q):
    """ 
    This function computes predicted rating
    Args:
        t: tuple (UserID, MovieID, Rating)
        P: user's features matrix (M by K)
        Q: item's features matrix (N by K)
    Returns:
        predicted rating: l 
    """
    user=t[0]-1
    movie=t[1]-1
    l = np.dot(P[user,:],Q[movie,:])

    return l

# Calcul de l'erreur MSE 
def computeMSE(rdd, P, Q):
    """ 
    This function computes Mean Square Error (MSE)
    Args:
        rdd: RDD(UserID, MovieID, Rating)
        P: user's features matrix (M by K)
        Q: item's features matrix (N by K)
    Returns:
        mse: mean square error 
    """ 
    #predictions=rdd.map(lambda r: ((r[0], r[1]), predictedRating((r[0], r[1], r[2]), P, Q)))
    #ratesAndPreds = rdd.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
    #mse = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
    #return mse
    r = rdd.collect()
    se = []
    for i in range(0,len(r)):
        t = r[i]
        l = predictedRating(t, P, Q)
        se.append((r[i][2]-l)**2)
        
    return(np.mean(se))

###### Implantation de l'algorithme de gradient sur l'ensemble d'apprentissage. 
Nous prenons un pas égal à  𝛾=0.001

In [6]:
# Algorithem de descente de gradient pour la factorisation de matrices
def GD(trainRDD, K=10, MAXITER=50, GAMMA=0.001, LAMBDA=0.05):
    # Construction de la matrice R (creuse)
    row=[]
    col=[]
    data=[]
    for part in trainRDD.collect():
        row.append(part[0]-1)
        col.append(part[1]-1)
        data.append(part[2])
    R=sparse.csr_matrix((data, (row, col)))
    
    # Initialisation aléatoire des matrices P et Q
    M,N = R.shape
    P = np.random.rand(M,K)
    Q = np.random.rand(N,K)
    
    # Calcul de l'erreur MSE initiale
    mse=[]
    mse_tmp = computeMSE(trainRDD, P, Q)
    mse.append([0, mse_tmp])
    print("epoch: ", str(0), " - MSE: ", str(mse_tmp))
    
    # Boucle
    nonzero = R.nonzero()
    nbNonZero = R.nonzero()[0].size
    I,J = nonzero[0], nonzero[1]
    for epoch in range(MAXITER):
        for i,j in zip(I,J):
            # Mise à jour de P[i,:] et Q[j,:] par descente de gradient à pas fixe
            P[i,:] = P[i,:] + GAMMA*2*(R[i,j]-np.dot(P[i,:],Q[j,:]))*Q[j,:]
            Q[j,:] = Q[j,:] + GAMMA*2*(R[i,j]-np.dot(P[i,:],Q[j,:]))*P[i,:]
        # Calcul de l'erreur MSE courante, et sauvegarde dans le tableau mse 
        mse.append(computeMSE(trainRDD,P,Q))
        print("epoch: ", str(epoch+1), " - MSE: ", str(computeMSE(trainRDD,P,Q)))
    return P, Q, mse

In [7]:
# Calcul de P, Q et de mse
P,Q,mse = GD(trainRDD, K=10, MAXITER=10, GAMMA=0.001, LAMBDA=0.05)

epoch:  0  - MSE:  2.8398459846747173
epoch:  1  - MSE:  1.0572495483968163
epoch:  2  - MSE:  0.9065576231330671
epoch:  3  - MSE:  0.8686814573419376
epoch:  4  - MSE:  0.8497500389954725
epoch:  5  - MSE:  0.8380263414322584
epoch:  6  - MSE:  0.8298921814985142
epoch:  7  - MSE:  0.8238132387579948
epoch:  8  - MSE:  0.8190146215334111
epoch:  9  - MSE:  0.8150585503110095
epoch:  10  - MSE:  0.8116772082087109


In [8]:
# Calcul des ratings prédits
predictions=testRDD.map(lambda r: predictedRating((r[0], r[1], r[2]), P, Q)).collect()

In [9]:
# Evaluation du modèle par MSE
print('mse = ', computeMSE(testRDD,P,Q))

mse =  0.8579773963394961


In [10]:
# Comparaison sur le jeu de test les valeurs prédites aux ratings sur 10 échantillons
df = pd.DataFrame(list(zip(testRDD.map(lambda r: r[2]).collect()[0:10], predictions[0:10])), columns = ['Ratings réels','Ratings prédits'])
df

Unnamed: 0,Ratings réels,Ratings prédits
0,5.0,4.607427
1,3.0,4.262196
2,5.0,4.308922
3,5.0,4.260373
4,4.0,3.739415
5,5.0,3.963769
6,5.0,4.763576
7,3.0,3.11827
8,5.0,2.959716
9,4.0,3.094439
