### Importación de librerías

In [1]:
import os
import torch
import torchaudio
import tarfile
import torch.nn as nn
import numpy as np
import torch.nn.functional as F
import IPython
import gc
import torch.nn.functional as F
import torchaudio.transforms as tt
import matplotlib
import matplotlib.pyplot as plt
import wandb
import random
import torch.optim as optim
import torchaudio.transforms as T

from torch import nn, optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Subset, Dataset
from typing import Dict  
from sklearn.metrics import accuracy_score, confusion_matrix
from torch.utils.data import Dataset
from torchaudio.datasets import GTZAN
from torch.utils.data import DataLoader
from torch.utils.data import random_split
from torch.optim.lr_scheduler import ReduceLROnPlateau
import collections
from torchaudio.transforms import MelSpectrogram, AmplitudeToDB
from torchaudio.sox_effects import apply_effects_tensor
from typing import Optional

%matplotlib inline

In [None]:
# Antes de los experimentos
os.environ['WANDB_REPOSITORY'] = "TP3_TD6"
os.environ['WANDB_BRANCH'] = "main"

wandb.login(key = os.environ["WANDB_API_KEY"])


In [3]:
# Definir el sample rate
samplerate = 22050

### Clases

In [4]:
# Normalización de audio
class NormalizeAudio(nn.Module):
    def __init__(self, mean=0.0, std=1.0):
        super(NormalizeAudio, self).__init__()
        self.mean = mean
        self.std = std

    def forward(self, waveform):
        return (waveform - self.mean) / self.std

# Transformación de TimeStretch en la forma de onda usando SoX
class TimeStretchWaveform(nn.Module):
    def __init__(self, rate: float = 1.2):
        super(TimeStretchWaveform, self).__init__()
        self.rate = rate

    def forward(self, waveform: torch.Tensor, sample_rate: int = samplerate) -> torch.Tensor:
        effects = [
            ['tempo', f'{self.rate}']
        ]
        # Aplicar efectos de SoX
        waveform, _ = apply_effects_tensor(waveform, sample_rate, effects)
        return waveform

# Transformación de PitchShift en la forma de onda usando SoX
class PitchShiftWaveform(nn.Module):
    def __init__(self, n_steps: int = 2):
        super(PitchShiftWaveform, self).__init__()
        self.n_steps = n_steps

    def forward(self, waveform: torch.Tensor, sample_rate: int = samplerate) -> torch.Tensor:
        effects = [
            ['pitch', f'{self.n_steps * 100}'],  # Cada step corresponde a 100 cents
            ['rate', f'{sample_rate}']  # Mantener el sample rate original
        ]
        # Aplicar efectos de SoX
        waveform, _ = apply_effects_tensor(waveform, sample_rate, effects)
        return waveform

# Opcional: Resamplear si es necesario
class ResampleWaveform(nn.Module):
    def __init__(self, orig_freq: int, new_freq: int):
        super(ResampleWaveform, self).__init__()
        self.resample = torchaudio.transforms.Resample(orig_freq=orig_freq, new_freq=new_freq)

    def forward(self, waveform: torch.Tensor) -> torch.Tensor:
        return self.resample(waveform)

class AddNoise(nn.Module):
    def __init__(self, noise_level=0.005):
        super(AddNoise, self).__init__()
        self.noise_level = noise_level

    def forward(self, spectrogram: torch.Tensor) -> torch.Tensor:
        noise = torch.randn_like(spectrogram) * self.noise_level
        return spectrogram + noise

# Dataset
class MusicDataset(Dataset):
    def __init__(self, root='./genres_5sec', transform=None):
        super().__init__()
        self.root = root
        self.files = []
        self.classes = self.get_classes(self.root)
        for c in self.classes:
            class_dir = os.path.join(root, c)
            for fname in os.listdir(class_dir):
                if fname.endswith('.wav'):
                    self.files.append((c, fname))
        # Ordenar las clases para tener índices consistentes
        self.classes = sorted(self.classes)
        self.transform = transform 
    
    def __len__(self):
        return len(self.files)
    
    def __getitem__(self, idx):
        genre, fname = self.files[idx]
        fpath = os.path.join(self.root, genre, fname)
        class_idx = self.classes.index(genre)
        waveform, sample_rate = torchaudio.load(fpath)
        
        # Convertir a mono si tiene múltiples canales
        if waveform.size(0) > 1:
            waveform = torch.mean(waveform, dim=0, keepdim=True)
        
        # Resamplear si es necesario
        if sample_rate != samplerate:
            resample_transform = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=samplerate)
            waveform = resample_transform(waveform)
            sample_rate = samplerate
        
        if self.transform:
            waveform = self.transform(waveform)
        
        # Asegurar que la forma de onda tenga una longitud fija (ejemplo: 5 segundos)
        fixed_length = samplerate * 5  # 5 segundos
        if waveform.size(1) > fixed_length:
            waveform = waveform[:, :fixed_length]
        elif waveform.size(1) < fixed_length:
            padding = fixed_length - waveform.size(1)
            waveform = torch.nn.functional.pad(waveform, (0, padding))
        
        return waveform, class_idx
        
    def get_classes(self, root):
        list_files = os.listdir(root)
        classes = []
        for file in list_files:
            name = os.path.join(root, file)
            if os.path.isdir(name):
                classes.append(file)
        return classes

