In [1]:
import torch
from torch import nn
import numpy as np


import json
import time
from pathlib import Path

from matplotlib import pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import ConfusionMatrixDisplay, accuracy_score
import seaborn as sns

# Dataset

In [None]:
if not Path("data_extended.json").is_file():
    !gdown "www.dropbox.com/ ... a_extended.json?dl=1 "

"gdown" no se reconoce como un comando interno o externo,
programa o archivo por lotes ejecutable.


In [None]:
with open("data_extended.json", "r") as f:
    data_json = json.load(f)

In [None]:
divide_into = 4

X = torch.tensor(data_json["mfcc"])
y = torch.tensor(data_json["labels"])

X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, test_size=0.1, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, stratify=y_train, test_size=0.2, random_state=42)

if divide_into is not None:
    X_train = X_train.unsqueeze(1).reshape(X_train.shape[0], divide_into, -1, 32).reshape(X_train.shape[0] * divide_into, -1, 32)
    y_train = np.repeat(y_train, divide_into)
    X_val = X_val.unsqueeze(1).reshape(X_val.shape[0], divide_into, -1, 32).reshape(X_val.shape[0] * divide_into, -1, 32)
    y_val = np.repeat(y_val, divide_into)
    X_test = X_test.unsqueeze(1).reshape(X_test.shape[0], divide_into, -1, 32).reshape(X_test.shape[0] * divide_into, -1, 32)
    y_test = np.repeat(y_test, divide_into)

train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
val_dataset = torch.utils.data.TensorDataset(X_val, y_val)
test_dataset = torch.utils.data.TensorDataset(X_test, y_test)

In [None]:
def show_mfcc(example, example_class):
    fig, ax = plt.subplots(1, 1, figsize=(10, 5))
    sns.heatmap(example.T, ax=ax)
    ax.set_xlabel("Time")
    ax.set_ylabel("Frequency")
    ax.set_title(f"Class: {data_json['mapping'][1:][example_class]}")

# Modelos

In [None]:
class RNNModel(nn.Module):
    def __init__(
        self,
        rnn_type,
        n_input_channels,
        hidd_size=256,
        num_layers=1,
    ):
        """
        Para utilizar una vanilla RNN entregue rnn_type="RNN"
        Para utilizar una LSTM entregue rnn_type="LSTM"
        Para utilizar una GRU entregue rnn_type="GRU"
        """
        super().__init__()

        self.rnn_type = rnn_type

        if rnn_type == "GRU":
            self.rnn_layer = nn.GRU(n_input_channels, hidd_size, batch_first=True, num_layers=num_layers)
        
        elif rnn_type == "LSTM":
            self.rnn_layer = nn.LSTM(n_input_channels, hidd_size, batch_first=True, num_layers=num_layers)

        elif rnn_type == "RNN":
            self.rnn_layer = nn.RNN(n_input_channels, hidd_size, batch_first=True, num_layers=num_layers)

        else:
            raise ValueError(f"rnn_type {rnn_type} not supported.")

        self.net = nn.Sequential(
            nn.Linear(hidd_size, 10),
        )

        self.flatten_layer = nn.Flatten()

    def forward(self, x):
        if self.rnn_type == "GRU":
            out, h = self.rnn_layer(x)

        elif self.rnn_type == "LSTM":
            out, (h, c) = self.rnn_layer(x)

        elif self.rnn_type == "RNN":
            out, h = self.rnn_layer(x)
            
        out = h[-1]

        return self.net(out)

In [None]:
class CNN1DModel(nn.Module):
    def __init__(
        self,
        hidd_size=256,
    ):
        super().__init__()

        self.conv_blocks = nn.Sequential(
            # COMPLETAR CÓDIGO
        )

        self.rnn_layer = RNNModel(
            n_input_channels= , # COMPLETAR CÓDIGO
            rnn_type="LSTM",
        )

    def forward(self, x):
        # COMPLETAR CÓDIGO

# Entrenamiento

In [None]:
def show_curves(all_curves):

    final_curve_means = {k: np.mean([c[k] for c in all_curves], axis=0) for k in all_curves[0].keys()}
    final_curve_stds = {k: np.std([c[k] for c in all_curves], axis=0) for k in all_curves[0].keys()}

    fig, ax = plt.subplots(1, 2, figsize=(13, 5))
    fig.set_facecolor('white')

    epochs = np.arange(len(final_curve_means["val_loss"])) + 1

    ax[0].plot(epochs, final_curve_means['val_loss'], label='validation')
    ax[0].plot(epochs, final_curve_means['train_loss'], label='training')
    ax[0].fill_between(epochs, y1=final_curve_means["val_loss"] - final_curve_stds["val_loss"], y2=final_curve_means["val_loss"] + final_curve_stds["val_loss"], alpha=.5)
    ax[0].fill_between(epochs, y1=final_curve_means["train_loss"] - final_curve_stds["train_loss"], y2=final_curve_means["train_loss"] + final_curve_stds["train_loss"], alpha=.5)
    ax[0].set_xlabel('Epoch')
    ax[0].set_ylabel('Loss')
    ax[0].set_title('Loss evolution during training')
    ax[0].legend()

    ax[1].plot(epochs, final_curve_means['val_acc'], label='validation')
    ax[1].plot(epochs, final_curve_means['train_acc'], label='training')
    ax[1].fill_between(epochs, y1=final_curve_means["val_acc"] - final_curve_stds["val_acc"], y2=final_curve_means["val_acc"] + final_curve_stds["val_acc"], alpha=.5)
    ax[1].fill_between(epochs, y1=final_curve_means["train_acc"] - final_curve_stds["train_acc"], y2=final_curve_means["train_acc"] + final_curve_stds["train_acc"], alpha=.5)
    ax[1].set_xlabel('Epoch')
    ax[1].set_ylabel('Accuracy')
    ax[1].set_title('Accuracy evolution during training')
    ax[1].legend()

    plt.show()

