In [2]:
import numpy as np 
from tqdm import tqdm_notebook as tqdm

- R : 평점 행렬
- k : User Latent와 item Latent 차원의 수 
- learning_rate : 학습률
- reg_param : Weight의 Regularization 값
- epochs : 전체 학습 횟수
- verbose = 학습 과정을 출력할지 여부( True : 10번마다 cost 출력, False : cost 출력 x )

In [9]:
class MatrixFactorization():
    def __init__(self, R, k, learning_rate, reg_param, epochs, verbose = False):
        """
        :param R: rating matrix
        :param k: latent parameter
        :param learning_rate: alpha on weight update
        :param reg_param: beta on weight update
        :param epochs: training epochs
        :param verbose: print status
        """

        self._R = R 
        self._num_users, self._num_items = R.shape
        self._k = k 
        self._learning_rate = learning_rate 
        self._reg_param = reg_param
        self._epochs = epochs
        self._verbose = verbose 

    def fit(self):
        """
        training Matrix Factorization : Update matrix latent weight and bias 
        """

        # init latent features
        self._P = np.random.normal(size = (self._num_users, self._k))
        self._Q = np.random.normal(size = (self._num_items, self._k))

        # init biases
        self._b_P = np.zeros(self._num_users)
        self._b_Q = np.zeros(self._num_items)
        self._b = np.mean(self._R[np.where(self._R != 0)])
        # 각 유저마다 평점을 평가할 때 후한지, 짜게 주는지에 따라 bias가 발생할 수 있기 때문에
        # 해당 유저의 평균을 빼줌으로써 bias를 제거할 수 있다. 

        # train while epochs
        self._training_process = []
        for epoch in range(self._epochs):
            xi, yi = self._R.nonzero()
            for i, j in zip(xi, yi) :
                self.gradient_descent(i, j, self._R[i,j])
            cost = self.cost()
            self._training_process.append((epoch, cost))

            if self._verbose == True and ((epoch + 1) % 10 == 0) :
                print("Iteration : %d ; cost = %.4f" %(epoch + 1, cost))


    def gradient_descent(self, i, j, rating):
        
        # get error
        prediction = self.get_prediction(i, j)
        error = rating - prediction

        # update biases
        self._b_P[i] += self._learning_rate * (error - self._reg_param * self._b_P[i])
        self._b_Q[j] += self._learning_rate * (error - self._reg_param * self._b_Q[j])

        # update latent feature
        dp, dq = self.gradient(error, i, j)
        self._P[i, :] += self._learning_rate * dp
        self._Q[j, :] += self._learning_rate * dq 


    def cost(self):
        xi, yi = self._R.nonzero()

        cost = 0

        for x, y in zip(xi, yi):
            cost += pow(self._R[x, y] - self.get_prediction(x, y), 2)
        return np.sqrt(cost / len(xi))


    def gradient(self, error, i, j):
        dp = (error * self._Q[j, :]) - (self._reg_param * self._P[i, :])
        dq = (error * self._P[i, :]) - (self._reg_param * self._Q[j, :])
        return dp, dq

    
    def get_prediction(self, i, j):
        return self._b + self._b_P[i] + self._b_Q[j] + self._P[i, :].dot(self._Q[j, :].T)

    
    def get_complete_matrix(self):
        return self._b + self.b_P[:, np.newaxis] + self._b_Q[np.newaxis, :] + self._P.dot(self._Q.T)


if __name__ == '__main__' :
    R = np.array([
        [1, 0, 0, 1, 3],
        [2, 0, 3, 1, 1],
        [1, 2, 0, 5, 0],
        [1, 0, 0, 4, 4],
        [2, 1, 5, 4, 0],
        [5, 1, 5, 4, 0],
        [0, 0, 0, 1, 0]
    ])


In [10]:
%%time 

factorizer = MatrixFactorization(R, k=3, learning_rate=0.01, reg_param=0.01, epochs=100, verbose=True)
factorizer.fit()

Iteration : 10 ; cost = 1.0807
Iteration : 20 ; cost = 0.6732
Iteration : 30 ; cost = 0.4633
Iteration : 40 ; cost = 0.3551
Iteration : 50 ; cost = 0.2964
Iteration : 60 ; cost = 0.2596
Iteration : 70 ; cost = 0.2320
Iteration : 80 ; cost = 0.2086
Iteration : 90 ; cost = 0.1874
Iteration : 100 ; cost = 0.1679
Wall time: 111 ms
