In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

# Carregamento dos dados

In [None]:
movies = pd.read_csv('./dataset/movies_sample.csv')
ratings = pd.read_csv('./dataset/ratings_sample.csv')
df = ratings[['userId', 'movieId', 'rating']]
df = df.merge(movies[['movieId', 'title']])
df

In [None]:
user_ids = df['userId'].unique()
item_ids = df['movieId'].unique()

map_users = {user: idx for idx, user in enumerate(user_ids)}
map_items = {item: idx for idx, item in enumerate(item_ids)}

df['userId'] = df['userId'].map(map_users)
df['movieId'] = df['movieId'].map(map_items)


original_id_to_title = movies.set_index('movieId')['title'].to_dict()
map_title = {new_idx: original_id_to_title.get(old_id) for old_id, new_idx in map_items.items()}

In [None]:
from sklearn.model_selection import train_test_split
train, test = train_test_split(df, test_size=.2, random_state=2)

# Stats
n_users = len(map_users)
n_items = len(map_items)


print(f"Dados carregados: {len(df)} linhas.")
print(f"Usuários únicos: {n_users}")
print(f"Filmes únicos: {n_items}")

# Implementação dos algoritmos de recomendação
## Mais Populares
Recomenda os itens mais popupares ainda não vistos

In [None]:
pop_counts = train.groupby('movieId').size().reset_index(name='count')
pop_counts = pop_counts.sort_values('count', ascending=False)

top_candidates = pop_counts['movieId'].values[:100]
top_candidates_scores = pop_counts.set_index('movieId')['count'].to_dict()

# Indexar histórico do usuário para verificação rápida 
user_seen_train = train.groupby('userId')['movieId'].apply(set).to_dict()

# Gerar recomendações para usuários únicos do teste
test_users = test['userId'].unique()
N = 10
recs_list = []

print(f"Gerando recomendações para {len(test_users)} usuários do teste...")

for user in test_users:
    seen = user_seen_train.get(user, set())
    user_recs = []
    
    for item in top_candidates:
        if item not in seen:
            user_recs.append(item)
            if len(user_recs) == N:
                break
    
    # Armazenar no formato para análise
    for item in user_recs:
        recs_list.append([user, item, top_candidates_scores[item]])

# Criar DataFrame final de recomendações
df_recs_pop = pd.DataFrame(recs_list, columns=['userId', 'movieId', 'pop_score'])

# Adicionar Títulos para visualização
df_recs_pop['title'] = df_recs_pop['movieId'].map(map_title)

print("Recomendações geradas com sucesso.")
df_recs_pop.head()

## UserKNN
Recomenda baseado na preferência de usuários similares

In [None]:
MAX = 10000

In [None]:
from scipy.sparse import csr_matrix
from sklearn.neighbors import NearestNeighbors

# Criação da matriz esparsa
R_sparse = csr_matrix(
    (train['rating'].values, (train['userId'].values, train['movieId'].values)),
    shape=(n_users, n_items)
)

user_sums = np.array(R_sparse.sum(axis=1)).flatten()
user_counts = np.diff(R_sparse.indptr)
# Evita divisão por zero
user_means = np.zeros(n_users)
mask = user_counts > 0
user_means[mask] = user_sums[mask] / user_counts[mask]

# Criar Matriz Centralizada (R - Média)
R_centered = R_sparse.copy()
# Subtrai a média do usuário de cada uma de suas avaliações não nulas
R_centered.data -= np.repeat(user_means, user_counts)

# Treinar Modelo na Matriz Centralizada
model_knn = NearestNeighbors(metric='cosine', algorithm='brute', n_neighbors=50, n_jobs=-1)
model_knn.fit(R_centered)

In [None]:
# Gerar recomendações
def get_user_knn_recs(model, sparse_matrix, target_users, n_recs=10):
    recs_list = []
    
    # Limitar para os MAX vizinhos
    users_to_predict = target_users[:MAX] 

    # Encontrar vizinhos para o lote de usuários
    # distances = 1 - similaridade (cosseno)
    distances, indices = model.kneighbors(sparse_matrix[users_to_predict])
    
    for i, user_idx in enumerate(users_to_predict):
        neighbor_indices = indices[i]
        neighbor_distances = distances[i]
        
        # Similaridade = 1 - distância
        similarities = 1 - neighbor_distances
        
        # Obter notas dos vizinhos
        # neighbor_ratings shape: (k, n_items)
        neighbor_ratings = sparse_matrix[neighbor_indices].toarray()
        
        #Predição de nota
        # Score = Soma(Nota * Similaridade) / Soma(Similaridades)
        # Ajuste dimensional para multiplicar (k, 1) * (k, n_items)
        sim_col = similarities.reshape(-1, 1)
        
        weighted_sum = np.sum(neighbor_ratings * sim_col, axis=0)
        
        rated_mask = (neighbor_ratings > 0).astype(float)
        sum_of_weights = np.dot(similarities, rated_mask) + 1e-9 # Evitar div/0
        
        predicted_scores = weighted_sum / sum_of_weights
        
        # Filtrar itens já vistos
        user_row = sparse_matrix[user_idx].indices
        predicted_scores[user_row] = -np.inf
        
        # Pegar Top N
        top_indices = np.argpartition(predicted_scores, -n_recs)[-n_recs:]
        top_indices = top_indices[np.argsort(predicted_scores[top_indices])[::-1]]
        
        for item_idx in top_indices:
            score = predicted_scores[item_idx]
            if score > 0 and score != -np.inf: # Garantir que é recomendação válida
                recs_list.append([user_idx, item_idx, score])

    return pd.DataFrame(recs_list, columns=['userId', 'movieId', 'knn_score'])