# Early Stopping
class EarlyStopping:
    """
    Early stopping para detener el entrenamiento cuando la pérdida de validación deja de mejorar.
    """
    def __init__(self, patience=7, min_delta=0):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_loss = None
        self.early_stop = False
        
    def __call__(self, val_loss):
        if self.best_loss is None:
            self.best_loss = val_loss
        elif val_loss > self.best_loss - self.min_delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_loss = val_loss
            self.counter = 0

# Modelos
class MLP(nn.Module):
    def __init__(self, n_input, nodes=128, n_output=10):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(n_input, nodes)
        self.fc2 = nn.Linear(nodes, nodes)
        self.fc3 = nn.Linear(nodes, n_output)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return F.log_softmax(x, dim=1)

class MLP2(nn.Module):
    def __init__(self, n_input, nodes=128, n_output=10, hidden_layers=3):
        super(MLP2, self).__init__()

        self.dense_layers = nn.ModuleList()
        in_features = n_input
        
        for i in range(hidden_layers):
            layer = nn.Sequential(
                nn.Linear(in_features, nodes),
                nn.ReLU(),
            )
            self.dense_layers.append(layer)
            in_features = nodes
            
        # Output layer
        self.output = nn.Linear(nodes, n_output)
        
    def forward(self, x):
        x = x.view(x.size(0), -1)  # Flatten input
        
        # Pass through all dense layers
        for layer in self.dense_layers:
            x = layer(x)
            
        x = self.output(x)
        return F.log_softmax(x, dim=1)
    
