In [None]:
import pandas as pd
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.metrics import mean_absolute_error, mean_squared_error, roc_auc_score, root_mean_squared_error
import numpy as np
from mlxtend.preprocessing import TransactionEncoder
from mlxtend.frequent_patterns import apriori, association_rules


In [None]:
# Caricamento dati 
url = "../datasets/ml-100k/u.data"
columns = ['user_id', 'movie_id', 'rating', 'timestamp']
df = pd.read_csv(url, sep='\t', names=columns)
df.drop('timestamp', axis=1, inplace=True)

In [None]:
# Creazione matrice utente-film
ratings_matrix = df.pivot_table(index='user_id', columns='movie_id', values='rating')
ratings_matrix.fillna(0, inplace=True)

train_data, test_data = train_test_split(df, test_size=0.2, random_state=42)
train_matrix = train_data.pivot_table(
    index='user_id',
    columns='movie_id',
    values='rating'
).fillna(0)

In [None]:
# Calcolo similarità tra utenti
user_similarity = cosine_similarity(train_matrix)
user_similarity_df = pd.DataFrame(user_similarity, index=train_matrix.index, columns=train_matrix.index)

# Funzione per raccomandare film a un utente basandosi sugli utenti simili
def recommend_movies(user_id, num_recommendations=5):
    similar_users = user_similarity_df[user_id].sort_values(ascending=False)[1:]  # escludi se stesso
    weighted_ratings = np.zeros(train_matrix.shape[1])
    
    for other_user, similarity in similar_users.items():
        weighted_ratings += similarity * train_matrix.loc[other_user].values
    
    user_rated = train_matrix.loc[user_id].values > 0
    weighted_ratings[user_rated] = 0  # Escludi quelli già visti
    recommended_indices = np.argsort(weighted_ratings)[::-1][:num_recommendations]
    
    return train_matrix.columns[recommended_indices]

In [None]:
# Visualizzazione film raccomandati
rec = recommend_movies(7)

movies = pd.read_csv("../datasets/ml-100k/u.item", sep="|", encoding="latin-1", 
                     names=["movie_id", "title", "release_date", "video_release_date", "IMDb_URL", "unknown", "Action", "Adventure", "Animation", "Children's", "Comedy", "Crime", "Documentary", "Drama", "Fantasy", "Film-Noir", "Horror", "Musical", "Mystery", "Romance", "Sci-Fi", "Thriller", "War", "Western"])
film = []
for r in rec:
    film.append(movies.loc[r, "title"])
film

# Metriche di Valutazione su un sottoinsieme del test split

In [None]:
# Costruzione predizioni binarie per valutazione
threshold = 3 

def predict_rating(user_id, movie_id):
    if movie_id not in train_matrix.columns:
        return 0
    similar_users = user_similarity_df[user_id].sort_values(ascending=False)[1:]
    
    num = 0
    den = 0
    for other_user, similarity in similar_users.items():
        rating = train_matrix.loc[other_user, movie_id]
        if rating > 0:
            num += similarity * rating
            den += similarity
    if den == 0:
        return 0
    return num / den

In [None]:
def precision_at_k(ranked_list, ground_truth, k):
    # Precision@k: fra i primi k, quanti sono rilevanti?
    top_k = ranked_list[:k]
    return len(set(top_k) & set(ground_truth)) / k

def recall_at_k(ranked_list, ground_truth, k):
    # Recall@k: quale frazione del ground_truth è nei primi k?
    top_k = ranked_list[:k]
    return len(set(top_k) & set(ground_truth)) / len(ground_truth)

def ndcg_at_k(ranked_list, ground_truth, k):
    # NDCG@k con labels binarie (1=relevant,0=non).
    dcg = 0.0
    idcg = sum(1.0 / np.log2(i+2) for i in range(min(len(ground_truth), k)))
    for i, item in enumerate(ranked_list[:k]):
        if item in ground_truth:
            dcg += 1.0 / np.log2(i+2)
    return dcg / idcg if idcg > 0 else 0.0

In [None]:
threshold = 3.5
K = 10   

# Sampling test set
test_sample = test_data.sample(1000, random_state=1)
y_true_cont = []
y_pred_cont = []
y_true_bin = []
y_pred_bin = []

for _, row in test_sample.iterrows():
    u = row['user_id']
    m = row['movie_id']
    true_r = row['rating']
    pred_r = predict_rating(u, m)
    
    y_true_cont.append(true_r)
    y_pred_cont.append(pred_r)
    
    y_true_bin.append(int(true_r >= threshold))
    y_pred_bin.append(int(pred_r >= threshold))

In [None]:
# Rating prediction metrics
mae  = mean_absolute_error(y_true_cont, y_pred_cont)
rmse = np.sqrt(mean_squared_error(y_true_cont, y_pred_cont))

print(f"MAE:  {mae:.4f}")
print(f"RMSE: {rmse:.4f}")

