In [4]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [5]:
import torch
from torchvision import transforms

# Exploratory Data Analisis

In [6]:
# Cargar los datos
x_train = np.load('/content/x_train.npy')
x_test = np.load('/content/x_test.npy')
y_train = np.load('/content/y_train.npy')
y_c1_train = np.load('/content/y_c1_train.npy')
y_c2_train = np.load('/content/y_c2_train.npy')


# División conjunto de entrenamiento y validación

A partir del conjunto de datos `x_train`, se realiza la división para crear los conjuntos de entrenamiento y validación, respectivamente.



In [17]:
from sklearn.model_selection import train_test_split

Los parámetros `random_state` y `stratify` en la función `train_test_split` son fundamentales para asegurar una división adecuada de los datos. El parámetro `random_state` establece una semilla para el generador de números aleatorios, lo que garantiza que la división sea reproducible en ejecuciones futuras. Por otro lado, `stratify=y_train` asegura que la proporción de las clases en el conjunto de entrenamiento y el conjunto de validación sea la misma que en el conjunto original, lo que es especialmente importante en problemas de clasificación para mantener la representación adecuada de cada clase.


In [18]:
X_train, X_val, y_train, y_val, y_c1_train, y_c1_val, y_c2_train, y_c2_val = train_test_split(
    x_train, y_train, y_c1_train, y_c2_train,
    test_size=0.2,
    random_state=42,
    stratify=y_train
)

# Creación de Dataset

Se define una clase personalizada `HierarchicalDataset` que extiende la clase `Dataset`. Esta clase se encargará de convertir los datos en tensores. Posteriormente, esta implementación nos permitirá crear los `DataLoaders` necesarios para la manipulación eficiente de los datos durante el entrenamiento.


In [20]:
from torch.utils.data import DataLoader, Dataset

In [81]:
class HierarchicalDataset(Dataset):
    def __init__(self, x_data, y_data, y_c1_data, y_c2_data, transform=None):
        # Aplicar transformaciones si están definidas
        self.x_data = torch.tensor(x_data, dtype=torch.float32).permute(0, 3, 1, 2)  # [N, C, H, W]
        self.y_data = torch.tensor(y_data, dtype=torch.float32)
        self.y_c1_data = torch.tensor(y_c1_data, dtype=torch.float32)
        self.y_c2_data = torch.tensor(y_c2_data, dtype=torch.float32)
        self.transform = transform  # Asegúrate de que esta línea esté presente

    def __len__(self):
        return len(self.x_data)

    def __getitem__(self, idx):
        # Obtener la imagen y las etiquetas
        sample = self.x_data[idx]
        y = self.y_data[idx]
        y_c1 = self.y_c1_data[idx]
        y_c2 = self.y_c2_data[idx]

        # Aplicar transformaciones si existen
        if self.transform:
            sample = self.transform(sample)

        return sample, y, y_c1, y_c2

In [82]:
# Definir las transformaciones
transform = transforms.Compose([
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalización típica
])

# Crear los datasets
train_dataset = HierarchicalDataset(X_train, y_train, y_c1_train, y_c2_train, transform=transform)
val_dataset = HierarchicalDataset(X_val, y_val, y_c1_val, y_c2_val, transform=transform)


In [23]:
print('Tamaño del Dataset de entrenamiento', len(train_dataset))
print('Tamaño del Dataset de validación', len(val_dataset))

Tamaño del Dataset de entrenamiento 40000
Tamaño del Dataset de validación 10000


# Creación DataLoaders

Se utilizan los datasets creados anteriormente para generar los `DataLoaders`, que facilitan la manipulación de los datos en lotes (batches) durante el entrenamiento.

Antes de crear un `DataLoader`, es necesario definir el tamaño de los lotes (`batch_size`), que es un parámetro esencial para su creación.

El parámetro `shuffle`, cuando se establece en `True`, permite que los datos sean barajados antes de ser divididos en los lotes. Esto garantiza que el orden de las muestras cambie en cada época durante el entrenamiento, lo que ayuda a reducir el sesgo. Cabe destacar que `shuffle=True` se utiliza únicamente durante el entrenamiento y no en la fase de validación.


In [84]:
batch_size = 164
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

In [85]:
print('Tamaño DataLoader de entrenamiento:', len(train_loader))
print('Tamaño DataLoader de validación:', len(val_loader))

Tamaño DataLoader de entrenamiento: 1250
Tamaño DataLoader de validación: 313


Observamos que los tamaños de los `DataLoaders` no coinciden con los tamaños de los `Datasets` creados anteriormente. Esto se debe a la división en lotes que se realiza al crear los `DataLoaders`. Sabemos que el tamaño del conjunto de datos de entrenamiento es de 40,000 y el del conjunto de validación es de 10,000. Por lo tanto, el tamaño del `DataLoader` se calcula **dividiendo el tamaño del conjunto de datos entre el** `batch_size`.


