In [None]:
# Code Cell 1: Load Data
import numpy as np
import pandas as pd
from scipy.sparse import csr_matrix, coo_matrix
import scipy.sparse
import random


train_matrix_path = "../Data/train_matrix.npz"
test_matrix_path = "../Data/test_matrix.npz"
unique_ids_path = "../Data/unique.npz"

train = scipy.sparse.load_npz(train_matrix_path)
test = scipy.sparse.load_npz(test_matrix_path)

unique_user_ids = np.load(unique_ids_path, allow_pickle=True)["users"]
unique_item_ids = np.load(unique_ids_path, allow_pickle=True)["items"]

In [2]:
num_users, num_items = train.shape

print(f"Train matrix shape: {train.shape}")
print(f"Test matrix shape: {test.shape}")
print(f"Number of unique users: {num_users}")
print(f"Number of unique items: {num_items}")

Train matrix shape: (64078, 144804)
Test matrix shape: (64078, 144804)
Number of unique users: 64078
Number of unique items: 144804


In [None]:
k = 100
learning_rate = 0.01
lambda_reg = 0.8
num_epochs = 10
log_step = 1

print(
    f"Hyperparameters: k={k}, learning_rate={learning_rate}, lambda_reg={lambda_reg}, num_epochs={num_epochs}, log_step={log_step}"
)

Hyperparameters: k=100, learning_rate=0.01, lambda_reg=0.8, num_epochs=10, log_step=1


In [54]:
P = np.random.rand(num_users, k) * 0.005
Q = np.random.rand(k, num_items) * 0.005


user_bias = np.zeros(num_users)
item_bias = np.zeros(num_items)

global_average = np.mean(train.data)

print(f"Initialized P shape: {P.shape}")
print(f"Initialized Q shape: {Q.shape}")
print(f"Initialized user_bias shape: {user_bias.shape}")
print(f"Initialized item_bias shape: {item_bias.shape}")
print(f"Calculated global average: {global_average:.4f}")

Initialized P shape: (64078, 100)
Initialized Q shape: (100, 144804)
Initialized user_bias shape: (64078,)
Initialized item_bias shape: (144804,)
Calculated global average: 7.6299


In [None]:
def calculate_predicted_rating(
    user_idx, item_idx, P, Q, user_bias, item_bias, global_average
):
    """
    Calculates the predicted rating for a single user-item pair.

    Args:
        user_idx (int): Mapped index of the user.
        item_idx (int): Mapped index of the item.
        P (np.ndarray): User latent feature matrix (num_users, k).
        Q (np.ndarray): Item latent feature matrix (k, num_items).
        user_bias (np.ndarray): User bias vector (num_users,).
        item_bias (np.ndarray): Item bias vector (num_items,).
        global_average (float): Global average rating.

    Returns:
        float: The predicted rating.
    """
    # TODO
    predicted_rating = (
        global_average
        + user_bias[user_idx]
        + item_bias[item_idx]
        + np.dot(P[user_idx, :], Q[:, item_idx])
    )

    return predicted_rating


def calculate_error(actual_rating, predicted_rating):
    """
    Calculates the prediction error for a single rating.

    Args:
        actual_rating (float): The actual observed rating.
        predicted_rating (float): The predicted rating.

    Returns:
        float: The prediction error (actual - predicted).
    """
    # TODO
    error = actual_rating - predicted_rating

    return error


def update_biases(
    user_idx, item_idx, error, user_bias, item_bias, learning_rate, lambda_reg
):
    """
    Updates the user and item biases using the calculated error and SGD rules.

    Args:
        user_idx (int): Mapped index of the user.
        item_idx (int): Mapped index of the item.
        error (float): The prediction error for the user-item pair.
        user_bias (np.ndarray): User bias vector (num_users,).
        item_bias (np.ndarray): Item bias vector (num_items,).
        learning_rate (float): The learning rate for SGD.
        lambda_reg (float): The regularization parameter.
    """

    # TODO
    user_bias[user_idx] += learning_rate * (error - lambda_reg * user_bias[user_idx])
    item_bias[item_idx] += learning_rate * (error - lambda_reg * item_bias[item_idx])


def update_latent_features(
    user_idx, item_idx, error, P, Q, learning_rate, lambda_reg, k
):
    """
    Updates the user and item latent features using the calculated error and SGD rules.

    Args:
        user_idx (int): Mapped index of the user.
        item_idx (int): Mapped index of the item.
        error (float): The prediction error for the user-item pair.
        P (np.ndarray): User latent feature matrix (num_users, k).
        Q (np.ndarray): Item latent feature matrix (k, num_items).
        learning_rate (float): The learning rate for SGD.
        lambda_reg (float): The regularization parameter.
        k (int): The number of latent features.
    """

    # TODO
    for f in range(k):
        p_uf = P[user_idx, f]
        q_fi = Q[f, item_idx]

        P[user_idx, f] += learning_rate * (error * q_fi - lambda_reg * p_uf)
        Q[f, item_idx] += learning_rate * (error * p_uf - lambda_reg * q_fi)


