In [42]:
import tensorflow as tf
import numpy as np
from tqdm import tqdm_notebook as tqdm

class MatrixFactorization():
    def __init__(self, R, k, learning_rate, reg_param, epochs, verbose=False):
        '''
        R : Rating Matrix  
        k : Latent size
        learning_rate : learning_rate
        reg_param : regulazation parameter
        '''
        self._R = R
        self._num_users, self._num_items = R.shape
        self._k = k
        self._learning_rate = learning_rate
        self._reg_param = reg_param
        self._epochs = epochs
        self._verbose = verbose
        self.build_graph()

    def build_graph(self):
        '''
        U : User Matrix 
        I : Item Matrix 
        bu : User Bias
        bi : Item Bias
        bg : global bias
        '''
        self.U = tf.Variable(tf.random.normal([self._num_users, self._k]), dtype=tf.float32)
        self.I = tf.Variable(tf.random.normal([self._num_items, self._k]), dtype=tf.float32)
        self.bu = tf.Variable(tf.zeros([self._num_users]), dtype=tf.float32)
        self.bi = tf.Variable(tf.zeros([self._num_items]), dtype=tf.float32)
        self.bg = tf.constant([np.mean(self._R[np.where(self._R != 0)])],dtype=tf.float32)


    def loss(self, i, j):
        '''
        Cost(i, j) = (r(i, j) - ri_hat(i, j))**2 + lambda * (||bias_u|| + ||bias_i||)
        '''
        self.error = (self._R[i, j] - tf.tensordot(self.U[i] , tf.transpose(self.I[j]), axes=1) - self.bu[i] - self.bi[j] - self.bg) ** 2
        self.error += self._reg_param * (tf.reduce_sum(self.bu) + tf.reduce_sum(self.bi))
        return self.error
    
    def gradientdescent(self, i, j):
        with tf.GradientTape() as tape:
            loss_value = self.loss(i, j)
        return tape.gradient(loss_value, [self.U, self.I, self.bu, self.bi])
            
    def fit(self):
        self.build_graph()
        self.training_process = []
        xi, yi = self._R.nonzero()
        for epoch in tqdm(range(self._epochs)):
            '''
            calculate only non zero values
            '''
            for i, j in zip(xi, yi):
                if self._R[i, j] > 0:
                    du, di, dbu, dbi = self.gradientdescent(i, j)
                    self.bu[i].assign((self.bu[i] - self._learning_rate * dbu)[0])
                    self.bi[j].assign((self.bi[j] - self._learning_rate * dbi)[0])
                    self.U[i].assign((self.U[i] - self._learning_rate * du)[0])
                    self.I[j].assign((self.I[j] - self._learning_rate * di)[0])
                else:
                    continue
            if (epoch + 1) % 10 == 0:
                print(self.cost())
            
    def get_complete_matrix(self):
        return tf.tensordot(self.U , tf.transpose(self.I), axes=1) + tf.expand_dims(model.bi, 0) + tf.expand_dims(model.bu, 1) + self.bg
        
    def cost(self):
        xi, yi = self._R.nonzero()
        predicted = self.get_complete_matrix()
        cost = 0
        for x, y in zip(xi, yi):
            cost += pow(self._R[x, y] - predicted[x, y], 2)
        return np.sqrt(cost / len(xi))
    
if __name__ == "__main__": # 이름을 main으로 설정해서 여기 안에서 테스트 하는 용도임. 이렇게 하면 다른 곳에 불러올 때 이 안은 사용 안함

    R = np.array([
        [1, 0, 0, 1, 3],
        [2, 0, 3, 1, 1],
        [1, 2, 0, 5, 0],
        [1, 0, 0, 4, 4],
        [2, 1, 5, 4, 0],
        [5, 1, 5, 4, 0],
        [0, 0, 0, 1, 0],
    ])
    model = MatrixFactorization(R, k=3, learning_rate=0.01, reg_param=0.01, epochs=100, verbose=True)
    model.fit()

HBox(children=(IntProgress(value=0), HTML(value='')))

2.6470656
2.5508668
2.5278397
2.5204988
2.5175366
2.5162337
2.5157301
2.5156796
2.5159123
2.5163348