# Transfer Learning y Fine Tuning

***Transfer learning*** es una técnica en deep learning que permite reutilizar un modelo preentrenado del estado del arte en una tarea similar. En lugar de entrenar un modelo desde cero, se aprovechan las características ya aprendidas por el modelo en conjuntos de datos grandes (como *ImageNet*, *Open Images*, *COCO*, y *Places365*), que contienen millones de imágenes. En este proceso, típicamente se reemplazan y ajustan solo las últimas capas densas para adaptarlas a la nueva tarea específica, acelerando el entrenamiento y mejorando la precisión en conjuntos de datos más pequeños.

***Fine tuning***, por otro lado, es una técnica específica dentro del transfer learning que permite ajustar o "afinar" algunas o todas las capas del modelo preentrenado. Esto implica congelar las primeras capas, que han aprendido características generales, y dejar entrenables las capas superiores para adaptar el modelo a una tarea más específica.



# Transfer learning con AlexNet

## Modelo AlexNet

In [41]:
import torch
import torch.nn as nn
from torchvision import models
from sklearn.metrics import precision_recall_fscore_support

In [86]:
alexnet_model = models.alexnet(weights=models.AlexNet_Weights.DEFAULT)

Necesitamos mantener los parámetros ya aprendidos por el modelo intactos, por lo cuál se **congelan** los gradientes mediante  `requires_grad = False` en cada parámetro del modelo.

In [87]:
# Función para congelar las capas del modelo base
def freeze_model(base_model):
    for param in base_model.parameters():
        param.requires_grad = False
    return base_model


In [88]:
alexnet_model = freeze_model(alexnet_model)

## Nuevo modelo con base AlexNet

Definimos un nuevo modelo basado en el modelo de **AlexNet**.


Necesitamos conocer la cantidad de `in_features` de la última capa de **AlexNet** para que la nueva capa densa que añadiremos se conecte correctamente en términos de dimensiones.

In [89]:
# Obtener el número de entradas de la última capa
in_features = alexnet_model.classifier[6].in_features

# Crear un nuevo clasificador
num_classes_y = 100  # Cambia esto según tu problema
num_classes_y_c1 = 8
num_classes_y_c2 = 20

# Reemplazar la última capa de clasificación y añadir nuevas capas
alexnet_model.classifier[6] = nn.Linear(in_features, num_classes_y)
alexnet_model.classifier.add_module('7', nn.Linear(in_features, num_classes_y_c1))
alexnet_model.classifier.add_module('8', nn.Linear(in_features, num_classes_y_c2))

La línea anterior accede a la segunda capa en el módulo `classifier` de EfficientNet. Esta capa es una capa `Linear` (totalmente conectada) que constituye la última capa del modelo original y define el número de entradas `in_features` y salidas `out_features` de la clasificación original en EfficientNet.