# Executando
unique_test_users = test['userId'].unique()
df_recs_knn = get_user_knn_recs(model_knn, R_sparse, unique_test_users, n_recs=10)

# Adicionando títulos
df_recs_knn['title'] = df_recs_knn['movieId'].map(map_title)

print("UserKNN finalizado.")
df_recs_knn.head()

## BPR
Recomenda com base de interações implicitas, nesse caso consideraremos avaliações a cima de 3 como positivo e abaixo disso ou nao avaliado como negativo

In [None]:
import implicit
import scipy.sparse as sparse

# Filtrar apenas interações positivas (>= 3) do treino
train_pos = train[train['rating'] >= 3].copy()

# Criar matriz esparsa UserxItem
sparse_user_item = sparse.csr_matrix(
    (np.ones(len(train_pos)), (train_pos['userId'], train_pos['movieId'])), 
    shape=(n_users, n_items)
)

print(f"Matriz esparsa criada: {sparse_user_item.shape} com {sparse_user_item.nnz} interações positivas.")

# Configurar e Treinar o Modelo
model_implicit = implicit.bpr.BayesianPersonalizedRanking(
    factors=64, 
    learning_rate=0.01, 
    regularization=0.01, 
    iterations=50,
    random_state=42
)

# Treinando
model_implicit.fit(sparse_user_item)

# Gerar Recomendações

recs_list_bpr = []
unique_test_users = test['userId'].unique()

users_to_rec = unique_test_users 

ids, scores = model_implicit.recommend(
    users_to_rec, 
    sparse_user_item[users_to_rec], 
    N=10, 
    filter_already_liked_items=True
)

# Formatar a saída para o seu DataFrame padrão
for i, user_id in enumerate(users_to_rec):
    for j in range(len(ids[i])):
        item_id = ids[i][j]
        score = scores[i][j]
        recs_list_bpr.append([user_id, item_id, score])

df_recs_bpr = pd.DataFrame(recs_list_bpr, columns=['userId', 'movieId', 'bpr_score'])
df_recs_bpr['title'] = df_recs_bpr['movieId'].map(map_title)

df_recs_bpr.head()


# Análise dos dados obtidos

## Preparação dos metadados

In [None]:
movies_df = pd.read_csv('./dataset/movies_sample.csv')
movies_df['movieId'] = movies_df['movieId'].map(map_items) # Usar o mesmo ID interno
movies_df = movies_df.dropna(subset=['movieId']) # Remover itens que não entraram no map

# Separar gêneros 
# dict: item_id -> lista de generos
item_genres = {}
for idx, row in movies_df.iterrows():
    if row['genres'] != '(no genres listed)':
        item_genres[int(row['movieId'])] = row['genres'].split('|')
    else:
        item_genres[int(row['movieId'])] = []

In [None]:
def get_catalog_coverage(recs_df, n_items):
    """Calcula a porcentagem de itens únicos recomendados."""
    unique_recs = recs_df['movieId'].nunique()
    coverage = (unique_recs / n_items) * 100
    return coverage

def get_gini_coefficient(recs_df, n_items):
    """
    Calcula o Coeficiente de Gini da distribuição de recomendações.
    Gini = 1: Apenas 1 item é recomendado para todos (Desigualdade Total).
    Gini = 0: Todos os itens são recomendados igualmente (Igualdade Total).
    """
    # Contagem de quantas vezes cada item foi recomendado
    item_counts = recs_df['movieId'].value_counts().reindex(np.arange(n_items), fill_value=0).values
    item_counts = np.sort(item_counts)
    
    # Cálculo do Gini
    n = len(item_counts)
    index = np.arange(1, n + 1)
    gini = ((2 * np.sum(index * item_counts)) / (n * np.sum(item_counts))) - ((n + 1) / n)
    return gini

def calculate_shannon_entropy(genre_list):
    """Calcula a diversidade (entropia) de uma lista de gêneros."""
    if not genre_list:
        return 0.0
    counts = pd.Series(genre_list).value_counts(normalize=True) # Probabilidades
    entropy = -np.sum(counts * np.log2(counts))
    return entropy

