# Implementation

In [None]:
# Import required libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# Load the Data

We'll use the MovieLens 100k dataset, which contains 100,000 movie ratings from 943 users on 1,682 movies.

In [None]:
# Download movie ratings data
!wget http://files.grouplens.org/datasets/movielens/ml-100k.zip
# Unzip it
!unzip ml-100k.zip
# Remove the .zip file
!rm ml-100k.zip

In [None]:
# Load the ratings data
ratings = pd.read_csv(
    "ml-100k/u.data",
    sep="\t",  # Tab-separated data
    names=["user_id", "movie_id", "rating", "timestamp"],  # Column names
    usecols=["user_id", "movie_id", "rating"],  # We don't need the timestamp column
    low_memory=False
)
ratings

# Train-Test Split

We'll split the data into training and test sets (80/20 split) while ensuring each user has data in both sets.

In [None]:
test_perc = 0.2

# Initialize the train and test dataframes
train_set, test_set = pd.DataFrame(), pd.DataFrame()

# Split data for each user
for user_id in ratings.user_id.unique():
    # Select only samples of the current user and shuffle
    user_df = ratings[ratings.user_id == user_id].sample(
        frac=1,
        random_state=42
    )
    
    n_entries = len(user_df)
    n_test = int(round(test_perc * n_entries))
    
    test_set = pd.concat((test_set, user_df.tail(n_test)))
    train_set = pd.concat((train_set, user_df.head(n_entries - n_test)))

# Shuffle the final datasets
train_set = train_set.sample(frac=1).reset_index(drop=True)
test_set = test_set.sample(frac=1).reset_index(drop=True)

train_set.shape, test_set.shape

# Compute Problem Dimensions

Determine the number of unique users and movies in the dataset.

In [None]:
n_users = ratings.user_id.unique().shape[0]
n_movies = ratings.movie_id.unique().shape[0]
n_users, n_movies

# Model: Matrix Factorization

The `H` class implements the matrix factorization model. It initializes two matrices:
- `P`: User latent feature matrix (n_users × r)
- `Q`: Movie latent feature matrix (n_movies × r)

where `r` is the number of latent factors.

In [None]:
class H:
    """
    Matrix Factorization model for collaborative filtering.
    
    This class represents a recommender system that factorizes the user-item rating
    matrix into two lower-dimensional matrices: P (user features) and Q (item features).
    
    Parameters
    ----------
    n : int
        Number of users
    m : int
        Number of movies/items
    r : int
        Number of latent factors (rank of the factorization)
    
    Attributes
    ----------
    P : numpy.ndarray
        User feature matrix of shape (n, r)
    Q : numpy.ndarray
        Movie feature matrix of shape (m, r)
    """
    
    def __init__(self, n, m, r):
        """Initialize the model with random feature matrices."""
        np.random.seed(42)
        self.P = np.random.rand(n, r)
        self.Q = np.random.rand(m, r)
    
    def predict(self):
        """
        Generate predicted ratings for all user-movie pairs.
        
        Returns
        -------
        numpy.ndarray
            Predicted rating matrix of shape (n_users, n_movies)
        """
        return self.P @ self.Q.T

# Loss Function: Mean Squared Error

The MSE function computes the mean squared error between predicted and actual ratings.

In [None]:
def mse(ratings, R_hat):
    """
    Calculate Mean Squared Error for the given ratings.
    
    Parameters
    ----------
    ratings : pandas.DataFrame
        DataFrame containing 'user_id', 'movie_id', and 'rating' columns
    R_hat : numpy.ndarray
        Predicted rating matrix of shape (n_users, n_movies)
    
    Returns
    -------
    float
        Mean squared error between predicted and actual ratings
    """
    # Extract predictions for the given user-movie pairs
    predictions = R_hat[ratings['user_id'] - 1, ratings['movie_id'] - 1]
    
    # Get actual ratings
    actual = ratings['rating']
    
    # Return mean of squared differences
    return np.mean((predictions - actual) ** 2)

In [None]:
# Initialize the model and compute initial MSE
h = H(n_users, n_movies, 5)
mse(train_set, h.predict())

# Training: Gradient Descent

We'll train the model using gradient descent to minimize the MSE.

## Gradient Derivation

For the loss function $L = (x^T y - a)^2$, the gradients are:
- $\nabla_x L = 2(x^T y - a) \cdot y$
- $\nabla_y L = 2(x^T y - a) \cdot x$

We can verify this mathematically:

In [None]:
import sympy as sp

# Define symbols
n = 3
a = sp.Symbol('a')
x = sp.Matrix(sp.symbols(f'x0:{n}'))
y = sp.Matrix(sp.symbols(f'y0:{n}'))

