In [108]:
%matplotlib inline


In [109]:
import json
import random
import re
from collections import Counter
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, List, Optional

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from annoy import AnnoyIndex
from sklearn.cluster import KMeans
from sklearn.decomposition import TruncatedSVD
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import (
    balanced_accuracy_score,
    confusion_matrix,
    f1_score,
    precision_score,
    recall_score,
    silhouette_score,
)
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler

try:
    from sentence_transformers import SentenceTransformer
    HAS_SENTENCE_TRANSFORMERS = True
except ImportError:
    HAS_SENTENCE_TRANSFORMERS = False
USE_SENTENCE_TRANSFORMERS = False  # Defina True para habilitar embeddings BERT (mais lento)

DATA_DIR = Path("data")
ARTIFACT_DIR = Path("poc_artifacts")
ARTIFACT_DIR.mkdir(parents=True, exist_ok=True)

RNG_SEED = 42
np.random.seed(RNG_SEED)
random.seed(RNG_SEED)

# Ajuste opcional para acelerar testes; use None para todo o dataset
SAMPLE_LIMIT = None

# Número de vizinhos utilizado na votação do Annoy
MODEL_SAMPLE_SIZE = 12000
ANNOY_TOP_K = 15


In [110]:
def normalize_name(value: str) -> str:
    if not isinstance(value, str):
        return ""
    name = value.lower()
    name = re.sub(r"\(.*?\)|\[.*?]", "", name)
    name = re.sub(r"[^a-z0-9\s]", " ", name)
    return re.sub(r"\s+", " ", name).strip()


def load_raw_sources(data_dir: Path) -> tuple[pd.DataFrame, pd.DataFrame]:
    steam_path = data_dir / "steam-games-complete-dataset.csv"
    amazon_path = data_dir / "meta_Video_Games.json"
    if not steam_path.exists():
        raise FileNotFoundError(f"Steam dataset not found: {steam_path}")
    if not amazon_path.exists():
        raise FileNotFoundError(f"Amazon dataset not found: {amazon_path}")
    steam_df = pd.read_csv(steam_path)
    amazon_df = pd.read_json(amazon_path, lines=True, convert_dates=False)
    return steam_df, amazon_df


def join_sources(steam_df: pd.DataFrame, amazon_df: pd.DataFrame) -> pd.DataFrame:
    steam = steam_df.copy()
    amazon = amazon_df.copy()

    steam["normalized_name"] = steam["name"].apply(normalize_name)
    amazon["normalized_name"] = amazon["title"].apply(normalize_name)

    for col in ["name", "url", "genre", "popular_tags", "reviews", "image_url"]:
        if col not in steam.columns:
            steam[col] = "Unknown"

    steam = steam[[
        "normalized_name",
        "name",
        "url",
        "genre",
        "popular_tags",
        "reviews",
        "image_url",
    ]].dropna(subset=["normalized_name"])
    steam = steam[steam["normalized_name"] != ""]

    amazon = amazon[["normalized_name", "title", "category"]].dropna(subset=["normalized_name"])
    amazon = amazon[amazon["normalized_name"] != ""]

    merged = pd.merge(steam, amazon, on="normalized_name", how="outer", suffixes=("_steam", "_amazon"))
    merged["title"] = merged["title"].fillna(merged["name"]).fillna("Unknown")
    merged["name"] = merged["name"].fillna("Unknown")
    merged["genre"] = merged["genre"].fillna("Unknown")
    merged["popular_tags"] = merged["popular_tags"].fillna("[]")
    merged["category"] = merged["category"].fillna("[]")
    merged["url"] = merged["url"].fillna("Unknown")
    merged["reviews"] = merged["reviews"].fillna("Unknown")
    merged["image_url"] = merged["image_url"].fillna("Unknown")
    merged["tags"] = merged["popular_tags"]
    merged["categoria"] = merged["category"]
    merged = merged.drop_duplicates(subset=["normalized_name"]).reset_index(drop=True)
    return merged


