# DATA 612 Final project
### Ananya
### Introduction : 
This notebook spark implementation demonstrates movie recommendation using Stochastic Gradient Descent algorithm, which is another way to recommend a product or an item.
Dataset is taken from the Movielens website : https://grouplens.org/datasets/movielens/.



### what is sgd

1.loss function to be optimised 

2.weak learner to make prediction

3.additive model to add weak learner to minimize the loss function 

In [133]:
# importing required library 
import sys
import math
from time import time
import random
import csv
import numpy
from pyspark import SparkContext
from scipy import sparse
import numpy as np
from functions import *
%matplotlib inline
from sklearn.metrics import mean_squared_error
from sklearn.metrics import mean_squared_error
from math import sqrt

### This algorithm requires initialization of the parameters

1."rho" is the decay factor or the exponentially weighted average over the square of the gradients.

2."decay" decays the learning rate over time, so we can move even closer to the local minimum in the end of training.

In [134]:
rho = 0.2
C = 6 # Number of factors
number_of_iter = 50 # number of iterations
block_number = 5 # number of blocks to take from the matrix

## This function is an implementation of the SGD algorithm.
This functions takes as an input a matrix R which is a sparse matrix, the mask matrix, and (Q, P), which are the factorisation of R.
loss function to minimze the distance between R and QP
Input : R, Q, P, mask, Ni, Nj, blockRange 
Output : Q, P, n, blockRange

In [135]:
def primary_SGD(R, Q, P, mask, no_row, no_column, blockRange):
  
    global rho
    eta = .01 # first step size
    R_new = R.nonzero()
    n = R_new[0].size
    
    for i in range(n):
        
        j = random.randint(0, n-1) # Pick randomly an element j
        row, col = R_new[0][j], R_new[1][j] # retrieve the row and column of the random j
        
        # take a small blocks from R, mask, Q and P
        Ri = R[row,col] 
        maski = mask[row,col]
        Qi = Q[row,:]
        Pi = P[:,col]
        
        # compute the gradient of Qi and Pi
        _, grad_Q = objective_Q(Pi, Qi, Ri, maski, rho/no_row[row])
        _, grad_P = objective_P(Pi, Qi, Ri, maski, rho/no_column[col])
        eta = eta * (1 + i) ** (- 0.5)
        
        # update the blocks of P and Q
        Q[row,:] = Qi - eta * grad_Q
        P[:,col] = Pi - eta * grad_P
        
        
    return (Q, P, n, blockRange)

### Parallelized SGD Algorithm


In [136]:
def Parallelized_SGD(R, mask):
    
    
    global nbr_iter, block_number, C
    
    Q = numpy.random.random_sample((R.shape[0], C))
    P = numpy.random.random_sample((C, R.shape[1]))
    block_i = (R.shape[0]/block_number, R.shape[1]/block_number)
    
    
    rowRangeList = [[k*block_i[0],(k+1)*block_i[0]] for k in range(block_number)]
    colRangeList = [[k*block_i[1],(k+1)*block_i[1]] for k in range(block_number)]

    rowRangeList[-1][1] += R.shape[0]%block_number
    colRangeList[-1][1] += R.shape[1]%block_number


    for iter_ in range(number_of_iter):
        if iter_ % 10 == 0:
            print("... iteration %s"%(iter_))
        
        for epoch in range(block_number):
            grid = []
            
            for block in range(block_number):
                rowRange = [int(rowRangeList[block][0]), int(rowRangeList[block][1])]
                colRange = [int(colRangeList[block][0]), int(colRangeList[block][1])]
                
                Rn = R[rowRange[0]:rowRange[1], colRange[0]:colRange[1]]
                maskn = mask[rowRange[0]:rowRange[1], colRange[0]:colRange[1]]
                Qn = Q[rowRange[0]:rowRange[1],:]
                Pn = P[:,colRange[0]:colRange[1]]
                
                no_row = {}
                for i in range(rowRange[0],rowRange[1]):
                    no_row[int(i-int(rowRange[0]))] = R[i,:].nonzero()[0].size
                    
                no_column = {}
                for i in range(colRange[0],colRange[1]):
                    no_column[i-colRange[0]] = R[:,i].nonzero()[0].size 
                    
                if (Rn.nonzero()[0].size != 0):
                    grid.append([Rn, Qn, Pn, maskn, no_row, no_column, (rowRange, colRange)])
                    
                    
                    
            rdd = sc.parallelize(grid, block_number).\
                        map(lambda x: primary_SGD(x[0],x[1],x[2],x[3],x[4],x[5],x[6])).collect()
                
                
            for elem in rdd:
                rowRange,colRange = elem[3]
                Q[rowRange[0]:rowRange[1],:] = elem[0]
                P[:,colRange[0]:colRange[1]] = elem[1]

            colRangeList.insert(0,colRangeList.pop())
            
            
            
    return Q,P

## This function loads the input data

