<a href="https://colab.research.google.com/github/dlskawns/RecSys_and_Retrieval_Study/blob/main/matrix_factorization(banila).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np

class MatrixFactorization():
  def __init__(self, R, k, learning_rate, reg_param, epochs, verbose = False):
    """
    param R: rating matrix
    param k: latent parameter
    param learning_rate: alpha on weight update -> 학습률 파라미터
    param reg_param: beta on weight update -> 가중치 기울기 파라미터
    param epochs: training epochs
    param verbose: print status
    """

    self._R = R
    self._k = k
    self._learning_rate = learning_rate
    self._reg_param = reg_param
    self._epochs = epochs
    self._verbose = verbose

  def fit(self):
    """
    training Matrix Factorization : Update matrix latent weight and bias
    매트릭스의 잠재 가중치 및 편향을 학습하는 함수
    self._b에 대한 설명
    - global bias: input R에서 평가가 매겨진 rating의 평균값을 global bias로 사용 -> 매트릭스 전체의 편향을 평균으로 내는 방법
    - 정규화 기능. 최종 rating에 음수가 들어가는 것 대신 latent feature에 음수가 포함되도록 해줌.
    return: training_process
    """
    self._num_users = self._R.shape[0]
    self._num_items = self._R.shape[1]
    # init latent features  # 잠재벡터 가중치 초기화 설정
    self._P = np.random.normal(size = (self._num_users, self._k))
    self._Q = np.random.normal(size = (self._num_items, self._k))
    
    # init biases # 잠재벡터 바이어스 초기화 설정
    self._b_P = np.zeros(self._num_users)  # 유저 수 차원의 영벡터 생성
    self._b_Q = np.zeros(self._num_items)  # 아이템 수 차원의 영벡터 생성
    self._b = np.mean(self._R[np.where(self._R != 0)]) 

    # train while epochs
    self._training_process = []           # training 현황 리스트 생성
    for epoch in range(self._epochs):     # epoch 수 만큼 반복

      # rating이 존재하는 index 기준으로 training 진행
      for i in range(self._num_users):    # i 는 순차적으로 users 의미
        for j in range(self._num_items):  # j 는 user i에 대해 순차적으로 items 의미
          if self._R[i, j] > 0:           # 해당 유저 i에 대한 item j가 0이상-> 평가가 되었다면,
            self.gradient_descent(i, j, self._R[i, j])  # gradient_descent 함수로 i와 j에 대한 경사하강을 진행
      cost = self.cost()                  # cost 함수를 사용하며 결과값을 각 epoch 별로 변수에 저장
      self._training_process.append((epoch, cost))  # training 현황 리스트에 epoch과 cost를 튜플 형태로 저장

      # print status 프린트 할때 경과 지켜보기
      if self._verbose == True and ((epoch + 1) % 10 == 0):   
        print('iteration: %d ; cost = %.4f' % (epoch + 1, cost))

  def cost(self):
    """
    비용함수로써 RMSE를 기준으로 하여 로스를 계산하는 함수
    return rmse cost
    """
    # xi, yi: R[xi, yi]는 nonzero인 value를 의미
    xi, yi = self._R.nonzero()  # 기존 점수행렬의 0이 아닌 index의 열벡터를 가져간다.
    predicted = self.get_complete_matrix()  # get_complete_matrix 함수를 사용해 predicted 변수에 넣는다.
    cost = 0  # cost 초기화
    for x, y in zip(xi, yi):  # 0이 아닌 (유저, 아이템) index들을 각각 x, y로 가져간다.
      cost += pow(self._R[x, y ] - predicted[x,y], 2) # 실제 행렬중 0이 아닌 index와 이것을 예측한 행렬의 점수의 차이를 구한 뒤, 제곱하여 cost에 더해준다.
    return np.sqrt((cost) / len(xi)) #RMSE 이므로 위 cost들의 합의 평균(MSE)에 루트를 씌워주어 cost 계산을 완료한다.


  def gradient(self, error, i, j):
    """
    gradient of latent feature for GD -> 잠재요인의 경사하강을 위한 기울기 파악함수(미분)

    param error: raing - prediction error
    param i: user index
    param j: item index
    return: gradient of latent feature tuple
    """

    dp = (error * self._Q[j, :]) - (self._reg_param * self._P[i, :])
    dq = (error * self._P[i, :]) - (self._reg_param * self._Q[j, :])
    return dp, dq

  def gradient_descent(self, i, j, rating):
    """
    경사하강 함수

    i: user index of matrix
    j: item index of matrix
    rating: rating of (i,j)
    """
    # get error
    prediction = self.get_prediction(i,j)
    error = rating - prediction

    # update biases 오차에서 바이어스를 제한 값을 학습률만큼 곱해준 뒤 이전 바이어스에 더해준다.
    self._b_P[i] += self._learning_rate *(error - self._reg_param *self._b_P[i])  
    self._b_Q[j] += self._learning_rate *(error- self._reg_param *self._b_Q[j])

    # update latent feature
    dp, dq = self.gradient(error, i, j)
    self._P[i, :] += self._learning_rate * dp # 미분된 가중치 기울기를 학습률과 곱해준 뒤 더해준다.
    self._Q[j, :] += self._learning_rate * dq

  def get_prediction(self, i, j):
    """
    get predicted rating: user_i, item_j
    return prediction of r_ij
    """
    return self._b +self._b_P[i] + self._b_Q[j] + self._P[i, :].dot(self._Q[j, :].T)

  def get_complete_matrix(self):
    """
    PXQ 매트릭스를 만들고, P.bias와 Q.bais, global bias를 더해준다.
    - PXQ 행렬에 b_P[:, np.newaxis]를 더하는 것은 각 열마다 bias를 더해주는 것 # np.newaxis는 한차원 추가되는것
    - b_Q[np.newaxis",]를 더하는 것은 각 행마다 bias를 더해주는 것
    - b를 더하는 것은 element마다 bias를 더해주는 것
    return: complete matrix R^
    """
    return self._b +self._b_P[:, np.newaxis] +self._b_Q[np.newaxis,] + self._P.dot(self._Q.T)
  
  def print_result(self):
    """
    print fit result
    """

    print("User Latent P")
    print(self._P)
    print("Item Latent Q")
    print(self._Q.T)
    print("P x Q")
    print(self._P.dot(self._Q.T))
    print('bias')
    print(self._b)
    print('User Latent bias')
    print(self._b_P)
    print('Item Latent bias')
    print(self._b_Q)
    print('Final R Matrix')
    print(self.get_complete_matrix())
    print('Final RMSE')
    print(self._training_process[self._epochs-1][1])

  def recsys(self, user_id):
    """
    선택 유저(user_id)가 안본 상품 중 가장 높은 스코어로 파악되는 것 추천하는 함수
    """
    completed_matrix = self.get_complete_matrix()
    non_arr = np.where(self._R[user_id]==0)[0]
    print(non_arr)
    recommend_lst = completed_matrix[user_id][non_arr].argsort()[-5:]
    return recommend_lst
    

