In [None]:
# [1] Importação da Base Local
# - Carrega o dataset diretamente do arquivo local dataset.csv no diretório do projeto.

import os, glob, warnings
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import display

from sklearn.model_selection import train_test_split, GridSearchCV, StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score, roc_auc_score,
    silhouette_score,
    roc_curve
)
from sklearn.metrics import pairwise_distances_argmin_min
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier


warnings.filterwarnings('ignore')
SEED = 42
np.random.seed(SEED)
sns.set(style='whitegrid')

# Carregamento do dataset local
data_file = os.path.join(os.getcwd(), 'dataset.csv') if os.path.exists(os.path.join(os.getcwd(), 'dataset.csv')) else 'dataset.csv'
if not os.path.exists(data_file):
    raise FileNotFoundError('Arquivo dataset.csv não encontrado no diretório do projeto.')
print(f'Arquivo carregado: {os.path.basename(data_file)}')
df = pd.read_csv(data_file)
print('Formato inicial:', df.shape)

# [2] Pré-processamento e Divisão Treino/Teste
target_col = None
if 'explicit' in df.columns:
    target_col = 'explicit'
elif 'track_genre' in df.columns:
    # Caso "explicit" não exista, usar "track_genre" (multiclasse).
    # Para manter AUC-ROC binária, transformamos em binário pegando a classe majoritária vs. demais.
    top_class = df['track_genre'].value_counts().idxmax()
    df['target_binary'] = (df['track_genre'] == top_class).astype(int)
    target_col = 'target_binary'
elif 'popularity' in df.columns:
    # Fallback: binariza popularidade acima da mediana
    median_pop = df['popularity'].median()
    df['target_binary'] = (df['popularity'] >= median_pop).astype(int)
    target_col = 'target_binary'
else:
    raise ValueError('Não foi possível identificar uma coluna alvo adequada (explicit/track_genre/popularity).')

# Seleciona features numéricas e remove o alvo
num_df = df.select_dtypes(include=[np.number]).copy()
if target_col not in num_df.columns:
    # Se o alvo não é numérico, garante que está no df geral e anexa
    y = df[target_col].astype(int)
else:
    y = num_df[target_col].astype(int)
    num_df = num_df.drop(columns=[target_col])

# Remove colunas claramente identificadoras caso existam
for col in ['id', 'track_id', 'song_id']:
    if col in num_df.columns:
        num_df = num_df.drop(columns=[col])

# Trata faltantes
num_df = num_df.fillna(num_df.median(numeric_only=True))

# Split estratificado
X_train, X_test, y_train, y_test = train_test_split(
    num_df.values, y.values, test_size=0.2, random_state=SEED, stratify=y
)
print('Treino:', X_train.shape, 'Teste:', X_test.shape)

# Padronização (necessária para SVM e K-Means)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# [3] Clusterização K-Médias (método do cotovelo e silhueta)
# - Ajusta K-Means somente no conjunto de TREINO.
k_values = list(range(2, 13))
inertias = []
silhouettes = []
used_kmeans_on_train = False
for k in k_values:
    kmeans_tmp = KMeans(n_clusters=k, random_state=SEED, n_init=10)
    kmeans_tmp.fit(X_train_scaled)
    used_kmeans_on_train = True
    inertias.append(kmeans_tmp.inertia_)
    # Silhueta apenas se houver mais de 1 cluster e amostras suficientes
    try:
        sil = silhouette_score(X_train_scaled, kmeans_tmp.labels_)
    except Exception:
        sil = np.nan
    silhouettes.append(sil)

# Determinação do K ótimo
# - Silhueta: escolhe K com maior valor de silhueta
sil_vals = np.array(silhouettes, dtype=float)
sil_safe = np.where(np.isnan(sil_vals), -np.inf, sil_vals)
best_sil_idx = int(np.argmax(sil_safe))
best_sil_k = k_values[best_sil_idx]
# - Cotovelo: distância máxima ao segmento entre (k_min, inertia_minK) e (k_max, inertia_maxK)
x = np.array(k_values, dtype=float)
y = np.array(inertias, dtype=float)
x1, y1 = x[0], y[0]
x2, y2 = x[-1], y[-1]
y_hat = y1 + (y2 - y1) * (x - x1) / (x2 - x1)
dist = y_hat - y  # y está abaixo; maior distância sugere cotovelo
elbow_k = int(x[int(np.argmax(dist))])
# K final de referência: usar o de melhor silhueta
optimal_k = best_sil_k
print(f'K ótimo (silhueta): {best_sil_k}')
print(f'K de cotovelo (heurística): {elbow_k}')
print(f'K selecionado para referência: {optimal_k}')