GENRE_MAPPING: Dict[str, List[str]] = {
    "Action": ["Action", "Shooter", "Fighting", "Beat em up"],
    "Adventure": ["Adventure", "Point & Click"],
    "RPG": ["RPG", "JRPG", "CRPG", "Roguelike"],
    "Strategy": ["Strategy", "RTS", "Turn-Based Strategy", "Grand Strategy"],
    "Simulation": ["Simulation", "City Builder", "Management"],
    "Sports": ["Sports", "Racing", "Football", "Basketball"],
    "Indie": ["Indie", "Casual"],
}


def clean_genre(genre_str: str) -> str:
    if not isinstance(genre_str, str) or genre_str == "Unknown":
        return "Other"
    for main_genre, variants in GENRE_MAPPING.items():
        for variant in variants:
            if variant.lower() in genre_str.lower():
                return main_genre
    return "Other"


def parse_json_field_improved(field) -> List[str]:
    if field is None or (isinstance(field, float) and np.isnan(field)):
        return []
    if isinstance(field, list):
        data = field
    elif isinstance(field, str):
        if field.strip().lower() == "unknown":
            return []
        try:
            loaded = json.loads(field)
            data = loaded if isinstance(loaded, list) else [loaded]
        except json.JSONDecodeError:
            data = [field]
    else:
        data = [field]

    cleaned: List[str] = []
    for item in data:
        if not isinstance(item, str):
            item = str(item)
        stripped = item.strip()
        if not stripped:
            continue
        clean_item = re.sub(r"[^a-zA-Z0-9\s]", "", stripped).strip().lower()
        if len(clean_item) > 2:
            cleaned.append(clean_item)
    return cleaned


def create_game_description(row: pd.Series) -> str:
    title = str(row.get("title", "")) if row.get("title", "") != "Unknown" else ""
    genre = str(row.get("clean_genre", "")) if row.get("clean_genre", "") != "Unknown" else ""
    tags = " ".join(row.get("clean_tags", []))
    categoria = " ".join(row.get("clean_categoria", []))
    description = f"Title: {title}. Genre: {genre}. Tags: {tags}. Category: {categoria}."
    return description.strip()


MAIN_GENRES_ONEHOT = ["Action", "Adventure", "Strategy", "RPG", "Simulation", "Sports", "Indie", "Other"]
MAIN_GENRES_LABEL = ["Action", "Adventure", "Strategy", "RPG", "Simulation"]


def get_main_genre(genre_str: str) -> str:
    if not isinstance(genre_str, str) or genre_str == "Unknown":
        return "Other"
    for genre in MAIN_GENRES_LABEL:
        if genre.lower() in genre_str.lower():
            return genre
    return "Other"


def prepare_games_dataframe(steam_df: pd.DataFrame, amazon_df: pd.DataFrame) -> pd.DataFrame:
    merged = join_sources(steam_df, amazon_df)
    games = merged.copy()
    games["clean_genre"] = games["genre"].apply(clean_genre)
    games["clean_tags"] = games["tags"].apply(parse_json_field_improved)
    games["clean_categoria"] = games["categoria"].apply(parse_json_field_improved)
    games["description"] = games.apply(create_game_description, axis=1)
    games["title"] = games["title"].replace("Unknown", np.nan).fillna(games["name"]).fillna("Unknown")
    for col in ["title", "name", "url", "reviews", "genre", "image_url"]:
        games[col] = games[col].astype(str)
    games.reset_index(drop=True, inplace=True)
    return games


In [111]:
@dataclass
class ModelMetrics:
    name: str
    balanced_accuracy: float
    precision: float
    recall: float
    f1: float
    silhouette: Optional[float]
    confusion: np.ndarray
    label_names: List[str]
    extras: Optional[Dict[str, float]] = None


