# Lista 3 - Problema 2 individual - Smile


In [14]:

import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset
from tqdm.notebook import tqdm

In [12]:
class Smile(Dataset):
    def __init__(self, data, labels):
        self.data = torch.Tensor(data).float()
        self.labels = torch.Tensor(labels).long()
        self.n_classes = len(np.unique(labels))

    def __getitem__(self, index):
        x = self.data[index]
        y = self.labels[index]

        return x, F.one_hot(y, self.n_classes).float()
    
    def __len__(self):
        return len(self.data)
    
# smile_train = torch.utils.data.DataLoader(Smile(X_train, y_train), batch_size=32)

In [15]:
# Funcion para entrenar el modelo
def train_loop(model, train, val, optimizer, patience=5, epochs=100):
    """_Bucle de entrenamiento_

    Args:
        model: red a entrenal
        train: datos de entrenamiento
        val: datos de validacion
        optimizer: optimizador de pytorch, por ejemplo torch.optim.Adam
        patience: numero de epochs sin mejora en el conjunto de validacion
        epochs: numero de epochs

    Returns:
        _type_: _description_
    """
    def epoch_loss(dataset):
        data_loss = 0.0
        for i, (data, labels) in enumerate(dataset):
            inputs = data.to('cuda')
            y = labels.to('cuda')
            outputs = model(inputs)
            loss = F.cross_entropy(outputs, y, reduction="mean")
            data_loss += loss.item()  
        return data_loss / i  
    
    def early_stopping(val_loss, patience=5):
        if len(val_loss) > patience:
            if val_loss[-1] > val_loss[-(patience+1)]:
                return True
    
    hist_loss = {'train': [], 'val': []}
    pbar = tqdm(range(epochs))
    for epoch in pbar:  # bucle para todos los epochs
        for i, (data, labels) in enumerate(train):
            # obtenemos los datos y los subimos a la GPU
            inputs = data.to('cuda')
            y = labels.to('cuda')

            # Reiniciamos los gradientes
            optimizer.zero_grad()

            # Aplicamos los datos al modelo
            outputs = model(inputs)
            # Calculamos la perdida
            loss = F.cross_entropy(outputs, y, reduction="mean")

            # Hacemos el paso hacia atras
            loss.backward()
            optimizer.step()

        # Calculamos la perdida en el conjunto de entrenamiento y validacion
        with torch.no_grad():
            hist_loss['train'].append(epoch_loss(train))
            hist_loss['val'].append(epoch_loss(val))

        # Mostramos la perdida en el conjunto de entrenamiento y validacion
        pbar.set_postfix({'train': hist_loss['train'][-1], 'val': hist_loss['val'][-1]})

        # Si la perdida en el conjunto de validacion no disminuye, paramos el entrenamiento
        if early_stopping(hist_loss['val'], patience):
            break
            
    return hist_loss 

# Para el optimizador podemos usar Adam, le pasaremos el siguiente objeto
# optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
# donde model es el modelo que queremos entrenar
# y lr es la tasa de aprendizaje, 1e-4 es un valor comun, pero podeis probar otros valores

In [16]:
# Clase para definir la arquitectura de la red convolucional
class convolutional(nn.Module):
    def __init__(
        self,
        num_classes=2,
        input_size=32,
        input_channels=3,
        kernel_size=3,
        kernels=[16, 32],
        pooling=nn.MaxPool2d,
        batch_norm=False,
        dropout=0.0,
    ):
        super(convolutional, self).__init__()
        nkernels = [input_channels] + kernels
        padding = (kernel_size-1) // 2
        self.convo = []
        for k in range(1, len(nkernels)):
            self.convo.append(
                nn.Conv2d(
                    nkernels[k - 1],
                    nkernels[k],
                    kernel_size=kernel_size,
                    stride=1,
                    padding=padding,
                )
            )
            self.convo.append(nn.ReLU())
            self.convo.append(pooling(kernel_size=2, stride=2))
            if batch_norm:
                self.convo.append(nn.BatchNorm2d(nkernels[k]))
            if dropout > 0:
                self.convo.append(nn.Dropout(dropout))
        self.convo = nn.Sequential(*self.convo)
        out_size = input_size // (2** len(kernels))
        self.fc = nn.Linear(out_size * out_size * nkernels[-1], num_classes)

    def forward(self, x):
        out = self.convo(x)
        return self.fc(out.view(out.size(0), -1))

In [17]:
def test_model(model, test):
    """_Funcion para obtener las predicciones de un modelo en un conjunto de test_

    Poner el modelo en modo evaluacion antes de llamar a esta funcion
    
    Args:
        model: _modelo entrenado_
        test: _conjunto de test_

    Returns:
        _type_: _etiquetas predichas, etiquetas reales_
    """
    preds = []
    true = []
    for i, (data, labels) in enumerate(test):
        inputs = data.to('cuda')
        outputs = model(inputs)
        preds.append(outputs.detach().cpu().numpy())
        true.append(labels.detach().cpu().numpy())
    return np.argmax(np.concatenate(preds), axis=1), np.argmax(np.concatenate(true), axis=1)