In [None]:
# Torch imports
import torch
import torchaudio
from torch import nn
from torch.utils.data import Dataset
import torch.nn.functional as F
import torchvision.models as models

# Metrics and visualization
import time
from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

# Models and feature extractor
from transformers import Wav2Vec2Processor, Wav2Vec2Model

# Others
import os
from tqdm import tqdm
from random import randint
import numpy as np
import re


## Datasets

In [None]:
class CustomSpeechCommands(Dataset):
    def __init__(self, root, files_list, download=True, target_len=16000, mode="mfcc", cnn_model=None):
        """
        mode: 'mfcc', 'mfcc_delta', 'mfcc_delta_delta', 'cnn', 'wav2vec2'
        cnn_model: modelo CNN preentrenado o personalizado para extracción
        """
        self.target_len = target_len
        self.mode = mode
        self.cnn_model = cnn_model
        self.dataset = torchaudio.datasets.SPEECHCOMMANDS(root=root, download=download)
        self.indices = None
        self.splitter(files_list, root)

    def splitter(self, files_list, root):
        with open(files_list, 'r') as f:
            self.file_paths = [line.strip() for line in f.readlines()]

        self.all_paths = []
        for item in tqdm(self.dataset._walker, desc=f"Splitting {files_list}"):
            relative_path = os.path.relpath(
                item,
                start=os.path.join(root, "SpeechCommands", "speech_commands_v0.02")
            ).replace("\\", "/")
            self.all_paths.append(relative_path)

        self.indices = [i for i, path in enumerate(self.all_paths) if path in self.file_paths]
        print(f"Archivos encontrados: {len(self.indices)} / {len(self.file_paths)}")

    def pad_waveform(self, waveform):
        length = waveform.shape[-1]
        if length < self.target_len:
            waveform = F.pad(waveform, (0, self.target_len - length))
        elif length > self.target_len:
            waveform = waveform[:, :self.target_len]
        return waveform

    def extract_feature_single(self, waveform, sample_rate, feature_extractor=None, processor=None, device="cuda"):
        """
        Extrae features de UNA muestra según el modo configurado.
        """
        waveform = self.pad_waveform(waveform).to(device)

        if feature_extractor is not None:
            feature_extractor = feature_extractor.to(device)

        # --- MFCC ---
        if self.mode == "mfcc":
            feat = feature_extractor(waveform).squeeze(0).cpu().transpose(0, 1)

        # --- MFCC + Delta ---
        elif self.mode == "mfcc_delta":
            base = feature_extractor(waveform)
            delta = torchaudio.functional.compute_deltas(base)
            feat = torch.cat([base, delta], dim=1).squeeze(0).cpu().transpose(0, 1)

        # --- MFCC + Delta + Delta-Delta ---
        elif self.mode == "mfcc_delta_delta":
            base = feature_extractor(waveform)
            delta = torchaudio.functional.compute_deltas(base)
            delta2 = torchaudio.functional.compute_deltas(delta)
            feat = torch.cat([base, delta, delta2], dim=1).squeeze(0).cpu().transpose(0, 1)

        # --- CNN ---
        elif self.mode == "cnn":
            spec_transform = torchaudio.transforms.MelSpectrogram(sample_rate=sample_rate).to(device)
            spec = spec_transform(waveform).unsqueeze(0)
            with torch.no_grad():
                embedding = self.cnn_model(spec.to(device)).cpu().squeeze()
            feat = embedding

        # --- Wav2Vec2 ---
        elif self.mode == "wav2vec2":
            waveform = waveform.squeeze(0)
            inputs = processor(
                waveform,
                sampling_rate=sample_rate,
                return_tensors="pt",
                padding=True
            ).to(device)
            with torch.no_grad():
                outputs = feature_extractor(**inputs)
            feat = outputs.last_hidden_state.squeeze(0).cpu()

        else:
            raise ValueError(f"Modo de extracción '{self.mode}' no soportado.")

        return feat

    def extract_features(self, feature_extractor=None, processor=None, device="cuda"):
        features, labels = [], []

        with torch.no_grad():
            for idx in tqdm(self.indices, desc=f"Extrayendo features ({self.mode})"):
                waveform, sample_rate, label, _, _ = self.dataset[idx]
                feat = self.extract_feature_single(
                    waveform, sample_rate, feature_extractor, processor, device
                )
                features.append(feat)
                labels.append(label)

        features = torch.stack(features)
        print(f"Features tensor: {features.shape}")

        return features, labels

    def save_features(self, feature_extractor=None, save_path=None, processor=None, device="cuda"):
        print(f"Guardando features ({self.mode}) en {save_path}")
        try:
            features, labels = self.extract_features(feature_extractor, processor, device)
            torch.save({"features": features, "labels": labels}, save_path)
            print(f"Features guardadas correctamente en {save_path}")
            print(f"Clases finales: {set(labels)}")
        except Exception as e:
            print(f"Error al guardar features en {save_path}: {e}")

    def __len__(self):
        return len(self.indices)

    def __getitem__(self, idx):
        original_idx = self.indices[idx]
        waveform, sample_rate, label, speaker_id, utterance_number = self.dataset[original_idx]
        waveform = self.pad_waveform(waveform)
        return waveform, sample_rate, label, speaker_id, utterance_number

