<a href="https://colab.research.google.com/github/MehrdadDastouri/matrix_factorization_recommender/blob/main/matrix_factorization_recommender.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split

# Generate synthetic user-item rating data
def generate_data(num_users=10, num_items=15, sparsity=0.7):
    """
    Generate a synthetic user-item rating matrix with sparsity.

    Args:
    - num_users: Number of users.
    - num_items: Number of items.
    - sparsity: Proportion of missing ratings.

    Returns:
    - ratings: User-item rating matrix (numpy array).
    """
    np.random.seed(42)
    ratings = np.random.randint(1, 6, size=(num_users, num_items)).astype(float)  # Ratings between 1 and 5
    mask = np.random.rand(*ratings.shape) < sparsity  # Create missing values
    ratings[mask] = 0  # Set missing ratings to 0
    return ratings

# Generate synthetic data
num_users = 10
num_items = 15
sparsity = 0.7
ratings = generate_data(num_users, num_items, sparsity)
print("User-Item Rating Matrix:")
print(ratings)

# Convert data to PyTorch tensors
ratings_tensor = torch.tensor(ratings, dtype=torch.float32)

# Split data into train and test sets
def train_test_split_matrix(ratings, test_ratio=0.2):
    """
    Split the rating matrix into training and testing sets.

    Args:
    - ratings: User-item rating matrix.
    - test_ratio: Proportion of ratings to include in the test set.

    Returns:
    - train: Training rating matrix.
    - test: Testing rating matrix.
    """
    train = ratings.copy()
    test = np.zeros_like(ratings)
    for user in range(ratings.shape[0]):
        non_zero_indices = ratings[user, :].nonzero()[0]
        test_indices = np.random.choice(non_zero_indices, size=int(len(non_zero_indices) * test_ratio), replace=False)
        train[user, test_indices] = 0
        test[user, test_indices] = ratings[user, test_indices]
    return train, test

train_ratings, test_ratings = train_test_split_matrix(ratings, test_ratio=0.2)
train_ratings_tensor = torch.tensor(train_ratings, dtype=torch.float32)
test_ratings_tensor = torch.tensor(test_ratings, dtype=torch.float32)

# Define the Matrix Factorization model
class MatrixFactorization(nn.Module):
    def __init__(self, num_users, num_items, latent_dim):
        super(MatrixFactorization, self).__init__()
        self.user_factors = nn.Embedding(num_users, latent_dim)  # User latent factors
        self.item_factors = nn.Embedding(num_items, latent_dim)  # Item latent factors

    def forward(self, user, item):
        user_embedding = self.user_factors(user)
        item_embedding = self.item_factors(item)
        return (user_embedding * item_embedding).sum(1)  # Dot product of user and item factors

# Initialize the model
latent_dim = 5
model = MatrixFactorization(num_users, num_items, latent_dim)
criterion = nn.MSELoss()  # Loss function
optimizer = optim.Adam(model.parameters(), lr=0.01)  # Optimizer

# Prepare training data
train_indices = torch.nonzero(train_ratings_tensor > 0)  # Indices of non-zero ratings
train_users = train_indices[:, 0]
train_items = train_indices[:, 1]
train_ratings = train_ratings_tensor[train_users, train_items]

# Training loop
epochs = 100
train_losses = []
for epoch in range(epochs):
    model.train()
    optimizer.zero_grad()

    # Forward pass
    predictions = model(train_users, train_items)
    loss = criterion(predictions, train_ratings)
    train_losses.append(loss.item())

    # Backward pass and optimization
    loss.backward()
    optimizer.step()

    if (epoch + 1) % 10 == 0:
        print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}")

# Evaluate the model
model.eval()
test_indices = torch.nonzero(test_ratings_tensor > 0)
test_users = test_indices[:, 0]
test_items = test_indices[:, 1]
test_ratings = test_ratings_tensor[test_users, test_items]

with torch.no_grad():
    test_predictions = model(test_users, test_items)
test_loss = criterion(test_predictions, test_ratings)
print(f"Test Loss (MSE): {test_loss.item():.4f}")

# Visualize the original, train, and predicted matrices
predicted_ratings_tensor = torch.zeros_like(ratings_tensor)
with torch.no_grad():  # Disable gradient calculation
    for user in range(num_users):
        for item in range(num_items):
            predicted_ratings_tensor[user, item] = model(
                torch.tensor([user]), torch.tensor([item])
            )

print("\nOriginal Ratings:")
print(ratings)
print("\nTrain Ratings:")
print(train_ratings)

# Fix: Use .detach() before converting to NumPy
print("\nPredicted Ratings:")
print(predicted_ratings_tensor.detach().numpy())

User-Item Rating Matrix:
[[0. 5. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 5. 0. 1. 0. 3. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 4. 0. 4. 2. 0.]
 [0. 0. 5. 0. 0. 4. 4. 0. 0. 3. 0. 0. 2. 0. 2.]
 [0. 0. 0. 0. 0. 4. 0. 0. 4. 4. 0. 5. 0. 0. 5.]
 [2. 1. 0. 4. 4. 5. 1. 0. 0. 0. 1. 1. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0. 0. 2. 2. 0. 0. 0. 0. 0. 0.]
 [5. 0. 4. 0. 0. 0. 0. 5. 0. 0. 0. 2. 0. 1. 0.]
 [0. 4. 5. 3. 0. 0. 0. 4. 0. 0. 0. 0. 0. 0. 0.]
 [3. 4. 0. 0. 0. 0. 0. 0. 0. 4. 0. 4. 0. 0. 0.]]
Epoch [10/100], Loss: 13.3372
Epoch [20/100], Loss: 10.9463
Epoch [30/100], Loss: 9.0327
Epoch [40/100], Loss: 7.4371
Epoch [50/100], Loss: 6.0584
Epoch [60/100], Loss: 4.8510
Epoch [70/100], Loss: 3.8064
Epoch [80/100], Loss: 2.9344
Epoch [90/100], Loss: 2.2377
Epoch [100/100], Loss: 1.6939
Test Loss (MSE): 14.4191

Original Ratings:
[[0. 5. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 5. 0. 1. 0. 3. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 4. 0. 4. 2. 0.]
 [0. 0. 5. 0.