# Proyecto
## Clasificación de Sonidos Urbanos

**Universidad del Valle de Guatemala**\
**Facultad de Ingeniería**\
**Departamento de Ciencias de la Computación**\
**Deep Learning**

---
### Integrantes:
- Diego Leiva
- Pablo Orellana
- Maria Marta Ramirez
---

## Librerias

In [None]:
# Datos
import pandas as pd
import numpy as np

# Scikit-learn
from sklearn.metrics import classification_report
from sklearn.model_selection import LeaveOneGroupOut

# Torch
import torch
import torchaudio
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader

# Utils
import os
from tqdm import tqdm
import random
import platform

# Advertencias
import warnings
warnings.filterwarnings("ignore") # Ignorar advertencias

## Metadatos

In [None]:
AUDIO_PATH = 'data/UrbanSound8K/audio/'

# Obtencion de metadatos
metadata = pd.read_csv('data/UrbanSound8K/metadata/UrbanSound8K.csv')
files = metadata['slice_file_name'].values
labels = metadata['classID'].values
folds = metadata['fold'].values

# Creacion de directorios
paths = [
    os.path.join(AUDIO_PATH + f"fold{fold}", file) for fold, file in zip(folds, files)
]

## Configuracion de PyTorch CUDA

In [None]:
# Semilla para reproducibilidad
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)

# Configuración de determinismo
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device_info = ""

# Configuración de dispositivo
if torch.cuda.is_available():
    torch.cuda.manual_seed(42)
    torch.cuda.manual_seed_all(42)
    device_info = f'{torch.cuda.get_device_name(0)}'
else:
    device_info = f"{platform.processor()}"

print(f"Device: {device_info}")

## Dataset y Dataloader de Audio

In [None]:
class AudioDataset():
    def __init__(self, paths, labels):
        self.paths = paths
        self.labels = labels
        self.audio_length = 160000  # 4 segundos de audio

        # Definir transformaciones con parámetros ajustados
        self.mel_spec = torchaudio.transforms.MelSpectrogram(
            sample_rate=22050,
            n_fft=2048,
            win_length=1024,
            hop_length=512,
            n_mels=120,  # Reducido de 128
            f_min=0,
            f_max=11025  # sample_rate/2
        )
        
        self.mfcc = torchaudio.transforms.MFCC(
            sample_rate=22050,
            n_mfcc=48
        )

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, idx):
        file = self.paths[idx]
        waveform, _ = torchaudio.load(file, normalize=True)
        mono = torch.mean(waveform, dim=0, keepdim=True)

        temp = torch.zeros([1, self.audio_length])
        if mono.numel() < self.audio_length:
            temp[:, :mono.numel()] = mono
        else:
            temp = mono[:, :self.audio_length]

        # Asignar audio mono 
        audio_mono = temp

        # Obtener espectrograma de Mel
        mel_spectrogram = self.mel_spec(audio_mono)
        mel_spectrogram_norm = (mel_spectrogram - mel_spectrogram.mean()) / mel_spectrogram.std()

        # Obtener MFCC
        mfcc = self.mfcc(audio_mono)
        mfcc_norm = (mfcc - mfcc.mean()) / mfcc.std()

        # Ajustar el tamaño de MFCC para que coincida con el de Mel
        if mfcc_norm.size(2) != mel_spectrogram_norm.size(2):
            mfcc_norm = F.interpolate(mfcc_norm, size=mel_spectrogram_norm.size(2), mode='linear')

        # Crear el feature vector
        feature_vector = torch.cat([mel_spectrogram_norm, mfcc_norm], axis=1)

        feature_dict = {
            'feature_vector': feature_vector[0].permute(1, 0).clone().detach(),
            'label': torch.tensor(self.labels[idx], dtype=torch.long)
        }
        
        return feature_dict

In [None]:
def collate_data(data):
    # Inicializar listas de features y labels
    features = []
    labels = []

    # Iterar sobre los datos
    for element in data:
        # Extraer feature y label
        feature = element["feature_vector"].to(device)
        label = element["label"].to(device)

        # Agregar a las listas
        features.append(feature)
        labels.append(label)

    # Realizar padding de los features y convertir labels a tensor
    feature = nn.utils.rnn.pad_sequence(features, batch_first=True, padding_value=0.)
    labels = torch.stack(labels).long()  # Asegurar que labels sea Long

    return feature, labels

