# CNN para clasificación de imágenes con Pytorch

- Veremos cómo construir una CNN para clasificar imágenes en MNIST.

- También tocaremos un poco cómo trabajar con modelos preentrenados (Tranfer Learning) como el modelo de [ResNet](https://pytorch.org/vision/stable/models.html#resnet).

In [None]:
# Importando líbrerias
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, ConfusionMatrixDisplay

import torch
from torch.utils.data import TensorDataset, DataLoader, SubsetRandomSampler, random_split
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import torchvision
from torchvision import datasets
from torchvision.transforms import ToTensor, transforms

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

In [None]:
# Barajar los indices
indices = np.arange(60000)
np.random.shuffle(indices)
n_train = 55000

# Transformar los datos en tensores y normalícelos
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])

# Preparar el conjunto de entrenamiento y el conjunto de validación
dataset = datasets.MNIST('.', download=True, train=True, transform=transform)

# Construir el cargador de entranamiento
train_loader = DataLoader(dataset,
                            batch_size=64, shuffle=False,
                            sampler=SubsetRandomSampler(indices[:n_train])
                            )

# Construir el cargador de validación
val_loader = DataLoader(dataset,
                            batch_size=64, shuffle=False,
                            sampler=SubsetRandomSampler(indices[n_train:])
                            )

In [None]:
len(train_loader), len(val_loader)

In [None]:
dataset[0][0].shape, dataset[0][1]

In [None]:
train_loader.batch_size, val_loader.batch_size

In [None]:
def train_val(model, criterion, optimizer, train_loader, val_loader, num_epochs):
    # Ciclo for para el número de épocas
    train_loss_history, train_acc_history, val_loss_history, val_acc_history = [], [], [], []
    all_labels, all_preds = [], []
    model = model.to(device=device)
    for epoch in range(num_epochs):
        train_loss = 0.0
        train_acc = 0.0
        val_loss = 0.0
        val_acc = 0.0

        # Establecer el modelo en modo de entrenamiento
        model.train()
        y_true = []
        y_pred = []
        # Iterar sobre los datos de entrenamiento
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device, dtype=torch.float), labels.to(device, dtype=torch.long)
            optimizer.zero_grad()
            predicted_outputs = model(inputs)
            # Calcular la pérdida
            loss = criterion(predicted_outputs, labels)
            loss.backward()
            optimizer.step()
            # Acumular la pérdida y la precisión
            train_loss += loss.item()
            # La etiqueta con mayor valor será nuestra predicción
            _, predicted = torch.max(predicted_outputs , 1)
            y_true.extend(labels.cpu().numpy())
            y_pred.extend(predicted.cpu().numpy())

        # Calcular la pérdida y precisión promedio del entrenamiento
        train_loss /= len(train_loader)
        train_loss_history.append(train_loss)
        train_acc = accuracy_score(y_true, y_pred)
        train_acc_history.append(train_acc)

        # set the model to evaluation mode
        model.eval()
        y_true = []
        y_pred = []
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device, dtype=torch.float), labels.to(device, dtype=torch.long)
                # Run the forward pass
                predicted_outputs = model(inputs)
                # Compute loss
                loss = criterion(predicted_outputs, labels)
                # Accumulate the average loss of the mini-batch
                val_loss += loss.item()
                # The label with the highest value will be our prediction
                _, predicted = torch.max(predicted_outputs , 1)
                y_true.extend(labels.cpu().numpy())
                y_pred.extend(predicted.cpu().numpy())

        # calculate the average validation loss and accuracy
        val_loss /= len(val_loader)
        val_loss_history.append(val_loss)
        val_acc = accuracy_score(y_true, y_pred)
        val_acc_history.append(val_acc)
        all_labels.extend(y_true)
        all_preds.extend(y_pred)

        print(f'Epoch {epoch+1}/{num_epochs}, train loss: {train_loss:.4f}, train acc: {train_acc:.4f}, val loss: {val_loss:.4f}, val acc: {val_acc:.4f}')

    return train_loss_history, train_acc_history, val_loss_history, val_acc_history, all_labels, all_preds


