In [1]:
import pandas as pd
from scipy.sparse import csr_matrix
from sklearn.neighbors import NearestNeighbors
from sklearn.metrics import mean_squared_error
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.model_selection import train_test_split

In [2]:
data_used = 0.01
test_size = .3

# Load the datasets
ratings = pd.read_csv('dataset/ratings.csv')
movies = pd.read_csv('dataset/movies.csv')

# Create a sparse matrix
row = ratings['movieId'].astype('category').cat.codes
col = ratings['userId'].astype('category').cat.codes
data = ratings['rating']
movie_user_sparse = csr_matrix((data, (row, col)))

# Map back indices to movie IDs and user IDs for later lookup
movie_id_map = dict(enumerate(ratings['movieId'].astype('category').cat.categories))
user_id_map = dict(enumerate(ratings['userId'].astype('category').cat.categories))

# Sample 10% of the movies
num_movies = movie_user_sparse.shape[0]
sampled_indices = np.random.choice(num_movies, size=int(data_used * num_movies), replace=False)
sampled_sparse_matrix = movie_user_sparse[sampled_indices]

# Split the sampled data into 70% training and 30% testing
train_data, test_data = train_test_split(sampled_sparse_matrix, test_size=test_size, random_state=42)

In [None]:
nearestNeighbors = 3

# Train a NearestNeighbors model on the training data
knn = NearestNeighbors(metric='cosine', algorithm='auto', n_neighbors=nearestNeighbors, n_jobs=-1)
knn.fit(train_data)

# Define a prediction function
def predict_rating(movie_idx, user_idx, train_sparse_matrix, test_sparse_matrix):
    distances, indices = knn.kneighbors(test_sparse_matrix[movie_idx].reshape(1, -1), n_neighbors=nearestNeighbors)
    # Map indices from train to test dataset
    neighbor_ratings = train_sparse_matrix[indices.flatten(), user_idx].toarray().flatten()

    # Calculate the weighted average of neighbors' ratings
    weights = 1 - distances.flatten()
    weighted_sum = np.dot(weights, neighbor_ratings)
    weight_sum = np.sum(weights)
    return weighted_sum / weight_sum if weight_sum > 0 else np.nan

In [4]:
# Evaluate the model on the test set
test_ratings = []
predicted_ratings = []

for movie_idx in range(test_data.shape[0]):
    user_ratings = test_data[movie_idx].toarray().flatten()
    for user_idx, actual_rating in enumerate(user_ratings):
        if actual_rating > 0:  # Only predict for non-zero ratings
            predicted_rating = predict_rating(movie_idx, user_idx, train_data, test_data)
            if predicted_rating is not None:
                test_ratings.append(actual_rating)
                predicted_ratings.append(predicted_rating)

# Calculate the loss (Mean Squared Error)
mse = mean_squared_error(test_ratings, predicted_ratings)
print(f"Mean Squared Error on the test set: {mse}")

KeyboardInterrupt: 