## Modelo LSTM

In [None]:
class AudioLSTM(nn.Module):
    def __init__(self, feature_size, out_features, hidden_layers, layers, dropout):
        super().__init__()
        self.n_hidden = hidden_layers
        self.n_layers = layers
        self.n_feature = feature_size

        # Capa LSTM
        self.lstm = nn.LSTM(
            input_size=self.n_feature,
            hidden_size=self.n_hidden,
            num_layers=self.n_layers,
            dropout=dropout,
            batch_first=True
        )

        # Capa de dropout
        self.dropout = nn.Dropout(dropout)

        # Activacion
        self.relu = nn.ReLU()

        # Capas lineales (fully connected)
        self.fc1 = nn.Linear(int(hidden_layers), int(hidden_layers/2))
        self.fc2 = nn.Linear(int(hidden_layers/2), out_features)


    # Forward pass
    def forward(self, x, hidden):
        # X shape -> (batch_size, sequence_length, n_features)
        l_out, l_hidden = self.lstm(x, hidden)

        # Out shape -> (batch_size, sequence_length, n_hidden*direcction)
        out = self.dropout(l_out)

        # out shape -> (batch_size, out_features)
        out = self.fc1(out) # Capa 1
        out = self.fc2(out[:, -1, :]) # Capa 2

        # Retornar el output y el hidden state
        return out, l_hidden
    

    # Inicializar hidden state
    def init_hidden(self, batch_size):
        # Obtener pesos de la primera capa
        weight = next(self.parameters()).data
        # Inicializar hidden state con ceros
        hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_().to(device),
                  weight.new(self.n_layers, batch_size, self.n_hidden).zero_().to(device))
        # Retornar hidden state
        return hidden

In [None]:
def save_model(state, path):
    torch.save(state, path)

### Loop de entrenamiento

In [None]:
def train_model(data_loader, model, epoch, optimizer, device):
    # Inicializar lista de perdidas, accuracy, labels y predicciones
    losses = []
    labels = []
    predictions = []

    # Establecer modelo en modo de entrenamiento
    model.train()

    # Crear barra de progreso
    loop = tqdm(data_loader)

    # Iterar sobre los datos
    for batch_idx, (data, target) in enumerate(loop):
        # Enviar datos al dispositivo
        data = data.to(device)
        target = target.to(device)

        # Limpiar gradientes
        model.zero_grad()

        # Inicializar hidden state y output
        output, _ = model(data, model.init_hidden(data.size(0)))
        
        # Calcular perdida
        loss = nn.CrossEntropyLoss()(output, target)
        loss.backward() # Backpropagation

        # Optimizar
        optimizer.step()

        # Agregar perdida
        losses.append(loss.item())
        
        # Obtener mayor valor de output
        winners = output.argmax(dim=1)
        
        # Agregar labels y predicciones
        labels += torch.flatten(target).cpu().tolist()
        predictions += torch.flatten(winners).cpu().tolist()

        # Calcular accuracy
        batch_accuracy = (winners == target).sum().float() / float(target.size(0))

        # Actualizar barra de progreso
        loop.set_description(f"TRAIN -> Epoch {epoch} | Batch: {batch_idx}/{len(data_loader)} | Loss: {loss.item():.4f} | Accuracy: {batch_accuracy:.4f}")


    # Obtener promedio de perdidas y accuracy
    total_loss = np.mean(losses)
    total_accuracy = np.mean(np.array(labels) == np.array(predictions))

    # Retornar perdidas y accuracy
    return total_loss, total_accuracy, labels, predictions

### Loop de Validacion