def plotting_confusion_matrix(all_labels, all_preds, title_name=''):

    # Plotting Confusion Matrix
    fig, axs = plt.subplots(1, 2, figsize = (14, 6))

    # Implementing visualization of Confusion Matrix
    c_m = confusion_matrix(all_labels, all_preds)
    c_m_normalized = confusion_matrix(all_labels, all_preds, normalize='true').round(3)

    ConfusionMatrixDisplay(c_m, display_labels=dataset.classes).plot(cmap='Greys', xticks_rotation=25, ax=axs[0])
    ConfusionMatrixDisplay(c_m_normalized*100, display_labels=dataset.classes).plot(cmap='Greys', xticks_rotation=25, ax=axs[1],)

    plt.xticks(fontsize=10)
    plt.yticks(fontsize=10)
    axs[0].set_title('Valores absolutos', fontsize=14)
    axs[1].set_title('Valores porcentuales', fontsize=14)
    plt.suptitle(f'Confusion Matrix {title_name}', fontsize=16)
    plt.show()


def plot_loss_and_accuracy(train_loss_history, val_loss_history, train_acc_history, val_acc_history):
    fig, axs = plt.subplots(1, 2, figsize = (10, 4))

    # Plot the training and validation loss
    axs[0].plot(train_loss_history, label='train loss')
    axs[0].plot(val_loss_history, label='val loss')
    axs[0].grid()

    # Plot the training and validation accuracy
    axs[1].plot(train_acc_history, label='train acc')
    axs[1].plot(val_acc_history, label='val acc')
    axs[1].grid()
    plt.legend()
    plt.show()

## Modelo de clasificación con Capas Fully Connected

In [None]:
# Define the class Net
class Net(nn.Module):
    def __init__(self, num_classes=10):
    	# Define all the parameters of the net
        super(Net, self).__init__()
        self.fc1 = nn.Linear(28 * 28 * 1, 200)
        self.fc2 = nn.Linear(200, num_classes)

    def forward(self, x):
    	# Do the forward pass
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [None]:
# Instantiate the Adam optimizer and Cross-Entropy loss function
model = Net()
optimizer = optim.Adam(model.parameters(), lr=3e-4)
criterion = nn.CrossEntropyLoss()

for batch_idx, (data, target) in enumerate(train_loader):
    data = data.view(-1, 28 * 28)
    optimizer.zero_grad()
    # Complete a forward pass
    output = model(data)
    # Compute the loss, gradients and change the weights
    loss = criterion(output, target)
    loss.backward()
    optimizer.step()

# Set the model in eval mode
model.eval()
total, correct = (0, 0)
all_labels, all_preds = [], []
for i, data in enumerate(val_loader, 0):
    inputs, labels = data
    all_labels.extend(list(labels.numpy()))

    # Put each image into a vector
    inputs = inputs.view(-1, 28*28*1)

    # Do the forward pass and get the predictions
    outputs = model(inputs)
    _, outputs = torch.max(outputs.data, 1)
    all_preds.extend(list(outputs.numpy()))
    total += labels.size(0)
    correct += (outputs == labels).sum().item()
print('The testing set accuracy of the network is: %d %%' % (100 * correct / total))

In [None]:
plotting_confusion_matrix(all_labels, all_preds, title_name='- Fully Connected NN')

## Arquitecturas Bases

In [None]:
class NetCNN(nn.Module):
    def __init__(self):
        super(NetCNN, self).__init__()
        # Instantiate two convolutional layers
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=5, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(in_channels=5, out_channels=10, kernel_size=3, padding=1)
        # Instantiate the ReLU nonlinearity
        self.relu = nn.ReLU()
        # Instantiate a max pooling layer
        self.pool = nn.MaxPool2d(2, 2)
        # Instantiate a fully connected layer
        self.fc = nn.Linear(7 * 7 * 10, 10)


    def forward(self, x):
        # Apply conv followd by relu, then in next line pool
        x = self.relu(self.conv1(x))
        x = self.pool(x)
        # Apply conv followed by relu, then in next line pool
        x = self.relu(self.conv2(x))
        x = self.pool(x)
        # Prepare the image for the fully connected layer
        x = x.view(-1, self.fc.in_features)
        # Apply the fully connected layer and return the result
        return self.fc(x)