class CEL(nn.Module):
    def __init__(self, n_input, nodes=128, n_output=10, hidden_layers=3, dropout_prob=0.3, dense_layers=2):
        super(CEL, self).__init__()
        
        # Convolutional layers
        self.conv_layers = nn.ModuleList()
        in_channels = 1
        out_channels = nodes
        
        for _ in range(hidden_layers):
            conv_block = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=1, padding=1),
                nn.BatchNorm2d(out_channels),
                nn.ReLU(),
                nn.MaxPool2d(kernel_size=2),
                nn.Dropout2d(dropout_prob)
            )
            self.conv_layers.append(conv_block)
            in_channels = out_channels
            out_channels = out_channels * 2
        
        # Calculate the size after convolutions
        self.flatten_size = nodes * (n_input // (2**hidden_layers)) * (n_input // (2**hidden_layers))
        
        # Dense layers
        self.dense_layers = nn.ModuleList()
        in_features = self.flatten_size
        out_features = nodes
        
        for i in range(dense_layers):
            if i == dense_layers - 1:
                out_features = n_output
            
            dense_block = nn.Sequential(
                nn.Linear(in_features, out_features),
                nn.ReLU() if i < dense_layers - 1 else nn.Identity(),
                nn.Dropout(dropout_prob) if i < dense_layers - 1 else nn.Identity()
            )
            self.dense_layers.append(dense_block)
            in_features = out_features

    def forward(self, x):
        # Apply convolutional layers
        for conv_block in self.conv_layers:
            x = conv_block(x)
        
        # Flatten
        x = x.view(x.size(0), -1)
        
        # Apply dense layers
        for dense_block in self.dense_layers:
            x = dense_block(x)
            
        return F.log_softmax(x, dim=1)
    
class CEL2(nn.Module):
    def __init__(self, n_input, nodes=128, n_output=10, hidden_layers=3, dropout_prob=0.3, dense_layers=2):
        super(CEL2, self).__init__()
        
        # Convolutional layers
        self.conv_layers = nn.ModuleList()
        in_channels = 1
        out_channels = nodes
        
        for _ in range(hidden_layers):
            conv_block = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=1, padding=1),
                nn.BatchNorm2d(out_channels),
                nn.LeakyReLU(negative_slope=0.1),
                nn.MaxPool2d(kernel_size=2),
                nn.Dropout2d(dropout_prob)
            )
            self.conv_layers.append(conv_block)
            in_channels = out_channels
            out_channels = out_channels * 2
        
        # Calculate the size after convolutions
        self.flatten_size = nodes * (n_input // (2**hidden_layers)) * (n_input // (2**hidden_layers))
        
        # Dense layers
        self.dense_layers = nn.ModuleList()
        in_features = self.flatten_size
        out_features = nodes
        
        for i in range(dense_layers):
            if i == dense_layers - 1:
                out_features = n_output
            
            dense_block = nn.Sequential(
                nn.Linear(in_features, out_features),
                nn.LeakyReLU(negative_slope=0.1) if i < dense_layers - 1 else nn.Identity(),
                nn.Dropout(dropout_prob) if i < dense_layers - 1 else nn.Identity()
            )
            self.dense_layers.append(dense_block)
            in_features = out_features

    def forward(self, x):
        # Apply convolutional layers
        for conv_block in self.conv_layers:
            x = conv_block(x)
        
        # Flatten
        x = x.view(x.size(0), -1)
        
        # Apply dense layers
        for dense_block in self.dense_layers:
            x = dense_block(x)
            
        return F.log_softmax(x, dim=1)

class CEL3(nn.Module):
    def __init__(self, n_input, nodes=128, n_output=10, hidden_layers=3, dropout_prob=0.3, dense_layers=2):
        super(CEL3, self).__init__()
        
        # Convolutional layers
        self.conv_layers = nn.ModuleList()
        in_channels = 1
        out_channels = nodes
        
        for _ in range(hidden_layers):
            conv_block = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=1, padding=1),
                nn.BatchNorm2d(out_channels),
                nn.ELU(),
                nn.MaxPool2d(kernel_size=2),
                nn.Dropout2d(dropout_prob)
            )
            self.conv_layers.append(conv_block)
            in_channels = out_channels
            out_channels = out_channels * 2
        
        # Calculate the size after convolutions
        self.flatten_size = nodes * (n_input // (2**hidden_layers)) * (n_input // (2**hidden_layers))
        
        # Dense layers
        self.dense_layers = nn.ModuleList()
        in_features = self.flatten_size
        out_features = nodes
        
        for i in range(dense_layers):
            if i == dense_layers - 1:
                out_features = n_output
            
            dense_block = nn.Sequential(
                nn.Linear(in_features, out_features),
                nn.ELU() if i < dense_layers - 1 else nn.Identity(),
                nn.Dropout(dropout_prob) if i < dense_layers - 1 else nn.Identity()
            )
            self.dense_layers.append(dense_block)
            in_features = out_features

    def forward(self, x):
        # Apply convolutional layers
        for conv_block in self.conv_layers:
            x = conv_block(x)
        
        # Flatten
        x = x.view(x.size(0), -1)
        
        # Apply dense layers
        for dense_block in self.dense_layers:
            x = dense_block(x)
            
        return F.log_softmax(x, dim=1)


### Funciones

In [5]:
# Parseo de géneros
def parse_genres(fname):
    """
    Extrae el nombre del género musical a partir del nombre del archivo.

    Args:
        fname (str): Ruta completa del archivo.

    Returns:
        str: Nombre del género extraído del archivo.
    """
    parts = fname.split('/')[-1].split('.')[0]
    return parts #' '.join(parts[0])

# Obtener clases
def classes(root = './genres_5sec'):
    """
    Lista todas las clases de géneros musicales presentes en el directorio especificado.

    Args:
        root (str, opcional): Ruta al directorio raíz que contiene las carpetas de géneros. Por defecto es './genres_5sec'.

    Returns:
        list: Lista de nombres de clases (géneros musicales).
    """
    list_files = os.listdir(root)
    classes = []
    for file in list_files:
        name = f'{root}/{file}'
        if os.path.isdir(name):
            classes.append(file)
    return classes

# Plotear waveforms
def plot_waveforms(dataset, num_samples=5, figsize=(15, 10)):
    """
    Plotea múltiples waveforms con sus etiquetas correspondientes.
    
    Args:
        dataset: Dataset que contiene los audios y etiquetas.
        num_samples (int, opcional): Número de waveforms a mostrar. Por defecto es 5.
        figsize (tuple, opcional): Tamaño de la figura (ancho, alto). Por defecto es (15, 10).
    """
    
    fig, axes = plt.subplots(num_samples, 1, figsize=figsize)
    fig.suptitle('Waveforms de diferentes géneros musicales', fontsize=16)
    
    # Asegurarse de que axes sea siempre una lista
    if num_samples == 1:
        axes = [axes]
    
    # Obtener índices aleatorios sin repetición
    indices = random.sample(range(len(dataset)), num_samples)
    
    for idx, ax in zip(indices, axes):
        waveform, label = dataset[idx]
        genre = dataset.classes[label]
        
        # Plotear la waveform
        ax.plot(waveform.t().numpy()[0], linewidth=1)
        ax.set_title(f'Género: {genre}')
        ax.set_xlabel('Tiempo (muestras)')
        ax.set_ylabel('Amplitud')
        ax.grid(True)
    
    plt.tight_layout()
    plt.show()

# Para comparar waveforms del mismo género
def plot_genre_waveforms(dataset, genre_name, num_samples=3, figsize=(15, 8)):
    """
    Plotea múltiples waveforms del mismo género.
    
    Args:
        dataset: Dataset que contiene los audios y etiquetas.
        genre_name (str): Nombre del género a visualizar.
        num_samples (int, opcional): Número de waveforms a mostrar. Por defecto es 3.
        figsize (tuple, opcional): Tamaño de la figura (ancho, alto). Por defecto es (15, 8).
    """
    
    genre_idx = dataset.classes.index(genre_name)
    genre_samples = [(i, wave, label) for i, (wave, label) in enumerate(dataset) if label == genre_idx]
    
    if len(genre_samples) < num_samples:
        num_samples = len(genre_samples)
    
    selected_samples = random.sample(genre_samples, num_samples)
    
    fig, axes = plt.subplots(num_samples, 1, figsize=figsize)
    fig.suptitle(f'Waveforms del género: {genre_name}', fontsize=16)
    
    if num_samples == 1:
        axes = [axes]
    
    for i, (idx, waveform, _) in enumerate(selected_samples):
        axes[i].plot(waveform.t().numpy()[0], linewidth=1)
        axes[i].set_title(f'Muestra {idx}')
        axes[i].set_xlabel('Tiempo (muestras)')
        axes[i].set_ylabel('Amplitud')
        axes[i].grid(True)
    
    plt.tight_layout()
    plt.show()

# Stratify split
def stratify_split(dataset: Dataset, classes: list[int], transforms: Dict[str, nn.Module] = {'train': None, 'val': None, 'test': None}) -> tuple[Subset, Subset, Subset]:
    """
    Divide el dataset en conjuntos de entrenamiento, validación y prueba de manera estratificada.

    Args:
        dataset (Dataset): Dataset completo que contiene los datos y etiquetas.
        classes (list[int]): Lista de índices de clases presentes en el dataset.
        transforms (Dict[str, nn.Module], opcional): Transformaciones a aplicar a cada subconjunto. Por defecto es {'train': None, 'val': None, 'test': None}.

    Returns:
        tuple[Subset, Subset, Subset]: Tupla que contiene los subconjuntos de entrenamiento, validación y prueba.
    """
    
    train_indices = []
    val_indices = []
    test_indices = []

    NUM_CLASSES = len(classes)
    total_samples = len(dataset)
    
    # Calcular el número de muestras para cada conjunto
    train_samples = int(total_samples * 0.8)
    val_samples = int(total_samples * 0.1)
    test_samples = total_samples - train_samples - val_samples  # Resto para test

    train_samples_per_class = train_samples // NUM_CLASSES
    val_samples_per_class = val_samples // NUM_CLASSES
    test_samples_per_class = test_samples // NUM_CLASSES

    train_target_counter = collections.Counter()
    val_target_counter = collections.Counter()
    test_target_counter = collections.Counter()

    for idx, data in enumerate(dataset):
        target = data[1]
        if train_target_counter[target] < train_samples_per_class:
            train_indices.append(idx)
            train_target_counter[target] += 1
        elif val_target_counter[target] < val_samples_per_class:
            val_target_counter[target] += 1
            val_indices.append(idx)
        elif test_target_counter[target] < test_samples_per_class:
            test_target_counter[target] += 1
            test_indices.append(idx)

    train_dataset = Subset(
        MusicDataset(root=dataset.root, transform=transforms['train']), 
        train_indices
    )
    
    val_dataset = Subset(
        MusicDataset(root=dataset.root, transform=transforms['val']), 
        val_indices
    )

    test_dataset = Subset(
        MusicDataset(root=dataset.root, transform=transforms['test']), 
        test_indices
    )

    return train_dataset, val_dataset, test_dataset

def count_parameters(model):
    """
    Cuenta el número de parámetros entrenables en un modelo.

    Args:
        model: Modelo de PyTorch cuyo número de parámetros se desea contar.

    Returns:
        int: Número total de parámetros entrenables.
    """
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

# Configuración para probar diferentes optimizadores
def get_optimizer(optimizer_name, model, lr):
    """
    Retorna el optimizador de PyTorch basado en el nombre proporcionado.

    Args:
        optimizer_name (str): Nombre del optimizador ('adam', 'adamw', 'sgd', etc.).
        model: Modelo de PyTorch cuyos parámetros serán optimizados.
        lr (float): Tasa de aprendizaje para el optimizador.

    Returns:
        torch.optim.Optimizer: Instancia del optimizador seleccionado.
    """
    if optimizer_name == 'adam':
        return torch.optim.Adam(model.parameters(), lr=lr)
    elif optimizer_name == 'adamw':
        return torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=0.01)
    elif optimizer_name == 'sgd':
        return torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9)
    else:
        return torch.optim.Adam(model.parameters(), lr=lr)
    
