# Modelo ConvNeXt

En este notebook explicaremos el proceso para construir una red neuronal convolucional *ConvNeXt* a partir de una red neuronal *ResNet*. Nos basaremos en el artículo [A ConvNet for the 2020s](https://arxiv.org/abs/2201.03545), donde fueron introducidas las redes *ConvNeXt*.

El proceso consiste en siete pasos:

1. **Crear un modelo base de ResNet basado en ResNet-50**
2. **Cambiar la proporción de los stages del modelo ResNet**
3. **Cambiar el bloque *Stem* por un bloque *Patchify***
4. **Añadir *depthwise convolution* (*ResNeXt-ify*)**
5. **Cambiar el bloque *Bottleneck* por *Inverted-Bottleneck***
6. **Aumentar el tamaño de los kernels**
7. **Cambios en el *microdiseño***


Para medir los cambios en la exactitud (*accuracy*) del modelo en cada uno de los siete pasos, entrenaremos al correspondiente modelo por 100 épocas utilizando los mismos parámetros de entrenamiento. Repetiremos tres veces cada experimento y reportaremos el promedio de los resultados.

### Preparación de los datos

Entrenaremos al modelo en el conjunto de datos CIFAR-10. Este dataset consiste de 60000 imágenes a color en 10 clases distintas, donde no hay intersección entre las distintas clases. Se puede acceder al dataset mediante las herramientas de la paquetería de pytorch, o también en la página oficial: https://www.cs.toronto.edu/~kriz/cifar.html.

In [1]:
# Importamos las paqueterías necesarias para el notebook
import time
import gc
import copy
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision.ops import StochasticDepth
from torchvision import datasets
from torchvision import transforms
from torch.utils.data.sampler import SubsetRandomSampler

# De ser posible utilizaremos GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [15]:
def data_loader(data_dir,
                batch_size,
                random_seed=42,
                valid_size=0.1,
                shuffle=False,
                test=False):
    """
    Función para cargar los datos de CIFAR-10
    """
    
    # Definimos el transform para normalizar los datos con pytorch
    # Los valores fueron obtenidos en el notebook "data_extraction.ipynb"
    normalize = transforms.Normalize(  
        mean=[0.4914, 0.4822, 0.4465],
        std=[0.2023, 0.1994, 0.2010],
    )

    # Definimos el transform para preporcesar los datos
    transform_train = transforms.Compose([
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        normalize
    ])
    transform_test = transforms.Compose([
        transforms.ToTensor(),
        normalize
    ])
    
    # Obtener los datos del conjunto de prueba
    if test:
        dataset = datasets.CIFAR10(
          root=data_dir, train=False,
          download=True, transform=transform_test,
        )

        data_loader = torch.utils.data.DataLoader(
            dataset, batch_size=batch_size, shuffle=shuffle
        )

        return data_loader

    # Cargamos una copia de los datos de entrenamiento
    train_dataset = datasets.CIFAR10(
        root=data_dir, train=True,
        download=True, transform=transform_train,
    )
    
    # Cargamos una copia extra de los datos de entrenamiento para dividirlo después en el conjunto de validación
    valid_dataset = datasets.CIFAR10(
        root=data_dir, train=True,
        download=True, transform=transform_train,
    )
    
    # Separamos los datos de entrenamiento y validación mediante índices
    num_train = len(train_dataset)
    indices = list(range(num_train))
    split = int(np.floor(valid_size * num_train))

    if shuffle:
        np.random.seed(42)
        np.random.shuffle(indices)

    train_idx, valid_idx = indices[split:], indices[:split]
    train_sampler = SubsetRandomSampler(train_idx)
    valid_sampler = SubsetRandomSampler(valid_idx)
    
    # Finalmente, definimos los conjuntos de entrenamiento y validación
    train_loader = torch.utils.data.DataLoader(
        train_dataset, batch_size=batch_size, sampler=train_sampler)

    valid_loader = torch.utils.data.DataLoader(
        valid_dataset, batch_size=batch_size, sampler=valid_sampler)

    return (train_loader, valid_loader)


# Aplicamos la función para cargar los datos de CIFAR-10, los guardamos en el directorio actual
train_loader, valid_loader = data_loader(data_dir='./data',
                                         batch_size=64)

test_loader = data_loader(data_dir='./data',
                              batch_size=64,
                              test=True)    
cifar10_classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified


### Función de entrenamiento

Con la siguiente función obtendremos los datos del desempeño de cada modelo a lo largo del entrenamiento. Los parámetros de entrenamiento son los mismo que los utilizados en el artículo donde fueorn introducidas las redes [*ConvNeXt*](https://arxiv.org/abs/2201.03545), con la excepción del número de épocas, el cual no demostró mejorar el desempeño de la red al ser aumentado.

In [3]:
def entrenamiento(model, epocas):
    
    model = model.to(device)
    
    # variables para guardar los resultados
    accuracy_training_epochs = []
    accuracy_validation_epochs = []
    loss_epoch = []
    test_accuracy = []
    best_model = None
    
    # parámetros de entrenamiento
    num_epochs = epocas

    optimizer = optim.AdamW(model.parameters(),
                            lr=0.004,
                            betas=(0.9, 0.999),
                            weight_decay=0.05
                            )
    
    criterion = nn.CrossEntropyLoss()

    lr_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epocas)

    
    # entrenamiento
    print("Comenzando entrenamiento")
    for epoch in range(num_epochs):
        start_time = time.time()
        for i, (images, labels) in enumerate(train_loader):

            # Mover a los tensores a GPU de ser posible
            images = images.to(device)
            labels = labels.to(device)

            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # Ahorro de memoria
            del images, labels, outputs
            torch.cuda.empty_cache()
            gc.collect()

        loss_epoch.append(loss.item()) # Guardar la información del loss de esta época
        lr_scheduler.step() # Implementación de learning rate decay

        # Medición de la exactitud en el conjunto de validación
        with torch.no_grad():
            correct = 0
            total = 0
            for images, labels in valid_loader:
                images = images.to(device)
                labels = labels.to(device)
                outputs = model(images)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
                del images, labels, outputs
            val_accuracy = correct/total
            accuracy_validation_epochs.append(val_accuracy)

        # Medición de la exactitud sobre todo el conjunto de entrenamiento
        with torch.no_grad():
            correct = 0
            total = 0
            for images, labels in train_loader:
                images = images.to(device)
                labels = labels.to(device)
                outputs = model(images)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
                del images, labels, outputs
            train_accuracy = correct/total
            accuracy_training_epochs.append(train_accuracy)

        # Medición de la exactitud en el conjunto de prueba
        with torch.no_grad():
            correct = 0
            total = 0
            for images, labels in test_loader:
                images = images.to(device)
                labels = labels.to(device)
                outputs = model(images)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
                del images, labels, outputs
            t_acc = correct/total
            test_accuracy.append(t_acc)
            
            # Guardar el modelo en caso de que su accuracy en el conjunto de prueba sea mayor que los anteriores
            if t_acc >= max(test_accuracy):
                best_model = copy.deepcopy(model)


        # Imprimir la pérdida, la exactitud en la validación y la exactitud en los datos de entrenamiento, de esta época.
        print(f"Epoch [{epoch+1}/{num_epochs}], Training accuracy: {round(train_accuracy,3)}, Validation accuracy: {round(val_accuracy,3)}, loss = {round(loss_epoch[-1],3)}")
        print(f"Time spent on epoch {epoch+1}: {round((time.time()-start_time)/60,2)}min")
        
    print("Entrenamiento finalizado")
        
   # regresar el mejor modelo, el accuracy en el entrenamiento, validation y prueba y la pérdida. Info de todas las épocas.     
    return [best_model,
            accuracy_training_epochs,
            accuracy_validation_epochs,
            test_accuracy,
            loss_epoch]  


## Modelo base: ResNet-50

El modelo base es el ResNet-50, el cual fue introducido en el artículo [Deep Residual Learning for Image Recognition](https://arxiv.org/abs/1512.03385).

Debido a las diferencias en tamaño entre CIFAR-10 y el conjunto de datos para el que fue diseñado el modelo ResNet-50 hemos hecho algunas modificaciones a la arquitectura original.

Los detalles de cómo funciona el modelo ResNet-50, los cambios que introdujimos y la explicación del código pueden ser consultados en el notebook de "./ResNet-50.ipynb".

In [4]:
class utilConv(nn.Sequential):
    # groups=1 es la opción por defecto de una capa convolucional en pytorch, la defino para cambiarla más adelante en el notebook.
    def __init__(self, in_features, out_features, kernel_size, stride = 1, norm = nn.BatchNorm2d, act = nn.ReLU, bias=True, groups=1):
        super().__init__(
            nn.Conv2d(in_features, out_features, kernel_size=kernel_size ,padding=kernel_size // 2, stride=stride, bias=bias,groups=groups),
            norm(out_features),
            act()
        )
        
class BottleNeckBlock(nn.Module):
    def __init__(self,in_features, out_features, reduction = 4, stride = 1):
        super().__init__()
        reduced_features = out_features // reduction
        self.block = nn.Sequential(
            # Reducción de canales
            utilConv(in_features, reduced_features, kernel_size=1, stride=stride, bias=False), # el stride puede ser 2 para aplicar downsampling
            # El número de canales se mantiene fijo
            utilConv(reduced_features, reduced_features, kernel_size=3, bias=False),
            # Aumento de canales
            utilConv(reduced_features, out_features, kernel_size=1, bias=False, act=nn.Identity), 
        )
        
        # self.shortcut es utilizado para transformar al input a las dimensiones correctas para poder sumarlo a la salida del bloque
        if in_features != out_features:
            self.shortcut =nn.Sequential(utilConv(in_features, out_features, kernel_size=1, stride=stride, bias=False))
        else:
            self.shortcut = nn.Identity()

        self.act = nn.ReLU()

    def forward(self, x):
        res = x
        x = self.block(x)
        res = self.shortcut(res)
        x += res
        x = self.act(x)
        return x
    
class Stage(nn.Sequential):
    def __init__(self, in_features, out_features, depth, stride = 2):  # in_features y out_features deben ser distintos, sino se aplicará downsampling y el Bottleneck no aplicará la identidad
        super().__init__(
            
            BottleNeckBlock(in_features, out_features, stride=stride), # Aquí se lleva a cabo el downsampling
            *[BottleNeckBlock(out_features, out_features) for _ in range(depth - 1)]
        )
        
        
class Stem(nn.Sequential):
    def __init__(self, in_features, out_features):
        super().__init__(
            utilConv(in_features, out_features, kernel_size=3, stride=1),  # en el caso de ImageNet, el kernel es de tamaño 7
#             nn.MaxPool2d(kernel_size=3, stride=2, padding=1)  
        )
        
class Encoder(nn.Module):
    def __init__(self, in_channels, stem_features, depths, widths):  # 
        super().__init__()
        self.stem = Stem(in_channels, stem_features)

        in_out_widths = list(zip(widths, widths[1:]))

        
        self.stages = nn.ModuleList() # lista de pytorch con los stages
        
        self.stages.append(Stage(stem_features, widths[0], depths[0], stride=1)) # se puede inferir de la figura 1 del artículo que el primer bloque del stage1 tiene stride 1
        
        for (in_features, out_features), depth in zip(in_out_widths, depths[1:]):
            # añadir cada uno de los stages
            self.stages.append(Stage(in_features, out_features, depth))
            

    def forward(self, x):
        x = self.stem(x)
        for stage in self.stages:

            x = stage(x)
        return x
    

    
class Decoder(nn.Module):
    def __init__(self, in_features, n_classes):
        super().__init__()
        self.avg = nn.AdaptiveAvgPool2d((1, 1))
        self.decoder = nn.Linear(in_features, n_classes)

    def forward(self, x):
        x = self.avg(x)
        x = x.view(x.size(0), -1)
        x = self.decoder(x)
        x = F.softmax(x, dim=1)
        return x

Con estos bloques podemos definir nuestro modelo base:

In [5]:
class ResNet(nn.Module):
    
    def __init__(self, in_channels, n_classes, stem_features, depths, widths):
        super().__init__()
        self.encoder = Encoder(in_channels=in_channels, stem_features=stem_features, depths=depths, widths=widths)
        self.decoder = Decoder(widths[-1], n_classes)
        
    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

#### Experimentos

In [8]:
# Repetimos 3 veces el experimento 

model1 = ResNet(in_channels=3, n_classes = 10, stem_features=64, depths=[3,4,6,3], widths=[64, 128, 256,512]).to(device)
model2 = ResNet(in_channels=3, n_classes = 10, stem_features=64, depths=[3,4,6,3], widths=[64, 128, 256,512]).to(device)
model3 = ResNet(in_channels=3, n_classes = 10, stem_features=64, depths=[3,4,6,3], widths=[64, 128, 256,512]).to(device)

model1, training1, validation1, test1, loss1 = entrenamiento(model1, 2)
model2, training2, validation2, test2, loss2 = entrenamiento(model2, 100)
model3, training3, validation3, test3, loss3 = entrenamiento(model3, 100)

Epoch [1/2], Training accuracy: 0.299, Validation accuracy: 0.288, loss = 2.249
Time spent on epoch 1: 3.36min
Epoch [2/2], Training accuracy: 0.337, Validation accuracy: 0.335, loss = 1.93
Time spent on epoch 2: 3.38min


KeyboardInterrupt: 

In [None]:
# Guardar resultados
results_dict1 = {"loss": loss1,
    'Train':training1,
     'Validation': validation1,
     "Test":test1}
results_dict2 = {"loss": loss2,
    'Train':training2,
     'Validation': validation2,
     "Test":test2}
results_dict3 = {"loss": loss3,
    'Train':training3,
     'Validation': validation3,
     "Test":test3}

results1_base = pd.DataFrame(results_dict1)
results2_base = pd.DataFrame(results_dict2)
results3_base = pd.DataFrame(results_dict3)

results1_base.to_csv("./results/results_convnext_base_1.csv",index=False)
results2_base.to_csv("./results/results_convnext_base_2.csv",index=False)
results3_base.to_csv("./results/results_convnext_base_3.csv",index=False)

In [None]:
accuracy_base = (results1_base["Test"].max() + results2_base["Test"].max() + results3_base["Test"].max())/3
print(f"Accuracy del modelo base: {accuracy_base}")

## Cambiar la proporción de los stages

El primer cambio consiste en modificar el número de bloques *Bottleneck* que tiene cada *Stage*, originalmente se propone la proporción [3,3,9,3] en vez de [3,4,6,3] como en ResNet-50, sin embargo, para evitar aumentar el número de capas y aun así conservar la proporción [n,n,3n,n], hemos optado por [2,2,6,2].

Para implementarlo solo hay que modificar el parámtro *depths* al crear el modelo.

#### Experimentos

In [19]:
# Repetimos 3 veces el experimento
model1 = ResNet(in_channels=3, n_classes = 10, stem_features=64, depths=[2,2,6,2], widths=[64, 128, 256,512]).to(device)
model2 = ResNet(in_channels=3, n_classes = 10, stem_features=64, depths=[2,2,6,2], widths=[64, 128, 256,512]).to(device)
model3 = ResNet(in_channels=3, n_classes = 10, stem_features=64, depths=[2,2,6,2], widths=[64, 128, 256,512]).to(device)

model1, training1, validation1, test1, loss1 = entrenamiento(model1, 100)
model2, training2, validation2, test2, loss2 = entrenamiento(model2, 100)
model3, training3, validation3, test3, loss3 = entrenamiento(model3, 100)

KeyboardInterrupt: 

In [None]:
# Guardar resultados
results_dict1 = {"loss": loss1,
    'Train':training1,
     'Validation': validation1,
     "Test":test1}
results_dict2 = {"loss": loss2,
    'Train':training2,
     'Validation': validation2,
     "Test":test2}
results_dict3 = {"loss": loss3,
    'Train':training3,
     'Validation': validation3,
     "Test":test3}

results1_change_stage_cr = pd.DataFrame(results_dict1)
results2_change_stage_cr = pd.DataFrame(results_dict2)
results3_change_stage_cr = pd.DataFrame(results_dict3)

results1_change_stage_cr.to_csv("./results/results_convnext_change_stage_cr_1.csv",index=False)
results2_change_stage_cr.to_csv("./results/results_convnext_change_stage_cr_2.csv",index=False)
results3_change_stage_cr.to_csv("./results/results_convnext_change_stage_cr_3.csv",index=False)

In [None]:
accuracy_cr = (results1_change_stage_cr["Test"].max() + results2_change_stage_cr["Test"].max() + results3_change_stage_cr["Test"].max())/3
print(f"Accuracy del modelo al cambiar la proporción de los stages: {accuracy_cr}")

## Cambiar *Stem* por *Patchify*

Las redes ResNet empiezan con un bloque *Stem*, éste se encarga de aplicar un *downsampling* agresivo a las imágenes de entrada. *Patchify*, por otro lado, consiste en dividir en bloques a las imágenes de entrada, mediante capas convolucionales cuyo stride es igual al tamaño del kernel.

En el modelo particular que hemos estado usando optamos por evitar usar *downsampling* en el bloque *Stem* debido a que la dimensión de nuestras imágenes es más pequeña que la de las imágenes de ImageNet para las que fue planteado este diseño. Por ello mismo, hemos optado por utilizar *patchify* de dimensión 1, para evitar reducir el tamaño de las imágenes. Originalmente se utiliza *patchify* de dimensión 4.

Para implementar esto, redefinimos el bloque Stem para que consista de una capa convolucional seguida de una capa de Batch Normalization.

In [20]:
class Stem(nn.Sequential):
    def __init__(self, in_features, out_features, patch_size=1):
        super().__init__(
            nn.Conv2d(in_features, out_features, kernel_size=patch_size, stride=patch_size),
            nn.BatchNorm2d(out_features) 
        )

#### Experimentos

In [21]:
# Repetimos 3 veces el experimento
model1 = ResNet(in_channels=3, n_classes = 10, stem_features=64, depths=[2,2,6,2], widths=[64, 128, 256,512]).to(device)
model2 = ResNet(in_channels=3, n_classes = 10, stem_features=64, depths=[2,2,6,2], widths=[64, 128, 256,512]).to(device)
model3 = ResNet(in_channels=3, n_classes = 10, stem_features=64, depths=[2,2,6,2], widths=[64, 128, 256,512]).to(device)

model1, training1, validation1, test1, loss1 = entrenamiento(model1, 100)
model2, training2, validation2, test2, loss2 = entrenamiento(model2, 100)
model3, training3, validation3, test3, loss3 = entrenamiento(model3, 100)

Epoch [1/100], Training accuracy: 0.196, Validation accuracy: 0.19, loss = 2.384
Time spent on epoch 1: 4.03min


KeyboardInterrupt: 

In [None]:
# Guardar resultados
results_dict1 = {"loss": loss1,
    'Train':training1,
     'Validation': validation1,
     "Test":test1}
results_dict2 = {"loss": loss2,
    'Train':training2,
     'Validation': validation2,
     "Test":test2}
results_dict3 = {"loss": loss3,
    'Train':training3,
     'Validation': validation3,
     "Test":test3}

results1_patchify = pd.DataFrame(results_dict1)
results2_patchify = pd.DataFrame(results_dict2)
results3_patchify = pd.DataFrame(results_dict3)

results1_patchify.to_csv("./results/results_convnext_patchify_1.csv",index=False)
results2_patchify.to_csv("./results/results_convnext_patchify_2.csv",index=False)
results3_patchify.to_csv("./results/results_convnext_patchify_3.csv",index=False)

In [None]:
accuracy_patch = (results1_patchify["Test"].max() + results2_patchify["Test"].max() + results3_patchify["Test"].max())/3
print(f"Accuracy del modelo al aplicar Patchify: {accuracy_patch}")

## Añadir *depthwise convolution* (*ResNeXt-ify*)

Basados en el artículo [Aggregated Residual Transformations for Deep Neural Networks](https://arxiv.org/abs/1611.05431), se propone utilizar *grouped convolution* en la capa convolucional en la que el kernel es de tamaño 3 del bloque Bottleneck . En particular, se utiliza un tipo de *grouped convolution* llamdo *depthwise*, en donde el número de grupos es el mismo que el número de canales de entrada.

Para implementar este cambio basta utilizar el parámetro *groups* al llamar a la capa nn.Conv2d() en cuestión.

In [22]:
class BottleNeckBlock(nn.Module):
    def __init__(self,in_features, out_features, reduction = 4, stride = 1):
        super().__init__()
        reduced_features = out_features // reduction
        self.block = nn.Sequential(
            # Reducción de canales
            utilConv(in_features, reduced_features, kernel_size=1, stride=stride, bias=False),
            # El número de canales se mantiene fijo
            utilConv(reduced_features, reduced_features, kernel_size=3, bias=False, groups=reduced_features), # en esta capa se utiliza grouped convolution
            # Aumento de canales
            utilConv(reduced_features, out_features, kernel_size=1, bias=False, act=nn.Identity), 
        )
        
        # self.shortcut es utilizado para transformar al input a las dimensiones correctas para poder sumarlo a la salida del bloque
        if in_features != out_features:
            self.shortcut =nn.Sequential(utilConv(in_features, out_features, kernel_size=1, stride=stride, bias=False))
        else:
            self.shortcut = nn.Identity()

        self.act = nn.ReLU()

    def forward(self, x):
        res = x
        x = self.block(x)
        res = self.shortcut(res)
        x += res
        x = self.act(x)
        return x

#### Experimentos

In [23]:
# Repetimos 3 veces el experimento
model1 = ResNet(in_channels=3, n_classes = 10, stem_features=64, depths=[2,2,6,2], widths=[64, 128, 256,512]).to(device)
model2 = ResNet(in_channels=3, n_classes = 10, stem_features=64, depths=[2,2,6,2], widths=[64, 128, 256,512]).to(device)
model3 = ResNet(in_channels=3, n_classes = 10, stem_features=64, depths=[2,2,6,2], widths=[64, 128, 256,512]).to(device)

model1, training1, validation1, test1, loss1 = entrenamiento(model1, 100)
model2, training2, validation2, test2, loss2 = entrenamiento(model2, 100)
model3, training3, validation3, test3, loss3 = entrenamiento(model3, 100)

Epoch [1/100], Training accuracy: 0.245, Validation accuracy: 0.246, loss = 2.307
Time spent on epoch 1: 3.39min


KeyboardInterrupt: 

In [None]:
# Guardar resultados
results_dict1 = {"loss": loss1,
    'Train':training1,
     'Validation': validation1,
     "Test":test1}
results_dict2 = {"loss": loss2,
    'Train':training2,
     'Validation': validation2,
     "Test":test2}
results_dict3 = {"loss": loss3,
    'Train':training3,
     'Validation': validation3,
     "Test":test3}

results1_resnextify = pd.DataFrame(results_dict1)
results2_resnextify = pd.DataFrame(results_dict2)
results3_resnextify = pd.DataFrame(results_dict3)

results1_resnextify.to_csv("./results/results_convnext_resnextify_1.csv",index=False)
results2_resnextify.to_csv("./results/results_convnext_resnextify_2.csv",index=False)
results3_resnextify.to_csv("./results/results_convnext_resnextify_3.csv",index=False)

In [None]:
accuracy_resnext = (results1_resnextify["Test"].max() + results2_resnextify["Test"].max() + results3_resnextify["Test"].max())/3
print(f"Accuracy del modelo al aplicar  ResNeXt-ify: {accuracy_resnext}")

## Inverted-Bottleneck

El modelo ResNet-50 utiliza el bloque Bottleneck. El nombre de dicho bloque se debe a que primero reduce el número de canales mediante una convolución con kernel de tamaño 1, después mantiene el número de canales y aplica una convolución con kernel de tamaño 3 y por último aumenta el número de canales al original.

Los transformadores de visión utilizan un sistema opuesto. Primero aumentan el número de canales, después aplican la convolución con kernel de dimensión mayor a 1 y por último reducen el número de canales al original. Este tipo de bloque se llama *Inverted-Bottleneck*, o Bottleneck invertido.

Inspirados en el diseño de los transformadores de visión, se propone utilizar bloques Bottleneck invertidos. La implementación se puede llevar a cabo mediante la modificación del bloque BottleNeck como sigue:

In [24]:
class BottleNeckBlock(nn.Module):
    def __init__(self, in_features, out_features, expansion = 4, stride = 1):
        super().__init__()
        expanded_features = out_features * expansion
        self.block = nn.Sequential(
            # Aumento de canales
            utilConv(in_features, expanded_features, kernel_size=1, stride=stride, bias=False),
            # El número de canales se mantiene fijo (Aquí se aplica la convolución depthwise)
            utilConv(expanded_features, expanded_features, kernel_size=3, bias=False, groups=in_features),
            # Reducción de canales
            utilConv(expanded_features, out_features, kernel_size=1, bias=False, act=nn.Identity)
        )

        # self.shortcut es utilizado para transformar al input a las dimensiones correctas para poder sumarlo a la salida del bloque
        if in_features != out_features:
            self.shortcut =nn.Sequential(utilConv(in_features, out_features, kernel_size=1, stride=stride, bias=False))
        else:
            self.shortcut = nn.Identity()

        self.act = nn.ReLU()

    def forward(self, x):
        res = x
        x = self.block(x)
        res = self.shortcut(res)
        x += res
        x = self.act(x)
        return x

#### Experimentos

In [25]:
# Repetimos 3 veces el experimento
model1 = ResNet(in_channels=3, n_classes = 10, stem_features=64, depths=[2,2,6,2], widths=[64, 128, 256,512]).to(device)
model2 = ResNet(in_channels=3, n_classes = 10, stem_features=64, depths=[2,2,6,2], widths=[64, 128, 256,512]).to(device)
model3 = ResNet(in_channels=3, n_classes = 10, stem_features=64, depths=[2,2,6,2], widths=[64, 128, 256,512]).to(device)

model1, training1, validation1, test1, loss1 = entrenamiento(model1, 100)
model2, training2, validation2, test2, loss2 = entrenamiento(model2, 100)
model3, training3, validation3, test3, loss3 = entrenamiento(model3, 100)

Epoch [1/100], Training accuracy: 0.277, Validation accuracy: 0.277, loss = 2.018
Time spent on epoch 1: 6.54min


KeyboardInterrupt: 

In [None]:
# Guardar resultados
results_dict1 = {"loss": loss1,
    'Train':training1,
     'Validation': validation1,
     "Test":test1}
results_dict2 = {"loss": loss2,
    'Train':training2,
     'Validation': validation2,
     "Test":test2}
results_dict3 = {"loss": loss3,
    'Train':training3,
     'Validation': validation3,
     "Test":test3}

results1_inverted = pd.DataFrame(results_dict1)
results2_inverted = pd.DataFrame(results_dict2)
results3_inverted = pd.DataFrame(results_dict3)

results1_inverted.to_csv("./results/results_convnext_inverted_1.csv",index=False)
results2_inverted.to_csv("./results/results_convnext_inverted_2.csv",index=False)
results3_inverted.to_csv("./results/results_convnext_inverted_3.csv",index=False)

In [None]:
accuracy_inverted = (results1_inverted["Test"].max() + results2_inverted["Test"].max() + results3_inverted["Test"].max())/3
print(f"Accuracy del modelo al invertir el cuello de botella: {accuracy_inverted}")

## Aumentar el tamaño de los kernels

Tomando como inspiración el modelo [Swin Transformers](https://arxiv.org/abs/2103.14030), se propone aumentar el tamaño del kernel de las capas convolucionales en el bloque Bottleneck, de 3x3 a 7x7. Para ello se plantea que es necesario hacer dos cosas:

1. Mover la capa de *depthwise convolution* para que sea la primera del bloque
2. Incrementar el tamaño del kernel de la capa de *depthwise convolution* a 7x7

In [27]:
class BottleNeckBlock(nn.Module):
    def __init__(self, in_features, out_features, expansion = 4, stride = 1,
    ):
        super().__init__()
        expanded_features = out_features * expansion
        self.block = nn.Sequential(
            # El número de canales se mantiene fijo (con grupos depth-wise y kernel más grande)
            utilConv(in_features, in_features, kernel_size=7, stride=stride, bias=False, groups=in_features),
            # Aumento en el número de canales
            utilConv(in_features, expanded_features, kernel_size=1),
            # Reducción de canales
            utilConv(expanded_features, out_features, kernel_size=1, bias=False, act=nn.Identity),
        )
        
        # self.shortcut es utilizado para transformar al input a las dimensiones correctas para poder sumarlo a la salida del bloque
        if in_features != out_features:
            self.shortcut =nn.Sequential(utilConv(in_features, out_features, kernel_size=1, stride=stride, bias=False))
        else:
            self.shortcut = nn.Identity()
        
        self.act = nn.ReLU()

    def forward(self, x):
        res = x
        x = self.block(x)
        res = self.shortcut(res)
        x += res
        x = self.act(x)
        return x

#### Experimentos

In [28]:
# Repetimos 3 veces el experimento
model1 = ResNet(in_channels=3, n_classes = 10, stem_features=64, depths=[2,2,6,2], widths=[64, 128, 256,512]).to(device)
model2 = ResNet(in_channels=3, n_classes = 10, stem_features=64, depths=[2,2,6,2], widths=[64, 128, 256,512]).to(device)
model3 = ResNet(in_channels=3, n_classes = 10, stem_features=64, depths=[2,2,6,2], widths=[64, 128, 256,512]).to(device)

model1, training1, validation1, test1, loss1 = entrenamiento(model1, 100)
model2, training2, validation2, test2, loss2 = entrenamiento(model2, 100)
model3, training3, validation3, test3, loss3 = entrenamiento(model3, 100)

Epoch [1/100], Training accuracy: 0.321, Validation accuracy: 0.32, loss = 2.061
Time spent on epoch 1: 4.73min


KeyboardInterrupt: 

In [None]:
# Guardar resultados
results_dict1 = {"loss": loss1,
    'Train':training1,
     'Validation': validation1,
     "Test":test1}
results_dict2 = {"loss": loss2,
    'Train':training2,
     'Validation': validation2,
     "Test":test2}
results_dict3 = {"loss": loss3,
    'Train':training3,
     'Validation': validation3,
     "Test":test3}

results1_kernel = pd.DataFrame(results_dict1)
results2_kernel = pd.DataFrame(results_dict2)
results3_kernel = pd.DataFrame(results_dict3)

results1_kernel.to_csv("./results/results_convnext_kernel_1.csv",index=False)
results2_kernel.to_csv("./results/results_convnext_kernel_2.csv",index=False)
results3_kernel.to_csv("./results/results_convnext_kernel_3.csv",index=False)

In [None]:
accuracy_kernel = (results1_kernel["Test"].max() + results2_kernel["Test"].max() + results3_kernel["Test"].max())/3
print(f"Accuracy del modelo al aumentar el tamaño de los kernels: {accuracy_kernel}")

## Cambios en el micro-diseño

Por último se proponen los siguientes cambios de menor escala en el diseño de la arquitectura:

1. Cambiar la función de activación ReLU por GELU
2. Disminuir el número de funciones de activación
3. Disminución en el número de capas de normalización
4. Cambiar *batch normalization* por *layer normalization*
5. Separar las capas de *downsampling*
6. Añadir *Stochastic Depth*

Nuevamente, debido a que nuestras imágenes son de dimensión 32x32, en vez de 224x224 como fue propuesto en el artículo, optamos por omitir la primera capa de downsampling.

Implementemos dichos cambios:

In [29]:
class LayerScaler(nn.Module):
    def __init__(self, init_value, dimensions):
        super().__init__()
        self.gamma = nn.Parameter(init_value * torch.ones((dimensions)), 
                                    requires_grad=True)
        
    def forward(self, x):
        return self.gamma[None,...,None,None] * x

class BottleNeckBlock(nn.Module):
    def __init__(self, in_features, out_features, expansion = 4, drop_p = .0, layer_scaler_init_value = 1e-6):
        super().__init__()
        expanded_features = out_features * expansion
        self.block = nn.Sequential(
            # narrow -> wide (with depth-wise and bigger kernel)
            nn.Conv2d(
                in_features, in_features, kernel_size=7, padding=3, bias=False, groups=in_features
            ),
            # GroupNorm with num_groups=1 is the same as LayerNorm but works for 2D data
            nn.GroupNorm(num_groups=1, num_channels=in_features),
            # wide -> wide 
            nn.Conv2d(in_features, expanded_features, kernel_size=1),
            nn.GELU(),
            # wide -> narrow
            nn.Conv2d(expanded_features, out_features, kernel_size=1),
        )
        self.layer_scaler = LayerScaler(layer_scaler_init_value, out_features)
        self.drop_path = StochasticDepth(drop_p, mode="batch")

        
    def forward(self, x):
        res = x
        x = self.block(x)
        x = self.layer_scaler(x)
        x = self.drop_path(x)
        x += res
        return x
    
class Stage(nn.Sequential):
    def __init__(self, in_features, out_features, depth, drop_p = .0):
        
        if in_features != out_features:
            super().__init__(
                # Añadimos la capa de downsampling previo al stage.
                nn.Sequential(
                    nn.GroupNorm(num_groups=1, num_channels=in_features),
                    nn.Conv2d(in_features, out_features, kernel_size=2, stride=2)
                ),
                # Añadimos los stages
                *[BottleNeckBlock(out_features, out_features, drop_p=drop_p) for _ in range(depth)]
            )
        else:
            super().__init__(
                # Con esto garantizamos que no haya downsampling previo al primer stage
                *[BottleNeckBlock(out_features, out_features, drop_p=drop_p) for _ in range(depth)]
            )

#### Construimos el enconder y decoder de ConvNeXt

In [36]:
class ConvNextEncoder(nn.Module):
    def __init__(self, in_channels, stem_features, depths, widths, drop_p = .0):
        super().__init__()
        self.stem = Stem(in_channels, stem_features)

        in_out_widths = list(zip(widths, widths[1:]))
        # create drop paths probabilities (one for each stage)
        drop_probs = [x.item() for x in torch.linspace(0, drop_p, sum(depths))]     
        
        self.stages = nn.ModuleList()
        self.stages.append(Stage(stem_features, widths[0], depths[0], drop_p=drop_probs[0]))
        for (in_features, out_features), depth, drop_p in zip(in_out_widths, depths[1:], drop_probs[1:]):
            self.stages.append(Stage(in_features, out_features, depth, drop_p=drop_p))
        

    def forward(self, x):
        x = self.stem(x)
        for stage in self.stages:
            x = stage(x)
        return x

In [37]:
class ConvNextDecoder(nn.Sequential):
    def __init__(self, num_channels, num_classes = 10):
        super().__init__(
            nn.AdaptiveAvgPool2d((1, 1)),
            nn.Flatten(1),
            nn.LayerNorm(num_channels),
            nn.Linear(num_channels, num_classes)
        )

#### Definición del modelo final Modelo final

In [38]:
class ConvNext(nn.Sequential):
    def __init__(self, in_channels, stem_features, depths, widths, drop_p = .0, num_classes = 10):
        super().__init__()
        self.encoder = ConvNextEncoder(in_channels, stem_features, depths, widths, drop_p)
        self.head = ConvNextDecoder(widths[-1], num_classes)

In [39]:
# Número de parámetros del modelo
model = ConvNext(3,64,[2,2,6,2],[64, 128, 256, 512])
print("Number of parameters: {:,}".format(sum(p.numel() for p in model.parameters())))

Number of parameters: 8,531,978


#### Experimentos

In [None]:
# Repetimos 3 veces el experimento
model1 = ConvNext(3,64,[2,2,6,2],[64, 128, 256, 512]).to(device)
model2 = ConvNext(3,64,[2,2,6,2],[64, 128, 256, 512]).to(device)
model3 = ConvNext(3,64,[2,2,6,2],[64, 128, 256, 512]).to(device)

model1, training1, validation1, test1, loss1 = entrenamiento(model1, 100)
model2, training2, validation2, test2, loss2 = entrenamiento(model2, 100)
model3, training3, validation3, test3, loss3 = entrenamiento(model3, 100)

In [None]:
# Guardar resultados
results_dict1 = {"loss": loss1,
    'Train':training1,
     'Validation': validation1,
     "Test":test1}
results_dict2 = {"loss": loss2,
    'Train':training2,
     'Validation': validation2,
     "Test":test2}
results_dict3 = {"loss": loss3,
    'Train':training3,
     'Validation': validation3,
     "Test":test3}

results1_final = pd.DataFrame(results_dict1)
results2_final = pd.DataFrame(results_dict2)
results3_final = pd.DataFrame(results_dict3)

results1_final.to_csv("./results/results_convnext_final_1.csv",index=False)
results2_final.to_csv("./results/results_convnext_final_2.csv",index=False)
results3_final.to_csv("./results/results_convnext_final_3.csv",index=False)

In [None]:
accuracy_final = (results1_final["Test"].max() + results2_final["Test"].max() + results3_final["Test"].max())/3
print(f"Accuracy del modelo ConvNeXt: {accuracy_final}")