In [None]:
class NetwithoutPadding(nn.Module):
    def __init__(self):
        super(NetwithoutPadding, self).__init__()
        # Instantiate two convolutional layers
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=10, kernel_size=5, padding=0)
        self.conv2 = nn.Conv2d(in_channels=10, out_channels=20, kernel_size=5, padding=0)
        # Instantiate the ReLU nonlinearity
        self.relu = nn.ReLU()
        # Instantiate a max pooling layer
        self.pool = nn.MaxPool2d(2, 2)
        # Instantiate a fully connected layer
        self.fc = nn.Linear(4 * 4 * 20, 10)


    def forward(self, x):
        # Apply conv followd by relu, then in next line pool
        x = self.relu(self.conv1(x))
        x = self.pool(x)
        # Apply conv followd by relu, then in next line pool
        x = self.relu(self.conv2(x))
        x = self.pool(x)
        # Prepare the image for the fully connected layer
        x = x.view(-1, self.fc.in_features)
        # Apply the fully connected layer and return the result
        return self.fc(x)

## Entrenar Net without Padding

In [None]:
num_epochs = 20
# Instantiate the network
model = NetwithoutPadding()
# Instantiate the cross-entropy loss
criterion = nn.CrossEntropyLoss()
# Instantiate the SGD optimizer
optimizer = optim.SGD(model.parameters(), lr=1e-2, weight_decay=0.001, momentum=0.9)
train_loss_history, train_acc_history, val_loss_history, val_acc_history, all_labels, all_preds = train_val(model, criterion, optimizer, train_loader, val_loader, num_epochs)

In [None]:
plot_loss_and_accuracy(train_loss_history, val_loss_history, train_acc_history, val_acc_history)

In [None]:
params = model.state_dict()

# Dimensiones de las capas
print('Dimensiones de las capas:')
for name, param in model.named_parameters():
    print(f'\t{name}: {param.shape}')

# Cantidad de parámetros
print('Cantidad de parámetros:')
for key, value in params.items():
    print(f'\t{key}: {torch.numel(value)}')

print(f'Modelo: {model}')

## Entrenar NetCNN

In [None]:
num_epochs = 20
model = NetCNN()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=1e-2, weight_decay=0.001, momentum=0.9)

train_loss_history, train_acc_history, val_loss_history, val_acc_history, all_labels, all_preds = train_val(model, criterion, optimizer, train_loader, val_loader, num_epochs)

In [None]:
plot_loss_and_accuracy(train_loss_history, val_loss_history, train_acc_history, val_acc_history)

In [None]:
params = model.state_dict()

# Dimensiones de las capas
print('Dimensiones de las capas:')
for name, param in model.named_parameters():
    print(f'\t{name}: {param.shape}')

# Cantidad de parámetros
print('Cantidad de parámetros:')
for key, value in params.items():
    print(f'\t{key}: {torch.numel(value)}')

print(f'Modelo: {model}')

## Modificar el modelo

In [None]:
# Change the number of out channels
model.conv1 = nn.Conv2d(in_channels=1, out_channels=4, kernel_size=5, padding=0)
model.conv2 = nn.Conv2d(in_channels=4, out_channels=8, kernel_size=3, padding=0)
model.fc = nn.Linear(5 * 5 * 8, 10)

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=1e-2, weight_decay=0.001, momentum=0.9)

train_loss_history, train_acc_history, val_loss_history, val_acc_history, all_labels, all_preds = train_val(model, criterion, optimizer, train_loader, val_loader, num_epochs)

In [None]:
plotting_confusion_matrix(all_labels, all_preds, title_name='- NetCNN Modificado')

In [None]:
# Otra manera de cambiar los parámetros de la red
setattr(model, 'conv1', nn.Conv2d(in_channels=1, out_channels=4, kernel_size=5, padding=0))
setattr(model, 'conv2', nn.Conv2d(in_channels=4, out_channels=8, kernel_size=3, padding=0))
setattr(model, 'fc', nn.Linear(5 * 5 * 8, 10))
print(f'Modelo: {model}')

## Ejemplo de Otra Arquitectura con Batch Normalization