def get_scheduler(scheduler_name, optimizer, num_epochs, trainloader):
    """
    Retorna el scheduler de PyTorch basado en el nombre proporcionado.

    Args:
        scheduler_name (str): Nombre del scheduler ('plateau', 'cosine', 'step', 'onecycle', 'warmrestarts', etc.).
        optimizer (torch.optim.Optimizer): Optimizador para el cual se aplicará el scheduler.
        num_epochs (int): Número total de épocas de entrenamiento.
        trainloader (DataLoader): DataLoader del conjunto de entrenamiento.

    Returns:
        torch.optim.lr_scheduler._LRScheduler o None: Instancia del scheduler seleccionado o None si no se especifica.
    """
    if scheduler_name == 'plateau':
        return torch.optim.lr_scheduler.ReduceLROnPlateau(
            optimizer, mode='min', factor=0.5, patience=3, verbose=False
        )
    elif scheduler_name == 'cosine':
        return torch.optim.lr_scheduler.CosineAnnealingLR(
            optimizer, T_max=num_epochs, eta_min=1e-6
        )
    elif scheduler_name == 'step':
        return torch.optim.lr_scheduler.StepLR(
            optimizer, step_size=30, gamma=0.1
        )
    elif scheduler_name == 'onecycle':
        return torch.optim.lr_scheduler.OneCycleLR(
            optimizer,
            max_lr=0.01,
            epochs=num_epochs,
            steps_per_epoch=len(trainloader)
        )
    elif scheduler_name == 'warmrestarts':
        return torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(
            optimizer, T_0=10, T_mult=2
        )
    else:
        return None

def l1_regularization(model, lambda_l1=0.01):
    """
    Aplica regularización L1 al modelo.
    
    Args:
        model: Modelo PyTorch
        lambda_l1: Factor de regularización L1
    
    Returns:
        torch.Tensor: Término de regularización L1
    """
    l1_reg = torch.tensor(0., requires_grad=True)
    for param in model.parameters():
        l1_reg = l1_reg + torch.norm(param, 1)
    return lambda_l1 * l1_reg

