## Factorisation de matrices par descente de gradient: Parallélisation et optimisation

### Introduction

L'objectif de ce projet est de paralléliser la factorisation de matrices en se basant sur l'approche SGD proposée dans le papier CuMF_SGD: Parallelized Stochastic Gradient Descent for Matrix Factorization on GPUs [1].

On utilisera le dataset Netflix, un benchmark standard de la factorisation de matrices. Ce dataset est fourni sous forme d'une matrice sparse (i,j,R[i,j]) où i est l'identifiant d'un utilisateur, j est l'identifiant d'un film et R[i,j] est l'avis (R comme rating) de l'utilisateur i sur le film j.

On commence par charger les données. On peut les garder sous leur format sparse original ou on peut les transformer en une matrice R complète (ayant pour lignes les identifiants des utilisateur et en colonnes ceux des films).

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import pycuda.autoinit
import pycuda.driver as cuda
from pycuda.compiler import SourceModule
from pycuda import gpuarray
from IPython.display import clear_output
import cupy as cp
from time import time
from numba import jit
np.random.seed(0)
cp.random.seed(0)

In [2]:
# useful to capture C stdout and print it in Jupyter
%load_ext wurlitzer

In [3]:
def load_data(mode = "sparse", n=-1, file = "netflix_test.txt", shuffle=True):
    """
    Load the data
    mode: 'sparse' for sample format (i, j, R[i,j]), 'full' for full R matrix
    n: number of rows to load from file; set negative to load all the data
    shuffle: Randomly shuffle the samples
    """
    data_sep = " " if file=="netflix_test.txt" else ","
    # Load data from file
    data = np.genfromtxt("netflix_test.txt", delimiter=data_sep, dtype="int32")
    data = data[:,[0,1,3]]
    # set number of rows to load
    if n<=0 or n>len(data):
        n = len(data)
    # shuffle
    if shuffle:
        np.random.shuffle(data)
    # Convert to the defined mode
    if mode=="sparse":
        R = data[:n,:]
    elif mode=="full":
        data = data[:n,:]
        rows, row_pos = np.unique(data[:, 0], return_inverse=True)
        cols, col_pos = np.unique(data[:, 1], return_inverse=True)
        R = np.zeros((len(rows), len(cols)), dtype="uint8")
        R[row_pos, col_pos] = data[:, 2]
    else:
        print("mode should be 'sparse' or 'full'")
        return(-1)
    return R

Aperçu des données en mode sparse:

In [4]:
R = load_data("sparse"); R

array([[313306,  11077,      4],
       [336800,   4588,      2],
       [177586,   8782,      5],
       ...,
       [ 69205,   9913,      3],
       [144193,    660,      4],
       [364936,   1975,      4]], dtype=int32)

In [5]:
print(f"R a les dimensions {R.shape} et occupe {R.nbytes//1024//1024} MB en mémoire")

R a les dimensions (1408395, 3) et occupe 16 MB en mémoire


Aperçu des données en mode complet:

In [6]:
R = load_data("full"); R

array([[0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0],
       ...,
       [0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0]], dtype=uint8)

In [7]:
print(f"R a les dimensions {R.shape} et occupe {R.nbytes//1024//1024} MB en mémoire")

R a les dimensions (462858, 16938) et occupe 7476 MB en mémoire


Le mode sparse occupe beaucoup moins de mémoire, donc on le privilégie pour la suite. Mais d'abord nous fournissons un petit exemple avec une matrice complète pour mieux visualiser la factorisation de matrice.

In [9]:
R = load_data("full", 5)
R[0,0] = 2; R[4,0]=3  # R is too sparse, we add a few items to make it more meaningful
R

array([[2, 0, 0, 0, 5],
       [2, 0, 0, 0, 0],
       [0, 0, 0, 4, 0],
       [0, 1, 0, 0, 0],
       [3, 0, 3, 0, 0]], dtype=uint8)

On implémente une version basique de la factorisation de matrices avec descente de gradient. L'objectif est de calculer deux matrices P et Q dont le produit est approximativement égal à R sur ses éléments non nuls. Cela permet aussi de donner une estimation des éléments nuls (manquants) de R. Pour cette raison cette technique est fréquemment utilisée dans les systèmes de recommandation.

Dans toute la suite, les hyperparamètres alpha (learning rate) et beta (régularisation) ont été optimisés manuellement pour donner des résultats optimaux pour chaque algorithme. Le paramètre k (dimension des facteurs représentant les utilisateurs et les films) est fixé à 128 pour une comparaison plus pertinente avec la méthode CuMF de l'article [1] qui nécessite k=128.

In [36]:
def rmse_full_matrix(R, R_pred):
    """
    Calculate the root mean squared error between two matrices on non-zero elements
    """
    i_s, j_s = R.nonzero()
    sse = np.sum((R[i_s, j_s] - R_pred[i_s, j_s])**2)
    return np.sqrt(sse/len(i_s))

def sgd_iteration(P, Q, samples, alpha, beta):
    """
    SGD iteration over the provided samples from R
    """
    for i, j, r in samples:
        # Compute prediction and error
        prediction = P[i, :].dot(Q[j, :].T)
        e = (r - prediction)

        # Update user and item feature matrices
        P[i, :] += alpha * (e * Q[j, :] - beta * P[i,:])
        Q[j, :] += alpha * (e * P[i, :] - beta * Q[j,:])
    return(P,Q)