In [None]:
# Model with batch normalization
class NetBN(nn.Module):
    def __init__(self):
        super(NetBN, self).__init__()

        # Implement the sequential module for feature extraction
        self.features = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=10, kernel_size=5, stride=1, padding=0),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
            nn.BatchNorm2d(10),
            nn.Conv2d(in_channels=10, out_channels=20, kernel_size=5, stride=1, padding=0),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
            nn.BatchNorm2d(20))

        # Implement the fully connected layer for classification
        self.fc = nn.Linear(in_features=4 * 4 * 20, out_features=10)

    def forward(self, x):

        # Apply the feature extractor in the input
        x = self.features(x)

        # Squeeze the three spatial dimensions in one
        x = x.view(-1, self.fc.in_features)

        # Classify the images
        x = self.fc(x)
        return x

In [None]:
# Entrenamiento con optimizador Adam
num_epochs = 20
model = NetBN()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-2, weight_decay=0.001)
train_loss_history, train_acc_history, val_loss_history, val_acc_history, all_labels, all_preds = train_val(model, criterion, optimizer, train_loader, val_loader, num_epochs)

In [None]:
plot_loss_and_accuracy(train_loss_history, val_loss_history, train_acc_history, val_acc_history)

In [None]:
# Entrenamiento con optimizador SGD con momentum
num_epochs = 20
model = NetBN()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=1e-2, weight_decay=0.001, momentum=0.9)
train_loss_history, train_acc_history, val_loss_history, val_acc_history, all_labels, all_preds = train_val(model, criterion, optimizer, train_loader, val_loader, num_epochs)

In [None]:
plot_loss_and_accuracy(train_loss_history, val_loss_history, train_acc_history, val_acc_history)

**¿Aplicar batch normalization antes o después de la función de activación?**

Se sugiere:
- Con función de activación ReLU --> aplicar antes
- Con función de activación sigmoides (tanh y logística) --> aplicar después

En general, funciona mejor aplicar batch normalization antes de la función de activación.

Sin embargo, siempre es bueno probar ambos ordenes para ver qué funciona mejor.

In [None]:
# Probar aplicar batch normalization antes de la activación ReLU (Concluir)




## Transfer Learning

In [None]:
# Barajar los indices
indices = np.arange(50000)
np.random.shuffle(indices)
n_train = 45000

# Transformar los datos en tensores y normalícelos
# transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.491, 0.482, 0.447), (0.247, 0.243, 0.261))])  # cifar-10
# transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))])  # imagenet
# transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
# transform = transforms.Compose([transforms.ToTensor()])
transform = transforms.Compose([transforms.Resize(256), transforms.CenterCrop(224), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])])

# Preparar el conjunto de entrenamiento y el conjunto de validación
dataset = datasets.CIFAR10('.', download=False, train=True, transform=transform)

# Construir el cargador de entranamiento
train_loader = DataLoader(dataset,
                            batch_size=64, shuffle=False,
                            sampler=SubsetRandomSampler(indices[:n_train])
                            )

# Construir el cargador de validación
val_loader = DataLoader(dataset,
                            batch_size=64, shuffle=False,
                            sampler=SubsetRandomSampler(indices[n_train:])
                            )

In [None]:
# Cargar el modelo preentrenado
model_rn18 = torchvision.models.resnet18()
print(f'Modelo: {model_rn18}')

In [None]:
# Explorar las capas del modelo
for i, w in enumerate(model_rn18.parameters()):
    print(i, w.shape, w.requires_grad)

In [None]:
# Una muestra de lo que hace anteponer un * a una lista
lista_demo = ['conv1', 'bn1', 'relu', 'maxpool', 'layer1', 'layer2', 'layer3', 'layer4', 'avgpool', 'fc']
print(lista_demo, end='\n\n')
print(*lista_demo)

In [None]:
# Seleccionar las capas que no se van a entrenar
# model_fit_rn18 = nn.Sequential(*list(model_rn18.children()))  # Selecciona todas las capas
model_fit_rn18 = nn.Sequential(*list(model_rn18.children())[:-1])  # Selecciona todas las capas excepto la última
model_fit_rn18

In [None]:
# Congelar todas las capas excepto la última, para que no se entrenen
for param in model_fit_rn18.parameters():
    param.requires_grad = False

# Agregar la capa de salida ajustada a nuestra salida y entrenar
model_fitted_resnet18 = nn.Sequential(
                            model_fit_rn18,
                            nn.Flatten(),   # aplana la salida obtenida de la capa anterior
                            nn.Linear(in_features=512, out_features=10, bias=True))  # el valor 512 es el número de canales de la última capa convolucional