def l2_regularization(model, lambda_l2=0.01):
    """
    Aplica regularización L2 al modelo.
    
    Args:
        model: Modelo PyTorch
        lambda_l2: Factor de regularización L2
    
    Returns:
        torch.Tensor: Término de regularización L2
    """
    l2_reg = torch.tensor(0., requires_grad=True)
    for param in model.parameters():
        l2_reg = l2_reg + torch.norm(param, 2)
    return lambda_l2 * l2_reg

def train_model(model_name, device, hyperparameter_configs, dataset, train_dataset, val_dataset, n_input, by, 
                scheduler_name='None', optimizer_name='adam', regularization_type=None, lambda_reg=0.01, early_stopping_patience=None):
    """
    Entrena el modelo especificado con las configuraciones de hiperparámetros proporcionadas.

    Args:
        model_name (str): Nombre del modelo a entrenar ('MLP2', 'CEL', 'CEL2', 'CEL3', 'MLP', etc.).
        num_epochs (int): Número de épocas para el entrenamiento.
        device (torch.device): Dispositivo donde se realizará el entrenamiento (CPU o GPU).
        hyperparameter_configs (list of dict): Lista de configuraciones de hiperparámetros para probar.
        dataset: Dataset completo que contiene los datos y etiquetas.
        train_dataset (Subset): Subconjunto de entrenamiento.
        val_dataset (Subset): Subconjunto de validación.
        n_input (int): Tamaño de la entrada para el modelo.
        by (tuple): Información adicional para nombrar el experimento.
        scheduler_name (str, opcional): Nombre del scheduler a utilizar. Por defecto es 'None'.
        optimizer_name (str, opcional): Nombre del optimizador a utilizar. Por defecto es 'adam'.
        regularization_type: Tipo de regularización ('l1', 'l2', None)
        lambda_reg: Factor de regularización para L1/L2
        early_stopping_patience: Número de épocas para early stopping
    Retorna:
        None
    """
    
    # Inicializar variables para seguimiento de mejores y peores experimentos
    best_valid_loss = float('inf')

    # Definir función de pérdida
    loss_function = nn.CrossEntropyLoss()

    # Inicializar early stopping si se especifica
    early_stopping = None
    if early_stopping_patience:
        early_stopping = EarlyStopping(patience=early_stopping_patience)

    for config in hyperparameter_configs:
        
        num_epochs = config['epochs']

        reg_suffix = f"_reg_{regularization_type}" if regularization_type else ""

        if model_name == None:
            return 'Especificar modelo a usar'
        
        if model_name == 'MLP2':
            experiment_name = f"{by[0]}_{model_name}_epochs_{num_epochs}_opt_{optimizer_name}_scheduler_{scheduler_name}_lr_{config['learning_rate']}_nodes_{config['nodes']}_hl_{config['hidden_layers']}{reg_suffix}" 
        
        elif model_name == 'CEL' or model_name == 'CEL2' or model_name == 'CEL3':
            experiment_name = f"{by[0]}_{model_name}_epochs_{num_epochs}_opt_{optimizer_name}_scheduler_{scheduler_name}_lr_{config['learning_rate']}_nodes_{config['nodes']}_hl_{config['hidden_layers']}_dp_{config['dropout_prob']}_dl_{config['dense_layers']}{reg_suffix}"
        
        elif model_name == 'MLP':
            experiment_name = f"{by[0]}_{model_name}_epochs_{num_epochs}_opt_{optimizer_name}_scheduler_{scheduler_name}_lr_{config['learning_rate']}_nodes_{config['nodes']}{reg_suffix}"
        
        wandb.init(
            project='Trabajo_Practico_3_Music_Genre_Classification',
            name=experiment_name,
            reinit=True,
            save_code=True,
            config={
                "architecture": "MLP2",
                "dataset": "GTZAN",
                "num_classes": len(dataset.classes),
                "classes": dataset.classes,
                "sample_rate": samplerate,
                "input_size": n_input,
                **config
            }
        )
        
        # Crear DataLoader con el batch_size actual
        trainloader = DataLoader(train_dataset, batch_size=config['batch_size'], shuffle=True)
        valloader = DataLoader(val_dataset, batch_size=config['batch_size'], shuffle=False)
        
        # Inicializar modelo y optimizador con la configuración actual
        if model_name == "MLP2":
            model = MLP2(
                n_input=n_input,
                nodes=config['nodes'],
                n_output=len(dataset.classes),
                hidden_layers=config['hidden_layers']
            ).to(device)

        if model_name == "CEL" or model_name == "CEL2" or model_name == "CEL3":
            model = CEL(
                nn_input=n_input,
                nodes=config['nodes'],
                n_output=len(dataset.classes),
                hidden_layers=config['hidden_layers'],
                dropout_prob=config.get('dropout_prob', 0.3),
                dense_layers=config.get('dense_layers', 2)
            ).to(device)

        if model_name == 'MLP':
            model = MLP(
                n_input=n_input,
                nodes=config['nodes'],
                n_output=len(dataset.classes)
            ).to(device)

        optimizer = get_optimizer(optimizer_name, model, config['learning_rate'])
        
        if len(scheduler_name) > 0: 
            scheduler = get_scheduler(scheduler_name, optimizer, num_epochs, trainloader)
        
        print(f"\nStarting experiment: {experiment_name}")
        print(model)
        print(f"Number of parameters: {count_parameters(model)}")
        
        # Bucle de entrenamiento
        for epoch in range(num_epochs):
            model.train()
            train_losses = []
            
            for wav, genre_index in trainloader:
                wav = wav.to(device)
                genre_index = genre_index.to(device)
                
                optimizer.zero_grad()
                # Reshape input: [batch_size, channels, height, width] si es necesario
                wav = wav.unsqueeze(1)  # Añadir dimensión de canal si es necesario
                out = model(wav.view(wav.size(0), -1))  # Aplanar el input
                loss = loss_function(out, genre_index)

                # Aplicar regularización si está especificada
                if regularization_type == 'l1':
                    loss += l1_regularization(model, lambda_reg)
                elif regularization_type == 'l2':
                    loss += l2_regularization(model, lambda_reg)
                
                loss.backward()
                optimizer.step()
                train_losses.append(loss.item())
            
            # Validación
            model.eval()
            valid_losses = []
            correct = 0
            total = 0
            
            with torch.no_grad():
                for wav, genre_index in valloader:
                    wav = wav.to(device)
                    genre_index = genre_index.to(device)
                    
                    wav = wav.unsqueeze(1)  # Añadir dimensión de canal si es necesario
                    out = model(wav.view(wav.size(0), -1))
                    loss = loss_function(out, genre_index)
                    valid_losses.append(loss.item())
                    
                    _, predicted = torch.max(out.data, 1)
                    total += genre_index.size(0)
                    correct += (predicted == genre_index).sum().item()
            
            train_loss = np.mean(train_losses)
            valid_loss = np.mean(valid_losses)
            accuracy = correct / total
            
            # Logging en wandb
            wandb.log({
                "train_loss": train_loss,
                "valid_loss": valid_loss,
                "accuracy": accuracy,
                "epoch": epoch
            })
            
            print(f'Epoch [{epoch+1}/{num_epochs}] - '
                f'Train Loss: {train_loss:.4f}, '
                f'Valid Loss: {valid_loss:.4f}, '
                f'Accuracy: {accuracy:.4f}')
            
            if early_stopping:
                early_stopping(valid_loss)
                if early_stopping.early_stop:
                    print(f"Early stopping triggered at epoch {epoch+1}")
                    break

            # Actualizar scheduler
            if scheduler is not None:
                scheduler.step(valid_loss)
            
            if not os.path.exists('best_models'):
                os.makedirs('best_models')

            # Guardar el mejor modelo
            if valid_loss < best_valid_loss:
                best_valid_loss = valid_loss
                best_experiment = config
                torch.save(model.state_dict(), f'best_models/{experiment_name}.pt')
                print(f'Saved best model at epoch {epoch+1}')

        # Finalizar wandb
        wandb.finish()


