In [18]:
import numpy as np
from tqdm import tqdm_notebook as tqdm

class MatrixFactorization():
    def __init__(self, R, k, learning_rate, reg_param, epochs, verbose=False):
        """
        :param R: rating matrix
        :param K: latent parameter
        :param learning_rate: alpha on weight update
        :param reg_param: beta on weight update
        :param epochs: training epochs
        :param verbose: print status
        """
        self._R = R
        self._num_users, self._num_items = R.shape
        self._k = k
        self._learning_rate = learning_rate
        self._reg_param = reg_param
        self._epochs = epochs
        self._verbose = verbose
        
    def fit(self):
        """
        training Matrix Factorization: Update matrix latent weight and bias

        self._b에 대한 설명
        - global bias: input R에서 평가가 매겨진 rating의 평균값을 global bias로 사용
        - 정규화 기능. 최종 rating에 음수가 들어가는 대신 latent feature에 음수가 포함되도록 해줌

        :return: training_process
        """

        # init latent features
        self._P = np.random.normal(size=(self._num_users, self._k)) # 유저 latent space
        self._Q = np.random.normal(size=(self._num_items, self._k)) # 아이템 latent space

        # init biases
        self._b_P = np.zeros(self._num_users)
        self._b_Q = np.zeros(self._num_items)
        self._b = np.mean(self._R[np.where(self._R != 0)])

        # train while epochs
        self._training_process = []
        for epoch in range(self._epochs):
            # rating이 존재하는 index를 기준으로 training
            xi, yi = self._R.nonzero() # 0이 아닌부분의 x,y값
            for i, j in zip(xi, yi):
                self.gradient_descent(i, j, self._R[i, j])
            cost = self.cost()
            self._training_process.append((epoch, cost))

            # print status
            if self._verbose == True and ((epoch + 1) % 10 == 0):
                print('Iteration: %d ; cost = %.4f'%(epoch+1, cost))
                
    def cost(self):
        """
        compute root mean square error
        :return: rmse cost
        """
        
        # xi, yi: R[xi, yi]는 nonzero인 value를 의미
        # 참고: http://codepractice.tistory.com/90
        xi, yi = self._R.nonzero()
        # predicted = self.get_complete_matrix()
        cost = 0
        for x, y in zip(xi, yi):
            cost += pow(self._R[x, y] - self.get_prediction(x, y), 2)
        return np.sqrt(cost/len(xi))
    
    def gradient(self, error, i, j):
        """
        gradient of latent feature for GD
        
        :param error: rating - prediction error
        :param i: user index
        :param j: item index
        :return: gradient of latent feature tuple
        """
        
        dp = (error * self._Q[j, :]) - (self._reg_param * self._P[i, :])
        dq = (error * self._P[i, :]) - (self._reg_param * self._Q[j, :])
        return dp, dq
    
    def gradient_descent(self, i, j, rating):
        """
        gradient descent function
        - 행렬의 원소위치(i,j)와 평점 값(self._R[i,j])을 받음. 
        - prediction = self.get_prediction(i,j)은 user latent matrix와
          item latent matrix의 곱을 통해 평점 행렬 값을 생성하는 부분
        - self._P[i,:].dot(self._Q[j,:].T)에서 user latent P와 item latent Q가
          곱해져서 평점을 계산하고 bias를 없애기 위해 전체 평균(self._b),
          user의 평균 평점(self._b_P[i]), item의 평균 평점(self._b_Q[j])을
          더해줌으로써 값을 생성
        
        :param i: user index of matrix
        :param j: item index of matrix
        :param rating: rating of (i,j)
        """
        
        # get error
        prediction = self.get_prediction(i, j)
        error = rating - prediction
        
        # update biases
        self._b_P[i] += self._learning_rate * (error - self._reg_param * self._b_P[i])
        self._b_Q[j] += self._learning_rate * (error - self._reg_param * self._b_Q[j])
        
        # update latent feature
        dp, dq = self.gradient(error, i, j)
        self._P[i, :] += self._learning_rate * dp
        self._Q[j, :] += self._learning_rate * dq
        
    def get_prediction(self, i, j):
        """
        get predicted rating: user_i, item_j
        :return: prediction of r_ij
        """
        return self._b + self._b_P[i] + self._b_Q[j] + self._P[i, :].dot(self._Q[j, :].T)
    
    def get_complete_matrix(self):
        """
        computer complet matrix PXQ + P.bias + Q.bias + global bias
        - PXQ 행렬에 b_P[:, np.newaxis]를 더하는 것은 각 열마다 bias를 더해주는 것
        - b_Q[np.newaxis:, ]를 더하는 것은 각 행마다 bias를 더해주는 것
        - b를 더하는 것은 각 element마다 bias를 더해주는 것
        
        - newaxis: 차원을 추가해줌. 1차원인 latent들로 2차원의 R에 행/열 단위 연산을 해주기 위해 차원을 추가해주는 것
        
        :return: complete matrix R^
        """
        return self._b + self._b_P[:, np.newaxis] + self._b_Q[np.newaxis:, ] + self._P.dot(self._Q.T)

- R: 평점 행렬
- k: User Latent와 item Latent의 차원 수. 데이터가 많다면 보통 20 사용
- learning_rate: 학습률
- reg_param: weight의 regularization 값
- epochs: 전체 학습 횟수
- verbose: 학습 과정을 출력할지 여부 (True: 10번마다 cost 출력)

In [21]:
R = np.array([
    [1, 0, 0, 1, 3],
    [2, 0, 3, 1, 1],
    [1, 2, 0, 5, 0],
    [1, 0, 0, 4, 4],
    [2, 1, 5, 4, 0],
    [5, 1, 5, 4, 0],
    [0, 0, 0, 1, 0]
])

factorizer = MatrixFactorization(R, k=3, learning_rate=0.01, reg_param=0.01,
                                 epochs=100, verbose=True)
factorizer.fit()

Iteration: 10 ; cost = 0.9782
Iteration: 20 ; cost = 0.6789
Iteration: 30 ; cost = 0.5260
Iteration: 40 ; cost = 0.4222
Iteration: 50 ; cost = 0.3417
Iteration: 60 ; cost = 0.2760
Iteration: 70 ; cost = 0.2218
Iteration: 80 ; cost = 0.1774
Iteration: 90 ; cost = 0.1417
Iteration: 100 ; cost = 0.1135