def mf_sgd_full_matrix(R, k, alpha, beta, n_iter, verbose = True):
    """
    Matrix factorization
    - R : matrix to factorize
    - k : latent dimension
    - alpha : learning rate
    - beta : regularization parameter
    - n_iter : number of SGD iterations
    """
    # Initialize user and item feature matrices
    m,n = R.shape
    P = 1./k * np.random.normal(size=(m,k))
    Q = 1./k * np.random.normal(size=(n,k))

    # Create a list of training samples
    samples = [(i, j, R[i,j]) for i in range(m) for j in range(n) if R[i, j] > 0]

    # Perform stochastic gradient descent for number of iterations
    for i in range(n_iter):
        P,Q = sgd_iteration(P, Q, samples, alpha, beta)
        if verbose and ((i+1)%(n_iter//10) == 0):
            print(f"Iteration: {i+1}: RMSE = {rmse_full_matrix(R, P.dot(Q.T)):.5f}")
    return(P,Q)

In [11]:
start = time()
P,Q = mf_sgd_full_matrix(R, k=2, alpha=0.01, beta=0.0001, n_iter=1000)
end = time()
print(f"Temps d'exécution: {(end-start):.3f} s")

Iteration: 100: RMSE = 0.29382
Iteration: 200: RMSE = 0.09416
Iteration: 300: RMSE = 0.03717
Iteration: 400: RMSE = 0.01514
Iteration: 500: RMSE = 0.00621
Iteration: 600: RMSE = 0.00258
Iteration: 700: RMSE = 0.00111
Iteration: 800: RMSE = 0.00052
Iteration: 900: RMSE = 0.00029
Iteration: 1000: RMSE = 0.00020
Temps d'exécution: 0.072 s


In [12]:
P

array([[ 1.69867332,  0.16426039],
       [ 1.59920733,  0.33895919],
       [-1.63958472,  1.18567952],
       [-1.21138918,  0.28725432],
       [ 2.00630722,  1.20073611]])

In [13]:
P.dot(Q.T)

array([[ 2.00032158, -1.10017641,  0.83007038, -2.49560702,  4.99978396],
       [ 1.99984396, -0.9376086 ,  1.15121634, -2.13130822,  4.82492808],
       [-1.08000033,  1.77769254,  1.89539962,  3.99990004, -3.96596913],
       [-1.17057102,  0.99991389,  0.21928431,  2.25837799, -3.30684811],
       [ 2.99972706, -0.76334935,  2.99995274, -1.75594684,  6.54925722]])

Sur une petite matrice on constate que cet algorithme de factorisation SGD permet d'obtenir rapidement une factorisation avec un taux d'erreur inférieur à 0.1%. Réessayons avec une taille plus élevée:

In [37]:
R = load_data("full", 100000)
R.shape

(91130, 9813)

In [38]:
start = time()
P,Q = mf_sgd_full_matrix(R, k=128, alpha=0.025, beta=0.2, n_iter=50)
end = time()
print(f"Temps d'exécution: {(end-start):.3f} s")

Iteration: 5: RMSE = 2.75844
Iteration: 10: RMSE = 1.88797
Iteration: 15: RMSE = 1.37002
Iteration: 20: RMSE = 1.03478
Iteration: 25: RMSE = 0.79666
Iteration: 30: RMSE = 0.61906
Iteration: 35: RMSE = 0.49134
Iteration: 40: RMSE = 0.40592
Iteration: 45: RMSE = 0.35006
Iteration: 50: RMSE = 0.31246
Temps d'exécution: 831.640 s


Le résultat n'est pas le même. Calculons le nombre d'updates/s : On appelle un update (ou pas de gradient) la lecture d'un échantillon (i,j,R[i,j]) de R et la mise à jour des matrices P et Q à partir de cet unique échantillon. D'abord réexécutons le code sans calcul intermédiaire de RMSE pour obtenir une valeur plus fiable du temps d'exécution :

In [39]:
%%timeit
P,Q = mf_sgd_full_matrix(R, k=128, alpha=0.025, beta=0.2, n_iter=50, verbose=False)

13min 19s ± 4.72 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


Ici on a $n_{iterations} \times n_{samples} = 50 \times 100\,000 = 5\,000\,000$ updates de gradient en 799s, ce qui revient à 6258 updates/s.

### Format sparse

Une première optimisation consiste à utiliser une matrice R sous format sparse, ce qui réduit son empreinte mémoire et le temps de récupération des échantillons pour la descente de gradient. Pour optimiser l'empreinte mémoire de P et Q, on applique également une bijection sur les identifiants des utilisateurs afin d'obtenir des identifiants allant de 0 à $n_{utilisateurs}$ et de même pour les identifiants des films. Cela évite les gaps dans les identifiants et donc d'avoir des features inutilisés dans P et Q.

In [4]:
def remap_ids(R):
    R[:,0] = np.unique(R[:,0],return_inverse = True)[1]
    R[:,1] = np.unique(R[:,1],return_inverse = True)[1]
    return R

In [5]:
R = load_data("sparse", 100000)
R = remap_ids(R); R

array([[61493,  6340,     4],
       [66136,  2810,     2],
       [34656,  5160,     5],
       ...,
       [57710,   503,     4],
       [77190,   538,     4],
       [ 7831,   132,     5]], dtype=int32)

In [41]:
def rmse(R, P, Q):
    """
    Calculate the root mean square error between two matrices on non-zero elements
    """
    sse = 0 # sum of squared errors
    for i, j, r in R:
        prediction = P[i, :].dot(Q[j, :].T)
        sse += (r - prediction)**2
    return np.sqrt(sse/len(R))

def mf_sgd(R, k, alpha, beta, n_iter, verbose = True):
    """
    Matrix factorization
    - R : matrix to factorize
    - k : latent dimension
    - alpha : learning rate
    - beta : regularization parameter
    - n_iter : number of SGD iterations
    """
    # Initialize user and item feature matrices
    m,n = int(1+np.max(R[:,0])), int(1+np.max(R[:,1]))
    P = 1./k * np.random.normal(size=(m,k))
    Q = 1./k * np.random.normal(size=(n,k))

    # Perform stochastic gradient descent for number of iterations
    for i in range(n_iter):
        P,Q = sgd_iteration(P, Q, R, alpha, beta)
        if verbose and ((i+1)%(n_iter//10) == 0):
            print(f"Iteration: {i+1}: RMSE = {rmse(R, P, Q):.5f}")
    return(P,Q)

In [42]:
start = time()
P,Q = mf_sgd(R, k=128, alpha=0.025, beta=0.2, n_iter=50)
end = time()
print(f"Temps d'exécution: {(end-start):.3f} s")

Iteration: 5: RMSE = 2.77869
Iteration: 10: RMSE = 1.89984
Iteration: 15: RMSE = 1.37656
Iteration: 20: RMSE = 1.03226
Iteration: 25: RMSE = 0.79341
Iteration: 30: RMSE = 0.61939
Iteration: 35: RMSE = 0.49561
Iteration: 40: RMSE = 0.41257
Iteration: 45: RMSE = 0.35712
Iteration: 50: RMSE = 0.31903
Temps d'exécution: 55.213 s


In [43]:
%%timeit
P,Q = mf_sgd(R, k=128, alpha=0.025, beta=0.2, n_iter=50, verbose=False)

52.2 s ± 444 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


Ici on a 5 000 000 updates comme avant mais en 52.2 s, soit 95785 updates/s.

### Numba

Dans cette partie on utilise Numba en mode Just-in-time pour accélérer le code. Numba utilise le compilateur LLVM pour compiler nos fonctions en code machine qui est appelé à chaque exécution. Numba est compatible avec la majorité des objets et des fonctions de Numpy et permet de paralléliser leur exécution si cela est possible.

In [6]:
@jit(nopython=True)
def rmse(R, P, Q):
    """
    Calculate root mean square error between two matrices on non-zero elements
    """
    sse = 0 # sum of squared errors
    for i, j, r in R:
        prediction = P[i, :].dot(Q[j, :].T)
        sse += (r - prediction)**2
    return np.sqrt(sse/len(R))

@jit(nopython=True, parallel=True)
def sgd_iteration(P, Q, R, alpha, beta):
    """
    SGD step for matrix factorization
    """
    for i, j, r in R:
        # Compute prediction and error
        prediction = P[i, :].dot(Q[j, :].T)
        e = (r - prediction)

        # Update user and item latent feature matrices
        P[i, :] += alpha * (e * Q[j, :] - beta * P[i,:])
        Q[j, :] += alpha * (e * P[i, :] - beta * Q[j,:])
    return(P,Q)

@jit(nopython=True, parallel=True)
def mf_sgd(R, k, alpha, beta, n_iter, verbose = True):
    """
    Matrix factorization
    - R : matrix to factorize
    - k : latent dimension
    - alpha : learning rate
    - beta : regularization parameter
    - n_iter : number of SGD iterations
    """
    # Initialize user and item latent feature matrice
    m,n = (1+np.max(R[:,0])), (1+np.max(R[:,1]))
    P = 1./k * np.array([np.random.normal() for i in range(m*k)]).reshape(m,k)
    Q = 1./k * np.array([np.random.normal() for i in range(n*k)]).reshape(n,k)

    # Perform stochastic gradient descent for number of iterations
    for i in range(n_iter):
        P,Q = sgd_iteration(P, Q, R, alpha, beta)
        if verbose and ((i+1)%(n_iter//10) == 0):
            print("Iteration: ",i+1,": RMSE = ",rmse(R, P, Q))
    return(P,Q)

In [33]:
start = time()
P,Q = mf_sgd(R, k=128, alpha=0.025, beta=0.2, n_iter=50)
end = time()
print(f"Temps d'exécution: {(end-start):.3f} s")

Iteration:  5 : RMSE =  2.7807619324090216
Iteration:  10 : RMSE =  1.9105106239856737
Iteration:  15 : RMSE =  1.3850826465886725
Iteration:  20 : RMSE =  1.042221850082097
Iteration:  25 : RMSE =  0.8058066044542925
Iteration:  30 : RMSE =  0.634046730075755
Iteration:  35 : RMSE =  0.5128457058783094
Iteration:  40 : RMSE =  0.43180818421416756
Iteration:  45 : RMSE =  0.3778885842010513
Iteration:  50 : RMSE =  0.34063762536269115
Temps d'exécution: 30.410 s


In [35]:
%%timeit
P,Q = mf_sgd(R, k=128, alpha=0.025, beta=0.2, n_iter=50, verbose=False)

32.3 s ± 2.24 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [55]:
start = time()
P,Q = mf_sgd(R, k=128, alpha=0.005, beta=0.001, n_iter=500)
end = time()
print(f"Temps d'exécution: {(end-start):.3f} s")

Iteration:  50 : RMSE =  1.8298425738661261
Iteration:  100 : RMSE =  0.9457960716981967
Iteration:  150 : RMSE =  0.5016145296472588
Iteration:  200 : RMSE =  0.2676194944897138
Iteration:  250 : RMSE =  0.15124073111221237
Iteration:  300 : RMSE =  0.09561694689326262
Iteration:  350 : RMSE =  0.06477346322931872
Iteration:  400 : RMSE =  0.04814136577128693
Iteration:  450 : RMSE =  0.03965336796342305
Iteration:  500 : RMSE =  0.033144101548701385
Temps d'exécution: 344.880 s


Avec Numba on obtient un temps d'exécution moyen de 32.3s pour 5 000 000 updates, soit 154 800 updates/s.

L'utilisation de Numba a fourni ici un speed-up de 62% par rapport au code Python précédent.

### Parallélisation et approche Hogwild!

Dans la partie suivante on se propose de paralléliser la descente de gradient sur GPU. On utilise l'approche Hogwild![2] qui est l'une des approches les plus populaires pour paralléliser la descente de gradient, non pas seulement de par sa simplicité mais aussi par sa rapidité. Cette approche suppose que le problème d'optimisation est sparse, c'est-à-dire que la plupart des updates de gradient ne modifient qu'un sous-ensemble de paramètres de taille largement inférieure au nombre de threads qui s'exécutent en parallèle. Dans ce cas, la probabilité de collision est faible, et même en cas de collision la perte de précision résultante est négligeable.

On précise qu'ici, on parle de collision (ou race condition) lorsqu'un thread (1) et un thread (2) traitent en parallèle deux observations ayant la même ligne i ou la colonne j dans la matrice R, et dans ce cas ils modifient le même facteur P[i] ou Q[j] (supposons que c'est P[i]), et si le thread (1) effectue sa modification de P[i] entre la lecture du thread (2) de P[i] et la modification de (2) de P[i], alors la modification apportée par le thread (1) sera écrasée par celle du thread (2) et on perd un update.

La notion de collision étant précisée, on voit que pour les problèmes sparses la probabilité de collision est faible et son impact négligeable, d'où l'idée de Hogwild! consistant simplement à paralléliser les updates de gradient entre les threads en traitant une observation aléatoire par thread, sans tenir compte des potentielles collisions. Cela évite le besoin d'un scheduler pour choisir l'observation à traiter par chaque thread de manière à éviter les collisions (dans ce cas le scheduler sera le bottleneck et ralentira l'exécution selon l'article [1]), et évite le besoin de synchroniser les threads ou de verrouiller des accès mémoire. Cette approche est donc optimale de point de vue performance mais au coût d'une perte de précision inversement proportionnelle à la sparsité du problème (perte de précision nulle dans le cas théorique d'une matrice infiniment sparse, mais élevée si le nombre de threads concurrents est supérieur au nombre d'observations indépendantes, c'est-à-dire ne partageant pas une même ligne ou colonne dans R).

Dans notre cas la matrice R est très sparse avec seulement 0.01% de ses éléments étant non nuls (c'est généralement le cas des problèmes de recommandation), ce qui en fait un problème parfaitement adapté à l'approche Hogwild!. On adopte donc cette méthode, qu'on optimisera ensuite comme proposé dans [1] en traitant les données par batch.

In [64]:
# Sparsity of R: number of non-zero entries / total number of entries
len(R)/((1+np.max(R[:,0]))*(1+np.max(R[:,1])))

0.00011295908037319723

### PyCuda

Dans cette partie on utilise PyCuda pour paralléliser la factorisation de matrices avec descente de gradient sur un nombre paramétrable de threads et de blocs.

On applique d'abord l'approche Hogwild! standard où à chaque itération, chaque thread lit une observation aléatoire de R et effectue une mise à jour de gradient sur P et Q.

In [55]:
R = load_data("sparse", 100000)
R = remap_ids(R); R

array([[26918,  3614,     4],
       [30028,  3686,     2],
       [47327,  9157,     4],
       ...,
       [77065,  3781,     4],
       [38373,  1160,     4],
       [75980,  8320,     4]], dtype=int32)

In [230]:
pycudakernel = SourceModule("""
#include <stdio.h>
__global__ void pycuda_mf_kernel(float *P, float *Q, float alpha, float beta, int k, int n_samples, float *samples, int n_iter,
int n_threads_per_block,int n_blocks, int *rndm)
{
    // variable initialization
    int sample, iter;
    int thread_id_within_block = threadIdx.x;
    int block_id = blockIdx.x;
    int global_thread_id = block_id*n_threads_per_block + thread_id_within_block; // bijection that associates a unique id to every (thread,block) couple
    // we iterate n_iter times
    for ( iter = 0; iter < n_iter; iter++) {
        // every iteration, we pick a random sample from R and perform an gradient update
        sample = rndm[global_thread_id*n_iter+iter]; // get a new random number every iteration for every thread
        // read sample (i,j,R[i,j])
        int i = samples[sample*3];
        int j = samples[sample*3+1];
        float r = samples[sample*3+2];
        // calculate prediction and error
        float pred = 0;
        int t;
        for( t = 0; t < k; t = t + 1 ){
        pred =pred+(P[i*k+t]*Q[j*k+t]);
        }
        float e = r - pred;
        // update P[i] and Q[j]
        for( t = 0; t < k; t = t + 1 ){
            P[i*k+t] = alpha * e * Q[j*k+t] + (1 - alpha * beta) * P[i*k+t];
            Q[j*k+t] = alpha * e * P[i*k+t] + (1 - alpha * beta) * Q[j*k+t];
        }
    }
}
""")
pycuda_mf_kernel = pycudakernel.get_function("pycuda_mf_kernel")

In [231]:
def mf_sgd_pycuda(R, k, alpha, beta, n_iter, n_threads_per_block = 32, n_blocks = 1):
    """
    Matrix factorization
    - R : matrix to factorize
    - k : latent dimension
    - alpha : learning rate
    - beta : regularization parameter
    - n_iter : number of SGD iterations
    - n_threads_per_block : number of threads per block, each thread will run the kernel code
    - n_blocks : number of blocks (groups of threads)
    """
    # Copy the needed arrays to GPU memory
    m = 1 + np.max(R[:,0])
    n = 1 + np.max(R[:,1])
    P = gpuarray.to_gpu((1./k * np.random.normal(size = (m,k))).astype("float32"))
    Q = gpuarray.to_gpu((1./k * np.random.normal(size = (n,k))).astype("float32"))
    R_gpu = gpuarray.to_gpu(R.flatten().astype("float32"))
    rndm = gpuarray.to_gpu(np.random.randint(0,len(R),n_iter*n_blocks*n_threads_per_block, dtype=np.int32)) # an array of random numbers used to pick random samples
    # Perform stochastic gradient descent on GPU
    pycuda_mf_kernel(P, Q, np.float32(alpha), np.float32(beta), np.int32(k), np.int32(len(R)),
                       R_gpu, np.int32(n_iter), np.int32(n_threads_per_block), np.int32(n_blocks), rndm,
                       grid=(n_blocks,1), block=(n_threads_per_block,1,1))            
    return(P,Q)

In [82]:
start = time()
P,Q = mf_sgd_pycuda(R, k=128, alpha=0.021, beta=0.05, n_iter=5, n_threads_per_block=1000, n_blocks=1000)
end = time()
print(f"Temps d'exécution: {(end-start):.3f} s\nRMSE :")
rmse(R, P.get(), Q.get())

Temps d'exécution: 0.968 s
RMSE :


0.2910410943173563

In [77]:
%%timeit
P,Q = mf_sgd_pycuda(R, k=128, alpha=0.021, beta=0.05, n_iter=5, n_threads_per_block=1000, n_blocks=1000)

963 ms ± 2.98 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


De même que les exemples précédents, on a fait ici 5 000 000 updates de gradient (5 itérations $\times$ 1000 blocks $\times$ 1000 threads par block) mais cette fois-ci distribués sur 1 000 000 threads, ce qui est largement suffisant pour exploiter la capacité maximale de parallélisation d'un GPU moderne (dans notre cas, on utilise une RTX 3060 dont la capacité de threads concurrents est 28 SM * 32 warps/SM * 32 threads/warp = 28672, mais cette limite est théorique puisqu'une partie des threads peut être indisponible ou occupée par un autre processus).

Le temps d'exécution moyen vaut 963ms, on a donc atteint 5 192 107 updates/s, soit un speed up de 5420 % (54.2x) par rapport au code Python non parallélisé.

On peut également atteindre des valeurs plus faibles du RMSE en augmentant le nombre d'itérations (tout en conservant un temps d'exécution faible) :

In [98]:
start = time()
P,Q = mf_sgd_pycuda(R, k=128, alpha=0.012, beta=0.002, n_iter=100, n_threads_per_block=1000, n_blocks=1000)
end = time()
print(f"Temps d'exécution: {(end-start):.3f} s\nRMSE :")
rmse(R, P.get(), Q.get())

Temps d'exécution: 14.080 s
RMSE :


0.005696545176249839

Ici l'overhead d'initialisation devient négligeable et le speed-up par rapport au code Python devient de 74x

Ces résultats sont satisfaisants, mais on peut les améliorer en appliquant l'approche Batch Hogwild! proposée dans l'article [1]. L'idée est de choisir la dimension latente k multiple de 32 (ici 128) et une taille de bloc multiple de 32 threads (chaque groupe de 32 threads est appelé warp) puis à chaque itération, au lieu de traiter une observation aléatoire de R par thread, on traite séquentiellement 128 observations consécutives par warp, et sur chaque warp on parallélise le calcul du gradient et de la nouvelle valeur de P[i] et Q[j] sur les 32 threads du warp. Chaque warp traite donc en parallèle une première observation, puis une deuxième, .., puis une 128ème. Ensuite ce processus est répété n_iter fois.

D'après l'article, l'avantage principal de cette approche est l'utilisation plus efficace du cache, avec la mise en cache des 128 triplets consécutifs (i,j,R[i,j]) pour qu'ils soient traités plus rapidement par les 32 threads du warp. D'après nos tests, un autre avantage du traitement des données par batch est qu'on simule une moyennisation du gradient sur le batch ce qui réduit le risque de divergence du gradient. Concrètement, sur les approches précédentes il faut choisir un learning rate alpha de l'ordre de 0.01 sinon le gradient diverge et un overflow se produit sur les valeurs de P et Q, mais avec Batch Hogwild! on peut choisir un learning rate de l'ordre de 0.1 sans observer d'overflow, ce qui accélère la descente de gradient et permet d'obtenir un faible taux d'erreur avec moins d'updates.

L'article propose également plusieurs optimisations pour accélérer l'algorithme comme l'utilisation du type half (nombre décimal à 16 bits), et l'instruction \_shfl\_down\_sync  pour transmettre les valeurs P[i,u]Q[j,u] du registre d'un thread à l'autre afin de calculer le produit scalaire P[i].Q[j] sans recourir à la mémoire partagée du GPU (plus lente).

Ci-dessous nous proposons une adaptation PyCuda du code Cuda-C [3] fourni par les auteurs de l'article.

In [38]:
pycudakernel = SourceModule("""
#include <cuda_fp16.h>
extern "C"{
__global__ void pycuda_mf_kernel(half *P, half *Q, float alpha, float beta, int k, int n_samples, int *R, int n_iter, int *rndm)
{
    int thread_id_within_warp = threadIdx.x%32;
    int warp_id = (blockDim.x/32)*blockIdx.x + threadIdx.x/32; // unique combination of block id and warp id within block
    int start_id, warp_sample;
    for (int iter = 0; iter < n_iter; iter++) {
        // set start id (first sample to process) for each warp
        // first thread of the warp chooses a random sample and transmits it to the others
        if(thread_id_within_warp == 0)
        {
            start_id = rndm[warp_id*n_iter+iter]; // random int from uniform[O,n_samples] for every warp every iteration
        }
        start_id = __shfl_sync(0xffffffff, start_id, 0); // fast thread-to-thread transmission within warp
        for (warp_sample = 0; warp_sample < k; warp_sample++) {
            int offset = (start_id + warp_sample)%n_samples;
            int i = __ldg(&R[3*offset]);
            int j = __ldg(&R[3*offset+1]);
            float r = __ldg(&R[3*offset+2]);
            
            // read P[i] and Q[j] into register
            // there are 128 entries to read, each thread reads 4
            int base_p = i*k;
            int base_q = j*k;
            float tmp_p1 = __half2float(P[base_p + thread_id_within_warp]);
            float tmp_q1 = __half2float(Q[base_q + thread_id_within_warp]);

            float tmp_p2 = __half2float(P[base_p + thread_id_within_warp + 32]);
            float tmp_q2 = __half2float(Q[base_q + thread_id_within_warp + 32]);

            float tmp_p3 = __half2float(P[base_p + thread_id_within_warp + 64]);
            float tmp_q3 = __half2float(Q[base_q + thread_id_within_warp + 64]);

            float tmp_p4 = __half2float(P[base_p + thread_id_within_warp + 96]);
            float tmp_q4 = __half2float(Q[base_q + thread_id_within_warp + 96]);
            
            // calculate dot product on these 4 entries
            float tmp_product = tmp_p1*tmp_q1 + tmp_p2*tmp_q2 + tmp_p3*tmp_q3 + tmp_p4*tmp_q4;
            
            // transmit to other threads and get their products
            tmp_product += __shfl_down_sync(0xffffffff, tmp_product, 16);
            tmp_product += __shfl_down_sync(0xffffffff, tmp_product, 8);
            tmp_product += __shfl_down_sync(0xffffffff, tmp_product, 4);
            tmp_product += __shfl_down_sync(0xffffffff, tmp_product, 2);
            tmp_product += __shfl_down_sync(0xffffffff, tmp_product, 1);
            tmp_product = __shfl_sync(0xffffffff, tmp_product, 0);

            float err = r - tmp_product;
            
            // we need to update k=128 factors in P[i] and Q[j]: each of the 32 threads of the warp updates 4 factors
            P[base_p + thread_id_within_warp + 0] = __float2half(tmp_p1 + alpha*(err*tmp_q1 - beta*tmp_p1));
            Q[base_q + thread_id_within_warp + 0] = __float2half(tmp_q1 + alpha*(err*tmp_p1 - beta*tmp_q1));

            P[base_p + thread_id_within_warp + 32] = __float2half(tmp_p2 + alpha*(err*tmp_q2 - beta*tmp_p2));
            Q[base_q + thread_id_within_warp + 32] = __float2half(tmp_q2 + alpha*(err*tmp_p2 - beta*tmp_q2));

            P[base_p + thread_id_within_warp + 64] = __float2half(tmp_p3 + alpha*(err*tmp_q3 - beta*tmp_p3));
            Q[base_q + thread_id_within_warp + 64] = __float2half(tmp_q3 + alpha*(err*tmp_p3 - beta*tmp_q3));

            P[base_p + thread_id_within_warp + 96] = __float2half(tmp_p4 + alpha*(err*tmp_q4 - beta*tmp_p4));
            Q[base_q + thread_id_within_warp + 96] = __float2half(tmp_q4 + alpha*(err*tmp_p4 - beta*tmp_q4));
        }
    }
}}
  """, no_extern_c=True)
pycuda_mf_kernel = pycudakernel.get_function("pycuda_mf_kernel")

In [37]:
def mf_sgd_pycuda(R, k, alpha, beta, n_iter, n_threads_per_block, n_blocks ):
    """
    Matrix factorization
    - R : matrix to factorize
    - k : latent dimension
    - alpha : learning rate
    - beta : regularization parameter
    - n_iter : number of SGD iterations
    - n_threads_per_block : number of threads per block, each thread will run the kernel code
    - n_blocks : number of blocks (groups of threads)
    """
    # Copy the needed arrays to GPU memory
    m = 1 + np.max(R[:,0])
    n = 1 + np.max(R[:,1])
    P = gpuarray.to_gpu((1./k * np.random.normal(size = (m,k))).astype("float16"))
    Q = gpuarray.to_gpu((1./k * np.random.normal(size = (n,k))).astype("float16"))
    R_gpu = gpuarray.to_gpu(R.flatten().astype("int32"))
    rndm = gpuarray.to_gpu(np.random.randint(0,len(R),n_iter*n_blocks*n_threads_per_block//32, dtype=np.int32))
    # Perform stochastic gradient descent on GPU
    pycuda_mf_kernel(P, Q, np.float32(alpha), np.float32(beta), np.int32(k), np.int32(len(R)),
                       R_gpu, np.int32(n_iter), rndm,
                       grid=(n_blocks,1), block=(n_threads_per_block,1,1))
    return(P,Q)

In [185]:
start = time()
P,Q = mf_sgd_pycuda(R, k=128, alpha=0.13, beta=0.0001, n_iter=2, n_threads_per_block=1024, n_blocks=610)
end = time()
print(f"Temps d'exécution: {(end-start):.3f} s\nRMSE :")
rmse(R, P.get().astype("float32"), Q.get().astype("float32"))

Temps d'exécution: 0.274 s
RMSE :


0.001062024815507108

In [165]:
%%timeit
P,Q = mf_sgd_pycuda(R, k=128, alpha=0.13, beta=0.0001, n_iter=2, n_threads_per_block=1024, n_blocks=610)

270 ms ± 1.28 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


Dans l'exemple précédent on a choisi n_threads_per_block = 1024, n_blocks = 610 et n_iter = 2 pour avoir n_updates =  n_iter $\times$ n_blocks $\times$ 128 $\times$ n_threads_per_block/32 = 4 997 120 updates, en ligne avec les méthodes précédentes. On obtient 18,3 millions updates/s, soit 3.5x de plus que Hogwild! standard, mais on peut atteindre un meilleur speed-up en augmentant le nombre d'itérations, car ici le code s'exécute si rapidement (270 ms) que l'overhead pour initialiser P et Q et transmettre les données en mémoire GPU n'est plus négligeable. On augmente donc le nombre d'itérations.

In [211]:
start = time()
P,Q = mf_sgd_pycuda(R, k=128, alpha=0.12, beta=0.0001, n_iter=1000, n_threads_per_block=1024, n_blocks=1024)
end = time()
print(f"Temps d'exécution: {(end-start):.3f} s\nRMSE :")
rmse(R, P.get().astype("float32"), Q.get().astype("float32"))

Temps d'exécution: 8.270 s
RMSE :


0.0003738241412920352

In [188]:
%%timeit
P,Q = mf_sgd_pycuda(R, k=128, alpha=0.12, beta=0.0001, n_iter=1000, n_threads_per_block=1024, n_blocks=1024)

8.12 s ± 5.13 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


Ici on effectue 1000 $\times$ 1024 $\times$ 128 $\times$ 1024 / 32 = 4 194 304 000 updates, soit 516 539 901 updates/s ce qui est très satisfaisant et représente un speed-up de 5392x par rapport à l'implémentation séquentielle Python (sparse).

On s'approche de la limite de notre GPU qui est de 12.7 TFLOPS (qui serait équivalent à 516M updates/s si on suppose que chaque update utilise l'équivalent de 24586 opérations en virgule flottante, ce qui est peut-être un seul ordre de grandeur supérieur aux opérations qu'on effectue ici pour k = 128).

## CuPy

CuPy est un module Python qui propose des objets (principalement arrays) similaires à Numpy mais dont les méthodes sont parallélisées sur GPU avec CUDA. Dans notre cas, remplacer Numpy par CuPy dans notre code Python séquentiel n'a pas amélioré son temps d'exécution (car les boucles for de Python sont lentes et non parallélisées), mais CuPy offre également la possibilité d'écrire des kernels CUDA tout comme PyCuda. Pour comparer PyCuda et CuPy, on teste donc notre code précédent (batch Hogwild!) sur CuPy.

In [130]:
cupy_mf_kernel = cp.RawKernel("""
#include <cuda_fp16.h>
extern "C"
__global__ void cupy_mf_kernel(half *P, half *Q, float alpha, float beta, int k, int n_samples, int *R, int n_iter, int *rndm)
{
    int thread_id_within_warp = threadIdx.x%32;
    int warp_id = (blockDim.x/32)*blockIdx.x + threadIdx.x/32; // unique combination of block id and warp id within block
    int start_id, warp_sample;
    for (int iter = 0; iter < n_iter; iter++) {
        // set start id (first sample to process) for each warp
        // first thread of the warp chooses a random sample and transmits it to the others
        if(thread_id_within_warp == 0)
        {
            start_id = rndm[warp_id*n_iter+iter]; // random int from uniform[O,n_samples] for every warp every iteration
        }
        start_id = __shfl_sync(0xffffffff, start_id, 0); // fast thread-to-thread transmission within warp
        for (warp_sample = 0; warp_sample < k; warp_sample++) {
            int offset = (start_id + warp_sample)%n_samples;
            int i = __ldg(&R[3*offset]);
            int j = __ldg(&R[3*offset+1]);
            float r = __ldg(&R[3*offset+2]);
            
            // read P[i] and Q[j] into register
            // there are 128 entries to read, each thread reads 4
            int base_p = i*k;
            int base_q = j*k;
            float tmp_p1 = __half2float(P[base_p + thread_id_within_warp]);
            float tmp_q1 = __half2float(Q[base_q + thread_id_within_warp]);

            float tmp_p2 = __half2float(P[base_p + thread_id_within_warp + 32]);
            float tmp_q2 = __half2float(Q[base_q + thread_id_within_warp + 32]);

            float tmp_p3 = __half2float(P[base_p + thread_id_within_warp + 64]);
            float tmp_q3 = __half2float(Q[base_q + thread_id_within_warp + 64]);

            float tmp_p4 = __half2float(P[base_p + thread_id_within_warp + 96]);
            float tmp_q4 = __half2float(Q[base_q + thread_id_within_warp + 96]);
            
            // calculate dot product on these 4 entries
            float tmp_product = tmp_p1*tmp_q1 + tmp_p2*tmp_q2 + tmp_p3*tmp_q3 + tmp_p4*tmp_q4;
            
            // transmit to other threads and get their products
            tmp_product += __shfl_down_sync(0xffffffff, tmp_product, 16);
            tmp_product += __shfl_down_sync(0xffffffff, tmp_product, 8);
            tmp_product += __shfl_down_sync(0xffffffff, tmp_product, 4);
            tmp_product += __shfl_down_sync(0xffffffff, tmp_product, 2);
            tmp_product += __shfl_down_sync(0xffffffff, tmp_product, 1);
            tmp_product = __shfl_sync(0xffffffff, tmp_product, 0);

            float err = r - tmp_product;
            
            // we need to update k=128 factors in P[i] and Q[j]: each of the 32 threads of the warp updates 4 factors
            P[base_p + thread_id_within_warp + 0] = __float2half(tmp_p1 + alpha*(err*tmp_q1 - beta*tmp_p1));
            Q[base_q + thread_id_within_warp + 0] = __float2half(tmp_q1 + alpha*(err*tmp_p1 - beta*tmp_q1));

            P[base_p + thread_id_within_warp + 32] = __float2half(tmp_p2 + alpha*(err*tmp_q2 - beta*tmp_p2));
            Q[base_q + thread_id_within_warp + 32] = __float2half(tmp_q2 + alpha*(err*tmp_p2 - beta*tmp_q2));

            P[base_p + thread_id_within_warp + 64] = __float2half(tmp_p3 + alpha*(err*tmp_q3 - beta*tmp_p3));
            Q[base_q + thread_id_within_warp + 64] = __float2half(tmp_q3 + alpha*(err*tmp_p3 - beta*tmp_q3));

            P[base_p + thread_id_within_warp + 96] = __float2half(tmp_p4 + alpha*(err*tmp_q4 - beta*tmp_p4));
            Q[base_q + thread_id_within_warp + 96] = __float2half(tmp_q4 + alpha*(err*tmp_p4 - beta*tmp_q4));
        }
    }
}
  """, "cupy_mf_kernel")

In [184]:
def mf_sgd_cupy(R, k, alpha, beta, n_iter, n_threads_per_block, n_blocks):
    """
    Matrix factorization
    - R : matrix to factorize
    - k : latent dimension
    - alpha : learning rate
    - beta : regularization parameter
    - n_iter : number of SGD iterations
    - n_threads_per_block : number of threads per block, each thread will run the kernel code
    - n_blocks : number of blocks (groups of threads)
    """
    # Initialize user and item latent feature matrice
    m = 1 + np.max(R[:,0])
    n = 1 + np.max(R[:,1])
    P = 1./k * cp.random.rand(m, k).astype(cp.half)
    Q = 1./k * cp.random.rand(n, k).astype(cp.half)
    R_gpu = cp.asarray(R.flatten(), dtype=cp.int32)
    rndm = cp.random.randint(0,len(R),n_iter*n_blocks*n_threads_per_block//32, dtype=cp.int32)
    # Perform stochastic gradient descent on GPU
    cupy_mf_kernel((n_blocks,), (n_threads_per_block,), (P, Q, np.float32(alpha), np.float32(beta), np.int32(k), np.int32(len(R)), R_gpu, np.int32(n_iter), rndm))
    return(P,Q)

Premier test avec 4 997 120 updates de gradient :

In [188]:
start = time()
P,Q = mf_sgd_cupy(R, k=128, alpha=0.13, beta=0.0001, n_iter=2, n_threads_per_block=1024, n_blocks=610)
print(P[0,0]) # CuPy doesn't run the kernel code until you print the result
end = time()
clear_output()
print(f"Temps d'exécution: {(end-start):.3f} s\nRMSE :")
rmse(R, P.get().astype("float32"), Q.get().astype("float32"))

Temps d'exécution: 0.014 s
RMSE :


0.0026886741770251704

In [157]:
%%timeit
P,Q = mf_sgd_cupy(R, k=128, alpha=0.13, beta=0.0001, n_iter=2, n_threads_per_block=1024, n_blocks=610)
print(P[0,0])
clear_output(wait=False)

12.2 ms ± 41.6 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


Deuxième test avec 4 194 304 000 updates:

In [220]:
start = time()
P,Q = mf_sgd_cupy(R, k=128, alpha=0.12, beta=0.0001, n_iter=1000, n_threads_per_block=1024, n_blocks=1024)
print(P[0,0]) # CuPy doesn't run the kernel code until you print the result
end = time()
clear_output()
print(f"Temps d'exécution: {(end-start):.3f} s\nRMSE :")
rmse(R, P.get().astype("float32"), Q.get().astype("float32"))

Temps d'exécution: 7.810 s
RMSE :


0.0005486438305574281

In [219]:
%%timeit
P,Q = mf_sgd_cupy(R, k=128, alpha=0.12, beta=0.0001, n_iter=1000, n_threads_per_block=1024, n_blocks=1024)
print(P[0,0])
clear_output(wait=False)

7.64 s ± 36.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


On constate que CuPy garde un nombre d'updates/s élevé dans les deux tests (respectivement 410 M et 549 M) ce qui signifie qu'il encourt un overhead d'exécution significativement plus faible que PyCuda. Sur le deuxième test on obtient un speed-up de 6% par rapport à PyCuda et de 5732x par rapport à l'implémentation Python séquentielle.

Curieusement, les paramètres alpha et beta optimaux pour PyCuda ne correspondent pas exactement aux paramètres optimaux pour CuPy bien que le code du kernel soit identique, et on obtient une erreur légèrement plus élevée avec CuPy:

In [227]:
P,Q = mf_sgd_cupy(R, k=128, alpha=0.16, beta=0.00001, n_iter=1000, n_threads_per_block=1024, n_blocks=1024)
print(P[0,0])
clear_output()
print("RMSE :")
rmse(R, P.get().astype("float32"), Q.get().astype("float32"))

RMSE :


0.00041243791095700767

### Conclusion

Implémentation sparse séquentielle Python (référence): 95785 updates/s.

Numba : Speed-up de 1.62 (62% plus rapide)

PyCuda Hogwild! standard: Speed-up de 74

PyCuda Batch Hogwild! optimisé : Speed-up de 5392

CuPy Batch Hogwild! optimisé : Speed-up de 5732

Remarque: résultats susceptibles de varier en fonction de la machine utilisée

### Références
[1] Xiaolong Xie, Wei Tan, Liana Fong, Yun Liang: CuMF_SGD: Parallelized Stochastic Gradient Descent for Matrix Factorization on GPUs: https://arxiv.org/abs/1610.05838

[2] Feng Niu, Benjamin Recht, Christopher Re, Stephen J. Wright: HOGWILD!: A Lock-Free Approach to Parallelizing Stochastic Gradient Descent: https://arxiv.org/abs/1106.5730

[3] GitHub CuMF SGD https://github.com/cuMF/cumf_sgd