<Center><h3> Part 2 : Parallelized Stochastic Gradient Descent

### Import Section 

In [1]:
import sys
import math
from time import time
import random
import csv
import numpy
from pyspark import SparkContext
from scipy import sparse
import numpy as np
from functions import *
%matplotlib inline

### Initialize the used parameters

In [24]:
rho = 0.2
C = 6 # Number of factors
nbr_iter = 50 # number of iterations
block_number = 5 # number of blocks to take from the matrix

** Non-parallelized SGD Algorithm **

- Pick $j$ uniformly at random in $\{1, \ldots, n\}$
- $\forall t = 1 \ldots T :$

$$
w_{t+1} \gets w_t - \eta \nabla f_j(w_t)
$$

where $\eta$ is the step size .
To improve the performance of the algorithm we take $\eta$ = $\frac{\eta_0}{\sqrt{t+1}}$

In [14]:
def SGD(R, Q, P, mask, Ni, Nj, blockRange):
    """
    This function is an implementation of the SGD algorithm described above.
    Input : R, Q, P, mask, Ni, Nj, blockRange
    Output : Q, P, n, blockRange
    """
    
    global rho
    eta = .01 # first step size
    R_new = R.nonzero()
    n = R_new[0].size
    
    for i in range(n):
        
        j = random.randint(0, n-1) # Pick randomly an element j
        row, col = R_new[0][j], R_new[1][j] # retrieve the row and column of the random j
        
        # take a small blocks from R, mask, Q and P
        Ri = R[row,col] 
        maski = mask[row,col]
        Qi = Q[row,:]
        Pi = P[:,col]
        
        # compute the gradient of Qi and Pi
        _, grad_Q = objective_Q(Pi, Qi, Ri, maski, rho/Ni[row])
        _, grad_P = objective_P(Pi, Qi, Ri, maski, rho/Nj[col])
        eta = eta * (1 + i) ** (- 0.5)
        
        # update the blocks of P and Q
        Q[row,:] = Qi - eta * grad_Q
        P[:,col] = Pi - eta * grad_P
        #print(np.linalg.norm(Q[row,:]))
        
    return (Q, P, n, blockRange)

### Parallelized SGD Algorithm

In [25]:
def Parallelized_SGD(R, mask):
    """
    This function performs the Parallelized SGD algorithm
    Input : R, mask
    Output : Q, P
    """
    
    global nbr_iter, block_number, C
    
    Q = numpy.random.random_sample((R.shape[0], C))
    P = numpy.random.random_sample((C, R.shape[1]))
    block_i = (R.shape[0]/block_number, R.shape[1]/block_number)
    
    
    rowRangeList = [[k*block_i[0],(k+1)*block_i[0]] for k in range(block_number)]
    colRangeList = [[k*block_i[1],(k+1)*block_i[1]] for k in range(block_number)]

    rowRangeList[-1][1] += R.shape[0]%block_number
    colRangeList[-1][1] += R.shape[1]%block_number


    for iter_ in range(nbr_iter):
        if iter_ % 10 == 0:
            print("... iteration %s"%(iter_))
        
        for epoch in range(block_number):
            grid = []
            
            for block in range(block_number):
                rowRange = [int(rowRangeList[block][0]), int(rowRangeList[block][1])]
                colRange = [int(colRangeList[block][0]), int(colRangeList[block][1])]
                
                Rn = R[rowRange[0]:rowRange[1], colRange[0]:colRange[1]]
                maskn = mask[rowRange[0]:rowRange[1], colRange[0]:colRange[1]]
                Qn = Q[rowRange[0]:rowRange[1],:]
                Pn = P[:,colRange[0]:colRange[1]]
                
                Ni = {}
                for i in range(rowRange[0],rowRange[1]):
                    Ni[int(i-int(rowRange[0]))] = R[i,:].nonzero()[0].size
                    
                Nj = {}
                for i in range(colRange[0],colRange[1]):
                    Nj[i-colRange[0]] = R[:,i].nonzero()[0].size 
                    
                if (Rn.nonzero()[0].size != 0):
                    grid.append([Rn, Qn, Pn, maskn, Ni, Nj, (rowRange, colRange)])
                    
                    
                    
            rdd = sc.parallelize(grid, block_number).\
                        map(lambda x: SGD(x[0],x[1],x[2],x[3],x[4],x[5],x[6])).collect()
                
                
            for elem in rdd:
                rowRange,colRange = elem[3]
                Q[rowRange[0]:rowRange[1],:] = elem[0]
                P[:,colRange[0]:colRange[1]] = elem[1]

            colRangeList.insert(0,colRangeList.pop())
            
            
            
    return Q,P