In [None]:
# Binarized metrics
accuracy = accuracy_score(y_true_bin, y_pred_bin)
precision = precision_score(y_true_bin, y_pred_bin, zero_division=0)
recall = recall_score(y_true_bin, y_pred_bin, zero_division=0)
roc_auc = roc_auc_score(y_true_bin, y_pred_cont) 

print(f"Accuracy:  {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall:    {recall:.4f}")
print(f"ROC‑AUC:   {roc_auc:.4f}")

In [None]:
# Top‑N ranking metrics per utente
from collections import defaultdict

# raccogliamo vere interazioni positive nel test
true_items = defaultdict(list)
for _, row in test_sample.iterrows():
    if row['rating'] >= threshold:
        true_items[row['user_id']].append(row['movie_id'])

# raccogliamo predizioni e ordiniamo
pred_scores = defaultdict(list)
for u in test_sample['user_id'].unique():
    for m in test_sample['movie_id'].unique():
        pred_scores[u].append((m, predict_rating(u, m)))
    pred_scores[u].sort(key=lambda x: x[1], reverse=True)

# calcolo metriche aggregando per utente
prec_k = []
rec_k  = []
ndcg_k = []

for u, ranked in pred_scores.items():
    ranked_list = [m for m, _ in ranked]
    gt = true_items.get(u, [])
    if not gt:
        continue
    prec_k.append(precision_at_k(ranked_list, gt, K))
    rec_k.append(recall_at_k(ranked_list, gt, K))
    ndcg_k.append(ndcg_at_k(ranked_list, gt, K))

print(f"Precision@{K}: {np.mean(prec_k):.4f}")
print(f"Recall@{K}:    {np.mean(rec_k):.4f}")
print(f"NDCG@{K}:      {np.mean(ndcg_k):.4f}")

# Metriche di valutazione su tutto il test split

In [None]:
# Top‑N ranking metrics per utente - Su tutto il Test-split

from collections import defaultdict

# raccogliamo vere interazioni positive nel test
true_items = defaultdict(list)
for _, row in test_data.iterrows():
    if row['rating'] >= threshold:
        true_items[row['user_id']].append(row['movie_id'])

# raccogliamo predizioni e ordiniamo
pred_scores = defaultdict(list)
for u in test_data['user_id'].unique():
    # considera tutti i movie_id o un sottoinsieme già visto
    for m in test_data['movie_id'].unique():
        pred_scores[u].append((m, predict_rating(u, m)))
    # ordino decrescente
    pred_scores[u].sort(key=lambda x: x[1], reverse=True)

# calcolo metriche aggregando per utente
prec_k = []
rec_k  = []
ndcg_k = []

for u, ranked in pred_scores.items():
    ranked_list = [m for m, _ in ranked]
    gt = true_items.get(u, [])
    if not gt:
        continue
    prec_k.append(precision_at_k(ranked_list, gt, K))
    rec_k.append(recall_at_k(ranked_list, gt, K))
    ndcg_k.append(ndcg_at_k(ranked_list, gt, K))

print(f"Precision@{K}: {np.mean(prec_k):.4f}")
print(f"Recall@{K}:    {np.mean(rec_k):.4f}")
print(f"NDCG@{K}:      {np.mean(ndcg_k):.4f}")

In [None]:
# MAE E RMSE Su tutto il test split

y_true = test_data['rating'].values
y_pred = test_data.apply(lambda row: predict_rating(row['user_id'], row['movie_id']), axis=1).values

mae = mean_absolute_error(y_true, y_pred)
rmse = np.sqrt(mean_squared_error(y_true, y_pred))
print(f"MAE: {mae:.4f}")
print(f"RMSE: {rmse:.4f}")

In [None]:
# Valutazioni INIZIALI
y_true_binary = []
y_pred_binary = []
y_true_continuous = []
y_pred_continuous = []

test_sample = test_data.sample(10000, random_state=1)

for _, row in test_sample.iterrows():
    true_rating = row['rating']
    pred_rating = predict_rating(row['user_id'], row['movie_id'])
    
    y_true_continuous.append(true_rating)
    y_pred_continuous.append(pred_rating)
    
    actual = 1 if true_rating >= threshold else 0
    predicted = 1 if pred_rating >= threshold else 0
    
    y_true_binary.append(actual)
    y_pred_binary.append(predicted)

# calcolo Accuracy, Precision, Recall
accuracy = accuracy_score(y_true_binary, y_pred_binary)
precision = precision_score(y_true_binary, y_pred_binary, zero_division=0)
recall = recall_score(y_true_binary, y_pred_binary, zero_division=0)

# calcolo MAE e RMSE
mae = mean_absolute_error(y_true_continuous, y_pred_continuous)
rmse = root_mean_squared_error(y_true_continuous, y_pred_continuous))

print(f"Accuracy:  {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall:    {recall:.4f}")
print(f"MAE:       {mae:.4f}")
print(f"RMSE:      {rmse:.4f}")