# TP3: Encodeador de música

In [6]:
random_seed = 43992294
batch_size = 20

torch.manual_seed(random_seed);

if torch.cuda.is_available():
    torch.cuda.manual_seed(random_seed)

In [7]:
# Transformaciones para entrenamiento de la forma de onda
waveform_train_transform = transforms.Compose([
    NormalizeAudio(mean=0.0, std=1.0),
    TimeStretchWaveform(rate=1.2),
    PitchShiftWaveform(n_steps=2),
])

# Transformaciones para validación y prueba de la forma de onda
waveform_val_test_transform = transforms.Compose([
    NormalizeAudio(mean=0.0, std=1.0),
])

# Transformaciones para entrenamiento del espectrograma
spectrogram_train_transform = transforms.Compose([
    NormalizeAudio(mean=0.0, std=1.0),
    T.FrequencyMasking(freq_mask_param=30),  
    T.TimeMasking(time_mask_param=40),          
    AddNoise(noise_level=0.005),
])

# Transformaciones para validación y prueba del espectrograma
spectrogram_val_test_transform = transforms.Compose([
    NormalizeAudio(mean=0.0, std=1.0),
])

transforms_waveform = {
    'train': waveform_train_transform,
    'val': waveform_val_test_transform,
    'test': waveform_val_test_transform
}

transforms_spectrogram = {
    'train': spectrogram_train_transform,
    'val': spectrogram_val_test_transform,
    'test': spectrogram_val_test_transform
}

In [8]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 
                      'mps' if torch.backends.mps.is_available() else 
                      'cpu')

## Visualización de los archivos

In [8]:
dataset = MusicDataset()

In [10]:
# Dividir el dataset usando dataset.classes en lugar de la función classes
train_dataset, val_dataset, test_dataset = stratify_split(
    dataset, 
    dataset.classes,
    transforms_spectrogram
)

# Crear los dataloaders
trainloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
valloader = DataLoader(val_dataset, batch_size=batch_size)
testloader = DataLoader(test_dataset, batch_size=batch_size)

In [None]:
len(trainloader), len(valloader), len(testloader)

In [None]:
len(train_dataset), len(val_dataset), len(test_dataset)

In [None]:
waveform,label= train_dataset[789]
print("shape of waveform {}, sample rate with {}, label is {} ".format(waveform.size(),samplerate,label))

waveform,label= val_dataset[60]
print("shape of waveform {}, sample rate with {}, label is {} ".format(waveform.size(),samplerate,label))

waveform,label= test_dataset[70]
print("shape of waveform {}, sample rate with {}, label is {} ".format(waveform.size(),samplerate,label))


