### Import Libraries

In [None]:
# --- Configuração de Ambiente e Hardware ---
import os
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0'

# Bibliotecas RAPIDS (GPU)
import cupy as cp
from cuml.pipeline import Pipeline as cumlPipeline
from cuml.preprocessing import StandardScaler as cumlStandardScaler
from cuml.neighbors import KNeighborsClassifier as cumlKNN
from cuml.svm import SVC as cumlSVC
from cuml.ensemble import RandomForestClassifier as cumlRF
import xgboost as xgb
from cuml.metrics import accuracy_score

# Bibliotecas Scikit-learn (CPU & Utilities)
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import GroupKFold
from sklearn.metrics import classification_report, precision_recall_fscore_support, confusion_matrix
from sklearn.base import clone 

# Bibliotecas de Processamento de Áudio e Dados
import numpy as np
import pandas as pd
import librosa
from scipy import stats
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import time
import subprocess
import io
from joblib import Parallel, delayed
import psutil
import gc

# Bibliotecas TensorFlow
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, regularizers
from tensorflow.keras.callbacks import EarlyStopping

# Configuração de GPU para TF (evitar alocar toda VRAM)
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(e)

print("Ambiente configurado.")

### Audio Preprocessing

In [None]:
def calculate_stats(features_matrix):
    """Calcula Mean, Std, Skew, Kurtosis para a matriz de features."""
    if features_matrix.size == 0:
        return None 
    result = []
    result.extend(np.mean(features_matrix, axis=1))
    result.extend(np.std(features_matrix, axis=1))
    result.extend(stats.skew(features_matrix, axis=1))
    result.extend(stats.kurtosis(features_matrix, axis=1))
    return np.array(result)

def extract_features_windowed(file_path, window_size=3.0, overlap=0.5, sr=22050):
    """
    Extrai features de janelas de áudio.
    """
    all_window_features = []
    
    # Parâmetros DSP
    N_FFT = 2048
    HOP_LENGTH = 512
    N_MFCC = 20
    N_CHROMA = 12
    N_CONTRAST = 7
    
    try:
        # Carregar áudio (GTZAN é .wav, librosa nativo é rápido)
        y, _ = librosa.load(file_path, sr=sr, mono=True)
        
        # Janelamento
        samples_per_window = int(window_size * sr)
        hop_size = int(samples_per_window * (1.0 - overlap))
        
        if len(y) < samples_per_window:
            return None

        # Cria frames [n_windows, samples_per_window]
        y_frames = librosa.util.frame(y, frame_length=samples_per_window, hop_length=hop_size, axis=0)
        
        for y_window in y_frames:
            # Bases: STFT e CQT
            stft = np.abs(librosa.stft(y_window, n_fft=N_FFT, hop_length=HOP_LENGTH))
            cqt = np.abs(librosa.cqt(y_window, sr=sr, hop_length=HOP_LENGTH))
            
            window_vector = []
            
            # 1. Spectral Centroid, Bandwidth, Rolloff, Flatness, RMS, ZCR
            f_list = [
                librosa.feature.spectral_centroid(S=stft),
                librosa.feature.spectral_bandwidth(S=stft),
                librosa.feature.spectral_rolloff(S=stft),
                librosa.feature.rms(S=stft),
                librosa.feature.zero_crossing_rate(y_window, frame_length=N_FFT, hop_length=HOP_LENGTH)
            ]
            
            for feat in f_list:
                stats_f = calculate_stats(feat)
                if stats_f is not None: window_vector.extend(stats_f)

            # 2. Spectral Contrast
            f = librosa.feature.spectral_contrast(S=stft, n_bands=N_CONTRAST - 1)
            window_vector.extend(calculate_stats(f))

            # 3. Chroma (CQT, CENS, STFT)
            f_chroma = [
                librosa.feature.chroma_cqt(C=cqt, n_chroma=N_CHROMA),
                librosa.feature.chroma_cens(C=cqt, n_chroma=N_CHROMA),
                librosa.feature.chroma_stft(S=stft**2, n_chroma=N_CHROMA)
            ]
            for feat in f_chroma:
                window_vector.extend(calculate_stats(feat))

            # 4. MFCC + Deltas
            mel = librosa.feature.melspectrogram(S=stft**2, sr=sr)
            mfcc = librosa.feature.mfcc(S=librosa.power_to_db(mel), n_mfcc=N_MFCC)
            mfcc_delta = librosa.feature.delta(mfcc)
            mfcc_delta2 = librosa.feature.delta(mfcc, order=2)
            
            window_vector.extend(calculate_stats(mfcc))
            window_vector.extend(calculate_stats(mfcc_delta))
            window_vector.extend(calculate_stats(mfcc_delta2))

            # 5. Tonnetz
            # Recalcula chroma_cqt se necessário ou reutiliza
            f = librosa.feature.tonnetz(y=librosa.effects.harmonic(y_window), sr=sr, chroma=f_chroma[0])
            window_vector.extend(calculate_stats(f))
            
            all_window_features.append(window_vector)
            
    except Exception as e:
        print(f"Erro em {file_path}: {e}")
        return None
        
    return all_window_features

