### Building a Recommender System with Scikit-learn
This notebook focuses on implementing a custom recommender system. The primary objective is to build a generalized model capable of recommending items based on the training data provided. We will leverage the scikit-learn API to compare and evaluate different models to determine the most effective approach for our recommendation task. For more detailed information on using the scikit-learn API, visit the [scikit-learn developer guide.](https://scikit-learn.org/stable/developers/develop.html)

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.sparse import random

## Implementation recommender system

### Scikit-Learn API

The [Scikit-Learn User Manual](https://scikit-learn.org/stable/developers/develop.html) provides detailed guidelines on how to construct a basic estimator. To ensure compatibility with Scikit-Learn and its suite of estimators, we adhere strictly to these guidelines as outlined in the documentation.


### Cost Function Used

\begin{align}
J(U, M, \mathbf{b_u}, \mathbf{b_m}) &= \frac{1}{| \mathbb{Z} |} \Bigg(\sum_{u, i \in \mathbb{Z}} \frac{1}{2}(r^{(u,i)} - \hat{r}^{(u,i)})^2 + \sum_{u, i \in \mathbb{Z}} \Omega^{(u, i)} \Bigg)
\end{align}

\begin{align}
\hat{r}^{(u,i)} &= \mathbf{u}^{(u)T} \mathbf{m}^{(i)} + b_u^{(u)} + b_m^{(i)} \\
\Omega^{(u, i)} &= \frac{\lambda}{2} \Bigg( \lVert \mathbf{u}^{(u)} \rVert^2_2 +  \lVert \mathbf{m}^{(i)} \rVert^2_2 + b_u^{(u)2} + b_m^{(i)2} \Bigg)
\end{align}

Here, \( U \) represents the users, and \( M \) represents the items. The cost function is only applied to the entries in \( r \) where a rating exists.

$\mathbb{Z}$ is the set of known/non-zero entries in the rating matrix \( R \) (i.e., all combinations of \( (u, i) \) that occur). $\mathbf{u}^{(u)T}$ is the \( u \)-th row vector of \( U \), and $\mathbf{m}^{(i)}$ is the \( i \)-th column vector of $\mathbf{M}^{T}$. $\lambda$ is the regularization factor. The global average rating \( r \) is computed on \( R \), and then subtracted from all ratings. Therefore, the optimization is performed on $\R_{\text{center}}$ = R - r. The squared \( L_2 \) norm is defined as: $\|z\|_2^2 = \sum_i (z(i))^2$.


### Optimization with Stochastic Gradient Descent (SGD)
The cost function is optimized alternately, first for \( \mathbf{u}^{(u)T} \) and then for \( \mathbf{m}^{(i)} \) (and the biases), while keeping the other variable constant. This leads to a least squares optimization problem. With SGD, the cost function (and its gradients) is approximated using a per-sample loss \( L \):

With \( M \) fixed:
\begin{align}
L(U, r^{(u,i)}) &= \frac{1}{2} (r^{(u,i)} - \hat{r}^{(u,i)})^2 + \Omega^{(u, i)} \\
J(U) &= \mathbb{E}_{u,i \sim \mathbb{Z}} L(U, r^{(u,i)}) = \frac{1}{| \mathbb{Z} |} \sum_{u, i \in \mathbb{Z}}  L(U, r^{(u,i)})
\end{align}

With \( U \) fixed:
\begin{align}
L(M, r^{(u,i)}) &= \frac{1}{2} (r^{(u,i)} - \hat{r}^{(u,i)})^2 + \Omega^{(u, i)} \\
J(M) &= \mathbb{E}_{u,i \sim \mathbb{Z}} L(M, r^{(u,i)}) = \frac{1}{| \mathbb{Z} |} \sum_{u, i \in \mathbb{Z}}  L(M, r^{(u,i)})
\end{align}

### Algorithm
We optimize the cost function for each entry in the rating matrix \( R \) using the following algorithm, where \( \eta \) is the learning rate and \( t \) indexes the iteration step.

For all \( u,i \in \mathbb{Z} \) do:
\begin{align}
\mathbf{u}^{(u)}_{t+1} &= \mathbf{u}^{(u)}_{t} - \eta \frac{\partial L(U, r^{(u,i)})}{\partial \mathbf{u}^{(u)}_{t}} \\
\mathbf{m}^{(i)}_{t+1} &= \mathbf{m}^{(i)}_{t} - \eta \frac{\partial L(M, r^{(u,i)})}{\partial \mathbf{m}^{(i)}_{t}} \\
b_{m, t+1}^{(i)} &= b_{m, t}^{(i)} - \eta \frac{\partial L(M, r^{(u,i)})}{\partial b_{m, t}^{(i)}} \\
b_{u, t+1}^{(u)} &= b_{u, t}^{(u)} - \eta \frac{\partial L(U, r^{(u,i)})}{\partial b_{u, t}^{(u)}} \\
\end{align}



In [3]:
from sklearn.base import BaseEstimator
import numpy as np
from tqdm.notebook import tqdm

class RecommenderModel(BaseEstimator):

    def __init__(
        self,
        alpha: float=0, 
        lr:float=0.1,
        num_components: int=10,
        num_epochs: int=10000,
        random_seed: int=12345,
        batch_size = 10,
        epsilon=1e-10
    ):
        
        """
        Args:
            alpha: regularization constant
            lr: learning rate constant
            num_components: dimensionaliy of each user / item vector
            num_epochs: number of passes over each entry in the Rating matrix
            random_seed: random seed for reproducibility
        """

        self.alpha = alpha
        self.lr = lr
        self.num_components = num_components
        self.num_epochs = num_epochs
        self.random_seed = random_seed
        self.batch_size = batch_size
        self.epsilon = epsilon
    

    def __init_variables__(self, X:np.ndarray) -> None:
        
        self.default_rng = np.random.default_rng(self.random_seed)
        self.num_user, self.num_items = X.shape
        self.user_matrix = self.default_rng.normal(0, 0.1, (self.num_user, self.num_components))
        self.item_matrix = self.default_rng.normal(0, 1, (self.num_items, self.num_components))
        self.item_bias = np.zeros((self.num_items, 1))
        self.user_bias = np.zeros((self.num_user, 1))
        self.rating = self.bool_matrix(X)
        self.num_ratings = np.sum(self.rating)
        self.global_avg = np.sum(X) / (self.num_ratings if self.num_ratings > 0 else 1)

        # for plotting
        self.learning_curve = []
        self.cost_curve = []
        self.stop_step = 0

   
    def fit(self, X: np.ndarray):
        """Fit the model.
        
            Args:
                X (num_users, num_items) - The Rating Matrix
    
            Returns:
                Self
        """
        self.__init_variables__(X)
        user_indices, item_indices = np.where(self.rating != 0)
        indices = np.arange(len(user_indices))
        for epoch in range(self.num_epochs):
            prediction = self.predict()
            self.cost_curve.append(self.calc_cost(X, prediction))
            np.random.shuffle(indices)
            for index in range(len(indices)):
                user = user_indices[index]
                item = item_indices[index]

                actual_rating = X[user, item]
                prediction = np.dot(self.user_matrix[user].T, self.item_matrix[item]) + self.item_bias[item] +  self.user_bias[user]
                error = actual_rating - prediction
    
                dj_du, dj_dm, dj_dbu, dj_dbi = self.calc_gradients(error=error, m_item=item, u_user=user)

                self.user_matrix[user] -= self.lr * dj_du
                self.item_matrix[item] -= self.lr * dj_dm
                self.user_bias[user] -= self.lr * dj_dbu
                self.item_bias[item] -= self.lr * dj_dbi

                gradient_norm = (
                    np.linalg.norm(dj_du) + 
                    np.linalg.norm(dj_dm) + 
                    np.linalg.norm(dj_dbu) + 
                    np.linalg.norm(dj_dbi)
                )
                
                self.learning_curve.append(gradient_norm)

                if gradient_norm < self.epsilon:
                    self.stop_step = epoch
                    self.last_gradient = self.learning_curve[-1]
                    return self

        return self
    
    def print_eps_condition(self):
        if(self.stop_step != 0):
            print(f"The epsilon criterion was met in step: {self.stop_step}")
            print(f"The value of the last gradient is: {self.last_gradient}")
        else: 
            print("The stopping criterion was not met")
            print(f"The value of the last gradient is: {self.learning_curve[-1]}")
    
    def calc_cost(self, y_true: np.ndarray, y_pred: np.ndarray) -> float:
        """ Cost function
            Args:
                y_true: Ratings Matrix [num_users, num_items]
                y_pred: Predicted Ratings Matrix [num_users, num_items]
            Returns:
                cost for a predicion
        """

        omega = self.alpha / 2 * (np.sum(self.user_matrix**2) + np.sum(self.item_matrix**2) + np.sum(self.item_bias**2) + np.sum(self.user_bias**2))
        residual = y_true - y_pred * self.rating 
        cost = (np.sum(residual**2) / 2 + omega) / self.num_ratings
        return cost
    
    def calc_gradients(
            self,
            error: np.ndarray,
            m_item: np.ndarray,
            u_user: np.ndarray) -> (np.ndarray, np.ndarray, np.ndarray, np.ndarray):
        """ Calculate Gradients
            Args:
                error: Matrix with size [batch_size, batch_size]
                m_item: weights for chosen items [batch_size, num_components]
                u_user: weights for chosen users [batch_size, num_components]
                bias_user: bias for chosen user [batch_size]
                bias_item: bias for chosen item [batch_size]
            Returns:
                Tuple (dJ_dUser, dJ_dItem, dJ_dUserBias, dJ_dItemBias)
                
                with shapes:
                
                ([batch_size, num_components], [batch_size, num_components], [batch_size], [batch_size])
        """

        user, item = self.user_matrix[u_user], self.item_matrix[m_item]
        dj_du = np.multiply(error, -item) + self.alpha * user
        dj_dm = np.multiply(error, -user) + self.alpha * item

        dj_dbu = -error + self.alpha * self.user_bias[u_user]
        dj_dbi = -error + self.alpha * self.item_bias[m_item]
        return dj_du, dj_dm, dj_dbu, dj_dbi



    def predict(self) -> np.ndarray:
        """ Calculate Approximation of Rating Matrix
            Returns:
                Approximation of Rating Matrix [num_users, num_items]
        """
        prediction = np.matmul(self.user_matrix, self.item_matrix.T)
        prediction += self.user_bias
        prediction += self.item_bias.T
        return prediction
    
    def score(self, X: np.ndarray) -> float:
        """ Mean Squared Error between X and Approximation
            Args:
                X: Ratings Matrix [num_users, num_items]
            Returns:
                MSE (float)
        """
        rating = self.bool_matrix(X)
        error = ((self.predict() - X) * rating)**2
        score = np.sum(error) / np.sum(rating)
        return score

    def score_mae(self, X: np.ndarray) -> float:
        """ Mean Absolute Error between X and Approximation
            Args:
                X: Ratings Matrix [num_users, num_items]
            Returns:
                MAE (float)
        """
        rating = self.bool_matrix(X)
        error = np.abs(((self.predict() - X) * rating))
        score = np.sum(error) / np.sum(rating)
        return score
    

    def bool_matrix(self, X :np.ndarray) -> np.ndarray:
        '''
        Creates a boolean matrix 1 if the given argument has a rating else 0
        Args:
            X: Rating matrix 
        '''
        rating_matrix = np.where(X != 0, 1, 0)
        return rating_matrix



## Testing the model

In [4]:
import numpy as np

def print_result(test_name, passed, expected, actual):
    status = "Passed" if passed else "Failed"
    print(f"{status} test: {test_name}")
    print(f"----> Expected: {expected}")
    print(f"----> Actual: {actual}")

def test_ones_matrix():
    # Create a ones matrix
    X = np.ones((5, 5))
    
    # Initialize the model
    model = RecommenderModel(alpha=0, lr=0.01, num_components=2, num_epochs=50, random_seed=123)
    
    # Fit the model
    model.fit(X)
    
    # Make predictions
    predictions = model.predict()
    
    try:
        np.testing.assert_allclose(predictions, 1, atol=0.1)
        print_result("test_ones_matrix", True, np.ones((5, 5)), predictions)
    except AssertionError as e:
        print_result("test_ones_matrix", False, np.ones((5, 5)), predictions)


def test_twos_matrix():
    X = np.ones((5, 5)) + 1
    
    # Initialize the model
    model = RecommenderModel(alpha=0, lr=0.01, num_components=2, num_epochs=50, random_seed=123)
    
    # Fit the model
    model.fit(X)
    
    # Make predictions
    predictions = model.predict()
    
    try:
        np.testing.assert_allclose(predictions, 2, atol=0.1)
        print_result("test_twos_matrix", True, np.ones((5, 5)) + 1, predictions)
    except AssertionError as e:
        print_result("test_twos_matrix", False, np.ones((5, 5)) + 1, predictions)


def test_random_matrix():
    X = np.array([
        [4, 1, 4, 2],
        [3, 5, 2, 2],
        [2, 3, 1, 4],
        [2, 3, 2, 2]
    ])
    
    # Initialize the model
    model = RecommenderModel(alpha=0, lr=0.1, num_components=4, num_epochs=100, random_seed=123)
    
    # Fit the model
    model.fit(X)
    
    # Make predictions
    predictions = model.predict()
    
    # The predictions should be close to the input, since there are no missing (zero) entries
    try:
        np.testing.assert_allclose(predictions, X, atol=0.1)
        print_result("test_random_matrix", True, X, predictions)
    except AssertionError as e:
        print_result("test_random_matrix", False, X, predictions)

        
def test_sparse_matrix():
    # Create a 5x6 matrix with random values and some zeros
    X = np.array([
        [5, 3, 0, 1, 2, 0],
        [4, 0, 3, 0, 0, 2],
        [0, 1, 5, 0, 3, 4],
        [2, 0, 0, 4, 1, 0],
        [0, 4, 2, 0, 0, 5]
    ])
    
    # Initialize the model
    model = RecommenderModel(alpha=0, lr=0.1, num_components=4, num_epochs=100, random_seed=123)
    
    # Fit the model
    model.fit(X)
    
    # Make predictions
    predictions = model.predict()
    
    try:
        # Check that non-zero entries are close to the input values
        non_zero_indices = np.nonzero(X)
        np.testing.assert_allclose(predictions[non_zero_indices], X[non_zero_indices], atol=1.0)
        print_result("test_sparse_matrix", True, X[non_zero_indices], predictions[non_zero_indices])
    except:
        print_result("test_sparse_matrix", False, X[non_zero_indices], predictions[non_zero_indices])
    
    try:
        # Check that zero entries in the original matrix are now non-zero in the predictions
        zero_indices = np.where(X == 0)
        assert np.all(predictions[zero_indices] >= 0.0)
        print_result("test_sparse_matrix", True, "all larger than 0.0" , predictions[zero_indices])
    except AssertionError as e:
        print_result("test_sparse_matrix", False, "all larger than 0.0", predictions[zero_indices])


def test_random_sparse_matrix():
    # Create a random sparse matrix with more zeros
    X = np.array([
        [0, 0, 3, 0],
        [4, 0, 0, 2],
        [0, 5, 0, 0],
        [1, 0, 0, 0]
    ])
    
    # Initialize the model
    model = RecommenderModel(alpha=0, lr=0.1, num_components=2, num_epochs=100, random_seed=123)
    
    # Fit the model
    model.fit(X)
    
    # Make predictions
    predictions = model.predict()
    
    # The non-zero entries should be close to the input values
    try:
        non_zero_indices = np.nonzero(X)
        np.testing.assert_allclose(predictions[non_zero_indices], X[non_zero_indices], atol=1.0)
        print_result("test_random_sparse_matrix", True, X[non_zero_indices], predictions[non_zero_indices])
    except AssertionError as e:
        print_result("test_random_sparse_matrix", False, X[non_zero_indices], predictions[non_zero_indices])

for test in [
    test_ones_matrix, 
    test_twos_matrix, 
    test_random_matrix, 
    test_sparse_matrix, 
    test_random_sparse_matrix, 
]:
    try:
        test()
    except Exception as e:
        print(f"Error during testing - test: {test} error: {e}")

Passed test: test_ones_matrix
----> Expected: [[1. 1. 1. 1. 1.]
 [1. 1. 1. 1. 1.]
 [1. 1. 1. 1. 1.]
 [1. 1. 1. 1. 1.]
 [1. 1. 1. 1. 1.]]
----> Actual: [[1.04241708 0.99754131 0.97477828 0.98976791 0.98844079]
 [0.96051725 1.01274295 1.0034863  1.0273135  0.93996663]
 [0.95719652 1.01027234 1.0099667  1.02264772 0.95703827]
 [1.00096004 0.99983107 0.9982038  0.99805037 0.99629228]
 [1.02090523 1.00228524 0.97943205 1.00156645 0.96745133]]
Passed test: test_twos_matrix
----> Expected: [[2. 2. 2. 2. 2.]
 [2. 2. 2. 2. 2.]
 [2. 2. 2. 2. 2.]
 [2. 2. 2. 2. 2.]
 [2. 2. 2. 2. 2.]]
----> Actual: [[2.04227915 2.00084606 1.97277732 1.99453585 1.97593792]
 [1.96185144 2.01639516 2.00302742 2.03263752 1.93195392]
 [1.95763157 2.01397925 2.00864229 2.02842542 1.94594784]
 [1.99990123 2.00354796 1.99512691 2.00418237 1.97948204]
 [2.02058948 2.00663573 1.97716368 2.00838941 1.95199586]]
Passed test: test_random_matrix
----> Expected: [[4 1 4 2]
 [3 5 2 2]
 [2 3 1 4]
 [2 3 2 2]]
----> Actual: [[3.99999