def build_feature_matrices(games_df: pd.DataFrame) -> Dict[str, np.ndarray]:
    annoy_corpus = (
        games_df["title"].fillna("").astype(str)
        + " "
        + games_df["genre"].fillna("").astype(str)
        + " "
        + games_df["clean_tags"].apply(lambda tags: " ".join(tags)).astype(str)
        + " "
        + games_df["clean_categoria"].apply(lambda cats: " ".join(cats)).astype(str)
    )
    annoy_vectorizer = TfidfVectorizer(stop_words="english", max_features=1000, min_df=5)
    annoy_matrix = annoy_vectorizer.fit_transform(annoy_corpus).toarray().astype(np.float32)

    descriptions = games_df["description"].fillna("").tolist()
    if HAS_SENTENCE_TRANSFORMERS and USE_SENTENCE_TRANSFORMERS:
        embedding_model = SentenceTransformer("all-MiniLM-L6-v2")
        text_embeddings = embedding_model.encode(descriptions, show_progress_bar=False)
    else:
        tfidf_opt = TfidfVectorizer(
            stop_words="english",
            max_features=400,
            min_df=2,
            max_df=0.8,
            ngram_range=(1, 2),
        )
        text_embeddings = tfidf_opt.fit_transform(descriptions).toarray()

    numeric_features = pd.DataFrame(
        {
            "title_len": games_df["title"].astype(str).str.len(),
            "has_url": (games_df["url"].astype(str) != "Unknown").astype(int),
            "tags_count": games_df["clean_tags"].apply(len),
            "categoria_count": games_df["clean_categoria"].apply(len),
        }
    )
    for genre in MAIN_GENRES_ONEHOT:
        numeric_features[f"is_{genre.lower()}"] = (games_df["clean_genre"] == genre).astype(int)

    scaler = StandardScaler()
    numeric_scaled = scaler.fit_transform(numeric_features.fillna(0))

    combined_features = np.hstack([
        text_embeddings.astype(np.float32),
        numeric_scaled.astype(np.float32),
    ])
    return {
        "annoy_matrix": annoy_matrix,
        "combined_features": combined_features,
    }


def compute_classification_metrics(y_true: np.ndarray, y_pred: np.ndarray) -> Dict[str, float]:
    return {
        "balanced_accuracy": balanced_accuracy_score(y_true, y_pred),
        "precision": precision_score(y_true, y_pred, average="macro", zero_division=0),
        "recall": recall_score(y_true, y_pred, average="macro", zero_division=0),
        "f1": f1_score(y_true, y_pred, average="macro", zero_division=0),
    }


def evaluate_kmeans(features: np.ndarray, labels: np.ndarray, label_names: List[str], eval_indices: np.ndarray) -> ModelMetrics:
    sample_size = min(2000, len(features))
    sample_idx = np.random.choice(len(features), sample_size, replace=False)
    sample_features = features[sample_idx]

    best_score = -1.0
    best_k = 8
    for k in range(6, 16):
        temp_model = KMeans(n_clusters=k, random_state=RNG_SEED, n_init=5)
        clusters = temp_model.fit_predict(sample_features)
        if len(set(clusters)) <= 1:
            continue
        score = silhouette_score(sample_features, clusters)
        if score > best_score:
            best_score = score
            best_k = k

    final_model = KMeans(n_clusters=best_k, random_state=RNG_SEED, n_init=5)
    full_clusters = final_model.fit_predict(features)
    sil_sample_size = min(1000, len(features))
    sil_idx = np.random.choice(len(features), sil_sample_size, replace=False)
    if len(set(full_clusters)) > 1:
        silhouette = silhouette_score(features[sil_idx], full_clusters[sil_idx])
    else:
        silhouette = None

    eval_features = features[eval_indices]
    eval_labels = labels[eval_indices]
    X_train, X_test, y_train, y_test = train_test_split(
        eval_features,
        eval_labels,
        test_size=0.3,
        random_state=RNG_SEED,
    )

    eval_model = KMeans(n_clusters=best_k, random_state=RNG_SEED, n_init=5)
    train_clusters = eval_model.fit_predict(X_train)
    cluster_to_label: Dict[int, int] = {}
    for cluster_id in range(best_k):
        mask = train_clusters == cluster_id
        if not np.any(mask):
            continue
        majority = Counter(y_train[mask]).most_common(1)[0][0]
        cluster_to_label[cluster_id] = majority

    default_label = Counter(y_train).most_common(1)[0][0]
    test_clusters = eval_model.predict(X_test)
    predictions = np.array([cluster_to_label.get(cid, default_label) for cid in test_clusters])

    metrics = compute_classification_metrics(y_test, predictions)
    cm = confusion_matrix(y_test, predictions, labels=list(range(len(label_names))))

    return ModelMetrics(
        name=f"KMeans(k={best_k})",
        balanced_accuracy=metrics["balanced_accuracy"],
        precision=metrics["precision"],
        recall=metrics["recall"],
        f1=metrics["f1"],
        silhouette=silhouette,
        confusion=cm,
        label_names=label_names,
        extras={"best_k": float(best_k)},
    )


