# Dataset Preparation con Datos Raw y EfficientNet

# Imports

In [None]:
# Import libraries
import numpy as np 
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.utils.data import Dataset, random_split, DataLoader
from torchvision.transforms import Resize, InterpolationMode
import torchaudio
import librosa
import cv2
from pathlib import Path
from os import listdir, scandir
from tqdm.notebook import tqdm

# Importaciones adicionales para EfficientNet
from torchvision.models import efficientnet_b0, EfficientNet_B0_Weights
from torchsummary import summary
from torchmetrics import Accuracy, Precision, Recall, F1Score

# Helper functions

In [None]:
def plot_waveform(waveform, sample_rate, title=None):
    waveform = waveform.numpy()
    num_channels, num_frames = waveform.shape
    time_axis = torch.arange(0, num_frames) / sample_rate
    
    figure, axes = plt.subplots(num_channels, 1, figsize=(12, 4))
    if num_channels == 1:
        axes = [axes]
    for c in range(num_channels):
        axes[c].plot(time_axis, waveform[c], linewidth=1)
        axes[c].grid(True)
        if num_channels > 1:
            axes[c].set_ylabel(f'Channel {c+1}')
    
    figure.suptitle(title or 'Waveform')
    plt.show()

# Start preparing dataset

In [None]:
workdir_path = Path('../data')
sample_rate = 48000

data = []
with scandir(workdir_path) as entries:
    for entry in entries:
        if entry.is_dir():
            path_to_dir = workdir_path / entry.name

            for filename in listdir(path_to_dir):
                path_to_audio = path_to_dir / filename

                data.append((path_to_audio, int(filename[0])))

audio_df = pd.DataFrame(data, columns=['path_to_audio', 'class'])

audio_df

In [None]:
n_classes = audio_df['class'].max() + 1
print(f'There are {n_classes} classes in the dataset')

# Create dataset with Raw Audio

In [None]:
# Configuración para EfficientNet con datos raw
n_channels = 3  # EfficientNet requiere 3 canales
n_mels = 224    # Tamaño estándar para EfficientNet
time = 224      # Tamaño estándar para EfficientNet

# Longitud máxima de la forma de onda (en muestras)
max_length = 48000  # 1 segundo a 48kHz

# Transformación para redimensionar
resize_transform = Resize(size=(n_mels, time), interpolation=InterpolationMode.BICUBIC)

In [None]:
class RawAudioDataset(Dataset):
    def __init__(self, df):
        self.df = df
        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        audio_sample_path = self.df.iloc[idx, 0]
        class_id = self.df.iloc[idx, 1]
        
        # Cargar archivo de audio
        waveform, sample_rate = torchaudio.load(audio_sample_path)
        
        # Asegurar que la forma de onda tenga la longitud correcta
        if waveform.size(1) < max_length:
            # Padding si es más corto
            padding = torch.zeros(1, max_length - waveform.size(1))
            waveform = torch.cat([waveform, padding], dim=1)
        elif waveform.size(1) > max_length:
            # Recortar si es más largo
            waveform = waveform[:, :max_length]
        
        # Normalizar la forma de onda
        if waveform.abs().max() > 0:
            waveform = waveform / waveform.abs().max()
        
        # Redimensionar para EfficientNet (224x224)
        # Primero, reorganizar la forma de onda en una matriz 2D
        waveform_2d = waveform.view(1, -1, int(np.sqrt(max_length)), int(np.sqrt(max_length)))
        waveform_2d = waveform_2d.squeeze(0)  # Eliminar la primera dimensión
        
        # Aplicar redimensionamiento
        resized_waveform = resize_transform(waveform_2d)
        
        # Convertir a 3 canales para EfficientNet
        # Repetimos el canal para crear una imagen RGB
        resized_waveform = resized_waveform.repeat(3, 1, 1)
        
        return resized_waveform, class_id

In [None]:
# Crear dataset con datos raw
audio_dataset = RawAudioDataset(df=audio_df)

# Verificar una muestra
sample, label = audio_dataset[0]
print(audio_df.iloc[0, 0])
print(f'sample.shape: {sample.shape}, sample.class: {label}')

# Visualizar la muestra
plt.figure(figsize=(10, 4))
plt.imshow(sample[0].numpy(), aspect='auto')
plt.title('Datos Raw redimensionados')
plt.colorbar()
plt.tight_layout()
plt.show()

In [None]:
# Dividir el dataset en entrenamiento y validación
train_size = int(0.8 * len(audio_dataset))
val_size = len(audio_dataset) - train_size

train_dataset, val_dataset = random_split(audio_dataset, [train_size, val_size])

print(f'Training set size: {len(train_dataset)}')
print(f'Validation set size: {len(val_dataset)}')

# Crear dataloaders
batch_size = 32

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

# Model Definition