def analyze_filter_bubble(recs_df, train_df, target_users, item_genres_dict):
    """
    Compara a entropia do histórico vs. recomendações.
    Retorna a média de entropia do histórico e a média da recomendação.
    """
    user_hist_entropy = []
    user_recs_entropy = []
    
    # Agrupar histórico por usuário para acesso rápido
    train_grouped = train_df[train_df['userId'].isin(target_users)].groupby('userId')['movieId'].apply(list)
    recs_grouped = recs_df.groupby('userId')['movieId'].apply(list)
    
    for user in target_users:
        # Entropia do Histórico
        if user in train_grouped:
            hist_items = train_grouped[user]
            hist_genres = [g for item in hist_items for g in item_genres_dict.get(item, [])]
            user_hist_entropy.append(calculate_shannon_entropy(hist_genres))
        
        # Entropia das Recomendações
        if user in recs_grouped:
            recs_items = recs_grouped[user]
            recs_genres = [g for item in recs_items for g in item_genres_dict.get(item, [])]
            user_recs_entropy.append(calculate_shannon_entropy(recs_genres))
            
    return np.mean(user_hist_entropy), np.mean(user_recs_entropy)

In [None]:
# Lista dos dataframes e nomes
models = [
    ('Most Popular', df_recs_pop),
    ('UserKNN', df_recs_knn),
    ('BPR-MF', df_recs_bpr)
]

# Variáveis para armazenar resultados
results = []
target_users_analysis = df_recs_bpr['userId'].unique() # Usar mesmos usuários para todos

print(f"{'Modelo':<15} | {'Cob. (%)':<10} | {'Gini (Viés)':<12} | {'Ent. Hist':<10} | {'Ent. Recs':<10} | {'Delta Entropia':<15}")
print("-" * 80)

for name, df in models:
    # Cobertura
    cov = get_catalog_coverage(df, n_items)
    
    # Gini
    gini = get_gini_coefficient(df, n_items)
    
    # Bolha de Filtro 
    df_filtered = df[df['userId'].isin(target_users_analysis)]
    ent_hist, ent_recs = analyze_filter_bubble(df_filtered, train, target_users_analysis, item_genres)
    
    # Delta: Se negativo, reduziu a diversidade (Bolha). Se positivo, expandiu.
    delta = ent_recs - ent_hist
    
    results.append({
        'Modelo': name, 'Cobertura': cov, 'Gini': gini, 
        'Entropia_Hist': ent_hist, 'Entropia_Recs': ent_recs
    })
    
    print(f"{name:<15} | {cov:>9.2f}% | {gini:>11.4f}  | {ent_hist:>9.2f}  | {ent_recs:>9.2f}  | {delta:>14.2f}")

In [None]:
def plot_long_tail_density_linear(models_data, train_df, n_items):
    # 1. Definir Ranking de Popularidade Real (Treino)
    item_popularity = train_df['movieId'].value_counts().reindex(np.arange(n_items), fill_value=0)
    sorted_items = item_popularity.sort_values(ascending=False).index
    
    plt.figure(figsize=(14, 7))
    
    colors = {'Most Popular': '#d62728', 'UserKNN': '#1f77b4', 'BPR-MF': '#2ca02c'}
    
    # Janelas de suavização
    smoothing_windows = {'Most Popular': 20, 'UserKNN': 200, 'BPR-MF': 200}
    
    for name, df in models_data:
        # Contagem bruta
        rec_counts = df['movieId'].value_counts().reindex(sorted_items, fill_value=0)

        # Normalização: densidade
        rec_density = rec_counts / rec_counts.sum()

        # Suavização
        window = smoothing_windows.get(name, 100)
        smooth_curve = rec_density.rolling(window=window, min_periods=1, center=True).mean()
        
        x_axis = np.arange(len(smooth_curve))
        
        # Plot
        plt.plot(x_axis, smooth_curve, label=name, color=colors[name], linewidth=2.5)

        # Sombreamento
        plt.fill_between(x_axis, 0, smooth_curve, color=colors[name], alpha=0.1)

        # Marcador do ponto onde o modelo chega a 99.9% da massa
        threshold = 0.999
        cumsum_curve = smooth_curve.cumsum()
        last_idx = np.argmax(cumsum_curve >= threshold)


    plt.title('Fenômeno da Cauda Longa nas recomendações', fontsize=14)
    plt.xlabel('Itens', fontsize=12)
    plt.ylabel('Popularidade', fontsize=12)

    plt.xlim(0, n_items)
    plt.ylim(bottom=0, top=max(smooth_curve.max() for _, df in models_data) * 1.1)

    plt.legend(fontsize=12, loc='upper right')
    plt.grid(True, ls="--", alpha=0.3)
    plt.tight_layout()
    plt.show()

real_n_items = len(map_items)
plot_long_tail_density_linear(models, train, real_n_items)


In [None]:
labels = ["Most Popular", "UserKNN", "BPR-MF"]
values = [11.99, 77.46, 36.45]
colors = ["red", "blue", "green"]

plt.figure(figsize=(8, 5))
plt.bar(labels, values, color=colors)

plt.title("Catalog Coverage por Modelo")
plt.ylabel("Catalog Coverage (%)")
plt.xlabel("Modelos")

plt.tight_layout()
plt.show()