def evaluate_annoy(
    annoy_matrix: np.ndarray,
    labels: np.ndarray,
    label_names: List[str],
    eval_indices: np.ndarray,
    top_k: int = ANNOY_TOP_K,
) -> ModelMetrics:
    dims = annoy_matrix.shape[1]
    annoy_index = AnnoyIndex(dims, metric="angular")
    for idx, vector in enumerate(annoy_matrix):
        annoy_index.add_item(idx, vector.tolist())
    annoy_index.build(50)

    fallback_label = Counter(labels).most_common(1)[0][0]
    predictions: List[int] = []
    for idx in eval_indices:
        neighbors = annoy_index.get_nns_by_item(idx, top_k + 1, include_distances=False)
        neighbors = [n for n in neighbors if n != idx][:top_k]
        if not neighbors:
            predictions.append(fallback_label)
            continue
        neighbor_labels = labels[neighbors]
        majority = Counter(neighbor_labels).most_common(1)[0][0]
        predictions.append(majority)

    y_true = labels[eval_indices]
    predictions_arr = np.array(predictions)
    metrics = compute_classification_metrics(y_true, predictions_arr)
    cm = confusion_matrix(y_true, predictions_arr, labels=list(range(len(label_names))))

    return ModelMetrics(
        name=f"Annoy(top_k={top_k})",
        balanced_accuracy=metrics["balanced_accuracy"],
        precision=metrics["precision"],
        recall=metrics["recall"],
        f1=metrics["f1"],
        silhouette=None,
        confusion=cm,
        label_names=label_names,
        extras={"top_k": float(top_k)},
    )


def plot_metric_comparison(models: Iterable[ModelMetrics], output_path: Path) -> None:
    metrics = ["balanced_accuracy", "precision", "recall", "f1"]
    labels = [m.name for m in models]
    values = {metric: [getattr(m, metric) for m in models] for metric in metrics}

    x = np.arange(len(labels))
    width = 0.2

    plt.figure(figsize=(10, 6))
    for idx, metric in enumerate(metrics):
        plt.bar(x + idx * width, values[metric], width=width, label=metric.replace("_", " ").title())

    plt.xticks(x + width * (len(metrics) - 1) / 2, labels, rotation=20)
    plt.ylim(0, 1.0)
    plt.ylabel("Score")
    plt.title("Modelos vs. Métricas (macro)")
    plt.legend()
    plt.tight_layout()
    plt.savefig(output_path, dpi=160)
    plt.close()


def plot_confusion(cm: np.ndarray, label_names: List[str], title: str, output_path: Path) -> None:
    plt.figure(figsize=(9, 7))
    plt.imshow(cm, interpolation="nearest", cmap=plt.cm.Blues)
    plt.title(title)
    plt.colorbar()

    tick_marks = np.arange(len(label_names))
    plt.xticks(tick_marks, label_names, rotation=45, ha="right")
    plt.yticks(tick_marks, label_names)

    thresh = cm.max() / 2.0 if cm.size else 0
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            value = cm[i, j]
            plt.text(
                j,
                i,
                f"{value}",
                horizontalalignment="center",
                color="white" if value > thresh else "black",
            )

    plt.ylabel("True label")
    plt.xlabel("Predicted label")
    plt.tight_layout()
    plt.savefig(output_path, dpi=160)
    plt.close()