class FeaturesDataset(Dataset):
    def __init__(self, features_path):
        """
        Carga un archivo .pt con 'features' y 'labels' previamente guardados.

        features_path: ruta al archivo .pt (por ejemplo 'data/train.pt')
        """
        data = torch.load(features_path)
        self.features = data["features"]
        self.labels = data["labels"]

        self.label_to_idx = {label: i for i, label in enumerate(sorted(set(self.labels)))}
        self.idx_to_label = {v: k for k, v in self.label_to_idx.items()}
        self.numeric_labels = torch.tensor([self.label_to_idx[l] for l in self.labels])

        print(f"Dataset cargado desde {features_path}")
        print(f" - {len(self.features)} ejemplos")
        print(f" - {len(self.label_to_idx)} clases")

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        feature = self.features[idx]
        label = self.numeric_labels[idx]
        return feature, label


## Models

In [None]:
class RNNModel(nn.Module):
    def __init__(
        self,
        rnn_type,
        n_input_channels,
        hidd_size=256,
        out_features = 35,
        num_layers=1,
    ):
        """
        Para utilizar una vanilla RNN entregue rnn_type="RNN"
        Para utilizar una LSTM entregue rnn_type="LSTM"
        Para utilizar una GRU entregue rnn_type="GRU"
        """
        super().__init__()

        self.rnn_type = rnn_type

        if rnn_type == "GRU":
            self.rnn_layer = nn.GRU(n_input_channels, hidd_size, batch_first=True, num_layers=num_layers)

        elif rnn_type == "LSTM":
            self.rnn_layer = nn.LSTM(n_input_channels, hidd_size, batch_first=True, num_layers=num_layers)

        elif rnn_type == "RNN":
            self.rnn_layer = nn.RNN(n_input_channels, hidd_size, batch_first=True, num_layers=num_layers, bidirectional=True)

        else:
            raise ValueError(f"rnn_type {rnn_type} not supported.")

        self.net = nn.Sequential(
            nn.Linear(hidd_size, out_features),
        )

        self.flatten_layer = nn.Flatten()

    def forward(self, x):
        if self.rnn_type == "GRU":
            out, h = self.rnn_layer(x)

        elif self.rnn_type == "LSTM":
            out, (h, c) = self.rnn_layer(x)

        elif self.rnn_type == "RNN":
            out, h = self.rnn_layer(x)

        out = h[-1]

        return self.net(out)