def process_gtzan_file(genre, filename, dataset_path, params):
    """Worker para processamento paralelo."""
    file_path = os.path.join(dataset_path, genre, filename)
    
    # Nome da faixa (sem extensão) servirá como Group ID
    track_id = filename.split('.')[0] + "_" + genre 
    
    features = extract_features_windowed(
        file_path, 
        window_size=params['window_size'], 
        overlap=params['overlap'], 
        sr=params['sr']
    )
    
    if features is None or len(features) == 0:
        return None
        
    # Retorna: (Features, Label, GroupID)
    # Replicamos o label e o ID para cada janela extraída
    labels = [genre] * len(features)
    groups = [track_id] * len(features)
    
    return features, labels, groups

In [None]:
# --- Parâmetros e Execução ---
GTZAN_DIR = '../gtzan_dataset/genres_original' # Ajuste para o seu caminho
OUTPUT_FILE_X = '../preprocessed_features/gtzan_X_3s_50overlap.npy'
OUTPUT_FILE_y = '../preprocessed_features/gtzan_y_3s_50overlap.npy'
OUTPUT_FILE_groups = '../preprocessed_features/gtzan_groups_3s_50overlap.npy'

# Parâmetros: 3s de janela, 50% de overlap (gera mais dados que os 25% do FMA)
extraction_params = {'window_size': 3.0, 'overlap': 0.5, 'sr': 22050}

if not os.path.exists(OUTPUT_FILE_X):
    print("Iniciando extração de features do GTZAN...")
    
    # Listar arquivos
    genres = [d for d in os.listdir(GTZAN_DIR) if os.path.isdir(os.path.join(GTZAN_DIR, d))]
    tasks = []
    for genre in genres:
        genre_dir = os.path.join(GTZAN_DIR, genre)
        files = [f for f in os.listdir(genre_dir) if f.endswith('.wav')]
        for f in files:
            tasks.append((genre, f))
            
    print(f"Total de arquivos encontrados: {len(tasks)}")
    
    # Paralelismo
    n_cores = max(1, psutil.cpu_count(logical=False) - 1)
    print(f"Usando {n_cores} núcleos.")
    
    results = Parallel(n_jobs=n_cores, verbose=0)(
        delayed(process_gtzan_file)(
            g, f, GTZAN_DIR, extraction_params
        ) for g, f in tqdm(tasks, desc="Extraindo")
    )
    
    # Consolidar
    X_list, y_list, groups_list = [], [], []
    for res in results:
        if res:
            feat, lbl, grp = res
            X_list.extend(feat)
            y_list.extend(lbl)
            groups_list.extend(grp)
            
    X_np = np.array(X_list, dtype=np.float32)
    # Encoding Labels
    le = LabelEncoder()
    y_encoded = le.fit_transform(y_list).astype(np.int32)
    groups_np = np.array(groups_list) # Strings são ok para Groups
    
    # Limpeza NaNs
    X_np = np.nan_to_num(X_np, nan=0.0, posinf=0.0, neginf=0.0)
    
    # Salvar
    os.makedirs(os.path.dirname(OUTPUT_FILE_X), exist_ok=True)
    np.save(OUTPUT_FILE_X, X_np)
    np.save(OUTPUT_FILE_y, y_encoded)
    np.save(OUTPUT_FILE_groups, groups_np)
    
    # Salvar classes para referência
    np.save('../preprocessed_features/gtzan_classes.npy', le.classes_)
    
    print(f"Extração concluída. Shape X: {X_np.shape}")

else:
    print("Carregando arquivos pré-processados...")
    X_np = np.load(OUTPUT_FILE_X)
    y_encoded = np.load(OUTPUT_FILE_y)
    groups_np = np.load(OUTPUT_FILE_groups)
    class_names = np.load('../preprocessed_features/gtzan_classes.npy', allow_pickle=True)
    print(f"Dados carregados. Shape X: {X_np.shape}")

### Treino dos Modelos

In [None]:
# Definição dos Pipelines
pipe_knn = cumlPipeline([
    ('scaler', StandardScaler()),
    ('knn', cumlKNN(n_neighbors=10)) 
])

pipe_svm = cumlPipeline([
    ('scaler', StandardScaler()),
    ('svm', cumlSVC(kernel='rbf', C=0.75, gamma='scale')) 
])

# Random Forest: n_estimators mantido, max_depth reduzido levemente
pipe_rf = cumlPipeline([
    ('rf', cumlRF(n_estimators=300, 
                                  max_depth=7,
                                  min_samples_leaf=5,
                                  max_features=0.6))
])

