In [1]:
import numpy as np
from tqdm import tqdm_notebook as tqdm

class AlternatingLeastSqeares():
    def __init__(self, R, k, reg_param, epochs, verbose=False):
        self._R = R
        self._num_users, self._num_items = R.shape
        self._k = k
        self._reg_param = reg_param
        self._epochs = epochs
        self._verbose = verbose
        
    def fit(self):
        # init latent features
        self._users = np.random.normal(size=(self._num_users, self._k))
        self._items = np.random.normal(size=(self._num_items, self._k))
        
        # train while epochs
        self._training_process = []
        self._user_error = 0; self._item_error = 0;
        for epoch in range(self._epochs):
            """
            ALS와 SGD의 차이점
            
            ALS는 먼저 한 행렬을 업데이트를 계속 해 수렴을 시키고, 다음 행렬을 진행한다
            """
            for i, Ri in enumerate(self._R):
                self._users[i] = self.user_latent(i, Ri)
                self._user_error = self.cost()
            
            for j, Rj in enumerate(self._R.T):
                self._items[j] = self.item_latent(j, Rj)
                self._item_error = self.cost()
            
            cost = self.cost()
            self._training_process.append((epoch, cost))
            
            # print status
            if self._verbose == True and ((epoch + 1) % 10 == 0):
                print(f'Iteration: {epoch + 1}, cost: {cost:.4f}')
                
    def cost(self):
        xi, yi = self._R.nonzero()
        cost = 0
        for x, y in zip(xi, yi):
            cost += pow(self._R[x, y] - self.get_prediction(x, y), 2)
        return np.sqrt(cost/len(xi))

    def user_latent(self, i, Ri):
        du = np.linalg.solve(np.dot(self._items.T, np.dot(np.diag(Ri), self._items)) + self._reg_param * np.eye(self._k),
                             np.dot(self._items.T, np.dot(np.diag(Ri), self._R[i].T)).T)
        return du
    
    def item_latent(self, j, Rj):
        di = np.linalg.solve(np.dot(self._users.T, np.dot(np.diag(Rj), self._users)) + self._reg_param * np.eye(self._k),
                             np.dot(self._users.T, np.dot(np.diag(Rj), self._R[:, j])))
        return di
    
    def get_prediction(self, i, j):
        return self._users[i, :].dot(self._items[j, :].T)
    
    def get_complete_matrix(self):
        return self._users.dot(self._items.T)

In [2]:
R = np.array([
    [1, 0, 0, 1, 3],
    [2, 0, 3, 1, 1],
    [1, 2, 0, 5, 0],
    [1, 0, 0, 4, 4],
    [2, 1, 5, 4, 0],
    [5, 1, 5, 4, 0],
    [0, 0, 0, 1, 0],
])

In [3]:
als = AlternatingLeastSqeares(R=R, reg_param=0.01, epochs=100, verbose=True, k=3)
als.fit()

Iteration: 10, cost: 0.0349
Iteration: 20, cost: 0.0250
Iteration: 30, cost: 0.0206
Iteration: 40, cost: 0.0180
Iteration: 50, cost: 0.0162
Iteration: 60, cost: 0.0150
Iteration: 70, cost: 0.0140
Iteration: 80, cost: 0.0133
Iteration: 90, cost: 0.0127
Iteration: 100, cost: 0.0122


In [4]:
als.get_complete_matrix()

array([[ 1.01289772, -0.99091381, -0.90728696,  0.99644965,  2.99564975],
       [ 2.01468948,  1.22790536,  2.99175864,  1.01181884,  0.9898667 ],
       [ 1.00680657,  1.98944532,  7.38367513,  4.99793635, -1.39564315],
       [ 1.01458954, -1.54507718,  0.47204169,  4.0008713 ,  4.00126824],
       [ 1.99365205,  1.04234013,  5.00015147,  4.00352397,  1.39604419],
       [ 4.9955311 ,  0.98317156,  5.00025739,  3.99776856,  5.69978799],
       [ 0.36442219,  0.23031643,  1.18760949,  0.99433437,  0.2071837 ]])

```python
def fit(self):
    # init latent features
    self._users = np.random.normal(size=(self._num_users, self._k))
    self._items = np.random.normal(size=(self._num_items, self._k))

    # train while epochs
    self._training_process = []
    self._user_error = 0; self._item_error = 0;
    for epoch in range(self._epochs):
        """
        ALS와 SGD의 차이점

        ALS는 먼저 한 행렬을 업데이트를 계속 해 수렴을 시키고, 다음 행렬을 진행한다
        """
        for i, Ri in enumerate(self._R):
            self._users[i] = self.user_latent(i, Ri)
            self._user_error = self.cost()

        for j, Rj in enumerate(self._R.T):
            self._items[j] = self.item_latent(j, Rj)
            self._item_error = self.cost()

        cost = self.cost()
        self._training_process.append((epoch, cost))

        # print status
        if self._verbose == True and ((epoch + 1) % 10 == 0):
            print(f'Iteration: {epoch + 1}, cost: {cost:.4f}')
```

```python
def user_latent(self, i, Ri):
    du = np.linalg.solve(np.dot(self._items.T, np.dot(np.diag(Ri), self._items)) + self._reg_param * np.eye(self._k),
                         np.dot(self._items.T, np.dot(np.diag(Ri), self._R[i].T)).T)
    return du

def item_latent(self, j, Rj):
    di = np.linalg.solve(np.dot(self._users.T, np.dot(np.diag(Rj), self._users)) + self._reg_param * np.eye(self._k),
                         np.dot(self._users.T, np.dot(np.diag(Rj), self._R[:, j])))
```