class TCNNModel(nn.Module):
    def __init__(self, n_input_channels, hidd_size=64, out_features=35):
        """
        Modelo T-CNN (Temporal Convolutional Neural Network)

        Args:
            n_input_channels (int): Canales de entrada (e.g., 13 para MFCC)
            hidd_size (int): Número base de canales en las capas convolucionales
            out_features (int): Número de clases de salida (e.g., 35)
        """
        super().__init__()

        # --- Bloques Convolucionales ---
        # nn.Conv1d espera la entrada como (Batch, Channels, SeqLen)
        
        # (B, 13, T) -> (B, 64, T/2)
        self.conv_block1 = nn.Sequential(
            nn.Conv1d(n_input_channels, hidd_size, kernel_size=5, padding=2),
            nn.BatchNorm1d(hidd_size),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2)
        )
        
        # (B, 64, T/2) -> (B, 128, T/4)
        self.conv_block2 = nn.Sequential(
            nn.Conv1d(hidd_size, hidd_size * 2, kernel_size=3, padding=1),
            nn.BatchNorm1d(hidd_size * 2),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2)
        )

        # (B, 128, T/4) -> (B, 256, T/4)
        self.conv_block3 = nn.Sequential(
            nn.Conv1d(hidd_size * 2, hidd_size * 4, kernel_size=3, padding=1),
            nn.BatchNorm1d(hidd_size * 4),
            nn.ReLU(),
        )

        # --- Pooling Global y Clasificación ---
        
        # Colapsa la dimensión de secuencia (T/4) a 1
        # (B, 256, T/4) -> (B, 256, 1)
        self.global_pool = nn.AdaptiveAvgPool1d(1)
        
        self.flatten = nn.Flatten()
        
        # (B, 256) -> (B, 35)
        self.fc = nn.Linear(hidd_size * 4, out_features)

    def forward(self, x):
        x = x.permute(0, 2, 1) 
        
        x = self.conv_block1(x)
        x = self.conv_block2(x)
        x = self.conv_block3(x)
        x = self.global_pool(x)
        x = self.flatten(x)
        
        # Clasificar
        return self.fc(x)

class CNN1DModel(nn.Module):
    def __init__(
        self,
        hidd_size=256,
        in_channels = 13,
        out_channels = 64,
    ):
        super().__init__()

        self.conv_blocks = nn.Sequential(
            nn.Conv1d(in_channels, out_channels, kernel_size = 3, padding = 'same'),
            nn.ReLU(),
            nn.MaxPool1d(2)     
        )

        self.rnn_layer = RNNModel(
            n_input_channels=out_channels,
            rnn_type="RNN",
            hidd_size=hidd_size
        )

    def forward(self, x):
        perm_x = torch.permute(x, (0, 2, 1))
        conv_out = self.conv_blocks(perm_x)
        deperm_x = torch.permute(conv_out, (0, 2, 1))
        return self.rnn_layer(deperm_x)

## Trainers

In [None]:
def train_step(x_batch, y_batch, model, optimizer, criterion, use_gpu):
    # Predicción
    y_predicted = model(x_batch)

    # Cálculo de loss
    loss = criterion(y_predicted, y_batch)

    # Actualización de parámetros
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    return y_predicted, loss


def evaluate(val_loader, model, criterion, use_gpu):
    cumulative_loss = 0
    cumulative_predictions = 0
    data_count = 0

    for x_val, y_val in val_loader:
        if use_gpu:
            x_val = x_val.cuda()
            y_val = y_val.cuda()

        y_predicted = model(x_val)

        loss = criterion(y_predicted, y_val)

        class_prediction = torch.argmax(y_predicted, axis=1).long()

        cumulative_predictions += (y_val == class_prediction).sum().item()
        cumulative_loss += loss.item() * y_val.shape[0]
        data_count += y_val.shape[0]

    val_acc = cumulative_predictions / data_count
    val_loss = cumulative_loss / data_count

    return val_acc, val_loss