In [None]:
def validate(data_loader, model, device):
    # Configurar modelo en modo de evaluacion
    model.eval()
    
    # Inicializar lista de perdidas, accuracy, labels
    predictions = []
    labels = []

    # Sin actualizar gradientes
    with torch.no_grad():
        # Crear barra de progreso
        loop = tqdm(data_loader)

        # Iterar sobre los datos
        for batch_idx, (data, target) in enumerate(loop):
            # Enviar datos al dispositivo
            data = data.to(device)
            target = target.to(device)

            # Inicializar hidden state y output
            output, _ = model(data, model.init_hidden(data.size(0)))

            # Obtener mayor valor de output
            winners = output.argmax(dim=1)

            # Agregar lables y predicciones
            labels += torch.flatten(target).cpu()
            predictions += torch.flatten(winners).cpu()

            # Calcular accuracy
            batch_accuracy = (winners == target).sum().float() / float(target.size(0))

            loop.set_description(f"VALIDATE -> Batch: {batch_idx}/{len(data_loader)} | Accuracy: {batch_accuracy:.4f}")

    # Calcular accuracy total de toda la época de validación
    total_accuracy = np.mean(np.array(labels) == np.array(predictions))

    # Retornar accuracy
    return total_accuracy


## Configuracion inicial

In [None]:
# Constantes
EPOCHS = 20 # Numero de epocas
OUT_FEATURE = 10 # Numero de clases
PATIENCE = 5 # Paciencia para Early Stopping

## Creacion de Dataloaders y Datasets para 10 Fold Cross Validation

In [None]:
# Leave One Group Out Cross Validation
logo = LeaveOneGroupOut()

# Precisiones de cada fold
fold_accuracies = []

# Loop de entrenamiento por epocas y folds
for fold_idx, (train_idx, val_idx) in enumerate(logo.split(paths, labels, folds)):
    if fold_idx > 0:
        print(f"\n")
    
    print("-"*15, f" FOLD {fold_idx+1} ", "-"*15)

    # Dividir datos en train y validation sets
    train_paths = [paths[i] for i in train_idx]
    val_paths = [paths[i] for i in val_idx]
    train_labels = [labels[i] for i in train_idx]
    val_labels = [labels[i] for i in val_idx]

    # Crear datasets
    train_dataset = AudioDataset(train_paths, train_labels)
    val_dataset = AudioDataset(val_paths, val_labels)

    # Crear dataloaders
    train_loader = DataLoader(train_dataset, batch_size=128, shuffle=False, collate_fn=collate_data)
    val_loader = DataLoader(val_dataset, batch_size=128, shuffle=False, collate_fn=collate_data)

    # Entrenamiento del Modelo
    model = AudioLSTM(feature_size=168, 
                      out_features=OUT_FEATURE, 
                      hidden_layers=256, 
                      layers=2, 
                      dropout=0.3).to(device)
    
    # Crear optimizador AdamW y scheduler
    optimizer = optim.AdamW(model.parameters(), lr=1e-3)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', patience=PATIENCE)

    # Inicializar mejor accuracy y epoca
    best_accuracy = 0
    best_epoch = 0
    
    # Inicializar listas para etiquetas y predicciones acumuladas de todo el fold
    all_labels = []
    all_predictions = []

    # Loop de entrenamiento
    for epoch in range(EPOCHS):
        # Entrenar modelo y obtener perdidas y accuracy
        epoch_train_loss, epoch_train_acc, epoch_labels, epoch_predictions = train_model(train_loader, model, epoch, optimizer, device)
        
        # Acumular etiquetas y predicciones para el fold
        all_labels.extend(epoch_labels)
        all_predictions.extend(epoch_predictions)

        # Validar modelo y obtener accuracy
        epoch_val_acc = validate(val_loader, model, device)

        # Actualizar el scheduler y guardar el mejor modelo
        scheduler.step(epoch_train_acc)
        
        # Guardar el mejor modelo
        if epoch_val_acc > best_accuracy:
            best_accuracy = epoch_val_acc
            save_model({'model_state_dict': model.state_dict(), 'optimizer_state_dict': optimizer.state_dict()}, f'models/lstm_fold{fold_idx + 1}.pth')

    # Agregar accuracy del fold a la lista
    fold_accuracies.append(best_accuracy)

    # Generar reporte de clasificacion
    report = classification_report(torch.tensor(all_labels).numpy(), torch.tensor(all_predictions).numpy())
    print(f"Fold {fold_idx + 1} - Classification Report")
    print(report)

In [None]:
# Mostrar resultados finales
print("-"*15 ,f" FINAL RESULTS ", "-"*15)
for i, acc in enumerate(fold_accuracies):
    print(f"   - Fold {i+1} Accuracy: {acc:.4f}")
print("\n")
average_accuracy = np.mean(fold_accuracies)
print(f"Average Accuracy: {average_accuracy:.4f}")