In [None]:
specgram=tt.Spectrogram()(waveform)
print("shape of spectogram {}".format(specgram.size()))

plt.figure(figsize=(20, 5))
plt.imshow(specgram.log2()[0,:,:].numpy(),cmap='magma')

In [None]:
print("Waveform: {}\n".format(waveform))

plt.figure()
plt.plot(waveform.t().numpy())

In [None]:
IPython.display.Audio(waveform,rate=samplerate)

## Clasificación **(Prueba de modelos)**

In [9]:
dataset = MusicDataset()
train_dataset, val_dataset, test_dataset = stratify_split(
    dataset, 
    dataset.classes,
    transforms_waveform
)

In [10]:
n_input = 110250

## NNs

### MLP

In [11]:
hyperparameter_configs_MLP = [
    {
        "learning_rate": 0.001,
        "batch_size": 32,
        "nodes": 64,
        "epochs": 100
    },
    {
        "learning_rate": 0.0005,
        "batch_size": 32,
        "nodes": 128,
        "epochs": 300
    },
    {
        "learning_rate": 0.001,
        "batch_size": 16,
        "nodes": 32,
        "epochs": 300
    }
]

#### by Waveform

In [None]:
# Limpiar caché de CUDA y recolectar basura
torch.cuda.empty_cache()
del dataset, train_dataset, val_dataset, test_dataset
gc.collect()  # Importante para liberar memoria RAM

dataset = MusicDataset()
train_dataset, val_dataset, test_dataset = stratify_split(
    dataset, 
    dataset.classes,
    transforms_waveform
)

by = "waveform"
model_name = "MLP"
train_model(model_name, device, hyperparameter_configs_MLP, dataset, train_dataset, val_dataset, n_input, by)

#### by Spectogram

In [None]:
# Limpiar caché de CUDA y recolectar basura
torch.cuda.empty_cache()
del dataset, train_dataset, val_dataset, test_dataset, trainloader, valloader, testloader
gc.collect()  # Importante para liberar memoria RAM

dataset = MusicDataset()
train_dataset, val_dataset, test_dataset = stratify_split(
    dataset, 
    dataset.classes,
    transforms_waveform
)

by = "spectogram"
model_name = "MLP"
epochs = [50,100,300]
for i in range(len(epochs)):
    train_model(model_name, epochs[i], device, hyperparameter_configs_MLP, dataset, train_dataset, val_dataset, n_input, by)


### MLP2

In [23]:
# Define una lista de configuraciones de hiperparámetros sin 'dense_layers'
hyperparameter_configs_MLP2 = [
    {
        "learning_rate": 0.001,
        "batch_size": 32,
        "nodes": 64,
        "hidden_layers": 3
    },
    {
        "learning_rate": 0.0005,
        "batch_size": 32,
        "nodes": 128,
        "hidden_layers": 4
    },
    {
        "learning_rate": 0.001,
        "batch_size": 16,
        "nodes": 32,
        "hidden_layers": 2
    }
]

#### by Waveform

In [None]:
# Limpiar caché de CUDA y recolectar basura
torch.cuda.empty_cache()
del dataset, train_dataset, val_dataset, test_dataset, trainloader, valloader, testloader
gc.collect()  # Importante para liberar memoria RAM

dataset = MusicDataset()
train_dataset, val_dataset, test_dataset = stratify_split(
    dataset, 
    dataset.classes,
    transforms_waveform
)

by = "waveform"
model_name = "MLP2"
epochs = [50,100,300]
for i in range(len(epochs)):
    train_model(model_name, epochs[i], device, hyperparameter_configs_MLP2, dataset, train_dataset, val_dataset, n_input, by)

#### by Spectogram

In [None]:
# Limpiar caché de CUDA y recolectar basura
torch.cuda.empty_cache()
del dataset, train_dataset, val_dataset, test_dataset, trainloader, valloader, testloader
gc.collect()  # Importante para liberar memoria RAM

dataset = MusicDataset()
train_dataset, val_dataset, test_dataset = stratify_split(
    dataset, 
    dataset.classes,
    transforms_waveform
)

by = "spectogram"
model_name = "MLP2"
epochs = [50,100,300]
for i in range(len(epochs)):
    train_model(model_name, epochs[i], device, hyperparameter_configs_MLP2, dataset, train_dataset, val_dataset, n_input, by)


## CNNs

### CEL

In [None]:
# Define a list of hyperparameter configurations
hyperparameter_configs_CEL = [
    {
        "learning_rate": 0.001,
        "batch_size": 32,
        "nodes": 64,
        "hidden_layers": 3,
        "dropout_prob": 0.3,
        "dense_layers": 2
    },
    {
        "learning_rate": 0.0005,
        "batch_size": 32,
        "nodes": 128,
        "hidden_layers": 4,
        "dropout_prob": 0.4,
        "dense_layers": 2
    },
    {
        "learning_rate": 0.001,
        "batch_size": 16,
        "nodes": 32,
        "hidden_layers": 2,
        "dropout_prob": 0.2,
        "dense_layers": 3
    }
]

#### by Waveform

In [None]:
# Limpiar caché de CUDA y recolectar basura
torch.cuda.empty_cache()
del dataset, train_dataset, val_dataset, test_dataset, trainloader, valloader, testloader
gc.collect()  # Importante para liberar memoria RAM