In [None]:
# rating matrix - User X Item : (10 X 10)
R = np.array([
        [1, 0, 0, 1, 3],
        [2, 0, 3, 1, 1],
        [1, 2, 0, 5, 0],
        [1, 0, 0, 4, 4],
        [2, 1, 5, 4, 0],
        [5, 1, 5, 4, 0],
        [0, 0, 0, 1, 0]])


In [None]:
# 임의의 난수 생성 -> 10명의 유저 20개의 아이템 
np.random_seed = 24
user_rating = np.random.randint(0, 6, size = (10,120))

In [None]:
user_rating

array([[3, 2, 2, ..., 3, 1, 5],
       [1, 4, 3, ..., 0, 1, 1],
       [5, 3, 1, ..., 0, 5, 2],
       ...,
       [0, 0, 3, ..., 1, 5, 4],
       [2, 4, 3, ..., 4, 0, 3],
       [3, 0, 3, ..., 2, 1, 0]])

In [None]:
factorizer = MatrixFactorization(user_rating, k=8, learning_rate=0.01, reg_param=0.01, epochs=250, verbose=True)

In [None]:
factorizer.fit()
factorizer.recsys(0)

iteration: 10 ; cost = 1.2617
iteration: 20 ; cost = 1.2031
iteration: 30 ; cost = 1.1371
iteration: 40 ; cost = 1.0586
iteration: 50 ; cost = 0.9734
iteration: 60 ; cost = 0.8891
iteration: 70 ; cost = 0.8108
iteration: 80 ; cost = 0.7413
iteration: 90 ; cost = 0.6809
iteration: 100 ; cost = 0.6290
iteration: 110 ; cost = 0.5839
iteration: 120 ; cost = 0.5436
iteration: 130 ; cost = 0.5062
iteration: 140 ; cost = 0.4703
iteration: 150 ; cost = 0.4356
iteration: 160 ; cost = 0.4028
iteration: 170 ; cost = 0.3729
iteration: 180 ; cost = 0.3469
iteration: 190 ; cost = 0.3251
iteration: 200 ; cost = 0.3074
iteration: 210 ; cost = 0.2931
iteration: 220 ; cost = 0.2816
iteration: 230 ; cost = 0.2723
iteration: 240 ; cost = 0.2647
iteration: 250 ; cost = 0.2584
[  5   9  12  13  23  28  30  40  42  50  73  75  79  81  84  87  88  89
  92  93  94  95  96 104 115]


array([10,  8,  1, 22,  6])