print("Helper functions defined. Now implement the logic inside them!")

Helper functions defined. Now implement the logic inside them!


In [None]:
train_rows = train.row
train_cols = train.col
train_data = train.data
num_train_samples = len(train_data)

print("Starting training...")

for epoch in range(num_epochs):
    shuffled_indices = list(range(num_train_samples))
    random.shuffle(shuffled_indices)

    for idx in shuffled_indices:
        u = train_rows[idx]
        i = train_cols[idx]
        r_ui = train_data[idx]

        predicted_r_ui = calculate_predicted_rating(
            u, i, P, Q, user_bias, item_bias, global_average
        )

        error = calculate_error(r_ui, predicted_r_ui)

        update_biases(u, i, error, user_bias, item_bias, learning_rate, lambda_reg)

        update_latent_features(u, i, error, P, Q, learning_rate, lambda_reg, k)

    if (epoch + 1) % log_step == 0:
        train_sse = 0  # Sum of Squared Errors for the current epoch
        # Iterate through all training samples again to calculate the total error
        for idx in range(num_train_samples):
            u = train_rows[idx]
            i = train_cols[idx]
            r_ui = train_data[idx]
            # Calculate predicted rating using the *current* parameters
            predicted_r_ui = calculate_predicted_rating(
                u, i, P, Q, user_bias, item_bias, global_average
            )
            train_sse += (r_ui - predicted_r_ui) ** 2

        train_rmse = np.sqrt(train_sse / num_train_samples)
        test_rmse = calculate_rmse(P, Q, user_bias, item_bias, global_average, test)
        print(
            f"Epoch {epoch+1}/{num_epochs}, Train RMSE: {train_rmse:.4f}, Test RMSE: {test_rmse:.4f}"
        )


print("\nTraining finished.")

Starting training...
Epoch 1/10, Train RMSE: 1.6958, Test RMSE: 1.7181
Epoch 2/10, Train RMSE: 1.6507, Test RMSE: 1.6950
Epoch 3/10, Train RMSE: 1.6188, Test RMSE: 1.6814
Epoch 4/10, Train RMSE: 1.5934, Test RMSE: 1.6728
Epoch 5/10, Train RMSE: 1.5726, Test RMSE: 1.6672


KeyboardInterrupt: 

In [None]:
def calculate_rmse(P, Q, user_bias, item_bias, global_average, test):
    """
    Calculates the Root Mean Squared Error (RMSE) on a sparse matrix.

    Args:
        P (np.ndarray): Trained user latent feature matrix.
        Q (np.ndarray): Trained item latent feature matrix.
        user_bias (np.ndarray): Trained user bias vector.
        item_bias (np.ndarray): Trained item bias vector.
        global_average (float): Calculated global average rating.
        test (matrix): The sparse matrix to evaluate on (e.g., test set).

    Returns:
        float: The calculated RMSE.
    """

    test_rows = test.row
    test_cols = test.col
    test_data = test.data
    num_test_samples = len(test_data)

    if num_test_samples == 0:
        return 0.0  # Avoid division by zero

    sse = 0  # Sum of Squared Errors
    for idx in range(num_test_samples):
        u = test_rows[idx]
        i = test_cols[idx]
        r_ui = test_data[idx]

        # Predict rating
        predicted_r_ui = (
            global_average + user_bias[u] + item_bias[i] + np.dot(P[u, :], Q[:, i])
        )

        sse += (r_ui - predicted_r_ui) ** 2

    rmse = np.sqrt(sse / num_test_samples)
    return rmse


# Calculate RMSE on the test set
test_rmse = calculate_rmse(P, Q, user_bias, item_bias, global_average, test)
print(f"\nTest RMSE: {test_rmse:.4f}")


Test RMSE: 1.8369


In [None]:
user_idx = unique_user_ids[6]
item_idx = 5

try:
    mapped_user_idx = np.where(unique_user_ids == user_idx)[0][0]
    mapped_item_idx = unique_item_ids[item_idx]  # Get original item ID for mapped index
    # Now find the mapped index for this original item ID
    mapped_item_idx_for_prediction = np.where(unique_item_ids == mapped_item_idx)[0][0]

    predicted_rating_example = calculate_predicted_rating(
        mapped_user_idx,
        mapped_item_idx_for_prediction,
        P,
        Q,
        user_bias,
        item_bias,
        global_average,
    )
    print(
        f"\nPredicted rating for original user ID {user_idx} (mapped index {mapped_user_idx}) and original item ID {item_idx} (mapped index {mapped_item_idx_for_prediction}): {predicted_rating_example:.4f}"
    )

except IndexError:
    print(
        "\nCould not find mapped index for example prediction. Ensure original IDs exist in unique IDs."
    )