dataset = MusicDataset()
train_dataset, val_dataset, test_dataset = stratify_split(
    dataset, 
    dataset.classes,
    transforms_waveform
)

by = "waveform"
model_name = "CEL"
epochs = [50,100,300]   
for i in range(len(epochs)):
    train_model(model_name, epochs[i], device, hyperparameter_configs_CEL, dataset, train_dataset, val_dataset, n_input, by)

##### by Optimizer

In [None]:
# Limpiar caché de CUDA y recolectar basura
torch.cuda.empty_cache()
del dataset, train_dataset, val_dataset, test_dataset, trainloader, valloader, testloader
gc.collect()  # Importante para liberar memoria RAM

dataset = MusicDataset()
train_dataset, val_dataset, test_dataset = stratify_split(
    dataset, 
    dataset.classes,
    transforms_waveform
)

by = "waveform"
model_name = "CEL"
num_epochs = 300  
optimizer_names = ['adam', 'sgd', 'rmsprop']
for i in range(len(optimizer_names)):
    train_model(model_name, num_epochs, device, hyperparameter_configs_CEL, dataset, train_dataset, val_dataset, n_input, by, optimizer_name=optimizer_names[i])

#### by Spectogram

In [None]:
# Limpiar caché de CUDA y recolectar basura
torch.cuda.empty_cache()
del dataset, train_dataset, val_dataset, test_dataset, trainloader, valloader, testloader
gc.collect()  # Importante para liberar memoria RAM

dataset = MusicDataset()
train_dataset, val_dataset, test_dataset = stratify_split(
    dataset, 
    dataset.classes,
    transforms_waveform
)

by = "spectogram"
model_name = "CEL"
epochs = [50,100,300]   
for i in range(len(epochs)):
    train_model(model_name, epochs[i], device, hyperparameter_configs_CEL, dataset, train_dataset, val_dataset, n_input, by)


##### by Optimizer

In [None]:
# Limpiar caché de CUDA y recolectar basura
torch.cuda.empty_cache()
del dataset, train_dataset, val_dataset, test_dataset, trainloader, valloader, testloader
gc.collect()  # Importante para liberar memoria RAM

dataset = MusicDataset()
train_dataset, val_dataset, test_dataset = stratify_split(
    dataset, 
    dataset.classes,
    transforms_spectrogram
)

by = "spectogram"
model_name = "CEL"
num_epochs = 300  
optimizer_names = ['adam', 'sgd', 'rmsprop']
for i in range(len(optimizer_names)):
    train_model(model_name, num_epochs, device, hyperparameter_configs_CEL, dataset, train_dataset, val_dataset, n_input, by, optimizer_name=optimizer_names[i])

### CEL2

#### by Waveform

In [None]:
# Limpiar caché de CUDA y recolectar basura
torch.cuda.empty_cache()
del dataset, train_dataset, val_dataset, test_dataset, trainloader, valloader, testloader
gc.collect()  # Importante para liberar memoria RAM

dataset = MusicDataset()
train_dataset, val_dataset, test_dataset = stratify_split(
    dataset, 
    dataset.classes,
    transforms_waveform
)

by = "waveform"
model_name = "CEL2"
epochs = [50,100,300]   
for i in range(len(epochs)):
    train_model(model_name, epochs[i], device, hyperparameter_configs_CEL, dataset, train_dataset, val_dataset, n_input, by)


#### by Spectogram

In [None]:
# Limpiar caché de CUDA y recolectar basura
torch.cuda.empty_cache()
del dataset, train_dataset, val_dataset, test_dataset, trainloader, valloader, testloader
gc.collect()  # Importante para liberar memoria RAM

dataset = MusicDataset()
train_dataset, val_dataset, test_dataset = stratify_split(
    dataset, 
    dataset.classes,
    transforms_spectrogram
)

by = "spectrogram"
model_name = "CEL2"
epochs = [50,100,300]   
for i in range(len(epochs)):
    train_model(model_name, epochs[i], device, hyperparameter_configs_CEL, dataset, train_dataset, val_dataset, n_input, by)


### CEL3

#### by Waveform

In [None]:
# Limpiar caché de CUDA y recolectar basura
torch.cuda.empty_cache()
del dataset, train_dataset, val_dataset, test_dataset, trainloader, valloader, testloader
gc.collect()  # Importante para liberar memoria RAM

dataset = MusicDataset()
train_dataset, val_dataset, test_dataset = stratify_split(
    dataset, 
    dataset.classes,
    transforms_waveform
)

by = "waveform"
model_name = "CEL3"
epochs = [50,100,300]   
for i in range(len(epochs)):
    train_model(model_name, epochs[i], device, hyperparameter_configs_CEL, dataset, train_dataset, val_dataset, n_input, by)


#### by Spectogram

In [None]:
# Limpiar caché de CUDA y recolectar basura
torch.cuda.empty_cache()
del dataset, train_dataset, val_dataset, test_dataset, trainloader, valloader, testloader
gc.collect()  # Importante para liberar memoria RAM

dataset = MusicDataset()
train_dataset, val_dataset, test_dataset = stratify_split(
    dataset, 
    dataset.classes,
    transforms_spectrogram
)

by = "spectrogram"
model_name = "CEL3"
epochs = [50,100,300]   
for i in range(len(epochs)):
    train_model(model_name, epochs[i], device, hyperparameter_configs_CEL, dataset, train_dataset, val_dataset, n_input, by)