fig, ax = plt.subplots(1, 2, figsize=(12, 4))
ax[0].plot(k_values, inertias, marker='o')
ax[0].axvline(elbow_k, color='green', linestyle=':', label='cotovelo')
ax[0].set_title('Método do Cotovelo (Inertia)')
ax[0].set_xlabel('K')
ax[0].set_ylabel('Inertia')
ax[0].legend()
ax[1].plot(k_values, silhouettes, marker='o', color='orange')
ax[1].axvline(best_sil_k, color='purple', linestyle=':', label='melhor silhueta')
ax[1].set_title('Índice de Silhueta')
ax[1].set_xlabel('K')
ax[1].set_ylabel('Silhouette')
ax[1].legend()
plt.tight_layout()
plt.show()

# [4] Criação da feature de distância ao centróide mais próximo
# - Para cada K, ajusta K-Means no TREINO, calcula a distância mínima aos centróides
#   e adiciona uma NOVA feature (1 coluna) a X_train/X_test.
def build_distance_feature_for_k(Xtr_scaled, Xte_scaled, k):
    km = KMeans(n_clusters=k, random_state=SEED, n_init=10)
    km.fit(Xtr_scaled)
    # Distâncias ao centróide mais próximo
    _, dtr = pairwise_distances_argmin_min(Xtr_scaled, km.cluster_centers_)
    _, dte = pairwise_distances_argmin_min(Xte_scaled, km.cluster_centers_)
    Xtr_new = np.hstack([Xtr_scaled, dtr.reshape(-1, 1)])
    Xte_new = np.hstack([Xte_scaled, dte.reshape(-1, 1)])
    return Xtr_new, Xte_new

# [5] Modelos SVM e Random Forest
models = {
    'svm_linear': SVC(kernel='linear', C=1.0, probability=True, random_state=SEED),
    'svm_poly': SVC(kernel='poly', degree=3, C=1.0, gamma='scale', probability=True, random_state=SEED),
    'svm_rbf': SVC(kernel='rbf', C=1.0, gamma='scale', probability=True, random_state=SEED),
    'rf_base': RandomForestClassifier(n_estimators=300, max_depth=None, random_state=SEED),
    'rf_depth10': RandomForestClassifier(n_estimators=400, max_depth=10, random_state=SEED),
}

# [6] Treinamento, Avaliação e Coleta de Métricas
def eval_metrics(y_true, y_pred, y_proba):
    acc = accuracy_score(y_true, y_pred)
    prec = precision_score(y_true, y_pred, zero_division=0)
    rec = recall_score(y_true, y_pred, zero_division=0)
    f1 = f1_score(y_true, y_pred, zero_division=0)
    try:
        auc = roc_auc_score(y_true, y_proba)
    except Exception:
        auc = np.nan
    return acc, prec, rec, f1, auc

results = []

# Avaliação baseline (sem feature de distância)
for mname, model in models.items():
    clf = model
    clf.fit(X_train_scaled, y_train)
    y_pred = clf.predict(X_test_scaled)
    if hasattr(clf, 'predict_proba'):
        y_proba = clf.predict_proba(X_test_scaled)[:, 1]
    else:
        # fallback improvável (não usado pois SVC está com probability=True)
        y_proba = None
    acc, prec, rec, f1, auc = eval_metrics(y_test, y_pred, y_proba)
    results.append({
        'model': mname, 'with_cluster_feature': False, 'k': None,
        'accuracy': acc, 'precision': prec, 'recall': rec, 'f1': f1, 'auc': auc
    })

# Avaliação com feature de distância para diferentes K
for k in k_values:
    Xtr_k, Xte_k = build_distance_feature_for_k(X_train_scaled, X_test_scaled, k)
    for mname, model in models.items():
        clf = model
        clf.fit(Xtr_k, y_train)
        y_pred = clf.predict(Xte_k)
        if hasattr(clf, 'predict_proba'):
            y_proba = clf.predict_proba(Xte_k)[:, 1]
        else:
            y_proba = None
        acc, prec, rec, f1, auc = eval_metrics(y_test, y_pred, y_proba)
        results.append({
            'model': mname, 'with_cluster_feature': True, 'k': k,
            'accuracy': acc, 'precision': prec, 'recall': rec, 'f1': f1, 'auc': auc
        })

res_df = pd.DataFrame(results)
print('Resumo de resultados (top 10 por F1):')
display(res_df.sort_values(['with_cluster_feature','f1'], ascending=[True, False]).head(10))

# [7] GridSearch para SVM (baseline e K=optimal_k)
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=SEED)
# Dados expandidos com K ótimo
Xtr_opt, Xte_opt = build_distance_feature_for_k(X_train_scaled, X_test_scaled, optimal_k)