In [161]:
def load_data(filename="u.data" , small_data=True):
    
    data = np.loadtxt(filename, dtype=int)
    R = sparse.csr_matrix((data[:, 2], (data[:, 0]-1, data[:, 1]-1)),dtype=float)
    mask = sparse.csr_matrix((np.ones(data[:, 2].shape),(data[:, 0]-1, data[:, 1]-1)), dtype=bool )
    
    if small_data is True:
        R = (R[0:100, 0:100].copy())
        mask = (mask[0:100, 0:100].copy())
        
        
    return R.toarray(), mask.toarray()

##  This function returns :
        R : the matrix user-item containing the ratings
        mask : matrix is equal to 1 if a score existes and 0 otherwise

### Using Spark 2.3, this algorithm can be tested; SparkContext has been used to initialize the spark session

In [176]:
global R, P, Q
spark = SparkContext.getOrCreate()
R, mask = load_data(filename="u.data" , small_data=True)
Q, P = Parallelized_SGD(R, mask)
rdd1 = spark.parallelize(P)
df = rdd1.map(lambda x: x.tolist()).toDF(["column"])
df.write.csv("outP.csv")
df.write.csv("outQ.csv")


... iteration 0
... iteration 10
... iteration 20
... iteration 30
... iteration 40


In [177]:
predicted_rating = Q.dot(P)
print(predicted_rating)


[[4.80600252 3.22284498 1.5172746  ... 4.60971192 4.24265673 5.05350699]
 [3.31203728 2.94637395 0.89839108 ... 3.51413321 2.97226978 3.91307384]
 [2.88087961 2.10883337 1.03392691 ... 2.68440966 2.37356204 3.05356331]
 ...
 [3.60906029 2.41714227 1.16892304 ... 3.36043894 3.06578557 3.81572065]
 [3.86087734 2.48683463 1.25639892 ... 3.89525626 3.17046293 4.30298572]
 [3.01404063 1.81685796 0.90624342 ... 3.02373322 2.6698992  3.23433818]]


We can see that the algorithm works relatively well since non zero values in R are very similar to those in QP at the same positions.  

In [178]:
relative_error = np.linalg.norm(mask*(R - np.dot(Q, P))) / np.linalg.norm(R) * 100
print("Relative error : %f"%relative_error)

Relative error : 26.503981


In [179]:
mean_squared_error(R, predicted_rating, multioutput='raw_values')

array([ 6.88218286,  5.9023236 ,  1.54953101,  6.12422833,  4.93079942,
        5.94736754,  5.70524693,  7.40245003,  7.49506169,  4.48415911,
        7.24541625,  9.01598888,  6.59359918,  5.74966485,  8.27202452,
        4.91816549,  6.24388099,  5.08070523,  7.35784124,  6.10113013,
        4.2225028 ,  9.82495594,  9.41721332,  6.83417951,  5.98669841,
        5.18852988,  4.9094934 ,  7.3550913 ,  2.6789372 ,  7.90164413,
        6.796131  ,  8.73526922,  3.43627418,  1.93866597,  5.8973791 ,
        6.17257601,  3.24291813,  4.73460774,  5.42392883,  5.34247088,
        5.04654079,  7.372278  ,  6.11757673,  8.57369832,  7.69972424,
       11.46237997,  4.96680887,  8.20788964,  8.01276108,  5.53815808,
        6.70658272,  7.97710276,  6.06108316,  5.1474098 ,  9.79355861,
        7.16778199,  3.6479794 ,  6.44111422, 10.20204468,  6.84419061,
        5.85149581,  6.86685578,  6.4027803 ,  8.6940243 ,  5.82537558,
        6.30819354,  4.29829092,  4.49677041,  7.05938314,  4.60

### Recommendation

In order to find the movie that we'll recommand to the user n° u, we have to consider the decomposition $ R = QP$.

In fact, by multiplying the $u^{th}$ row of Q by the matrix P, we obtain a vector r of size $ I$ x $ 1$ that contains all the estimated ratings of movies for this user u.   

Now, we only need to consider the highest score and take its index (which correspond to the recommanded movie), however we shoudn't forget that we have to avoid recommanding a movie that the user had already seen, that's why we need to multiply our vector r by the opposite of the mask. 


** Let us now recommend a user 45 ** 

In [180]:
u = 2
r = np.dot(Q[u,:], P)

r = r * (1 - mask[u,:])

movie_index = np.argmax(r)
print("We recommand to the user  :", u ,"movie :" ,movie_index)
print ("The rating of this movie is : " ,r[movie_index])

We recommand to the user  : 2 movie : 63
The rating of this movie is :  3.1739291676863988


In [181]:
u = 44
r = np.dot(Q[u,:], P)

r = r * (1 - mask[u,:])

movie_index = np.argmax(r)
print("We recommand to the user  :", u ," movie :" ,movie_index)
print ("The rating of this movie is : " ,r[movie_index])

We recommand to the user  : 44  movie : 63
The rating of this movie is :  3.127435836829044


### Summary : Spark helped achieve parallel distribution while processing the datasets. The parallel STochastic Gradoent Descent algorithm distributed the computed matrix over many spark workers (block number) and then aggregate the result. The number of iterations can be customized and can help improve the performance. It is recommended that a very large set be leveraged in a cloud or any cluster setting environment in order to get a better recommendation result.