# Factorisation de matrice en parallèle : implémentation de FPSGD sur GPU

#### Damien BABET, Julie DJIRIGUIAN
#### Eléments logiciels pour le traitement des données massives

Ce projet repose initialement sur l'article "A Fast Parallel Stochastic Gradient Method for Matrix Factorization in Shared Memory Systems".

In [1]:
import random
import numpy as np
import pandas as pd
import pycuda.driver as cuda
import pycuda.autoinit
from pycuda import compiler, gpuarray, tools
from pycuda.compiler import SourceModule
from scipy import sparse as sp

### Import de données réelles pour exécuter l'algorithme

Les données utilisées sont issues du site https://grouplens.org/datasets/movielens/. Il fournit 20 millions de notes relatives à 27 000 films recueillies auprès de plus de 138 000 utilisateurs. Face à une tel volume d'observations, nous utilisons les fonctionnalités de pyspark, et notamment le module sql.

In [2]:
notes = sc.textFile("ratings.csv") # Chargement du fichier des notes
notes = notes.map(lambda ligne: ligne.split(',')) # Découpage des lignes en indiquant le séparateur
notes = notes.map(lambda champs: (champs[0], champs[1], champs[2])) # On garde que les trois premières valeurs (idUser, idFilm, Note)
notesSansTitre = notes.filter(lambda row: row[0]!='' and row[0]!='userId') # Suppression de la ligne des titres
notesSansTitre = notesSansTitre.map(lambda row: [float(row[0]), float(row[1]), float(row[2])]) # Format des valeurs (entiers pour les identifiants et float pour la note)
df= sqlContext.createDataFrame(notesSansTitre, ['userId', 'movieId', 'rating'])

In [3]:
# Nombre d'utilisateurs ayant noté au moins un film :
df.dropDuplicates(['userId']).select('userId').count()

138493

In [4]:
# Nombre de films ayant été notés au moins une fois :
df.dropDuplicates(['movieId']).select('movieId').count()

26744

On récupère le nombre total d'utilisateurs et de films dans la base du site MovieLens. Le nombre total d'identifiants distincts de films diffèrent du nombre évalué précédemment car certains films ne sont pas notés. Ceci contribue à rendre la matrice R plus sparse.

In [5]:
nbUsers=int(df.agg({"userId": "max"}).collect()[0]['max(userId)'])
nbFilms=int(df.agg({"movieId": "max"}).collect()[0]['max(movieId)'])

In [6]:
# Aperçu de la base de données :
df.describe().show()

+-------+-----------------+------------------+------------------+
|summary|           userId|           movieId|            rating|
+-------+-----------------+------------------+------------------+
|  count|         20000263|          20000263|          20000263|
|   mean|69045.87258292554| 9041.567330339605|3.5255285642993797|
| stddev|40038.62665316243|19789.477445413035|1.0519889192942433|
|    min|              1.0|               1.0|               0.5|
|    max|         138493.0|          131262.0|               5.0|
+-------+-----------------+------------------+------------------+



Plus de 20 millions de notes sont disponibles. Elles sont comprises entre 0,5 et 5 et la note moyenne est de 3,5.

##### Initialisation des paramètres

In [7]:
nbBloc=10 # Nombre de blocs
Knum=3 # Nombre de variables qui résument l'information dans P et Q

Des valeurs nulles sont ajoutées afin d'obtenir des P et Q avec des tailles divisibles par le nombre de blocs.

In [8]:
AjoutUsersFictifs=int(np.ceil(np.true_divide(nbUsers,nbBloc))*nbBloc-nbUsers)
AjoutFilmsFictifs=int(np.ceil(np.true_divide(nbFilms,nbBloc))*nbBloc-nbFilms)

In [9]:
p=np.random.uniform(0,1,(((nbUsers+AjoutUsersFictifs)*Knum)))
q=np.random.uniform(0,1,(((nbFilms+AjoutFilmsFictifs)*Knum)))

tP=(nbUsers+AjoutUsersFictifs)/nbBloc
tQ=(nbFilms+AjoutFilmsFictifs)/nbBloc

p2=p.reshape((nbUsers+AjoutUsersFictifs,Knum))
q2=q.reshape((Knum,nbFilms+AjoutFilmsFictifs))

tp=int((nbUsers+AjoutUsersFictifs)/nbBloc)
tq=int((nbFilms+AjoutFilmsFictifs)/nbBloc)

In [10]:
# Ajout les indices relatifs au bloc ligne et au bloc colonne.
changedTypedf = df.withColumn("numBlocLigne", (df["userId"].cast("double")/tp).cast("integer"))
changedTypedf = changedTypedf.withColumn("numBlocColonne", (df["movieId"].cast("double")/tq).cast("integer"))

In [11]:
# Aperçu de la table
changedTypedf.show(n=5)