In [None]:
# Code Cell 6: Recommendation Function
def get_recommendations(
    original_user_id,
    n,
    P,
    Q,
    user_bias,
    item_bias,
    global_average,
    train_matrix,
    unique_user_ids,
    unique_item_ids,
):
    """
    Generates top N recommendations for a given user.

    Args:
        original_user_id: The original ID of the user.
        n (int): The number of recommendations to generate.
        P (np.ndarray): Trained user latent feature matrix (num_users, k).
        Q (np.ndarray): Trained item latent feature matrix (k, num_items).
        user_bias (np.ndarray): Trained user bias vector (num_users,).
        item_bias (np.ndarray): Trained item bias vector (num_items,).
        global_average (float): Calculated global average rating.
        train_matrix (matrix): The sparse training interaction matrix.
        unique_user_ids (np.ndarray): Array mapping mapped user indices to original IDs.
        unique_item_ids (np.ndarray): Array mapping mapped item indices to original IDs.

    Returns:n_matrix
        list: A list of tuples (original_item_id, predicted_rating) for the top N recommendations.
              Returns an empty list if the user ID is not found.
    """
    # Find the mapped index for the original user ID
    try:
        user_idx = np.where(unique_user_ids == original_user_id)[0][0]
    except IndexError:
        print(
            f"Warning: Original user ID {original_user_id} not found in unique user IDs."
        )
        return []  # Return empty list if user not found

    num_items = Q.shape[1]
    all_item_indices = np.arange(num_items)

    # Find items the user has already rated in the training set
    user_row_sparse = train_matrix.getrow(user_idx)
    rated_item_indices = user_row_sparse.indices

    # Find items the user has NOT rated
    unrated_item_indices = np.setdiff1d(all_item_indices, rated_item_indices)

    user_latent_features = P[user_idx, :]  # Shape (k,)
    unrated_item_latent_features = Q[
        :, unrated_item_indices
    ]  # Shape (k, num_unrated_items)

    # Calculate dot products: result is (num_unrated_items,)
    latent_predictions = (
        user_latent_features @ unrated_item_latent_features
    )  # Using @ for matrix multiplication (1xk @ kxN -> 1xN)

    predicted_ratings_for_user = (
        global_average
        + user_bias[user_idx]
        + item_bias[unrated_item_indices]
        + latent_predictions
    )

    # Get the indices of the top N predicted ratings within the unrated_item_indices array
    top_n_indices_in_unrated = np.argsort(predicted_ratings_for_user)[::-1][:n]

    # Get the mapped item indices of the top N recommended items
    recommended_mapped_item_indices = unrated_item_indices[top_n_indices_in_unrated]

    # Get the original item IDs and their predicted ratings
    recommended_items_with_ratings = [
        (unique_item_ids[item_idx], predicted_ratings_for_user[i])
        for i, item_idx in enumerate(recommended_mapped_item_indices)
    ]  # Use index 'i' from top_n_indices_in_unrated to get the correct predicted rating

    return recommended_items_with_ratings

In [None]:
original_user_id_to_recommend = unique_user_ids[
    6
]  # Choose an original user ID that exists in unique_user_ids
top_n_recommendations = 5

recommendations = get_recommendations(
    original_user_id_to_recommend,
    top_n_recommendations,
    P,
    Q,
    user_bias,
    item_bias,
    global_average,
    train,
    unique_user_ids,
    unique_item_ids,
)

if recommendations:
    print(
        f"\nTop {top_n_recommendations} recommendations for original user ID {original_user_id_to_recommend}:"
    )
    for item_id, predicted_rating in recommendations:
        print(f"  Item ID: {item_id}, Predicted Rating: {predicted_rating:.4f}")

In [None]:
original_user_id_to_recommend = (
    12  # Choose an original user ID that exists in unique_user_ids
)
top_n_recommendations = 5

recommendations = get_recommendations(
    original_user_id_to_recommend,
    top_n_recommendations,
    P,
    Q,
    user_bias,
    item_bias,
    global_average,
    train,
    unique_user_ids,
    unique_item_ids,
)

if recommendations:
    print(
        f"\nTop {top_n_recommendations} recommendations for original user ID {original_user_id_to_recommend}:"
    )
    for item_id, predicted_rating in recommendations:
        print(f"  Item ID: {item_id}, Predicted Rating: {predicted_rating:.4f}")

In [None]:
eval_users_idx = [
    8,
    9,
    12,
    14,
    16,
    17,
    19,
    22,
    26,
    32,
    39,
    42,
    44,
    51,
    53,
    56,
    64,
    67,
    69,
    70,
]
eval_top_n_recommendations = 10

eval_recommendations = [
    get_recommendations(
        user_idx,
        eval_top_n_recommendations,
        P,
        Q,
        user_bias,
        item_bias,
        global_average,
        train,
        unique_user_ids,
        unique_item_ids,
    )
    for user_idx in eval_users_idx
]