def get_metrics_and_confusion_matrix(model, dataset):
    model.cpu()
    model.eval()
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=min(16, len(dataset)))
    y_true = []
    y_pred = []
    for x, y in dataloader:
        y_true.append(y)
        y_pred.append(model(x).argmax(dim=1))

    y_true = torch.cat(y_true)
    y_pred = torch.cat(y_pred)

    print(f"Accuracy: {accuracy_score(y_true, y_pred) * 100:.2f}%")
    
    ConfusionMatrixDisplay.from_predictions(y_true, y_pred, display_labels=data_json["mapping"][1:], xticks_rotation="vertical")
    plt.show()

In [None]:
def train_step(x_batch, y_batch, model, optimizer, criterion, use_gpu):
    # Predicción
    y_predicted = model(x_batch)

    # Cálculo de loss
    loss = criterion(y_predicted, y_batch)

    # Actualización de parámetros
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    return y_predicted, loss


def evaluate(val_loader, model, criterion, use_gpu):
    cumulative_loss = 0
    cumulative_predictions = 0
    data_count = 0

    for x_val, y_val in val_loader:
        if use_gpu:
            x_val = x_val.cuda()
            y_val = y_val.cuda()

        y_predicted = model(x_val)
        
        loss = criterion(y_predicted, y_val)

        class_prediction = torch.argmax(y_predicted, axis=1).long()

        cumulative_predictions += (y_val == class_prediction).sum().item()
        cumulative_loss += loss.item() * y_val.shape[0]
        data_count += y_val.shape[0]

    val_acc = cumulative_predictions / data_count
    val_loss = cumulative_loss / data_count

    return val_acc, val_loss


def train_model(
    model,
    train_dataset,
    val_dataset,
    epochs,
    criterion,
    batch_size,
    lr,
    n_evaluations_per_epoch=6,
    use_gpu=False,
):
    if use_gpu:
        model.cuda()

    # Definición de dataloader
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2, pin_memory=use_gpu)
    val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=False, pin_memory=use_gpu)

    # Optimizador
    optimizer = torch.optim.Adam(model.parameters(), lr=lr, betas=(0.9, 0.98), eps=1e-9)

    # Listas para guardar curvas de entrenamiento
    curves = {
        "train_acc": [],
        "val_acc": [],
        "train_loss": [],
        "val_loss": [],
    }

    t0 = time.perf_counter()

    iteration = 0

    n_batches = len(train_loader)
    print(n_batches)

    for epoch in range(epochs):
        print(f"\rEpoch {epoch + 1}/{epochs}")
        cumulative_train_loss = 0
        cumulative_train_corrects = 0
        examples_count = 0

        # Entrenamiento del modelo
        model.train()
        for i, (x_batch, y_batch) in enumerate(train_loader):
            if use_gpu:
                x_batch = x_batch.cuda()
                y_batch = y_batch.cuda()

            y_predicted, loss = train_step(x_batch, y_batch, model, optimizer, criterion, use_gpu)

            cumulative_train_loss += loss.item() * x_batch.shape[0]
            examples_count += y_batch.shape[0]

            # Calculamos número de aciertos
            class_prediction = torch.argmax(y_predicted, axis=1).long()
            cumulative_train_corrects += (y_batch == class_prediction).sum().item()

            if (i % (n_batches // n_evaluations_per_epoch) == 0) and (i > 0):
                train_loss = cumulative_train_loss / examples_count
                train_acc = cumulative_train_corrects / examples_count

                print(f"Iteration {iteration} - Batch {i}/{len(train_loader)} - Train loss: {train_loss}, Train acc: {train_acc}")

            iteration += 1

        model.eval()
        with torch.no_grad():
            val_acc, val_loss = evaluate(val_loader, model, criterion, use_gpu)

        print(f"Val loss: {val_loss}, Val acc: {val_acc}")

        train_loss = cumulative_train_loss / examples_count
        train_acc = cumulative_train_corrects / examples_count

        curves["train_acc"].append(train_acc)
        curves["val_acc"].append(val_acc)
        curves["train_loss"].append(train_loss)
        curves["val_loss"].append(val_loss)

    print()
    total_time = time.perf_counter() - t0
    print(f"Tiempo total de entrenamiento: {total_time:.4f} [s]")

    model.cpu()

    return curves, total_time

In [None]:
lr = 5e-4
batch_size = 32
criterion = nn.CrossEntropyLoss()
n_trains = 5

epochs = 40

all_curves = []
times = []

for train_run in range(n_trains):
    model = # INSERTE CÓDIGO

    curves, total_time = train_model(
        model,
        train_dataset,
        val_dataset,
        epochs,
        criterion,
        batch_size,
        lr,
        use_gpu=True,
    )

    all_curves.append(curves)
    times.append(total_time)

print(f"Tiempo de entrenamiento promedio de {n_trains} corridas: {np.mean(times):.2f} +- {np.std(times):.2f} [s]")

show_curves(all_curves)