+------+-------+------+------------+--------------+
|userId|movieId|rating|numBlocLigne|numBlocColonne|
+------+-------+------+------------+--------------+
|   1.0|    2.0|   3.5|           0|             0|
|   1.0|   29.0|   3.5|           0|             0|
|   1.0|   32.0|   3.5|           0|             0|
|   1.0|   47.0|   3.5|           0|             0|
|   1.0|   50.0|   3.5|           0|             0|
+------+-------+------+------------+--------------+
only showing top 5 rows



In [12]:
# On calcule le nombre d'obersations au sein de chaque bloc.
v= changedTypedf.groupBy(['numBlocLigne', 'numBlocColonne']).count().collect()
v=sqlContext.createDataFrame(v).toPandas()
v.columns=["numBlocLigne","numBlocColonne","rating"]

In [13]:
v.describe()

Unnamed: 0,numBlocLigne,numBlocColonne,rating
count,100.0,100.0,100.0
mean,4.5,4.5,200002.6
std,2.886751,2.886751,524623.3
min,0.0,0.0,413.0
25%,2.0,2.0,7296.0
50%,4.5,4.5,28857.5
75%,7.0,7.0,51835.0
max,9.0,9.0,1813876.0


In [14]:
changedTypedf=changedTypedf.orderBy(["userId", "movieId"])

### Création d'une matrice sparse pour tester l'algorithme de factorisation matricielle

##### Initialisation des paramètres

Nous créons tout d'abord une matrice fictive pour tester notre algorithme sur un petit jeu de données. On initialise les paramètres.

In [15]:
nbUsers=9
nbFilms=9
nbBloc=3
Knum=1
nbIter=3
gamma=0.001
lambdaP=0.1
lambdaQ=0.1

##### Initialisation des matrices P et Q

On initialise l'algorithme avec des P et Q aléatoires et éloignes des vraies valeurs. 
Des valeurs nulles sont ajoutées afin d'obtenir des P et Q avec des tailles divisibles par le nombre de blocs.

In [16]:
AjoutUsersFictifs=int(np.ceil(np.true_divide(nbUsers,nbBloc))*nbBloc-nbUsers)
AjoutFilmsFictifs=int(np.ceil(np.true_divide(nbFilms,nbBloc))*nbBloc-nbFilms)

p=np.random.uniform(9,10,(((nbUsers+AjoutUsersFictifs)*Knum)))
q=np.random.uniform(9,10,(((nbFilms+AjoutFilmsFictifs)*Knum)))

tp=int((nbUsers+AjoutUsersFictifs)/nbBloc) # Nombre d'utilisateurs par bloc
tq=int((nbFilms+AjoutFilmsFictifs)/nbBloc) # Nomre de films par bloc

In [17]:
r_sparse = sp.rand(nbUsers+AjoutUsersFictifs,nbFilms+AjoutFilmsFictifs,0.6)
r_data=np.transpose(np.array([r_sparse.row, r_sparse.col, r_sparse.data]))
df=pd.DataFrame(r_data, columns=("i","j","value"))
df["numBlocLigne"] =df.apply(lambda row : int(row[0]/tp), axis = 1 )
df["numBlocColonne"] =df.apply(lambda row : int(row[1]/tq), axis = 1 )

In [18]:
v = pd.DataFrame(df.groupby(["numBlocLigne", "numBlocColonne"]).count())
v = v.reset_index()[["numBlocLigne","numBlocColonne","value"]]

### Implémentation de l'algorithme sous python sans parallélisation

Pour exécuter l'algorithme sans parallélisation, nous effectuons une première boucle sur les blocs. Sur chaque bloc, nous  parcourons chaque observation et nous mettons à jour la valeur de P et Q selon la formule issue de la minimisation.

In [19]:
def erreur(x,y,z):
    error = 0
    for i in (0,len(z)-1):
        eij = z[i,2]-x[z[i,0]]*y[z[i,1]]
        error += eij*eij
    print(error)

erreur(p,q,r_data)
    
p_start = p
q_start = q

