<small><i>This notebook was create by Franck Iutzeler, Jerome Malick and Yann Vernaz (2016).</i></small>
<!-- Credit (images) Jeffrey Keating Thompson. -->

<center><img src="UGA.png" width="30%" height="30%"></center>
<center><h3>Master of Science in Industrial and Applied Mathematics (MSIAM)</h3></center>
<hr>
<center><h1>Convex and distributed optimization</h1></center>
<center><h2>Part III - Recommender Systems (3h + 3h home work)</h2></center>

# Outline

In this Lab, we will investigate some gradient-based algorithms on the very well known matrix factorization problem which is the most prominent approach for build a _Recommender Systems_.

Our goal is to implement Large-Scale Matrix Factorization with Distributed Stochastic Gradient Descent in Spark.

# Problem Formulation

The problem of matrix factorization for collaborative filtering captured much attention, especially after the [Netflix prize](https://datajobs.com/data-science-repo/Recommender-Systems-%5BNetflix%5D.pdf). The premise behind this approach is to approximate a large rating matrix $R$ with the multiplication of two low-dimensional factor matrices $P$ and $Q$, i.e. $R \approx \hat{R} = P^TQ$, that model respectively users and items in some latent space. For instance, matrix $R$ has dimension $m \times  n$ where $m$ and $n$ are restrictively the number of users and items, both large; while $P$ has size $m \times  k$ and contains user information in a latent space of size $k<<m,n$, $Q$ has size $n\times k$ and contains item information in the same latent space of size $k << m,n$. Typical values for $m, n$ are $10^6$ while $k$ is in the tens.

For a pair of user and item $(u_i,i_j)$ for which a rating $r_{ij}$ exists, a common approach approach is based on the minimization of the $\ell_2$-regularized quadratic error:
$$  \ell_{u_i,i_j}(P,Q)= \left(r_{ij} - p_{i}^{\top}q_{j}\right)^2 + \lambda(|| p_{i} ||^{2} + || q_{j} ||^2 )  $$
where $p_i$ is the column vector composed of the $i$-th line of $P$ and  $\lambda\geq 0$ is a regularization parameter. The whole matrix factorization problem thus writes
$$ \min_{P,Q} \sum_{i,j : r_{ij} \text{exists}}  \ell_{u_i,i_j}(P,Q). $$
Note that the error $ \ell_{u_i,i_j}(P,Q)$ depends only on $P$ and $Q$ through $p_{i}$ and $q_{j}$; however, item $i_j$ may also be rated by user $u_{i'}$ so that the optimal factor $q_{j}$ depends on both $p_{i}$ and $p_{i'}$.

In [1]:
# set up spark environment (Using Spark Local Mode)
import findspark
findspark.init()

from pyspark import SparkContext, SparkConf

conf = SparkConf().set("spark.executor.heartbeatInterval", "100000s").set("spark.executor.memory", "10000m")
# conf = SparkConf()
conf.setMaster("local[*]")
conf.setAppName("MSIAM part III - Matrix Factorization")

sc = SparkContext(conf = conf)

We remind you that you can access this interface by simply opening http://localhost:4040 in a web browser.

We will capitalize on the first lab and take the MovieLens dataset, and thus the RDD routines we already have.

In [2]:
def parseRating(line):
    fields = line.split('::')
    return int(fields[0]), int(fields[1]), float(fields[2])

# path to MovieLens dataset
movieLensHomeDir="../data/movielens/medium/"

# ratings is an RDD of (userID, movieID, rating)
ratingsRDD = sc.textFile(movieLensHomeDir + "ratings.dat").map(parseRating).setName("ratings").cache()

numRatings = ratingsRDD.count()
numUsers = ratingsRDD.map(lambda r: r[0]).distinct().count()
numMovies = ratingsRDD.map(lambda r: r[1]).distinct().count()
print("We have %d ratings from %d users on %d movies.\n" % (numRatings, numUsers, numMovies))

M = ratingsRDD.map(lambda r: r[0]).max()
N = ratingsRDD.map(lambda r: r[1]).max()
matrixSparsity = float(1)/float(M*N)
print("We have %d users, %d movies and the rating matrix has %f percent of non-zero value.\n" % \
                                          (M, N, 100*matrixSparsity))

We have 1000209 ratings from 6040 users on 3706 movies.

We have 6040 users, 3952 movies and the rating matrix has 0.000004 percent of non-zero value.



#  Gradient Descent Algorithms

The goal here is to 
1. Compute gradients of the loss functions.
2. Implement gradient algorithms.
3. Observe the prediction accuracy of the developed methods.

__Question 1__

> Split (ramdomly) the dataset into training versus testing sample. We learn over 70% (for example) of the users, we test over the rest.

> Define a routine that returns the predicted rating from factor matrices. Form a RDD with the following elements `(i,j,true rating,predicted rating)`. 

> Define a routine that returns the Mean Square Error (MSE).


In [3]:
from operator import add
import numpy as np
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint

def split_dataset(dataset, train_fraction):
    learn, test = dataset.randomSplit([train_fraction, 1 - train_fraction])
    return (learn, test)

def predict_ratings(true_ratings, P, Q):
    return true_ratings.map(lambda x: (x[2], np.dot(P[x[0] - 1], Q[x[1] - 1])))

def calculate_MSE(rdd):
    mse = rdd.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / rdd.count()
    return mse

__Question 2__

> Derive the update rules for gradient descent. 

> Implement a (full) gradient algorithm in `Python` on the training set.  Take a step size (learning rate) $\gamma=0.001$ and stop after a specified number of iterations. Investigate the latent space size (e.g. $K=2,5,10,50$).

> Provide plots and explanations for your experiments. 

> Try to parrallelize it so that the code can be run using `PySpark`. What do you conclude?

Stochastic Gradient Descent (SGD) simply does away with the expectation in the update and computes the gradient of the parameters using only a single or a few training examples. In SGD the learning rate $\gamma$ is typically much smaller than a corresponding learning rate in batch gradient descent because there is much more variance in the update.

In [16]:
def sqr_loss(y_ij, p_i, q_j, lambda_1, lambda_2):
    se = (y_ij - np.dot(p_i, q_j)) ** 2
    res = se + lambda_1 / 2 * np.linalg.norm(p_i) ** 2 + lambda_2 / 2 * np.linalg.norm(q_j) ** 2
    return res

def grad_p_i(y_ij, p_i, q_j, lambda_1):
    res = - 2 * q_j * (y_ij - np.dot(p_i, q_j)) + lambda_1 * p_i
    return res

def grad_q_j(y_ij, p_i, q_j, lambda_2):
    res = - 2 * p_i * (y_ij - np.dot(p_i, q_j)) + lambda_2 * q_j
    return res

def do_grad_p(grad_p):
    global P
    i = grad_p[0] - 1
    grad_val = grad_p[1]
    P[i] -= gamma * grad_val
    return 0

def do_grad_q(grad_q):
    global Q
    j = grad_q[0] - 1
    grad_val = grad_q[1]
    Q[j] -= gamma * grad_val
    return 0

def grad_algo(trainRDD, testRDD, N, M, K, gamma, max_iter, lambda_1, lambda_2, verbose = True):
    """ Run the gradient algorithm and returs P and Q such that rating matrix X = P^T * Q
    Args:
        trainRDD: RDD with ratings
        N: number of movies
        M: number of users
        K: latent feature space size
        gamma: gradient step size
        max_iter: number of gradient iterations before stop
        lambda_1, lambda_2: regularization constants
        verbose: if info messages should be printed
    Returns:
        (P, Q): matrix factorizations with sizes M x K and N x K correspondingly
        ll_tab: vector of likelihoods of every train iteration
    """
#     global P, Q
    P = np.ones((M, K))
    Q = np.ones((N, K))
    if verbose:
        print('start grad_algo with gamma = %f, max_iter = %d' % (gamma, max_iter))
    ll_tab = [1.]
    for it in range(max_iter): # maybe change to convergence criterion
        grads = trainRDD.map(lambda ex: (ex[0], # i
                                         ex[1], # j
                                         grad_p_i(ex[2], P[ex[0] - 1], Q[ex[1] - 1], lambda_1),
                                         grad_q_j(ex[2], P[ex[0] - 1], Q[ex[1] - 1], lambda_2)))
        grads_p = grads.map(lambda x: (x[0], x[2])).groupBy(lambda x: x[0])\
                       .mapValues(lambda x: np.sum(np.array([list(el)[1] for el in x]), 0) / len(x))
        grads_q = grads.map(lambda x: (x[1], x[3])).groupBy(lambda x: x[0])\
                       .mapValues(lambda x: np.sum(np.array([list(el)[1] for el in x]), 0) / len(x))
        
#         grads_p.map(do_grad_p)
        for grad_p in grads_p.toLocalIterator():
            i = grad_p[0] - 1
            grad_val = grad_p[1]
            P[i] -= gamma * grad_val
#         grads_p.map(do_grad_q)
        for grad_q in grads_q.toLocalIterator():
            j = grad_q[0] - 1
            grad_val = grad_q[1]
            Q[j] -= gamma * grad_val
            

        ll_sum = trainRDD.map(lambda ex: sqr_loss(ex[2], P[ex[0] - 1], Q[ex[1] - 1], lambda_1, lambda_2)).reduce(add)
        ll = ll_sum / trainRDD.count()
        ll_tab.append(ll)
        
        train_ll = calculate_MSE(predict_ratings(train, P, Q))
        test_ll = calculate_MSE(predict_ratings(test, P, Q))
        
#         if verbose and (i == 0 or i == (max_iter - 1)):
        print('iter[%d] ll_reg = %f, train_ll = %f, test_ll = %f' % (it, ll, train_ll, test_ll))
    if verbose:
        print('done')
    return P, Q, ll_tab

In [17]:
import matplotlib.pyplot as plt
%matplotlib inline

(train, test) = split_dataset(ratingsRDD, 0.7)

gamma = 0.001
max_iter = 100
K = 2 # 5, 10, 50
lambda_1, lambda_2 = 0.01, 0.01

(P, Q, f_tab) = grad_algo(train, test, N, M, K, gamma, max_iter, lambda_1, lambda_2)

plt.figure()
plt.plot(range(max_iter+1), f_tab, color="black", linewidth=1.0, linestyle="-")
plt.xlim(0, max_iter+1)
plt.xlabel('Number of iterations')
plt.ylabel('Functional value')
plt.show()

start grad_algo with gamma = 0.001000, max_iter = 100


timeout: timed out

__Question 3__
> Implement stochastic gradient descent algorithm for Matrix Factorization.

> Provide plots and explanations for your experiments.

> Compare and discuss the results with the (full) gradient algorithm in terms of MSE versus full data passes.

> Discuss the stepsize choice of SGD (e.g. constant v.s. 1/`nb_iter`).

Now we will implement Large-Scale Matrix Factorization with Distributed Stochastic Gradient Descent (DSGD) in Spark. 
The algorithm is described in the following article: <br \><br \>
_Gemulla, R., Nijkamp, E., Haas, P. J., & Sismanis, Y. (2011). Large-scale matrix factorization with distributed stochastic gradient descent. New York, USA._<br \><br \>
The paper sets forth a solution for matrix factorization using minimization of sum of local losses.  The solution involves dividing the matrix into strata for each iteration and performing sequential stochastic gradient descent within each stratum in parallel.  DSGD is a fully distributed algorithm, i.e. both the data matrix $R$ and factor matrices $P$ and $Q$ can be carefully split and distributed to multiple workers for parallel computation without communication costs between the workers. Hence, it is a good match for implementation in a distributed in-memory data processing system like Spark. 

__Question 4__

> Implement a `PySpark` version of DSGD.

> Test on different number of cores on a local machine (1 core, 2 cores, 4 cores). Ran the ALS method already implemented in MLlib as a reference for comparison.