def train_model(
    model,
    train_dataset,
    val_dataset,
    epochs,
    criterion,
    batch_size,
    lr,
    n_evaluations_per_epoch=6,
    use_gpu=False,
):
    if use_gpu:
        model.cuda()

    # Definición de dataloader
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2, pin_memory=use_gpu)
    val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=False, pin_memory=use_gpu)

    # Optimizador
    optimizer = torch.optim.Adam(model.parameters(), lr=lr, betas=(0.9, 0.98), eps=1e-9)

    # Listas para guardar curvas de entrenamiento
    curves = {
        "train_acc": [],
        "val_acc": [],
        "train_loss": [],
        "val_loss": [],
    }

    t0 = time.perf_counter()

    iteration = 0

    n_batches = len(train_loader)
    print(n_batches)

    for epoch in range(epochs):
        print(f"\rEpoch {epoch + 1}/{epochs}")
        cumulative_train_loss = 0
        cumulative_train_corrects = 0
        examples_count = 0

        # Entrenamiento del modelo
        model.train()
        for i, (x_batch, y_batch) in enumerate(train_loader):
            if use_gpu:
                x_batch = x_batch.cuda()
                y_batch = y_batch.cuda()

            y_predicted, loss = train_step(x_batch, y_batch, model, optimizer, criterion, use_gpu)

            cumulative_train_loss += loss.item() * x_batch.shape[0]
            examples_count += y_batch.shape[0]

            # Calculamos número de aciertos
            class_prediction = torch.argmax(y_predicted, axis=1).long()
            cumulative_train_corrects += (y_batch == class_prediction).sum().item()

            if (i % (n_batches // n_evaluations_per_epoch) == 0) and (i > 0):
                train_loss = cumulative_train_loss / examples_count
                train_acc = cumulative_train_corrects / examples_count

                print(f"Iteration {iteration} - Batch {i}/{len(train_loader)} - Train loss: {train_loss}, Train acc: {train_acc}")

            iteration += 1

        model.eval()
        with torch.no_grad():
            val_acc, val_loss = evaluate(val_loader, model, criterion, use_gpu)

        print(f"Val loss: {val_loss}, Val acc: {val_acc}")

        train_loss = cumulative_train_loss / examples_count
        train_acc = cumulative_train_corrects / examples_count

        curves["train_acc"].append(train_acc)
        curves["val_acc"].append(val_acc)
        curves["train_loss"].append(train_loss)
        curves["val_loss"].append(val_loss)

    print()
    total_time = time.perf_counter() - t0
    print(f"Tiempo total de entrenamiento: {total_time:.4f} [s]")

    model.cpu()

    return curves, total_time

def show_curves(all_curves, suptitle=''):
    final_curve_means = {k: np.mean([c[k] for c in all_curves], axis=0) for k in all_curves[0].keys()}
    final_curve_stds = {k: np.std([c[k] for c in all_curves], axis=0) for k in all_curves[0].keys()}

    fig, ax = plt.subplots(1, 2, figsize=(13, 5))
    fig.set_facecolor('white')

    epochs = np.arange(len(final_curve_means["val_loss"])) + 1

    # ==== Plot de pérdidas ====
    ax[0].plot(epochs, final_curve_means['val_loss'], label='validation')
    ax[0].plot(epochs, final_curve_means['train_loss'], label='training')
    ax[0].fill_between(epochs, 
                       y1=final_curve_means["val_loss"] - final_curve_stds["val_loss"], 
                       y2=final_curve_means["val_loss"] + final_curve_stds["val_loss"], alpha=.5)
    ax[0].fill_between(epochs, 
                       y1=final_curve_means["train_loss"] - final_curve_stds["train_loss"], 
                       y2=final_curve_means["train_loss"] + final_curve_stds["train_loss"], alpha=.5)
    ax[0].set_xlabel('Epoch')
    ax[0].set_ylabel('Loss')
    ax[0].set_title('Loss evolution during training')
    ax[0].legend()

    # ==== Plot de precisión ====
    ax[1].plot(epochs, final_curve_means['val_acc'], label='validation')
    ax[1].plot(epochs, final_curve_means['train_acc'], label='training')
    ax[1].fill_between(epochs, 
                       y1=final_curve_means["val_acc"] - final_curve_stds["val_acc"], 
                       y2=final_curve_means["val_acc"] + final_curve_stds["val_acc"], alpha=.5)
    ax[1].fill_between(epochs, 
                       y1=final_curve_means["train_acc"] - final_curve_stds["train_acc"], 
                       y2=final_curve_means["train_acc"] + final_curve_stds["train_acc"], alpha=.5)
    ax[1].set_xlabel('Epoch')
    ax[1].set_ylabel('Accuracy')
    ax[1].set_title('Accuracy evolution during training')
    ax[1].legend()

    fig.suptitle(suptitle, fontsize=16, weight="bold")

    # ==== Guardar y cerrar ====
    filepath = os.path.join('img', f'{suptitle}.pdf')
    plt.savefig(filepath, bbox_inches='tight', format='pdf')
    plt.close(fig)  

def get_metrics_and_confusion_matrix(models, dataset, name=''):
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=min(16, len(dataset)))

    # === Obtener etiquetas verdaderas ===
    y_true = []
    for _, y in dataloader:
        y_true.append(y)
    y_true = torch.cat(y_true)
    n_classes = len(torch.unique(y_true))

    # === Definir labels ===
    if hasattr(dataset, 'idx_to_label'):
        labels = [dataset.idx_to_label[i] for i in range(n_classes)]
    elif hasattr(dataset, 'labels'):
        labels = dataset.labels
    else:
        labels = [str(i) for i in range(n_classes)]

    # === Calcular matrices de confusión ===
    cms = []
    for model in models:
        model.cpu()
        model.eval()
        y_pred = []
        for x, _ in dataloader:
            y_pred.append(model(x).argmax(dim=1))
        y_pred = torch.cat(y_pred)

        cm = confusion_matrix(y_true, y_pred, labels=range(n_classes), normalize='true')
        cms.append(cm)

    cms = np.stack(cms)
    cm_mean = cms.mean(axis=0)
    cm_std = cms.std(axis=0)

    # === Accuracy promedio ===
    accs = []
    for model in models:
        y_pred = []
        for x, _ in dataloader:
            y_pred.append(model(x).argmax(dim=1))
        y_pred = torch.cat(y_pred)
        accs.append(accuracy_score(y_true, y_pred))

    acc_mean = np.mean(accs) * 100
    acc_std = np.std(accs) * 100

    # === Figura combinada ===
    os.makedirs('img', exist_ok=True)
    fig, axs = plt.subplots(1, 2, figsize=(14, 6))

    # --- Subplot 1: medias ---
    im1 = axs[0].imshow(cm_mean, interpolation='nearest', cmap=plt.cm.Blues)
    axs[0].set_title('Mean Confusion Matrix')
    axs[0].set_xlabel('Predicted label')
    axs[0].set_ylabel('True label')
    axs[0].set_xticks(np.arange(n_classes))
    axs[0].set_yticks(np.arange(n_classes))
    axs[0].set_xticklabels(labels, rotation=45, ha="right", rotation_mode="anchor")
    axs[0].set_yticklabels(labels)
    fig.colorbar(im1, ax=axs[0], fraction=0.046, pad=0.04)

    # --- Subplot 2: desviaciones estándar ---
    im2 = axs[1].imshow(cm_std, interpolation='nearest', cmap=plt.cm.Oranges)
    axs[1].set_title('Standard Deviation')
    axs[1].set_xlabel('Predicted label')
    axs[1].set_ylabel('True label')
    axs[1].set_xticks(np.arange(n_classes))
    axs[1].set_yticks(np.arange(n_classes))
    axs[1].set_xticklabels(labels, rotation=45, ha="right", rotation_mode="anchor")
    axs[1].set_yticklabels(labels)
    fig.colorbar(im2, ax=axs[1], fraction=0.046, pad=0.04)

    # --- Título general ---
    fig.suptitle(rf'{name}, mean acc = {acc_mean:.2f} ± {acc_std:.2f}%', fontsize=12)
    plt.tight_layout(rect=[0, 0, 1, 0.96])

    filepath = os.path.join('img', f'conf_mat_{name}.pdf')
    plt.savefig(filepath, bbox_inches='tight')
    plt.close(fig)

    print(f"Combined confusion matrix (mean + std) saved to {filepath}")


def evaluate_models_metrics(models, dataloader, criterion, use_gpu=True):
    """
    Evalúa múltiples modelos y calcula métricas promedio y desviación estándar.
    Retorna un diccionario con accuracy, recall, precision y f1
    """

    # Diccionarios para guardar resultados
    all_metrics = {
        "accuracy": [],
        "recall": [],
        "precision": [],
        "f1": [],
    }

    for model in models:
        model.eval()
        if use_gpu:
            model.cuda()

        y_true = []
        y_pred = []
        losses = []

        with torch.no_grad():
            for X, y in dataloader:
                if use_gpu:
                    X, y = X.cuda(non_blocking=True), y.cuda(non_blocking=True)

                outputs = model(X)
                loss = criterion(outputs, y)
                losses.append(loss.item())

                preds = outputs.argmax(dim=1)
                y_true.extend(y.cpu().numpy())
                y_pred.extend(preds.cpu().numpy())

        # Cálculo de métricas
        acc = accuracy_score(y_true, y_pred)
        rec = recall_score(y_true, y_pred, average='macro', zero_division='warn')
        prec = precision_score(y_true, y_pred, average='macro', zero_division='warn')
        f1 = f1_score(y_true, y_pred, average='macro', zero_division='warn')
        loss_mean = np.mean(losses)

        # Guardar métricas
        all_metrics["accuracy"].append(acc)
        all_metrics["recall"].append(rec)
        all_metrics["precision"].append(prec)
        all_metrics["f1"].append(f1)

        if use_gpu:
            model.cuda()
        else:
            model.cpu()

    # Calcular medias y desviaciones estándar
    metrics_mean = {k: np.mean(v) for k, v in all_metrics.items()}
    metrics_std = {k: np.std(v) for k, v in all_metrics.items()}

    # print("\n=== Resultados promedio sobre modelos ===")
    # for metric in all_metrics.keys():
    #     print(f"{metric.capitalize():<10}: {metrics_mean[metric]:.4f} +/- {metrics_std[metric]:.4f}")

    # print("\n=== Detalles por modelo ===")
    # for i, model in enumerate(models):
    #     print(f"\n=== Modelo {i + 1} ({model.rnn_type}) ===")
    #     for metric in all_metrics.keys():
    #         print(f"{metric.capitalize():<10}: {all_metrics[metric][i]:.4f} +/- {metrics_std[metric]:.4f}")

    return metrics_mean, metrics_std, all_metrics


## Visualization

In [None]:
def plot_waveform(wf, sample_rate, label="", figname=None):
    """
    Muestra el waveform (izquierda) y los MFCCs (derecha) de una señal de audio.

    Parámetros:
        wf (Tensor): señal de audio [1, N] o [N]
        sample_rate (int): frecuencia de muestreo (Hz)
        label (str): etiqueta opcional para el título
        figname (str): ruta para guardar la figura (si es None, solo muestra)
    """
    if isinstance(wf, torch.Tensor):
        wf = wf.squeeze().cpu()

    # === Transformación MFCC ===
    mfcc_transform = torchaudio.transforms.MFCC(
        sample_rate=sample_rate,
        n_mfcc=13,
        melkwargs={"n_fft": 320, "hop_length": 160, "n_mels": 23},
        log_mels=True
    )
    mfcc = mfcc_transform(wf.unsqueeze(0)).squeeze().cpu().numpy()  # [n_mfcc, time]

    # === Crear figura con 2 subplots ===
    fig, axes = plt.subplots(1, 2, figsize=(12, 4))
    sns.set_style("whitegrid")

    # --- Waveform ---
    time = torch.arange(0, len(wf)) / sample_rate
    axes[0].plot(time, wf.numpy(), color="steelblue", linewidth=1.0)
    axes[0].set_title("Waveform", fontsize=12)
    axes[0].set_xlabel("Tiempo [s]")
    axes[0].set_ylabel("Amplitud")

    # --- MFCC ---
    sns.heatmap(mfcc, ax=axes[1], cmap="viridis", cbar=True)
    axes[1].set_title("MFCCs", fontsize=12)
    axes[1].set_xlabel("Tiempo (frames)")
    axes[1].set_ylabel("Coeficiente MFCC")

    fig.suptitle(f"Audio: {label}", fontsize=14, y=1.02)
    plt.tight_layout()

    # === Guardar o mostrar ===
    if figname:
        name = os.path.join('img', f'{figname}.pdf')
        plt.savefig(name, bbox_inches="tight")
        print(f"Figura guardada en {name}")
    else:
        plt.show()

    plt.close(fig)


## Feature extraction

In [None]:
# --- Configuración base ---
ROOT_DIR = "data"
SAVE_DIR = os.path.join(ROOT_DIR, "features")
device = "cuda" if torch.cuda.is_available() else "cpu"
os.makedirs(SAVE_DIR, exist_ok=True)

# --- Parámetros comunes ---
mfcc = torchaudio.transforms.MFCC(
    sample_rate=16000,
    n_mfcc=13,
    melkwargs={"n_fft": 320, "hop_length": 160, "n_mels": 23}
)

# --- Inicializar Wav2Vec2 una sola vez ---
processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base")
wav2vec2 = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-base").to(device)

# --- Configuración de modos y extractores ---
modes = {
    "mfcc": mfcc,
    "mfcc_delta": mfcc,
    "mfcc_delta_delta": mfcc,
    "wav2vec2": wav2vec2,
}

# --- Procesar para train y val ---
for split in ["train", "val", "test"]:
    list_path = os.path.join(ROOT_DIR, f"{split}_list.txt")

    for mode, extractor in modes.items():
        save_path = os.path.join(SAVE_DIR, f"{split}_{mode}.pt")

        if os.path.isfile(save_path):
            print(f"{save_path} ya existe, saltando...")
            continue

        print(f"\nExtrayendo {mode} para {split}...")

        dataset = CustomSpeechCommands(ROOT_DIR, list_path, mode=mode)
        if mode == "wav2vec2":
            dataset.save_features(
                feature_extractor=extractor,
                processor=processor,
                device=device,
                save_path=save_path,
            )
        else:
            dataset.save_features(
                feature_extractor=extractor,
                device=device,
                save_path=save_path,
            )

print("\nExtracción de features completada.")


In [None]:
SAVE_DIR = os.path.join(ROOT_DIR, 'petes')
# --- Procesar para train y val ---
for split in ["train", "val", "test"]:
    list_path = os.path.join(ROOT_DIR, f"{split}_list.txt")
        
    for hl in [320, 160, 54, 32, 16]:
        mode = torchaudio.transforms.MFCC(
            sample_rate=16000,
            n_mfcc=13,
            log_mels = True,
            melkwargs={"n_fft": 320, "hop_length": hl, "n_mels": 23}
        )
        save_path = os.path.join(SAVE_DIR, f"{split}_{hl}_mfcc.pt")
        if os.path.isfile(save_path):
            print(f"{save_path} ya existe, saltando...")
            continue
        print(f"\nExtrayendo {mode} para {split}...")

        dataset = CustomSpeechCommands(ROOT_DIR, list_path, mode='mfcc')
        if mode == "wav2vec2":
            dataset.save_features(
                feature_extractor=extractor,
                processor=processor,
                device=device,
                save_path=save_path,
            )
        else:
            dataset.save_features(
                feature_extractor=mode,
                device=device,
                save_path=save_path,
            )

print("\nExtracción de features completada.")

## Training

In [None]:
ROOT_DIR = "data/petes"
SAVE_DIR = ROOT_DIR
device = "cuda"

lr = 5e-4
batch_size = 32
criterion = nn.CrossEntropyLoss()
n_trains = 5
epochs = 20
use_gpu = True

pattern = re.compile(r"train_(\d+)_mfcc.pt")
hop_lengths = sorted(
    [int(pattern.search(f).group(1)) for f in os.listdir(SAVE_DIR) if pattern.search(f)]
)

f1_scores = []
f1_stds = []
seq_lengths = []

for hop_length in hop_lengths:
    seq_len = 1 + 16000 // hop_length
    seq_lengths.append(seq_len)

    print(f"\n--- Hop length {hop_length} -> Secuencia {seq_len} frames ---")

    train_dataset = FeaturesDataset(os.path.join(SAVE_DIR, f"train_{hop_length}_mfcc.pt"))
    val_dataset   = FeaturesDataset(os.path.join(SAVE_DIR, f"val_{hop_length}_mfcc.pt"))
    test_dataset  = FeaturesDataset(os.path.join(SAVE_DIR, f"test_{hop_length}_mfcc.pt"))

    models = []
    curves = []

    for k in range(n_trains):
        print(f"Entrenamiento {k+1}/{n_trains}")

        # Crear modelo T-CNN
        model = TCNNModel(n_input_channels=13, hidd_size=64, out_features=35)

        curve, _ = train_model(
            model,
            train_dataset,
            val_dataset,
            epochs,
            criterion,
            batch_size,
            lr,
            n_evaluations_per_epoch=3,
            use_gpu=use_gpu,
        )

        curves.append(curve)
        models.append(model)

    show_curves(curves, suptitle=f"TCNN_seq{seq_len}")

    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    metrics_mean, metrics_std, _ = evaluate_models_metrics(models, test_loader, criterion, use_gpu=use_gpu)

    f1_scores.append(metrics_mean["f1"])
    f1_stds.append(metrics_std["f1"])

    print(f"F1 TCNN_seq{seq_len}: {metrics_mean['f1']:.3f} ± {metrics_std['f1']:.3f}")

    get_metrics_and_confusion_matrix(models, test_dataset, name=f"TCNN_seq{seq_len}")

# --- Visualización de F1 vs longitud de secuencia ---
plt.figure(figsize=(8, 5))
plt.errorbar(seq_lengths, f1_scores, yerr=f1_stds, label="TCNN", marker="o", capsize=4)
plt.xlabel("Cantidad de frames en la secuencia (MFCC)")
plt.ylabel("F1-score promedio (± std)")
plt.title("F1-score TCNN según longitud de secuencia")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig("img/f1_vs_length_tcnn.pdf", bbox_inches="tight")
plt.show()


In [None]:
print("HOLAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")
ROOT_DIR = "data"
SAVE_DIR = ROOT_DIR
device = "cuda"

lr = 5e-4
batch_size = 32
criterion = nn.CrossEntropyLoss()
n_trains = 5
epochs = 20
use_gpu = True

neurons_on_hidd_layer = [256, 128, 64, 32, 16, 8]

train_dataset = FeaturesDataset(os.path.join(SAVE_DIR, "train.pt"))
val_dataset   = FeaturesDataset(os.path.join(SAVE_DIR, "val.pt"))
test_dataset  = FeaturesDataset(os.path.join(SAVE_DIR, "test.pt"))

# Diccionarios para guardar resultados
f1_scores = {arch: [] for arch in ["GRU", "LSTM"]}
f1_stds   = {arch: [] for arch in ["GRU", "LSTM"]}

for arch in ["GRU", "LSTM"]:
    print(f"\n======= Entrenando modelos tipo {arch} =======")

    for hidd_size in neurons_on_hidd_layer:
        print(f"\n--- Modelo con hidd_size = {hidd_size} ---")

        models = []
        curves = []

        for k in range(n_trains):
            print(f"Entrenamiento {k+1}/{n_trains}")
            model = RNNModel(rnn_type=arch, n_input_channels=13, hidd_size=hidd_size)

            curve, _ = train_model(
                model,
                train_dataset,
                val_dataset,
                epochs,
                criterion,
                batch_size,
                lr,
                n_evaluations_per_epoch=3,
                use_gpu=use_gpu,
            )

            curves.append(curve)
            models.append(model)

        # Mostrar curvas promedio
        show_curves(curves, suptitle=f"{arch}_h{hidd_size}")

        # Evaluar métricas
        test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
        metrics_mean, metrics_std, _ = evaluate_models_metrics(models, test_loader, criterion, use_gpu=use_gpu)

        f1_scores[arch].append(metrics_mean["f1"])
        f1_stds[arch].append(metrics_std["f1"])

        print(f"F1 {arch}_h{hidd_size}: {metrics_mean['f1']:.3f} ± {metrics_std['f1']:.3f}")

        # Guardar matriz de confusión promedio
        get_metrics_and_confusion_matrix(models, test_dataset, name=f"{arch}_h{hidd_size}")

# --- Gráfico F1 vs número de neuronas ---
plt.figure(figsize=(8, 5))
for arch in ["GRU", "LSTM"]:
    plt.errorbar(neurons_on_hidd_layer, f1_scores[arch], yerr=f1_stds[arch], 
                 label=arch, marker="o", capsize=4)

plt.xlabel("Número de neuronas ocultas (hidd_size)")
plt.ylabel("F1-score promedio (± std)")
plt.title("F1-score según tamaño de capa oculta")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig("img/f1_vs_hidd_size.pdf", bbox_inches="tight")
plt.show()