def write_metrics(metric: ModelMetrics) -> None:
    output = {
        "model": metric.name,
        "balanced_accuracy": metric.balanced_accuracy,
        "precision_macro": metric.precision,
        "recall_macro": metric.recall,
        "f1_macro": metric.f1,
        "silhouette": metric.silhouette,
        "labels": metric.label_names,
        "confusion_matrix": metric.confusion.tolist(),
        "extras": metric.extras or {},
    }
    safe_name = metric.name.replace(" ", "_").replace("(", "").replace(")", "").replace("=", "")
    path = ARTIFACT_DIR / f"metrics_{safe_name}.json"
    with path.open("w", encoding="utf-8") as fp:
        json.dump(output, fp, indent=2)


In [112]:
steam_df, amazon_df = load_raw_sources(DATA_DIR)
games_df = prepare_games_dataframe(steam_df, amazon_df)

if SAMPLE_LIMIT is not None and SAMPLE_LIMIT < len(games_df):
    games_df = games_df.sample(SAMPLE_LIMIT, random_state=RNG_SEED).reset_index(drop=True)

feature_data = build_feature_matrices(games_df)
annoy_matrix = feature_data["annoy_matrix"]
combined_features = feature_data["combined_features"]

label_encoder = LabelEncoder()
genre_labels = games_df["genre"].apply(get_main_genre)
genre_numeric = label_encoder.fit_transform(genre_labels)
label_names = label_encoder.inverse_transform(np.arange(len(label_encoder.classes_))).tolist()

model_sample_size = len(games_df) if MODEL_SAMPLE_SIZE is None else min(int(MODEL_SAMPLE_SIZE), len(games_df))

sample_indices = np.random.choice(len(games_df), model_sample_size, replace=False)
sample_features = combined_features[sample_indices]
sample_annoy_matrix = annoy_matrix[sample_indices]
sample_labels = genre_numeric[sample_indices]

baseline_eval_size = min(1500, len(sample_features))
baseline_eval_indices = np.random.choice(len(sample_features), baseline_eval_size, replace=False)

kmeans_metrics = evaluate_kmeans(sample_features, sample_labels, label_names, baseline_eval_indices)
annoy_metrics = evaluate_annoy(sample_annoy_matrix, sample_labels, label_names, baseline_eval_indices, top_k=ANNOY_TOP_K)

metrics_list = [kmeans_metrics, annoy_metrics]

plot_metric_comparison(metrics_list, ARTIFACT_DIR / "metrics_comparison.png")
plot_confusion(
    kmeans_metrics.confusion,
    label_names,
    f"Matriz de Confusao - {kmeans_metrics.name}",
    ARTIFACT_DIR / "confusion_kmeans.png",
)
plot_confusion(
    annoy_metrics.confusion,
    label_names,
    f"Matriz de Confusao - {annoy_metrics.name}",
    ARTIFACT_DIR / "confusion_annoy.png",
)

for metric in metrics_list:
    write_metrics(metric)

metrics_table = pd.DataFrame(
    [
        {
            "model": m.name,
            "balanced_accuracy": m.balanced_accuracy,
            "precision_macro": m.precision,
            "recall_macro": m.recall,
            "f1_macro": m.f1,
            "silhouette": m.silhouette,
            **(m.extras or {}),
        }
        for m in metrics_list
    ]
).set_index("model")
display(metrics_table)
print(f"Artefatos salvos em: {ARTIFACT_DIR.resolve()}")
print(f"Total de jogos considerados: {len(games_df)}")
print(f"Tamanho da amostra nos modelos: {len(sample_features)}")
print(f"Sentence-Transformers habilitado: {HAS_SENTENCE_TRANSFORMERS and USE_SENTENCE_TRANSFORMERS}")


Unnamed: 0_level_0,balanced_accuracy,precision_macro,recall_macro,f1_macro,silhouette,best_k,top_k
model,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
KMeans(k=8),0.940222,0.861111,0.940222,0.871932,0.634086,8.0,
Annoy(top_k=15),0.849357,0.931599,0.849357,0.882125,,,15.0