def run_gs_and_eval(base_name, est, grid, Xtr, Xte, ytr, yte, with_feat, k_val):
    gs = GridSearchCV(est, grid, scoring='f1', cv=cv, n_jobs=None, refit=True)
    gs.fit(Xtr, ytr)
    best = gs.best_estimator_
    y_pred = best.predict(Xte)
    y_proba = best.predict_proba(Xte)[:,1] if hasattr(best, 'predict_proba') else None
    acc, prec, rec, f1, auc = eval_metrics(yte, y_pred, y_proba)
    results.append({
        'model': base_name + '_gs', 'with_cluster_feature': with_feat, 'k': k_val,
        'accuracy': acc, 'precision': prec, 'recall': rec, 'f1': f1, 'auc': auc,
        'grid_search': True, 'best_params': str(gs.best_params_)
    })
    return gs

# Grids por kernel
grid_linear = { 'C': [0.1, 1, 10, 100] }
grid_poly   = { 'C': [0.1, 1, 10], 'degree': [2, 3, 4], 'gamma': ['scale', 'auto'] }
grid_rbf    = { 'C': [0.1, 1, 10], 'gamma': ['scale', 0.01, 0.1, 1] }

# Baseline (sem feature de distância)
run_gs_and_eval('svm_linear', SVC(kernel='linear', probability=True, random_state=SEED), grid_linear,
                X_train_scaled, X_test_scaled, y_train, y_test, False, None)
run_gs_and_eval('svm_poly', SVC(kernel='poly', probability=True, random_state=SEED), grid_poly,
                X_train_scaled, X_test_scaled, y_train, y_test, False, None)
run_gs_and_eval('svm_rbf', SVC(kernel='rbf', probability=True, random_state=SEED), grid_rbf,
                X_train_scaled, X_test_scaled, y_train, y_test, False, None)

# Expandido (com feature de distância usando K ótimo)
run_gs_and_eval('svm_linear', SVC(kernel='linear', probability=True, random_state=SEED), grid_linear,
                Xtr_opt, Xte_opt, y_train, y_test, True, optimal_k)
run_gs_and_eval('svm_poly', SVC(kernel='poly', probability=True, random_state=SEED), grid_poly,
                Xtr_opt, Xte_opt, y_train, y_test, True, optimal_k)
run_gs_and_eval('svm_rbf', SVC(kernel='rbf', probability=True, random_state=SEED), grid_rbf,
                Xtr_opt, Xte_opt, y_train, y_test, True, optimal_k)

res_df = pd.DataFrame(results)
print('Resultados com GridSearch (SVM) — resumo por F1:')
display(res_df[res_df.get('grid_search', False) == True]
        .sort_values(['with_cluster_feature','f1'], ascending=[True, False])
        .groupby(['model','with_cluster_feature']).head(1))

# [7] Análise Comparativa e Gráficos
# [7.1] GridSearch para Random Forest (baseline e K=optimal_k)
rf_grid = {
    'n_estimators': [200, 400, 800],
    'max_depth': [None, 10, 20],
    'min_samples_split': [2, 5, 10],
    'min_samples_leaf': [1, 2, 4],
    'max_features': ['sqrt', 'log2', None]
}

def run_gs_rf_and_eval(base_name, grid, Xtr, Xte, ytr, yte, with_feat, k_val):
    rf = RandomForestClassifier(random_state=SEED)
    gs = GridSearchCV(rf, grid, scoring='f1', cv=cv, n_jobs=None, refit=True)
    gs.fit(Xtr, ytr)
    best = gs.best_estimator_
    y_pred = best.predict(Xte)
    y_proba = best.predict_proba(Xte)[:,1]
    acc, prec, rec, f1, auc = eval_metrics(yte, y_pred, y_proba)
    results.append({
        'model': base_name + '_rf_gs', 'with_cluster_feature': with_feat, 'k': k_val,
        'accuracy': acc, 'precision': prec, 'recall': rec, 'f1': f1, 'auc': auc,
        'grid_search': True, 'best_params': str(gs.best_params_)
    })
    return gs

# Baseline (Random Forest)
run_gs_rf_and_eval('rf', rf_grid, X_train_scaled, X_test_scaled, y_train, y_test, False, None)
# Expandido com K ótimo (Random Forest)
run_gs_rf_and_eval('rf', rf_grid, Xtr_opt, Xte_opt, y_train, y_test, True, optimal_k)

