In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

In [None]:
class User_Item_Rating_Matrix():
    def __init__(self, ratings_data):
        # receives a matrix with rows as reviews and columns as movies and ratings as values
        # turns it into a user-item matrix
        self.ratings_data = ratings_data
        self.user_item_matrix = self.create_user_item_matrix()

    def create_user_item_matrix(self):
        # create a user-item matrix
        user_item_matrix = self.ratings_data.pivot(index='userId', columns='movieId', values='rating')
        return user_item_matrix

In [None]:
class ALS():
    def __init__(self, n_factors = 10 , n_iterations = 10, reg = 0.1, alpha = 40, seed = 0):
        self.n_factors = n_factors
        self.n_iterations = n_iterations
        self.reg = reg
        self.alpha = alpha

        self.seed = seed

    def fit(self, R):
        np.random.seed(self.seed)
        if np.isnan(R).any():
            raise ValueError("R has NaN values")
        if (isinstance(R, pd.DataFrame)):
            R = R.values
        self.n_users, self.n_items = R.shape
        self.item_factors = np.random.normal(size = (self.n_items, self.n_factors))
        self.user_factors = np.random.normal(size = (self.n_users, self.n_factors))

        self.user_biases = np.random.normal(size = self.n_users)
        self.item_biases = np.random.normal(size = self.n_items)

        self.global_bias = np.mean(R[np.where(~np.isnan(R))])

        self.R = R
        self.R_bar = self.R - self.global_bias

        self.train()

    def train(self):
        for i in range(self.n_iterations):
            self.update_user_factors()
            self.update_item_factors()
            self.update_user_biases()
            self.update_item_biases()

    def update_user_factors(self):
        for u in range(self.n_users):
            self.user_factors[u] = self.update_user_factor(u)
    
    def update_user_factor(self, u):
        I_u = np.where(~np.isnan(self.R[u]))[0]
        A = np.dot(self.item_factors[I_u].T, self.item_factors[I_u]) + self.reg * np.eye(self.n_factors)
        V = np.dot(self.item_factors[I_u].T, self.R_bar[u, I_u])
        return np.linalg.solve(A, V)
    
    def update_item_factors(self):
        for i in range(self.n_items):
            self.item_factors[i] = self.update_item_factor(i)
    
    def update_item_factor(self, i):
        U_i = np.where(~np.isnan(self.R[:, i]))[0]
        A = np.dot(self.user_factors[U_i].T, self.user_factors[U_i]) + self.reg * np.eye(self.n_factors)
        V = np.dot(self.user_factors[U_i].T, self.R_bar[U_i, i])
        return np.linalg.solve(A, V)
    
    def update_user_biases(self):
        for u in range(self.n_users):
            self.user_biases[u] = self.update_user_bias(u)

    def update_user_bias(self, u):
        I_u = np.where(~np.isnan(self.R[u]))[0]
        return np.mean(self.R[u, I_u] - np.dot(self.item_factors[I_u], self.user_factors[u]) - self.item_biases[I_u] - self.global_bias)
    
    def update_item_biases(self):
        for i in range(self.n_items):
            self.item_biases[i] = self.update_item_bias(i)
    
    def update_item_bias(self, i):
        U_i = np.where(~np.isnan(self.R[:, i]))[0]
        return np.mean(self.R[U_i, i] - np.dot(self.user_factors[U_i], self.item_factors[i]) - self.user_biases[U_i] - self.global_bias)
        
    def evaluate(self, test):
        if (isinstance(test, pd.DataFrame)):
            test = test.values
        n_test = test.shape[0]
        rmse = 0
        for i in range(n_test):
            u, i, r = test[i]
            rmse += (r - self.predict(u, i))**2
        return np.sqrt(rmse/n_test)
    
    def predict(self, u, i):
        return self.global_bias + self.user_biases[u] + self.item_biases[i] + np.dot(self.user_factors[u], self.item_factors[i])
    
    def recommend(self, user, n_items):
        predictions = np.array([self.predict(user, i) for i in range(self.n_items)])
        return np.argsort(predictions)[::-1][:n_items]
    
    def add_user(self, user_ratings):
        # user_ratings is a list of tuples (item, rating), item should match the row index of R
        self.user_factors = np.vstack((self.user_factors, np.random.normal(size = self.n_factors)))
        self.user_biases = np.append(self.user_biases, np.random.normal())
        self.n_users += 1

        self.R = np.vstack((self.R, np.zeros(self.n_items)))

        for i, r in user_ratings:
            if i in range(self.n_items):
                self.R[self.n_users - 1, i] = r
            else:
                self.add_item()
                self.R[self.n_users - 1, -1] = r

        self.global_bias = np.mean(self.R[np.where(~np.isnan(self.R))])
        
        self.R_bar = self.R - self.global_bias

        self.train()


    def add_item(self, item):
        self.item_factors = np.vstack((self.item_factors, np.random.normal(size = self.n_factors)))
        self.item_biases = np.append(self.item_biases, np.random.normal())
        self.n_items += 1
        
        self.R = np.hstack((self.R, np.zeros((self.n_users, 1))))
    


In [4]:
import numpy as np

class BiasedALS:
    def __init__(self, n_factors=10, n_iterations=200, reg=0.01):
        self.n_factors = n_factors
        self.n_iterations = n_iterations
        self.reg = reg

    def fit(self, R):
        self.n_users, self.n_items = R.shape
        self.user_factors = np.random.random((self.n_users, self.n_factors))
        self.item_factors = np.random.random((self.n_items, self.n_factors))
        self.user_bias = np.zeros(self.n_users)
        self.item_bias = np.zeros(self.n_items)
        self.global_mean = np.mean(R[np.where(R != 0)])

        for _ in range(self.n_iterations):
            for u in range(self.n_users):
                self.user_factors[u], self.user_bias[u] = self.solve(R[u, :], self.item_factors, self.user_bias[u], self.reg, self.n_factors)
            for i in range(self.n_items):
                self.item_factors[i], self.item_bias[i] = self.solve(R[:, i], self.user_factors, self.item_bias[i], self.reg, self.n_factors)

    def solve(self, v, U, bias, reg, n_factors):
        A = np.dot(U.T, U) + np.eye(n_factors) * reg
        b = np.dot(U.T, v - bias)
        x = np.linalg.solve(A, b)
        new_bias = np.mean(v - np.dot(U, x))
        return x, new_bias

    def predict(self):
        return self.global_mean + self.user_bias[:, np.newaxis] + self.item_bias[np.newaxis, :] + self.user_factors.dot(self.item_factors.T)

# Assume R is your ratings matrix
R = np.array([[5, 3, 0, 1], [4, 0, 0, 1], [1, 1, 0, 5], [1, 0, 0, 4], [0, 1, 5, 4]])
model = BiasedALS()
model.fit(R)
print(model.predict())  # Predict the entire matrix

[[ 4.55584204  2.56049837 -0.43680917  0.57116085]
 [ 3.55264703 -0.42543924 -0.43760768  0.57036478]
 [ 0.54234495  0.54954489 -0.45040555  4.54810458]
 [ 0.59010563 -0.39720032 -0.40247381  3.59733308]
 [-0.44345883  0.56348753  4.55256812  3.56625155]]