# Define loss function
f = (x.dot(y) - a)**2

# Compute gradients
grad_x = sp.Matrix([sp.diff(f, xi) for xi in x])
grad_y = sp.Matrix([sp.diff(f, yi) for yi in y])

print("∇x f =")
sp.pprint(grad_x)
print("\n∇y f =")
sp.pprint(grad_y)

In [None]:
# Simpler verification with scalar variables
x, y, a = sp.symbols('x y a', real=True)
f = (x * y - a)**2

grad_x = sp.diff(f, x)
grad_y = sp.diff(f, y)

sp.pprint(grad_x)
sp.pprint(grad_y)

## Gradient Update Function

This function implements one epoch of stochastic gradient descent.

In [None]:
def update_gradient(h, ratings, lr):
    """
    Perform one epoch of stochastic gradient descent to update P and Q.
    
    For each rating in the dataset, compute the prediction error and update
    the corresponding user and movie feature vectors in the direction that
    reduces the error.
    
    Parameters
    ----------
    h : H
        The matrix factorization model instance
    ratings : pandas.DataFrame
        DataFrame containing 'user_id', 'movie_id', and 'rating' columns
    lr : float
        Learning rate for gradient descent
    """
    for _, user_id, movie_id, rating in ratings.itertuples():
        u = user_id - 1  # Convert to 0-indexed
        i = movie_id - 1  # Convert to 0-indexed
        
        # 1. Compute prediction
        prediction = h.Q[i] @ h.P[u]
        
        # 2. Compute error
        error = prediction - rating
        
        # 3. Update feature vectors
        # Store current P[u] before updating (needed for Q update)
        current_P = h.P[u].copy()
        h.P[u] = h.P[u] - lr * (h.Q[i] * error * 2)
        h.Q[i] = h.Q[i] - lr * (current_P * error * 2)

## Training Loop

Train the model for 100 epochs and track the training loss.

In [None]:
# Training hyperparameters
epochs = 100
lr = 0.001

In [None]:
# Train the model
mse_history = []
for i in range(epochs):
    update_gradient(h, train_set, lr)
    value = mse(train_set, h.predict())
    mse_history.append(value)

In [None]:
# Visualize training progress
plt.plot(mse_history)
plt.title("Training Loss (MSE) over Epochs")
plt.xlabel("Epoch")
plt.ylabel("Mean Squared Error")
plt.grid(True, alpha=0.3)
plt.show()

# Evaluation on Test Set

Now let's evaluate how well our trained model performs on unseen data.

In [None]:
# Compute test set MSE
test_mse = mse(test_set, h.predict())
print(f"Test Set MSE: {test_mse:.4f}")

# Error Analysis

Let's analyze the distribution of prediction errors.

In [None]:
# Compute absolute error for each test rating
errors = []
for _, user_id, movie_id, rating in test_set.itertuples():
    predicted = round(h.predict()[user_id - 1, movie_id - 1])
    errors.append(abs(predicted - rating))

In [None]:
# Statistical summary of errors
error_series = pd.Series(errors)
error_series.describe()

In [None]:
# Visualize error distribution
error_series.plot(kind='hist', bins=10)
plt.title("Distribution of Prediction Errors")
plt.xlabel("Absolute Error")
plt.ylabel("Frequency")
plt.grid(True, alpha=0.3)
plt.show()

In [None]:
# Count of each error value
error_series.value_counts().sort_index()

In [None]:
# Check the range of predicted ratings
predicted_ratings = np.unique(np.round(h.predict()))
print(f"Unique predicted ratings: {predicted_ratings}")

# Movie Metadata Analysis (Optional)

For further exploration, we can load movie metadata to understand which types of movies our model performs best on.

In [None]:
# Column names for the movie metadata file
columns = [
    "movie_id", "title", "release_date", "video_release_date", "IMDb_URL",
    "unknown", "Action", "Adventure", "Animation", "Children's", "Comedy", "Crime",
    "Documentary", "Drama", "Fantasy", "Film-Noir", "Horror", "Musical", "Mystery",
    "Romance", "Sci-Fi", "Thriller", "War", "Western"
]

# Load movie metadata
df_items = pd.read_csv(
    "ml-100k/u.item",
    sep="|",
    names=columns,
    encoding="ISO-8859-1"
)

print(df_items.head())

## Conclusion

This notebook demonstrates a complete implementation of matrix factorization for collaborative filtering from scratch. The model successfully learns latent representations of users and movies that can predict ratings with reasonable accuracy (MSE ≈ 0.91 on the test set).

Key takeaways:
- Matrix factorization is a powerful technique for recommender systems
- Gradient descent can effectively optimize the factorization
- The model captures latent patterns in user preferences without explicit content information