In [None]:
class AudioEfficientNet(nn.Module):
    def __init__(self, n_classes):
        super(AudioEfficientNet, self).__init__()
        
        # Cargar modelo pre-entrenado EfficientNet-B0
        self.efficientnet = efficientnet_b0(weights=EfficientNet_B0_Weights.DEFAULT)
        
        # Modificar la última capa para nuestro número de clases
        in_features = self.efficientnet.classifier[1].in_features
        self.efficientnet.classifier[1] = nn.Linear(in_features, n_classes)
    
    def forward(self, x):
        return self.efficientnet(x)

In [None]:
# Verificar si hay GPU disponible
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

if device.type == 'cuda':
    print(torch.cuda.get_device_name(0))

In [None]:
# Crear modelo
model = AudioEfficientNet(n_classes=int(n_classes))
model = model.to(device)

# Mostrar resumen del modelo
summary(model, (3, n_mels, time))

In [None]:
# Definir función de pérdida y optimizador
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='min', factor=0.5, patience=5
)

In [None]:
# Definir métricas
accuracy = Accuracy(task='multiclass', num_classes=int(n_classes)).to(device)
precision = Precision(task='multiclass', num_classes=int(n_classes), average='macro').to(device)
recall = Recall(task='multiclass', num_classes=int(n_classes), average='macro').to(device)
f1 = F1Score(task='multiclass', num_classes=int(n_classes), average='macro').to(device)

In [None]:
# Función para entrenar el modelo
def train_epoch(model, dataloader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    running_acc = 0.0
    
    for inputs, labels in dataloader:
        inputs, labels = inputs.to(device), labels.to(device)
        
        # Zero the parameter gradients
        optimizer.zero_grad()
        
        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        
        # Backward pass and optimize
        loss.backward()
        optimizer.step()
        
        # Statistics
        running_loss += loss.item() * inputs.size(0)
        running_acc += accuracy(outputs, labels) * inputs.size(0)
    
    epoch_loss = running_loss / len(dataloader.dataset)
    epoch_acc = running_acc / len(dataloader.dataset)
    
    return epoch_loss, epoch_acc

In [None]:
# Función para evaluar el modelo
def evaluate(model, dataloader, criterion, device):
    model.eval()
    running_loss = 0.0
    running_acc = 0.0
    running_precision = 0.0
    running_recall = 0.0
    running_f1 = 0.0
    
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            # Statistics
            running_loss += loss.item() * inputs.size(0)
            running_acc += accuracy(outputs, labels) * inputs.size(0)
            running_precision += precision(outputs, labels) * inputs.size(0)
            running_recall += recall(outputs, labels) * inputs.size(0)
            running_f1 += f1(outputs, labels) * inputs.size(0)
    
    epoch_loss = running_loss / len(dataloader.dataset)
    epoch_acc = running_acc / len(dataloader.dataset)
    epoch_precision = running_precision / len(dataloader.dataset)
    epoch_recall = running_recall / len(dataloader.dataset)
    epoch_f1 = running_f1 / len(dataloader.dataset)
    
    return epoch_loss, epoch_acc, epoch_precision, epoch_recall, epoch_f1

In [None]:
# Entrenar el modelo
num_epochs = 10
history = {
    'train_loss': [],
    'train_acc': [],
    'val_loss': [],
    'val_acc': [],
    'val_precision': [],
    'val_recall': [],
    'val_f1': []
}

for epoch in range(num_epochs):
    print(f'Epoch {epoch+1}/{num_epochs}')
    
    # Train
    train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
    
    # Validate
    val_loss, val_acc, val_precision, val_recall, val_f1 = evaluate(model, val_loader, criterion, device)
    
    # Update learning rate
    scheduler.step(val_loss)
    
    # Save history
    history['train_loss'].append(train_loss)
    history['train_acc'].append(train_acc)
    history['val_loss'].append(val_loss)
    history['val_acc'].append(val_acc)
    history['val_precision'].append(val_precision)
    history['val_recall'].append(val_recall)
    history['val_f1'].append(val_f1)
    
    # Print metrics
    print(f'Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}')
    print(f'Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}, Val Precision: {val_precision:.4f}, Val Recall: {val_recall:.4f}, Val F1: {val_f1:.4f}')
    print('-' * 60)

In [None]:
# Visualizar las métricas
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history['train_loss'], label='Train Loss')
plt.plot(history['val_loss'], label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history['train_acc'], label='Train Acc')
plt.plot(history['val_acc'], label='Val Acc')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

plt.tight_layout()
plt.show()

# Visualizar métricas adicionales
plt.figure(figsize=(15, 4))
plt.subplot(1, 3, 1)
plt.plot(history['val_precision'], label='Precision')
plt.xlabel('Epoch')
plt.ylabel('Precision')
plt.legend()

plt.subplot(1, 3, 2)
plt.plot(history['val_recall'], label='Recall')
plt.xlabel('Epoch')
plt.ylabel('Recall')
plt.legend()

plt.subplot(1, 3, 3)
plt.plot(history['val_f1'], label='F1 Score')
plt.xlabel('Epoch')
plt.ylabel('F1 Score')
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
# Guardar el modelo
torch.save(model.state_dict(), '../models/efficientnet_raw.pth')
print('Modelo guardado correctamente')