In [1]:
import pandas as pd
from google.colab import drive

drive.mount('/content/drive', force_remount=True)

file_path = '/content/drive/MyDrive/spotusers.csv'
spotusers = pd.read_csv(file_path)

print(spotusers.head())

Mounted at /content/drive
   663821     s948047
0  663821   s34945401
1  663821   s84097505
2  663821   s79213851
3  663821   s87544655
4  663821  s125969381


In [2]:
pip install annoy

Collecting annoy
  Downloading annoy-1.17.3.tar.gz (647 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/647.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m645.1/647.5 kB[0m [31m48.0 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m647.5/647.5 kB[0m [31m13.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: annoy
  Building wheel for annoy (setup.py) ... [?25l[?25hdone
  Created wheel for annoy: filename=annoy-1.17.3-cp311-cp311-linux_x86_64.whl size=553319 sha256=dfbc9662b0b96d206fa222b96fd45522bb02752738e54de215919283df37d3e3
  Stored in directory: /root/.cache/pip/wheels/33/e5/58/0a3e34b92bedf09b4c57e37a63ff395ade6f6c1099ba59877c
Successfully built annoy
Installing collected packages: annoy
Successfully installed annoy-1.17.3


In [3]:
from annoy import AnnoyIndex

In [4]:

spotusers.rename(columns={
    '663821': 'user_id',
    's948047': 'movie_id'
}, inplace=True)


spotusers_cleaned = spotusers.drop_duplicates()

unique_users = spotusers_cleaned['user_id'].nunique()
unique_movies = spotusers_cleaned['movie_id'].nunique()
avg_movies_per_user = spotusers_cleaned.groupby('user_id')['movie_id'].count().mean()


print("Number of unique users:", unique_users)
print("Number of unique movies:", unique_movies)
print("Average number of movies per user:", avg_movies_per_user)

Number of unique users: 9192
Number of unique movies: 335218
Average number of movies per user: 100.0001087902524


# Implement a Custom Recommendation Algorithm

In [5]:
import numpy as np
from scipy.sparse import coo_matrix, csr_matrix


def create_sparse_matrix(data):
    row = data['user_id'].astype('category').cat.codes
    col = data['movie_id'].astype('category').cat.codes
    values = np.ones(len(data))
    sparse_matrix = coo_matrix((values, (row, col))).tocsr()


    user_id_mapping = dict(enumerate(data['user_id'].astype('category').cat.categories))
    movie_id_mapping = dict(enumerate(data['movie_id'].astype('category').cat.categories))

    return sparse_matrix, user_id_mapping, movie_id_mapping


def calculate_cosine_similarity_sparse(matrix):
    """Compute cosine similarity for a sparse matrix."""
    row_norms = np.sqrt(matrix.multiply(matrix).sum(axis=1))
    row_norms[row_norms == 0] = 1  # Avoid division by zero
    normalized_matrix = matrix.multiply(1 / row_norms)
    similarity_matrix = normalized_matrix @ normalized_matrix.T
    return similarity_matrix


def get_top_k_similar_users(user_id, user_id_mapping, user_similarity, k=5):
    user_index = list(user_id_mapping.keys())[list(user_id_mapping.values()).index(user_id)]
    similarities = user_similarity[user_index].toarray().flatten()
    similar_user_indices = np.argsort(-similarities)[1:k+1]
    return [user_id_mapping[i] for i in similar_user_indices]

def recommend_movies(user_id, user_item_matrix, user_id_mapping, movie_id_mapping, user_similarity, k=5, limit=10):
    similar_users = get_top_k_similar_users(user_id, user_id_mapping, user_similarity, k)
    target_user_index = list(user_id_mapping.keys())[list(user_id_mapping.values()).index(user_id)]
    watched_movies = set(user_item_matrix[target_user_index].indices)
    movie_counts = {}
    for similar_user in similar_users:
        similar_user_index = list(user_id_mapping.keys())[list(user_id_mapping.values()).index(similar_user)]
        similar_watched = set(user_item_matrix[similar_user_index].indices)
        for movie_idx in similar_watched - watched_movies:
            movie_counts[movie_idx] = movie_counts.get(movie_idx, 0) + 1

    sorted_recommendations = sorted(movie_counts, key=movie_counts.get, reverse=True)
    return [movie_id_mapping[movie_idx] for movie_idx in sorted_recommendations[:limit]]



user_item_matrix_sparse, user_id_mapping, movie_id_mapping = create_sparse_matrix(spotusers_cleaned)


user_similarity_sparse = calculate_cosine_similarity_sparse(user_item_matrix_sparse)


example_user_id = list(user_id_mapping.values())[0]
recommendations = recommend_movies(
    user_id=example_user_id,
    user_item_matrix=user_item_matrix_sparse,
    user_id_mapping=user_id_mapping,
    movie_id_mapping=movie_id_mapping,
    user_similarity=user_similarity_sparse,
    k=5,
    limit=10
)

print(f"Top {len(recommendations)} recommendations for user {example_user_id}:")
for idx, movie_id in enumerate(recommendations, 1):
    print(f"{idx}. {movie_id}")


Top 10 recommendations for user 78443:
1. s119437608
2. s119302924
3. s124996692
4. s102035745
5. s140883491
6. s129011934
7. s133940962
8. s112662368
9. s132193346
10. s98169664


# Knn-algorithm from library

In [6]:
from sklearn.neighbors import NearestNeighbors


user_item_matrix_sparse, user_id_mapping, movie_id_mapping = create_sparse_matrix(spotusers_cleaned)


model = NearestNeighbors(metric='cosine', algorithm='brute')
model.fit(user_item_matrix_sparse)


def get_top_k_similar_users_nn(user_id, model, user_id_mapping, k=5):
    user_index = list(user_id_mapping.keys())[list(user_id_mapping.values()).index(user_id)]  # Map user_id to row index
    distances, indices = model.kneighbors(user_item_matrix_sparse[user_index], n_neighbors=k+1)  # +1 to exclude self
    similar_user_indices = indices.flatten()[1:]  # Exclude the user itself
    return [user_id_mapping[i] for i in similar_user_indices]


def recommend_movies_nn(user_id, model, user_item_matrix, user_id_mapping, movie_id_mapping, k=5, limit=10):
    similar_users = get_top_k_similar_users_nn(user_id, model, user_id_mapping, k)
    target_user_index = list(user_id_mapping.keys())[list(user_id_mapping.values()).index(user_id)]
    watched_movies = set(user_item_matrix[target_user_index].indices)
    movie_counts = {}
    for similar_user in similar_users:
        similar_user_index = list(user_id_mapping.keys())[list(user_id_mapping.values()).index(similar_user)]
        similar_watched = set(user_item_matrix[similar_user_index].indices)
        for movie_idx in similar_watched - watched_movies:
            movie_counts[movie_idx] = movie_counts.get(movie_idx, 0) + 1

    sorted_recommendations = sorted(movie_counts, key=movie_counts.get, reverse=True)
    return [movie_id_mapping[movie_idx] for movie_idx in sorted_recommendations[:limit]]


example_user_id = list(user_id_mapping.values())[0]
recommendations_nn = recommend_movies_nn(
    user_id=example_user_id,
    model=model,
    user_item_matrix=user_item_matrix_sparse,
    user_id_mapping=user_id_mapping,
    movie_id_mapping=movie_id_mapping,
    k=5,
    limit=10
)

print(f"Top {len(recommendations_nn)} recommendations for user {example_user_id}:")
for idx, movie_id in enumerate(recommendations_nn, 1):
    print(f"{idx}. {movie_id}")



Top 10 recommendations for user 78443:
1. s124996690
2. s119437608
3. s115868048
4. s124996692
5. s140883491
6. s129011934
7. s133940962
8. s98169664
9. s132193346
10. s142706538


# Time-Comparison

In [7]:
import time
from sklearn.metrics import precision_score, recall_score


ground_truth_movies = {'s124996692', 's129011934', 's98169664'}


def calculate_metrics(recommended_movies, ground_truth):
    recommended_set = set(recommended_movies)
    precision = len(recommended_set & ground_truth) / len(recommended_set) if recommended_set else 0
    recall = len(recommended_set & ground_truth) / len(ground_truth) if ground_truth else 0
    return precision, recall


start_time = time.time()
recommendations_custom = recommend_movies(
    user_id=example_user_id,
    user_item_matrix=user_item_matrix_sparse,
    user_id_mapping=user_id_mapping,
    movie_id_mapping=movie_id_mapping,
    user_similarity=user_similarity_sparse,
    k=5,
    limit=10
)
custom_time = time.time() - start_time
custom_precision, custom_recall = calculate_metrics(recommendations_custom, ground_truth_movies)


start_time = time.time()
recommendations_nn = recommend_movies_nn(
    user_id=example_user_id,
    model=model,
    user_item_matrix=user_item_matrix_sparse,
    user_id_mapping=user_id_mapping,
    movie_id_mapping=movie_id_mapping,
    k=5,
    limit=10
)
nn_time = time.time() - start_time
nn_precision, nn_recall = calculate_metrics(recommendations_nn, ground_truth_movies)

print("Custom Algorithm:")
print(f"Execution Time: {custom_time:.4f} seconds")
print(f"Precision@10: {custom_precision:.2f}")
print(f"Recall@10: {custom_recall:.2f}\n")

print("NearestNeighbors Algorithm:")
print(f"Execution Time: {nn_time:.4f} seconds")
print(f"Precision@10: {nn_precision:.2f}")
print(f"Recall@10: {nn_recall:.2f}")

Custom Algorithm:
Execution Time: 0.0075 seconds
Precision@10: 0.30
Recall@10: 1.00

NearestNeighbors Algorithm:
Execution Time: 0.1893 seconds
Precision@10: 0.30
Recall@10: 1.00


# Interface creation

In [None]:
# Terminal Interface
def terminal_interface():
    print("Welcome to the Recommendation System!")
    while True:
        user_id = input("Enter your user ID (or type 'exit' to quit): ")

        if user_id.lower() == 'exit':
            print("Goodbye!")
            break

        try:
            user_id = int(user_id)
            if user_id not in user_id_mapping.values():
                raise ValueError("User ID not found in the dataset.")


            recommendations = recommend_movies(
                user_id=user_id,
                user_item_matrix=user_item_matrix_sparse,
                user_id_mapping=user_id_mapping,
                movie_id_mapping=movie_id_mapping,
                user_similarity=user_similarity_sparse,
                k=5,
                limit=10
            )

            print(f"\nTop {len(recommendations)} recommendations for user {user_id}:")
            for idx, movie_id in enumerate(recommendations, 1):
                print(f"{idx}. {movie_id}")
        except ValueError as e:
            print(f"Error: {e}")
        except Exception as e:
            print(f"An unexpected error occurred: {e}")

user_item_matrix_sparse, user_id_mapping, movie_id_mapping = create_sparse_matrix(spotusers_cleaned)
user_similarity_sparse = calculate_cosine_similarity_sparse(user_item_matrix_sparse)

# Run the terminal interface
terminal_interface()

Welcome to the Recommendation System!


# **Annoy Algorithm**

In [None]:
import numpy as np
from annoy import AnnoyIndex

user_item_matrix_sparse, user_id_mapping, movie_id_mapping = create_sparse_matrix(spotusers_cleaned)

def build_annoy_index(user_item_matrix, n_trees=2):
    """
    Construire un index Annoy à partir de la matrice utilisateur-item.
    :param user_item_matrix: Matrice utilisateur-item au format CSR.
    :param n_trees: Nombre d'arbres à construire dans Annoy.
    :return: AnnoyIndex, mapping utilisateur.
    """
    num_users, num_items = user_item_matrix.shape
    annoy_index = AnnoyIndex(num_items, 'angular')  # Distance angulaire pour les vecteurs

    for user_index in range(num_users):
        vector = user_item_matrix[user_index].toarray().flatten()
        annoy_index.add_item(user_index, vector)

    annoy_index.build(n_trees)
    return annoy_index


def find_similar_users_annoy(user_id, user_id_mapping, annoy_index, k=5):
    """
    Trouver les utilisateurs les plus similaires avec Annoy.
    :param user_id: Identifiant de l'utilisateur cible.
    :param user_id_mapping: Mapping des IDs d'utilisateurs.
    :param annoy_index: Index Annoy.
    :param k: Nombre de voisins à retourner.
    :return: Liste des IDs d'utilisateurs similaires.
    """
    user_index = list(user_id_mapping.keys())[list(user_id_mapping.values()).index(user_id)]
    similar_user_indices = annoy_index.get_nns_by_item(user_index, k+1)[1:]  # Exclure l'utilisateur lui-même
    return [user_id_mapping[i] for i in similar_user_indices]


def recommend_movies_annoy(user_id, user_item_matrix, user_id_mapping, movie_id_mapping, annoy_index, k=5, limit=10):
    """
    Recommander des films pour un utilisateur basé sur Annoy.
    :param user_id: ID de l'utilisateur cible.
    :param user_item_matrix: Matrice utilisateur-item.
    :param user_id_mapping: Mapping des IDs d'utilisateurs.
    :param movie_id_mapping: Mapping des IDs de films.
    :param annoy_index: Index Annoy.
    :param k: Nombre d'utilisateurs similaires.
    :param limit: Nombre maximum de films à recommander.
    :return: Liste des IDs de films recommandés.
    """
    similar_users = find_similar_users_annoy(user_id, user_id_mapping, annoy_index, k)
    target_user_index = list(user_id_mapping.keys())[list(user_id_mapping.values()).index(user_id)]
    watched_movies = set(user_item_matrix[target_user_index].indices)
    movie_counts = {}
    for similar_user in similar_users:
        similar_user_index = list(user_id_mapping.keys())[list(user_id_mapping.values()).index(similar_user)]
        similar_watched = set(user_item_matrix[similar_user_index].indices)
        for movie_idx in similar_watched - watched_movies:
            movie_counts[movie_idx] = movie_counts.get(movie_idx, 0) + 1

    sorted_recommendations = sorted(movie_counts, key=movie_counts.get, reverse=True)
    return [movie_id_mapping[movie_idx] for movie_idx in sorted_recommendations[:limit]]


annoy_index = build_annoy_index(user_item_matrix_sparse)


example_user_id = list(user_id_mapping.values())[0]
annoy_recommendations = recommend_movies_annoy(
    user_id=example_user_id,
    user_item_matrix=user_item_matrix_sparse,
    user_id_mapping=user_id_mapping,
    movie_id_mapping=movie_id_mapping,
    annoy_index=annoy_index,
    k=5,
    limit=10
)

print(f"Top {len(annoy_recommendations)} recommandations basées sur Annoy pour l'utilisateur {example_user_id}:")
for idx, movie_id in enumerate(annoy_recommendations, 1):
    print(f"{idx}. {movie_id}")



In [None]:
import numpy as np
from scipy.sparse import csr_matrix


data = [
    (0, 0),
    (0, 1),
    (1, 0),
    (1, 2),
    (2, 1),
    (2, 2),
]


num_users = 3
num_items = 3


rows, cols = zip(*data)
values = np.ones(len(data))


user_item_matrix_sparse = csr_matrix((values, (rows, cols)), shape=(num_users, num_items))


user_id_mapping = {i: f"user_{i}" for i in range(num_users)}
movie_id_mapping = {i: f"movie_{i}" for i in range(num_items)}


annoy_index = build_annoy_index(user_item_matrix_sparse)

example_user_id = list(user_id_mapping.values())[0]
annoy_recommendations = recommend_movies_annoy(
    user_id=example_user_id,
    user_item_matrix=user_item_matrix_sparse,
    user_id_mapping=user_id_mapping,
    movie_id_mapping=movie_id_mapping,
    annoy_index=annoy_index,
    k=2,
    limit=2
)

print(f"Top {len(annoy_recommendations)} recommandations basées sur Annoy pour l'utilisateur {example_user_id}:")
for idx, movie_id in enumerate(annoy_recommendations, 1):
    print(f"{idx}. {movie_id}")