# XGBoost: Ajustado para dataset menor
model_xgb = xgb.XGBClassifier(
    n_estimators=300,
    max_depth=3,
    learning_rate=0.03,
    gamma=5.0,             # AUMENTADO: poda agressiva de árvores
    reg_alpha=1.0,
    reg_lambda=1.0,        # AUMENTADO: regularização L2
    subsample=0.7,         # Usa apenas 70% dos dados por árvore
    colsample_bytree=0.7,  # Usa apenas 70% das features por árvore
    tree_method='hist',
    device="cuda",
    random_state=42,
    objective='multi:softmax',
    num_class=len(np.unique(y_encoded))
)

models = {
    "KNN (GPU)": pipe_knn,
    "SVM (GPU)": pipe_svm,
    "Random Forest (GPU)": pipe_rf,
    "XGBoost (GPU)": model_xgb,
    "MLP (Keras)": "keras_placeholder"
}

def build_mlp(input_dim, num_classes):
    # Regularização um pouco mais forte pois o dataset é pequeno
    l2_reg = 0.02
    dropout_rate = 0.6
    
    model = keras.Sequential([
        layers.Input(shape=(input_dim,)),
        
        # Camada 1
        layers.Dense(128, kernel_initializer='he_normal', kernel_regularizer=regularizers.l2(l2_reg)),
        layers.BatchNormalization(),
        layers.ELU(),
        layers.Dropout(dropout_rate),

        # Camada 2
        layers.Dense(64, kernel_initializer='he_normal', kernel_regularizer=regularizers.l2(l2_reg)),
        layers.BatchNormalization(),
        layers.ELU(),
        layers.Dropout(dropout_rate),
        
        # Camada 3
        layers.Dense(32, kernel_initializer='he_normal', kernel_regularizer=regularizers.l2(l2_reg)),
        layers.BatchNormalization(),
        layers.ELU(),
        
        # Saída
        layers.Dense(num_classes, activation='softmax', kernel_initializer='glorot_uniform')
    ])
    
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.0005) 
    
    model.compile(
        optimizer=optimizer,
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    
    return model

# Validação Cruzada com GroupKFold (CRUCIAL para evitar vazamento de dados entre janelas)
gkf = GroupKFold(n_splits=10) # 10 folds padrão para GTZAN
results_track_level = {}

for name, model in models.items():
    print(f"\nTreinando {name}...")
    fold_accs = []
    
    # Clonar modelo se não for Keras
    if name != "MLP (Keras)":
        current_model = clone(model)
    
    for fold, (train_idx, test_idx) in enumerate(gkf.split(X_np, y_encoded, groups=groups_np)):
        X_train, y_train = X_np[train_idx], y_encoded[train_idx]
        X_test, y_test = X_np[test_idx], y_encoded[test_idx]
        
        # Track IDs para agregação (votação)
        groups_test = groups_np[test_idx]
        
        # Treino
        if name == "MLP (Keras)":
            # Scaling manual para Keras
            scl = StandardScaler()
            X_train_s = scl.fit_transform(X_train)
            X_test_s = scl.transform(X_test)
            
            keras_m = build_mlp(X_train.shape[1], len(np.unique(y_encoded)))
            cb = [EarlyStopping(patience=10, restore_best_weights=True, monitor='val_loss')]
            keras_m.fit(X_train_s, y_train, epochs=100, batch_size=64, validation_split=0.1, callbacks=cb, verbose=0)
            
            # Predição (Probabilidades -> Classes)
            preds_window = np.argmax(keras_m.predict(X_test_s, verbose=0), axis=1)
            tf.keras.backend.clear_session()
            
        elif "GPU" in name:
            # Converter para CuPy se necessário e treinar
            # Nota: cuML pipelines aceitam numpy, mas convertem internamente
            current_model.fit(X_train, y_train)
            preds_window = current_model.predict(X_test)
            # Se retornar cupy array, converter para numpy
            if hasattr(preds_window, 'get'): preds_window = preds_window.get()
        
        # --- Votação Majoritária por Faixa (Track Level Accuracy) ---
        df_pred = pd.DataFrame({'group': groups_test, 'pred': preds_window, 'true': y_test})
        
        # Agrupar por música e pegar a moda das predições das janelas
        track_preds = df_pred.groupby('group')['pred'].apply(lambda x: stats.mode(x)[0][0])
        track_true = df_pred.groupby('group')['true'].first()
        
        acc = accuracy_score(track_true, track_preds)
        fold_accs.append(acc)
        
    results_track_level[name] = fold_accs
    print(f"Média Acurácia (Track-Level): {np.mean(fold_accs):.4f}")

# Exibir Resultados Finais
print("\n=== Resultados Finais (10-Fold Group CV) ===")
for model, accs in results_track_level.items():
    print(f"{model}: {np.mean(accs)*100:.2f}% (+/- {np.std(accs)*100:.2f}%)")