In [26]:
def outputMatrix(A, path):
    """
    This function outputs a matrix to a csv file
    """
    f = open(path, 'w', 100)
    rows= A.shape[0]
    cols = A.shape[1]
    for row in range(rows):
        for col in range(cols):  
            if col == cols-1:
                f.write(str(A[row,col])) 
            else:
                f.write(str(A[row,col]) + ",")
        f.write("\n")

    f.flush()
    f.close()

In [27]:
def load_data(filename="u.data" , small_data=True):
    """
    This function returns :
        R : the matrix user-item containing the ratings
        mask : matrix is equal to 1 if a score existes and 0 otherwise
    """
    data = np.loadtxt(filename, dtype=int)
    
    R = sparse.csr_matrix((data[:, 2], (data[:, 0]-1, data[:, 1]-1)),dtype=float)
    mask = sparse.csr_matrix((np.ones(data[:, 2].shape),(data[:, 0]-1, data[:, 1]-1)), dtype=bool )

    # take a small part of the whole data for testing 
    if small_data is True:
        R = (R[0:100, 0:100].copy())
        mask = (mask[0:100, 0:100].copy())
        
        
    return R.toarray(), mask.toarray()

### Testing our algorithm

In [28]:
global R, P, Q
spark = SparkContext.getOrCreate()
output_Q = "Q.csv"
output_P = "P.csv"

#load data
R, mask = load_data(filename="u.data" , small_data=True)
t = time()
print("Start Process ....")
Q, P = Parallelized_SGD(R, mask)
print("Process finished in %s s"%(time()-t))
# Wrtie the obtained Matrices to csv file
outputMatrix(P,output_P)
outputMatrix(Q,output_Q)

Start Process ....
... iteration 0
... iteration 10
... iteration 20
... iteration 30
... iteration 40
Process finished in 1227.4811024665833 s


In [29]:
R_result = Q.dot(P)
print(R_result)
print(R)

[[ 5.24681419  3.44999746  2.93827376 ...,  4.93948659  4.20981051
   5.90208862]
 [ 3.45890691  2.88852161  1.94166001 ...,  3.45883726  2.87506586
   3.92243295]
 [ 3.28104943  2.11192302  2.53980984 ...,  3.09046697  2.66709054
   3.67397603]
 ..., 
 [ 2.79261176  2.11598226  2.5189713  ...,  2.36535702  1.94035837
   2.67286888]
 [ 2.86420341  1.8404591   2.10479273 ...,  2.83349656  2.49764757
   3.71285494]
 [ 2.82295066  2.46283972  2.08517971 ...,  2.90593869  2.04198094
   3.28897811]]
[[ 5.  3.  4. ...,  4.  3.  5.]
 [ 4.  0.  0. ...,  0.  0.  5.]
 [ 0.  0.  0. ...,  0.  0.  0.]
 ..., 
 [ 0.  0.  0. ...,  0.  0.  0.]
 [ 4.  0.  3. ...,  5.  0.  5.]
 [ 0.  0.  0. ...,  0.  0.  0.]]


We can see that the algorithm works relatively well since non zero values in R are very similar to those in QP at the same positions.  

In [42]:
relative_error = np.linalg.norm(mask*(R - np.dot(Q, P))) / np.linalg.norm(R) * 100
print("Relative error : %f"%relative_error)

Relative error : 26.429458


### Recommendation

In order to find the movie that we'll recommand to the user n° u, we have to consider the decomposition $ R = QP$.

In fact, by multiplying the $u^{th}$ row of Q by the matrix P, we obtain a vector r of size $ I$ x $ 1$ that contains all the estimated ratings of movies for this user u.   

Now, we only need to consider the highest score and take its index (which correspond to the recommanded movie), however we shoudn't forget that we have to avoid recommanding a movie that the user had already seen, that's why we need to multiply our vector r by the opposite of the mask. 


** What recommanding to the user n° 45 ?** 

In [40]:
u = 45
r = np.dot(Q[u,:], P)

r = r * (1 - mask[u,:])

movie_index = np.argmax(r)
print("We recommand to the user n° %s the movie n°  %d"%(u,movie_index))
print ("The rating of this movie is : %0.3f "%r[movie_index])

We recommand to the user n° 45 the movie n°  55
The rating of this movie is : 3.447 