res_df = pd.DataFrame(results)
print('Resultados com GridSearch (Random Forest) — resumo por F1:')
display(res_df[(res_df.get('grid_search', False) == True) & (res_df['model'].str.contains('rf_gs'))]
        .sort_values(['with_cluster_feature','f1'], ascending=[True, False])
        .groupby(['model','with_cluster_feature']).head(1))

# [7.2] Curvas ROC (baseline vs expandido com K ótimo)
def plot_roc_for_pair(estimator_baseline, estimator_expanded, Xb, Xe, ytr, yte, title):
    est_b = estimator_baseline
    est_b.fit(Xb, ytr)
    proba_b = est_b.predict_proba(X_test_scaled)[:,1] if Xb is X_train_scaled else est_b.predict_proba(Xe)[:,1]
    fpr_b, tpr_b, _ = roc_curve(yte, proba_b)
    auc_b = roc_auc_score(yte, proba_b)

    est_e = estimator_expanded
    est_e.fit(Xe, ytr)
    proba_e = est_e.predict_proba(Xe)[:,1]
    fpr_e, tpr_e, _ = roc_curve(yte, proba_e)
    auc_e = roc_auc_score(yte, proba_e)

    plt.figure(figsize=(5.5,4))
    plt.plot(fpr_b, tpr_b, label=f'baseline (AUC={auc_b:.3f})')
    plt.plot(fpr_e, tpr_e, label=f'expandido (AUC={auc_e:.3f})')
    plt.plot([0,1],[0,1],'k--', alpha=0.5)
    plt.xlabel('FPR')
    plt.ylabel('TPR')
    plt.title(title)
    plt.legend()
    plt.tight_layout()
    plt.show()
    print(f'ROC AUC — {title}: baseline={auc_b:.3f}, expandido={auc_e:.3f}')

# Preparar pares para SVM RBF e RandomForest
svm_rbf_base = SVC(kernel='rbf', C=1.0, gamma='scale', probability=True, random_state=SEED)
svm_rbf_exp  = SVC(kernel='rbf', C=1.0, gamma='scale', probability=True, random_state=SEED)
plot_roc_for_pair(svm_rbf_base, svm_rbf_exp, X_train_scaled, Xte_opt, y_train, y_test, 'ROC — SVM RBF (baseline vs expandido)')

rf_base = RandomForestClassifier(random_state=SEED)
rf_exp  = RandomForestClassifier(random_state=SEED)
plot_roc_for_pair(rf_base, rf_exp, X_train_scaled, Xte_opt, y_train, y_test, 'ROC — Random Forest (baseline vs expandido)')

metrics_to_plot = ['accuracy', 'precision', 'recall', 'f1', 'auc']
unique_models = list(models.keys())

# Linhas por K com linha base (sem feature) para cada modelo
for mname in unique_models:
    base_row = res_df[(res_df['model'] == mname) & (~res_df['with_cluster_feature'])].iloc[0]
    fig, axes = plt.subplots(1, len(metrics_to_plot), figsize=(4*len(metrics_to_plot), 3), sharex=True)
    fig.suptitle(f'Comparativo por K — {mname}')
    for j, metric in enumerate(metrics_to_plot):
        ax = axes[j] if len(metrics_to_plot) > 1 else axes
        dfk = res_df[(res_df['model'] == mname) & (res_df['with_cluster_feature'])].sort_values('k')
        ax.plot(dfk['k'], dfk[metric], marker='o', label='com dist')
        ax.axhline(base_row[metric], color='red', linestyle='--', label='baseline')
        ax.set_title(metric.upper())
        ax.set_xlabel('K')
        ax.set_ylabel(metric)
        if j == 0:
            ax.legend()
    plt.tight_layout()
    plt.show()

# Melhor resultado por modelo e se houve ganho sobre baseline
best_by_model = []
for mname in unique_models:
    base = res_df[(res_df['model'] == mname) & (~res_df['with_cluster_feature'])].iloc[0]
    with_feat = res_df[(res_df['model'] == mname) & (res_df['with_cluster_feature'])]
    if not with_feat.empty:
        best_row = with_feat.sort_values('f1', ascending=False).iloc[0]
        gain = {m: best_row[m] - base[m] for m in metrics_to_plot}
        best_by_model.append({
            'model': mname, 'best_k': int(best_row['k']), **{f'best_{m}': best_row[m] for m in metrics_to_plot},
            **{f'gain_{m}': gain[m] for m in metrics_to_plot}
        })
best_df = pd.DataFrame(best_by_model)
print('Melhor resultado por modelo (ordenado por ganho de F1):')
display(best_df.sort_values('gain_f1', ascending=False))


Arquivo carregado: dataset.csv
Formato inicial: (114000, 21)
Treino: (91200, 15) Teste: (22800, 15)