Artefatos salvos em: /home/douglas/Documents/pi_big_data/recommender-service/etl/poc_artifacts
Total de jogos considerados: 103400
Tamanho da amostra nos modelos: 12000
Sentence-Transformers habilitado: False


In [113]:
# Agglomerative & Spectral Clustering evaluation
from sklearn.cluster import AgglomerativeClustering, SpectralClustering

columns_order = [
    "balanced_accuracy",
    "precision_macro",
    "recall_macro",
    "f1_macro",
    "silhouette",
    "best_k",
    "top_k",
    "n_neighbors",
    "linkage",
    "affinity",
    "metric",
]

AGGLO_SAMPLE_SIZE = 4000
SPECTRAL_SAMPLE_SIZE = 3000

def _sanitize_model_name(name: str) -> str:
    return (
        name.replace(" ", "_")
        .replace("(", "")
        .replace(")", "")
        .replace("=", "")
        .replace(",", "")
    )

# Garantir que amostras existem (útil ao executar células isoladas)
if 'sample_features' not in locals() or 'sample_labels' not in locals():
    model_sample_size = len(games_df) if MODEL_SAMPLE_SIZE is None else min(int(MODEL_SAMPLE_SIZE), len(games_df))
    sample_indices = np.random.choice(len(games_df), model_sample_size, replace=False)
    sample_features = combined_features[sample_indices]
    sample_annoy_matrix = annoy_matrix[sample_indices]
    sample_labels = genre_numeric[sample_indices]

def evaluate_agglomerative(
    features: np.ndarray,
    labels: np.ndarray,
    label_names: List[str],
    linkage: str = "ward",
) -> ModelMetrics:
    if len(features) < 2:
        raise ValueError("É necessário pelo menos 2 jogos para Agglomerative Clustering.")

    train_size = min(AGGLO_SAMPLE_SIZE, len(features))
    train_idx = np.random.choice(len(features), train_size, replace=False)
    train_features = features[train_idx]
    train_labels = labels[train_idx]

    candidate_ks = [k for k in range(6, 16) if k < len(train_features)]
    if not candidate_ks:
        candidate_ks = [max(2, len(train_features) - 1)]

    best_score = -1.0
    best_k = candidate_ks[0]
    for k in candidate_ks:
        try:
            model = AgglomerativeClustering(n_clusters=k, linkage=linkage)
            clusters = model.fit_predict(train_features)
        except Exception:
            continue
        if len(set(clusters)) <= 1:
            continue
        score = silhouette_score(train_features, clusters)
        if score > best_score:
            best_score = score
            best_k = k

    final_model = AgglomerativeClustering(n_clusters=best_k, linkage=linkage)
    cluster_labels = final_model.fit_predict(train_features)

    fallback_label = Counter(train_labels).most_common(1)[0][0]
    cluster_to_label: Dict[int, int] = {}
    for cluster_id in np.unique(cluster_labels):
        mask = cluster_labels == cluster_id
        if not np.any(mask):
            continue
        majority = Counter(train_labels[mask]).most_common(1)[0][0]
        cluster_to_label[cluster_id] = majority

    predictions = np.array([cluster_to_label.get(cid, fallback_label) for cid in cluster_labels])
    metrics = compute_classification_metrics(train_labels, predictions)

    silhouette = None
    if len(set(cluster_labels)) > 1:
        silhouette = silhouette_score(train_features, cluster_labels)

    cm = confusion_matrix(train_labels, predictions, labels=list(range(len(label_names))))

    return ModelMetrics(
        name=f"Agglomerative(k={best_k})",
        balanced_accuracy=metrics["balanced_accuracy"],
        precision=metrics["precision"],
        recall=metrics["recall"],
        f1=metrics["f1"],
        silhouette=silhouette,
        confusion=cm,
        label_names=label_names,
        extras={"best_k": float(best_k), "linkage": linkage},
    )