In [None]:
# Explorar las capas del modelo
for i, w in enumerate(model_fitted_resnet18.parameters()):
    print(i, w.shape, w.requires_grad)

In [None]:
# Entrenar la última capa del modelo
num_epochs = 10
model_tl = model_fitted_resnet18
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model_tl.parameters(), lr=5e-4, betas=(0.9, 0.999))
# optimizer = torch.optim.Adam(model_tl.parameters(), lr=1e-2, weight_decay=0.001)
train_loss_history, train_acc_history, val_loss_history, val_acc_history, all_labels, all_preds = train_val(model_tl, criterion, optimizer, train_loader, val_loader, num_epochs)


In [None]:
plotting_confusion_matrix(all_labels, all_preds, title_name='- TL con ResNet18')


In [None]:
# Entrenar la última capa del modelo
num_epochs = 100
model_tl = model_fitted_resnet18
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model_tl.parameters(), lr=1e-4, betas=(0.9, 0.999))
# optimizer = torch.optim.Adam(model_tl.parameters(), lr=1e-2, weight_decay=0.001)
train_loss_history, train_acc_history, val_loss_history, val_acc_history, all_labels, all_preds = train_val(model_tl, criterion, optimizer, train_loader, val_loader, num_epochs)

### Otro modelo p/transfer learning

In [None]:
# Probar otro modelo
model_fit_vgg16 = torchvision.models.vgg16()
model_fit_vgg16
# model_fit_vgg19 = nn.Sequential(*list(model_fit_vgg19.children())[:-1])  # Seleccionar todas las capas excepto la última

In [None]:
# Veamos el grupos de capas clasificadoras
model_fit_vgg16.classifier

In [None]:
# Veamos la última capa clasificadora
model_fit_vgg16.classifier[6]

In [None]:
model_tl = model_fit_vgg16
# Modifiquemos algunas capas clasificadoras para reentrenarla con los datos y poder hacer la clasificación de 10 clases
for param in model_tl.parameters():
    param.requires_grad = False

n_clases = 10
model_tl.classifier[3].requires_grad = True
model_tl.classifier[3] = nn.Linear(4096, 2048)
model_tl.classifier[6].requires_grad = True
model_tl.classifier[6] = nn.Linear(2048, n_clases)
model_tl.classifier

In [None]:
# Explorar las capas del modelo
for i, w in enumerate(model_tl.parameters()):
    print(i, w.shape, w.requires_grad)

In [None]:
# Barajar los indices
indices = np.arange(50000)
np.random.shuffle(indices)
n_train = 45000

# Transformar los datos en tensores y normalícelos
transform = transforms.Compose([
    transforms.Resize((128, 128)), # Resize (height x width)
    transforms.ToTensor(),
    # Media y desviación estándar de imagenes image-net
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                          std=[0.229, 0.224, 0.225])
])

# Preparar el conjunto de entrenamiento y el conjunto de validación
dataset = datasets.CIFAR10('.', download=True, train=True, transform=transform)

# Construir el cargador de entranamiento
train_loader = DataLoader(dataset,
                            batch_size=64, shuffle=False,
                            sampler=SubsetRandomSampler(indices[:n_train])
                            )

# Construir el cargador de validación
val_loader = DataLoader(dataset,
                            batch_size=64, shuffle=False,
                            sampler=SubsetRandomSampler(indices[n_train:])
)

In [None]:
num_epochs = 10
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model_tl.parameters(), lr=1e-4, betas=(0.9, 0.999))

train_loss_history, train_acc_history, val_loss_history, val_acc_history, all_labels, all_preds = train_val(model_tl, criterion, optimizer, train_loader, val_loader, num_epochs)

In [None]:
plotting_confusion_matrix(all_labels, all_preds, title_name='- TL con VGG16')

### Para jugar con el modelo

In [None]:
# Modifique este grupo de capas de clasificación para reentrenar el modelo con nuestros datos
# Reentrenar la segunda capa lineal
model_tl.classifier[3].requires_grad = True
# Modificar la última y penúltima capa de clasificación para reducir el número de parámetros
model_tl.classifier[6] = nn.Sequential(
                      nn.Linear(4096, 256),
                      nn.ReLU(),
                      nn.Dropout(0.3),
                      nn.Linear(256, n_clases))

In [None]:
model_tl.classifier