for l in range(nbIter):
    
    #Choisir une permutation des colonnes
    L=np.array(range(nbBloc))
    random.shuffle(L)
    
    # Mise en forme de R selon l'ordre de la permutation et création de v_permut 
    # (vecteur des coordonnees des debuts de blocks sur R)
    r_permut=np.array(df[(df["numBlocLigne"]==0) & (df["numBlocColonne"]==L[0])][[0,1,2]]).flatten()
    v_permut=np.array(v[(v["numBlocLigne"]==0) & (v["numBlocColonne"]==L[0])][[2]]).flatten()
    for i in range(1,nbBloc) :
        r_permut=np.append(r_permut,np.array(df[(df["numBlocLigne"]==i) & (df["numBlocColonne"]==L[i])][[0,1,2]]).flatten()).flatten()
        v_permut=np.append(v_permut,np.array(v[(v["numBlocLigne"]==i) & (v["numBlocColonne"]==L[i])][[2]]).flatten()).flatten()
    v_permut_cum=np.append(1,np.cumsum(v_permut))
    
    # Execution du kernel
    K = Knum
    for t in range(1,nbBloc):
        idx = t
        rstart = v_permut_cum[idx]-1
        rend = v_permut_cum[idx+1]-1
        p_temp = 0
        q_temp = 0
        for n in (0,rend-rstart):
            i = r_permut[3 * (n+rstart)]
            j = r_permut[(3 * (n+rstart)) + 1]
            rij = r_permut[(3 * (n+rstart)) + 2]
            pqij = 0
            for k in (0,K-1):
                pqij += p[(i*K)+k] * q[(j*K)+k]
            eij = rij - pqij
            for k in (0,K-1):
                p_temp = gamma * eij * q[(j*K)+k] - gamma * lambdaP * p[(i*K)+k]
                q_temp = gamma * eij * p[(i*K)+k] - gamma * lambdaQ * q[(j*K)+k]
                if (q[(j*K)+k] + q_temp) > 0:
                    q[(j*K)+k] += q_temp
                if (p[(i*K)+k] + p_temp) > 0:
                    p[(i*K)+k] += p_temp

14804.4852094




### Ecriture de la fonction (kernel) en C appliquée à chaque bloc. Cette fonction est appliquée à chaque thread.

In [None]:
mod = SourceModule("""
    __global__ void block_update(float *p, float *q, float *r, float *v)
    {
      int K = 1;
      float gamma = 0.5;
      float lambdaP = 0.1;
      float lambdaQ = 0.1;
      int idx = threadIdx.x;
      int rstart = v[idx];
      int rend = v[idx+1];
      float pqij = 0;
      float p_temp = 0;
      float q_temp = 0;
      for (int n = 0; n < rend-rstart; ++n){
        int i = r[3 * (n+rstart)];
        int j = r[(3 * (n+rstart)) + 1];
        float rij = r[(3 * (n+rstart)) + 2];
        pqij = 0;
        for (int k = 0; k < K; ++k){
            pqij += p[(i*K)+k] * q[(j*K)+k];}
        float eij = rij - pqij;
        for (int k = 0; k < K; ++k){
            p_temp =  gamma * eij * q[(j*K)+k] - gamma * lambdaP * p[(i*K)+k];
            q_temp = gamma * eij * p[(i*K)+k] - gamma * lambdaQ * q[(j*K)+k];
            if ((q[(j*K)+k] + q_temp) > 0)
                q[(j*K)+k] += q_temp;
            if ((p[(i*K)+k] + p_temp) > 0)
                p[(i*K)+k] += p_temp;}
      }
    }
    """)

In [None]:
func = mod.get_function("block_update")

### Exécution du kernel (scheduler)

In [None]:
for l in range(1,nbIter):
    #Choisir une permutation des colonnes
    L=np.array(range(nbBloc))
    random.shuffle(L)
    
    # Mise en forme de R selon l'ordre de la permutation et création de v_permut (vecteur des coordonnees des debuts de blocks sur R)
    r_permut=np.array(changedTypedf.where((col('numBlocLigne')==0) & (col('numBlocColonne')==int(L[0]))).toPandas()[[3,4,2]]).flatten()
    v_permut=np.array(v[(v["numBlocLigne"]==0) & (v["numBlocColonne"]==L[0])][[2]]).flatten()
    for i in range(1,nbBloc) :
        r_permut=np.append(r_permut,np.array(df[(df["numBlocLigne"]==i) & (df["numBlocColonne"]==L[i])][[0,1,2]]).flatten()).flatten()
        v_permut=np.append(v_permut,np.array(v[(v["numBlocLigne"]==i) & (v["numBlocColonne"]==L[i])][[2]]).flatten()).flatten()
    v_permut_cum=np.append(0,np.cumsum(v_permut))
    
    # Execution du kernel
    func(cuda.InOut(p), cuda.InOut(q), cuda.InOut(r_permut), cuda.InOut(v_permut_cum), block=(4,4,1))
    print(p)
    
print(p)
print(q)

erreur(p,q,r_data)

Notre boucle ne fonctionne pas en raison semble-t-il du v_permut. 
En fait, nous avons initialement testé notre kernel sur quelques données. Ce programme de test correspond au programme libmf_cuda_demo.py disponible sur notre Github. De plus, les lignes de code au sein de la boucle fonctionnent en dehors de celle-ci. L'exécution de cette boucle qui constitue la dernière étape à la parallélisation de l'algorithme n'a pu être résolue.