def evaluate_spectral(
    features: np.ndarray,
    labels: np.ndarray,
    label_names: List[str],
    affinity: str = "nearest_neighbors",
    n_neighbors: int = SPECTRAL_SAMPLE_SIZE // 6,
) -> ModelMetrics:
    if len(features) < 2:
        raise ValueError("É necessário pelo menos 2 jogos para Spectral Clustering.")

    train_size = min(SPECTRAL_SAMPLE_SIZE, len(features))
    train_idx = np.random.choice(len(features), train_size, replace=False)
    train_features = features[train_idx]
    train_labels = labels[train_idx]

    nn_sample = max(1, min(n_neighbors, train_features.shape[0] - 1))

    candidate_ks = [k for k in range(6, 16) if k < len(train_features)]
    if not candidate_ks:
        candidate_ks = [max(2, len(train_features) - 1)]

    best_score = -1.0
    best_k = candidate_ks[0]
    for k in candidate_ks:
        try:
            model = SpectralClustering(
                n_clusters=k,
                random_state=RNG_SEED,
                assign_labels="kmeans",
                affinity=affinity,
                n_neighbors=nn_sample,
            )
            clusters = model.fit_predict(train_features)
        except Exception:
            continue
        if len(set(clusters)) <= 1:
            continue
        score = silhouette_score(train_features, clusters)
        if score > best_score:
            best_score = score
            best_k = k

    nn_full = max(1, min(n_neighbors, train_features.shape[0] - 1))

    final_model = SpectralClustering(
        n_clusters=best_k,
        random_state=RNG_SEED,
        assign_labels="kmeans",
        affinity=affinity,
        n_neighbors=nn_full,
    )
    cluster_labels = final_model.fit_predict(train_features)

    fallback_label = Counter(train_labels).most_common(1)[0][0]
    cluster_to_label: Dict[int, int] = {}
    for cluster_id in np.unique(cluster_labels):
        mask = cluster_labels == cluster_id
        if not np.any(mask):
            continue
        majority = Counter(train_labels[mask]).most_common(1)[0][0]
        cluster_to_label[cluster_id] = majority

    predictions = np.array([cluster_to_label.get(cid, fallback_label) for cid in cluster_labels])
    metrics = compute_classification_metrics(train_labels, predictions)

    silhouette = None
    if len(set(cluster_labels)) > 1:
        silhouette = silhouette_score(train_features, cluster_labels)

    cm = confusion_matrix(train_labels, predictions, labels=list(range(len(label_names))))

    return ModelMetrics(
        name=f"Spectral(k={best_k})",
        balanced_accuracy=metrics["balanced_accuracy"],
        precision=metrics["precision"],
        recall=metrics["recall"],
        f1=metrics["f1"],
        silhouette=silhouette,
        confusion=cm,
        label_names=label_names,
        extras={"best_k": float(best_k), "n_neighbors": float(nn_full), "affinity": affinity},
    )

agglo_metrics = evaluate_agglomerative(sample_features, sample_labels, label_names, linkage="ward")
spectral_metrics = evaluate_spectral(sample_features, sample_labels, label_names, affinity="nearest_neighbors", n_neighbors=20)
additional_metrics = [agglo_metrics, spectral_metrics]
metrics_list.extend(additional_metrics)

for metric in additional_metrics:
    safe_name = _sanitize_model_name(metric.name)
    plot_confusion(
        metric.confusion,
        label_names,
        f"Matriz de Confusao - {metric.name}",
        ARTIFACT_DIR / f"confusion_{safe_name}.png",
    )
    write_metrics(metric)

agglo_spectral_table = pd.DataFrame(
    [
        {
            "model": m.name,
            "balanced_accuracy": m.balanced_accuracy,
            "precision_macro": m.precision,
            "recall_macro": m.recall,
            "f1_macro": m.f1,
            "silhouette": m.silhouette,
            **(m.extras or {}),
        }
        for m in additional_metrics
    ]
).set_index("model").reindex(columns=columns_order)

display(agglo_spectral_table)