In [137]:
class CustomAlexNet(nn.Module):
    def __init__(self, num_fine_classes, num_coarse1_classes, num_coarse2_classes):
        super(CustomAlexNet, self).__init__()

        # Capas de características
        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(64, 192, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(192, 384, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(384, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )

        # Capas de clasificación
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(256 * 4 * 4, 2048),
            nn.BatchNorm1d(2048),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(2048, 1024),
            nn.BatchNorm1d(1024),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(1024, num_fine_classes + num_coarse1_classes + num_coarse2_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)

        # Dividir la salida en tres partes
        fine_output = x[:, :num_fine_classes]
        coarse1_output = x[:, num_fine_classes:num_fine_classes + num_coarse1_classes]
        coarse2_output = x[:, num_fine_classes + num_coarse1_classes:]

        return fine_output, coarse1_output, coarse2_output

In [138]:
# Inicializar el modelo con las cantidades de clases
num_fine_classes = 100
num_coarse1_classes = 8
num_coarse2_classes = 20

model_base_alexnet = CustomAlexNet(num_fine_classes, num_coarse1_classes, num_coarse2_classes)

print(model_base_alexnet)

CustomAlexNet(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(64, 192, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): ReLU(inplace=True)
    (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (6): Conv2d(192, 384, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): ReLU(inplace=True)
    (8): Conv2d(384, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (9): ReLU(inplace=True)
    (10): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (classifier): Sequential(
    (0): Flatten(start_dim=1, end_dim=-1)
    (1): Linear(in_features=4096, out_features=2048, bias=True)
    (2): BatchNorm1d(2048, eps=1e-05, momentu

# Entrenamiento

In [92]:
from sklearn.metrics import precision_recall_fscore_support

## Funciones para el entrenamiento

In [139]:
for batch in train_loader:
    inputs, labels_y, labels_y_c1, labels_y_c2 = batch
    print(inputs.shape, inputs.dtype)
    print(labels_y.shape, labels_y.dtype)
    print(labels_y_c1.shape, labels_y_c1.dtype)
    print(labels_y_c2.shape, labels_y_c2.dtype)
    break

torch.Size([32, 3, 32, 32]) torch.float32
torch.Size([32, 100]) torch.float32
torch.Size([32, 8]) torch.float32
torch.Size([32, 20]) torch.float32


In [140]:
# Función de entrenamiento
def training(model, optimizer, num_epochs, train_loader, val_loader, criterion_y, criterion_y_c1, criterion_y_c2, device):
    model.to(device)

    train_losses, val_losses = [], []
    train_accuracies_y, train_accuracies_y_c1, train_accuracies_y_c2 = [], [], []
    val_accuracies_y, val_accuracies_y_c1, val_accuracies_y_c2 = [], [], []

    for epoch in range(num_epochs):
        model.train()  # Modo de entrenamiento
        running_loss = 0.0
        correct_y, correct_y_c1, correct_y_c2 = 0, 0, 0
        total = 0

        for batch in train_loader:
            inputs, labels_y, labels_y_c1, labels_y_c2 = batch
            inputs, labels_y, labels_y_c1, labels_y_c2 = inputs.to(device), labels_y.to(device), labels_y_c1.to(device), labels_y_c2.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)

            # Calcular pérdidas
            loss_y = criterion_y(outputs[0], labels_y)
            loss_y_c1 = criterion_y_c1(outputs[1], labels_y_c1)
            loss_y_c2 = criterion_y_c2(outputs[2], labels_y_c2)
            loss = loss_y + loss_y_c1 + loss_y_c2
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

            # Obtener las predicciones
            _, predicted_y = torch.max(outputs[0].data, 1)
            _, predicted_y_c1 = torch.max(outputs[1].data, 1)
            _, predicted_y_c2 = torch.max(outputs[2].data, 1)

            # Convertir etiquetas one-hot a índices de clase
            labels_y_idx = torch.argmax(labels_y, dim=1)
            labels_y_c1_idx = torch.argmax(labels_y_c1, dim=1)
            labels_y_c2_idx = torch.argmax(labels_y_c2, dim=1)

            total += labels_y.size(0)
            correct_y += (predicted_y == labels_y_idx).sum().item()
            correct_y_c1 += (predicted_y_c1 == labels_y_c1_idx).sum().item()
            correct_y_c2 += (predicted_y_c2 == labels_y_c2_idx).sum().item()

        avg_train_loss = running_loss / len(train_loader)
        avg_train_accuracy_y = 100 * correct_y / total
        avg_train_accuracy_y_c1 = 100 * correct_y_c1 / total
        avg_train_accuracy_y_c2 = 100 * correct_y_c2 / total
        train_losses.append(avg_train_loss)
        train_accuracies_y.append(avg_train_accuracy_y)
        train_accuracies_y_c1.append(avg_train_accuracy_y_c1)
        train_accuracies_y_c2.append(avg_train_accuracy_y_c2)

        # Evaluar el modelo en el conjunto de validación
        model.eval()
        val_running_loss = 0.0
        val_correct_y, val_correct_y_c1, val_correct_y_c2 = 0, 0, 0
        val_total = 0

        with torch.no_grad():
            for val_batch in val_loader:
                val_inputs, val_labels_y, val_labels_y_c1, val_labels_y_c2 = val_batch
                val_inputs, val_labels_y, val_labels_y_c1, val_labels_y_c2 = val_inputs.to(device), val_labels_y.to(device), val_labels_y_c1.to(device), val_labels_y_c2.to(device)

                val_outputs = model(val_inputs)
                val_loss_y = criterion_y(val_outputs[0], val_labels_y)
                val_loss_y_c1 = criterion_y_c1(val_outputs[1], val_labels_y_c1)
                val_loss_y_c2 = criterion_y_c2(val_outputs[2], val_labels_y_c2)
                val_loss = val_loss_y + val_loss_y_c1 + val_loss_y_c2
                val_running_loss += val_loss.item()

                # Obtener predicciones
                _, val_predicted_y = torch.max(val_outputs[0], 1)
                _, val_predicted_y_c1 = torch.max(val_outputs[1], 1)
                _, val_predicted_y_c2 = torch.max(val_outputs[2], 1)

                val_labels_y_idx = torch.argmax(val_labels_y, dim=1)
                val_labels_y_c1_idx = torch.argmax(val_labels_y_c1, dim=1)
                val_labels_y_c2_idx = torch.argmax(val_labels_y_c2, dim=1)

                val_total += val_labels_y.size(0)
                val_correct_y += (val_predicted_y == val_labels_y_idx).sum().item()
                val_correct_y_c1 += (val_predicted_y_c1 == val_labels_y_c1_idx).sum().item()
                val_correct_y_c2 += (val_predicted_y_c2 == val_labels_y_c2_idx).sum().item()

        avg_val_loss = val_running_loss / len(val_loader)
        avg_val_accuracy_y = 100 * val_correct_y / val_total
        avg_val_accuracy_y_c1 = 100 * val_correct_y_c1 / val_total
        avg_val_accuracy_y_c2 = 100 * val_correct_y_c2 / val_total
        val_losses.append(avg_val_loss)
        val_accuracies_y.append(avg_val_accuracy_y)
        val_accuracies_y_c1.append(avg_val_accuracy_y_c1)
        val_accuracies_y_c2.append(avg_val_accuracy_y_c2)

        print(f'Epoch [{epoch+1}/{num_epochs}] Training Loss: {avg_train_loss:.4f}, '
              f'Accuracies Y/C1/C2: {avg_train_accuracy_y:.2f}%, {avg_train_accuracy_y_c1:.2f}%, {avg_train_accuracy_y_c2:.2f}% '
              f'| Validation Loss: {avg_val_loss:.4f}, '
              f'Accuracies Y/C1/C2: {avg_val_accuracy_y:.2f}%, {avg_val_accuracy_y_c1:.2f}%, {avg_val_accuracy_y_c2:.2f}%')

    return train_losses, val_losses, train_accuracies_y, train_accuracies_y_c1, train_accuracies_y_c2, val_accuracies_y, val_accuracies_y_c1, val_accuracies_y_c2


In [147]:
# Configuraciones para el entrenamiento
learning_rate = 0.00001
num_epochs = 50

In [148]:

criterion_y = nn.CrossEntropyLoss()
criterion_y_c1 = nn.CrossEntropyLoss(weight=torch.tensor([1.0]*num_classes_y_c1)).to(device)
criterion_y_c2 = nn.CrossEntropyLoss(weight=torch.tensor([1.0]*num_classes_y_c2)).to(device)

optimizer = torch.optim.Adam(model_base_alexnet.parameters(), lr=learning_rate)


In [None]:
# Entrenar el modelo
train_losses, val_losses, train_accuracies_y, train_accuracies_y_c1, train_accuracies_y_c2, val_accuracies_y, val_accuracies_y_c1, val_accuracies_y_c2 = training(
    model_base_alexnet, optimizer, num_epochs, train_loader, val_loader, criterion_y, criterion_y_c1, criterion_y_c2, device
)

Epoch [1/50] Training Loss: 4.7945, Accuracies Y/C1/C2: 35.60%, 72.19%, 55.29% | Validation Loss: 5.0501, Accuracies Y/C1/C2: 35.61%, 68.20%, 52.21%


In [None]:
test_dataset = HierarchicalDataset(x_test, y_train, y_c1_train, y_c2_train, transform=transform)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
# Cargar el modelo en modo evaluación
model_base_alexnet.eval()

import torch

all_predictions_y = []
all_predictions_c1 = []
all_predictions_c2 = []
all_labels_y = []
all_labels_c1 = []
all_labels_c2 = []

# Desactivar el cálculo de gradientes para la evaluación
with torch.no_grad():
    for images, labels_y, labels_c1, labels_c2 in test_loader:
        images = images.to(device)

        # Hacer la predicción
        outputs = model_base_alexnet(images)

        # Obtener las predicciones para las etiquetas Y, coarse_1 y coarse_2
        _, predicted_y = torch.max(outputs[0], 1)
        _, predicted_c1 = torch.max(outputs[1], 1)
        _, predicted_c2 = torch.max(outputs[2], 1)

        # Guardar las predicciones
        all_predictions_y.append(predicted_y.cpu())
        all_predictions_c1.append(predicted_c1.cpu())
        all_predictions_c2.append(predicted_c2.cpu())

        # Guardar las etiquetas verdaderas
        all_labels_y.append(labels_y.cpu())
        all_labels_c1.append(labels_c1.cpu())
        all_labels_c2.append(labels_c2.cpu())

# Concatenar todas las predicciones y etiquetas
all_predictions_y = torch.cat(all_predictions_y)
all_predictions_c1 = torch.cat(all_predictions_c1)
all_predictions_c2 = torch.cat(all_predictions_c2)

In [128]:
all_labels_y[1]


tensor([[0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 1.],
        ...,
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.]])

In [None]:
import pandas as pd

# Crear una lista con las predicciones "coarse_1 coarse_2 fine_label"
predictions_list = [
    f"{c1.item()} {c2.item()} {y.item()}"
    for c1, c2, y in zip(all_predictions_c1, all_predictions_c2, all_predictions_y)
]


submission_df = pd.DataFrame({
    "ID": range(len(predictions_list)),
    "Prediction": predictions_list
})


submission_df.to_csv("submit.csv", index=False)
