In [1]:
import numpy as np
import data

In [2]:
train = data.train
test = data.test

In [3]:
class MatrixFactorization():
    
    
    def __init__(self, train, test, k, learning_rate, reg_param, epochs, verbose = False):
        """
        param R : Rating Matrix
        param k : latent parameter
        param learning_rate : alpha on weight update
        param reg_param : regularization parameter
        param epochs : training epochs
        param verbose : print status
        """
        
        self._R = train
        self._test = test
        self._num_users, self._num_items = train.shape
        self._k = k
        self._learning_rate = learning_rate
        self._reg_param = reg_param
        self._epochs = epochs
        self._verbose = verbose
        
    
    def fit(self):
        """
        training Matrix Factorization : update matrix latent weight and bias
        """
        
        # init latent features
        self._P = np.random.normal(size=(self._num_users, self._k))
        self._Q = np.random.normal(size=(self._num_items, self._k))
        
        # init biases
        self._b_P = np.zeros(self._num_users)
        self._b_Q = np.zeros(self._num_items)
        self._b = np.mean(self._R[np.where(self._R != 0)]) # 0이 아닌 rating에 대해 평균
        
        # train while epochs
        self._training_process = []
        for epoch in range(self._epochs):
            
            # rating이 존재하는 index를 기준으로 training
            for u in range(self._num_users):
                for i in range(self._num_items):
                    if self._R[u, i] > 0:
                        self.gradient_descent(u, i, self._R[u, i])
            
            train_cost, test_cost = self.cost()
            self._training_process.append((epoch, train_cost, test_cost))
            
            if self._verbose == True and ((epoch + 1) % 10 == 0 ):
                print("Iteration : %d, train_cost = %.4f, test_cost = %.4f" % (epoch+1, train_cost, test_cost))
        
    
    def cost(self):
        """
        compute RMSE
        """
        xi, yi = self._R.nonzero() # 0 이 아닌 값의 index 반환
        test_x, test_y = self._test.nonzero()
        predicted = self.get_complete_matrix()
        cost_train = 0
        cost_test = 0
        
        for x, y in zip(xi, yi):
            cost_train += pow(self._R[x, y] - predicted[x, y], 2)
        
        for i, j in zip(test_x, test_y):
            cost_test += pow(self._test[i, j] - predicted[i, j], 2)
        
        return np.sqrt(cost_train/len(xi)), np.sqrt(cost_test/len(test_x))
        
    
    def gradient(self, error, u, i):
        """
        gradient of latent feature for GD
        param error : rating - prediction error
        param u : user index
        param i : item index
        """
        
        dp = (error * self._Q[i, :]) - (self._reg_param * self._P[u, :])
        dq = (error * self._P[u, :]) - (self._reg_param * self._Q[i, :])
        
        return dp, dq
    
    
    def gradient_descent(self, u, i, rating):
        """
        gradient descent function
        param u : user index
        param i : item index
        param rating : rating of (u, i)
        """
        
        prediction = self.get_prediction(u,i)
        error = rating - prediction
        
        self._b_P[u] += self._learning_rate * (error - self._reg_param * self._b_P[u])
        self._b_Q[i] += self._learning_rate * (error - self._reg_param * self._b_Q[i])
        
        dp, dq = self.gradient(error, u, i)
        self._P[u, :] += self._learning_rate * dp
        self._Q[i, :] += self._learning_rate * dq
        
    
    def get_prediction(self, u, i):
        """
        get predicted rating by user i on item j
        """
        return self._b + self._b_P[u] + self._b_Q[i] + self._P[u, :].dot(self._Q[i, :].T)

    
    def get_complete_matrix(self):
        """
        computer complete matrix
        """
        return self._b + self._b_P[:, np.newaxis] + self._b_Q[np.newaxis,:] + self._P.dot(self._Q.T)
    
    
    def print_results(self):
        """
        print fit results
        """

        print("Final R matrix:")
        print(self.get_complete_matrix())
        print("Final RMSE:")
        print(self._training_process[self._epochs-1][2])

In [4]:
np.random.seed(7)
    
factorizer = MatrixFactorization(train, test, k=40, learning_rate=0.01, reg_param=0.01, epochs=300, verbose=True)
factorizer.fit()
factorizer.print_results()

Iteration : 10, train_cost = 0.7035, test_cost = 1.6871
Iteration : 20, train_cost = 0.6352, test_cost = 1.6354
Iteration : 30, train_cost = 0.5849, test_cost = 1.6110
Iteration : 40, train_cost = 0.5438, test_cost = 1.5922
Iteration : 50, train_cost = 0.5102, test_cost = 1.5758
Iteration : 60, train_cost = 0.4826, test_cost = 1.5611
Iteration : 70, train_cost = 0.4598, test_cost = 1.5478
Iteration : 80, train_cost = 0.4407, test_cost = 1.5358
Iteration : 90, train_cost = 0.4246, test_cost = 1.5248
Iteration : 100, train_cost = 0.4108, test_cost = 1.5146
Iteration : 110, train_cost = 0.3989, test_cost = 1.5051
Iteration : 120, train_cost = 0.3885, test_cost = 1.4961
Iteration : 130, train_cost = 0.3793, test_cost = 1.4876
Iteration : 140, train_cost = 0.3712, test_cost = 1.4795
Iteration : 150, train_cost = 0.3639, test_cost = 1.4718
Iteration : 160, train_cost = 0.3573, test_cost = 1.4645
Iteration : 170, train_cost = 0.3513, test_cost = 1.4574
Iteration : 180, train_cost = 0.3459, te