Unnamed: 0_level_0,balanced_accuracy,precision_macro,recall_macro,f1_macro,silhouette,best_k,top_k,n_neighbors,linkage,affinity,metric
model,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
Agglomerative(k=8),0.810749,0.789562,0.810749,0.796063,0.646912,8.0,,,ward,,
Spectral(k=14),0.691612,0.682334,0.691612,0.66996,0.193411,14.0,,20.0,,nearest_neighbors,


In [114]:
# Brute-force Nearest Neighbors evaluation
from sklearn.neighbors import NearestNeighbors

NEAREST_TOP_K = ANNOY_TOP_K

if 'sample_features' not in locals() or 'sample_labels' not in locals():
    model_sample_size = len(games_df) if MODEL_SAMPLE_SIZE is None else min(int(MODEL_SAMPLE_SIZE), len(games_df))
    sample_indices = np.random.choice(len(games_df), model_sample_size, replace=False)
    sample_features = combined_features[sample_indices]
    sample_annoy_matrix = annoy_matrix[sample_indices]
    sample_labels = genre_numeric[sample_indices]

def evaluate_exact_neighbors(
    features: np.ndarray,
    labels: np.ndarray,
    label_names: List[str],
    eval_indices: np.ndarray,
    top_k: int = NEAREST_TOP_K,
    metric: str = "cosine",
) -> ModelMetrics:
    if len(features) < 2:
        raise ValueError("É necessário pelo menos 2 jogos para Nearest Neighbors.")

    n_neighbors = min(top_k + 1, len(features))
    nn_model = NearestNeighbors(n_neighbors=n_neighbors, metric=metric, algorithm="auto")
    nn_model.fit(features)

    fallback_label = Counter(labels).most_common(1)[0][0]
    predictions: List[int] = []
    for idx in eval_indices:
        distances, neighbors = nn_model.kneighbors(features[idx][None, :], n_neighbors=n_neighbors)
        neighbors = [n for n in neighbors[0] if n != idx][:top_k]
        if not neighbors:
            predictions.append(fallback_label)
            continue
        neighbor_labels = labels[neighbors]
        majority = Counter(neighbor_labels).most_common(1)[0][0]
        predictions.append(majority)

    y_true = labels[eval_indices]
    predictions_arr = np.array(predictions)
    metrics = compute_classification_metrics(y_true, predictions_arr)
    cm = confusion_matrix(y_true, predictions_arr, labels=list(range(len(label_names))))

    return ModelMetrics(
        name=f"NearestNeighbors(top_k={top_k})",
        balanced_accuracy=metrics["balanced_accuracy"],
        precision=metrics["precision"],
        recall=metrics["recall"],
        f1=metrics["f1"],
        silhouette=None,
        confusion=cm,
        label_names=label_names,
        extras={"top_k": float(top_k), "metric": metric},
    )

nn_eval_size = min(1200, len(sample_features))
nn_eval_indices = np.random.choice(len(sample_features), nn_eval_size, replace=False)

nearest_metrics = evaluate_exact_neighbors(sample_features, sample_labels, label_names, nn_eval_indices)
metrics_list.append(nearest_metrics)

plot_confusion(
    nearest_metrics.confusion,
    label_names,
    f"Matriz de Confusao - {nearest_metrics.name}",
    ARTIFACT_DIR / f"confusion_{_sanitize_model_name(nearest_metrics.name)}.png",
)
write_metrics(nearest_metrics)

nearest_table = pd.DataFrame([
    {
        "model": nearest_metrics.name,
        "balanced_accuracy": nearest_metrics.balanced_accuracy,
        "precision_macro": nearest_metrics.precision,
        "recall_macro": nearest_metrics.recall,
        "f1_macro": nearest_metrics.f1,
        "silhouette": nearest_metrics.silhouette,
        **(nearest_metrics.extras or {}),
    }
]).set_index("model").reindex(columns=columns_order)

display(nearest_table)


Unnamed: 0_level_0,balanced_accuracy,precision_macro,recall_macro,f1_macro,silhouette,best_k,top_k,n_neighbors,linkage,affinity,metric
model,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
NearestNeighbors(top_k=15),0.909476,0.902963,0.909476,0.905517,,,15.0,,,,cosine
