# Imports globales y funciones de utilidad

In [1]:
import time
import torch
import itertools
import torchvision
import numpy as np
import torch.nn as nn
import matplotlib.pyplot as plt
import torch.nn.functional as F
from torchvision import transforms
from torchvision.datasets import CIFAR10
from torch.utils.data.dataloader import DataLoader
from sklearn.metrics import accuracy_score, confusion_matrix


device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [2]:
def get_dataloaders(train_transf, test_transf, batch_size):
    train_dataset = CIFAR10("data", train=True, download=True, transform=train_transf)
    test_dataset = CIFAR10("data", train=False, download=True, transform=test_transf)

    train_size = int(0.8 * len(train_dataset))
    valid_size = len(train_dataset) - train_size
    train_dataset, validation_dataset = torch.utils.data.random_split(train_dataset, [train_size, valid_size])

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, pin_memory=True, num_workers=4)
    valid_loader = DataLoader(validation_dataset, batch_size=batch_size, shuffle=True, pin_memory=True, num_workers=4)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True, pin_memory=True, num_workers=4)

    return train_loader, valid_loader, test_loader

# Image Augmentation

La primera parte de este laboratorio consiste en expandir un poco el modelo de LeNet para tener más parámetros y luego explorar algunas técnicas de Image Augmentation (https://pytorch.org/vision/stable/transforms.html).

El modelo a implementar es el siguiente:


![Image](https://i.ibb.co/WxGgbmL/Capture.png)


In [3]:
class CustomCNN(nn.Module):
  def __init__(self, in_channels, number_classes):
    # in_channels: int, cantidad de canales de la imagen original
    super(CustomCNN, self).__init__()
    # Su implementacion
    self.conv1 = nn.Conv2d(in_channels, out_channels = 32, kernel_size = 3, padding = 1)
    self.conv2 = nn.Conv2d(in_channels = 32, out_channels = 32, kernel_size = 3, padding = 1)
    self.conv3 = nn.Conv2d(in_channels = 32, out_channels = 64, kernel_size = 3, padding = 1)
    self.conv4 = nn.Conv2d(in_channels = 64, out_channels = 64, kernel_size = 3, padding = 1)
    self.conv5 = nn.Conv2d(in_channels = 64, out_channels = 120, kernel_size = 3, padding = 1)
    
    self.linear1 = nn.Linear(in_features = 120*4*4, out_features = 512)
    self.linear2 = nn.Linear(in_features = 512, out_features = number_classes)
    
    self.max_pooling = nn.MaxPool2d(kernel_size = 2, stride = 2)
    self.dropout = nn.Dropout(p=0.5)

  def forward(self, x):
    # Su implementacion
    out = F.relu(self.conv1(x))
    out = F.relu(self.conv2(out))
    out = self.max_pooling(out)
    
    out = F.relu(self.conv3(out))
    out = F.relu(self.conv4(out))
    out = self.max_pooling(out)
    
    out = F.relu(self.conv5(out))
    out = self.max_pooling(out)
    
    out = self.dropout(out.flatten(1))
    
    out = F.relu(self.linear1(out))
    out = self.dropout(out)
    out = self.linear2(out)
    return out

## Funciones genericas para entrenar nuestros modelos

Vamos a utilizar las mismas funciones que implementamos en los laboratorios anteriores para entrenar y testear nuestros modelos.

In [4]:
def train_epoch(training_model, loader, criterion, optim):
    training_model.train()
    epoch_loss = 0.0
    all_labels = []
    all_predictions = []
    
    for images, labels in loader:
      all_labels.extend(labels.numpy())  

      optim.zero_grad()

      predictions = training_model(images.to(device))
      all_predictions.extend(torch.argmax(predictions, dim=1).cpu().numpy())

      loss = criterion(predictions, labels.to(device))
      
      loss.backward()
      optim.step()

      epoch_loss += loss.item()

    return epoch_loss / len(loader), accuracy_score(all_labels, all_predictions) * 100


def validation_epoch(val_model, loader, criterion):
    val_model.eval()
    epoch_loss = 0.0
    all_labels = []
    all_predictions = []
    
    with torch.no_grad():
      for images, labels in loader:
        all_labels.extend(labels.numpy())  

        predictions = val_model(images.to(device))
        all_predictions.extend(torch.argmax(predictions, dim=1).cpu().numpy())

        loss = criterion(predictions, labels.to(device))

        epoch_loss += loss.item()

    return epoch_loss / len(loader), accuracy_score(all_labels, all_predictions) * 100
  

def train_model(model, train_loader, test_loader, criterion, optim, number_epochs):
  train_history = []
  test_history = []
  accuracy_history = []

  for epoch in range(number_epochs):
      start_time = time.time()

      train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer)
      train_history.append(train_loss)
      print("Training epoch {} | Loss {:.6f} | Accuracy {:.2f}% | Time {:.2f} seconds"
            .format(epoch + 1, train_loss, train_acc, time.time() - start_time))

      start_time = time.time()
      test_loss, acc = validation_epoch(model, test_loader, criterion)
      test_history.append(test_loss)
      accuracy_history.append(acc)
      print("Validation epoch {} | Loss {:.6f} | Accuracy {:.2f}% | Time {:.2f} seconds"
            .format(epoch + 1, test_loss, acc, time.time() - start_time))

## Entrenando modelos 

Comenzamos definiendo una seccion de código con valores por defecto de hiperparámetros que vamos a utilizar y luego entrenamos un modelo de la CNN definida anteriormente sin usar augmentation en los datos y otro haciendo uso del mismo.

https://pytorch.org/vision/stable/transforms.html

In [5]:
# Global models config

BATCH_SIZE = 32
LR = 0.001
NUMBER_EPOCHS = 15
criterion = nn.CrossEntropyLoss().to(device)

In [6]:
# Fijamos las semillas siempre para poder comparar.

torch.manual_seed(42)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

# Creamos los dataloaders
test_transform = transforms.Compose([
    transforms.ToTensor()
])

train_transform = transforms.Compose([
    transforms.ToTensor()
])

train_loader, valid_loader, test_loader = get_dataloaders(train_transform, test_transform, BATCH_SIZE)

# Definimos el modelo y el optimizador
modelo_sin_aug = CustomCNN(3,10).to(device)
optimizer = torch.optim.Adam(modelo_sin_aug.parameters(), lr=LR)

# Entrenamos
train_model(modelo_sin_aug, train_loader, valid_loader, criterion, optimizer, NUMBER_EPOCHS)

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to data/cifar-10-python.tar.gz


HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))

Extracting data/cifar-10-python.tar.gz to data
Files already downloaded and verified
Training epoch 1 | Loss 1.833582 | Accuracy 30.68% | Time 12.57 seconds
Validation epoch 1 | Loss 1.413233 | Accuracy 48.77% | Time 0.92 seconds
Training epoch 2 | Loss 1.367879 | Accuracy 50.41% | Time 9.75 seconds
Validation epoch 2 | Loss 1.250740 | Accuracy 55.31% | Time 0.74 seconds
Training epoch 3 | Loss 1.199229 | Accuracy 57.18% | Time 8.60 seconds
Validation epoch 3 | Loss 1.074248 | Accuracy 61.46% | Time 0.80 seconds
Training epoch 4 | Loss 1.102544 | Accuracy 60.98% | Time 8.57 seconds
Validation epoch 4 | Loss 0.963749 | Accuracy 66.06% | Time 0.73 seconds
Training epoch 5 | Loss 1.024269 | Accuracy 63.68% | Time 8.61 seconds
Validation epoch 5 | Loss 0.968082 | Accuracy 65.50% | Time 0.83 seconds
Training epoch 6 | Loss 0.967637 | Accuracy 65.65% | Time 8.03 seconds
Validation epoch 6 | Loss 0.878411 | Accuracy 68.97% | Time 0.80 seconds
Training epoch 7 | Loss 0.921454 | Accuracy 67.49%

In [19]:
torch.manual_seed(42)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

# Creamos los datasets
test_transform = transforms.Compose([
    transforms.ToTensor()
])

# Definir transormaciones que vamos a aplicar al set de entrenamiento
train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(p=0.2), 
    transforms.RandomVerticalFlip(p=0.5), 
    transforms.ToTensor()
])
train_loader, valid_loader, test_loader = get_dataloaders(train_transform, test_transform, BATCH_SIZE)


# Crear el modelo, optimizador y entrenarlo
modelo_con_aug = CustomCNN(3,10).to(device)
optimizer = torch.optim.Adam(modelo_con_aug.parameters(), lr=LR)

# Entrenamos
train_model(modelo_con_aug, train_loader, valid_loader, criterion, optimizer, NUMBER_EPOCHS)

Files already downloaded and verified
Files already downloaded and verified
Training epoch 1 | Loss 1.868144 | Accuracy 28.34% | Time 12.48 seconds
Validation epoch 1 | Loss 1.575820 | Accuracy 40.58% | Time 0.73 seconds
Training epoch 2 | Loss 1.577137 | Accuracy 40.97% | Time 12.50 seconds
Validation epoch 2 | Loss 1.423760 | Accuracy 48.15% | Time 0.87 seconds
Training epoch 3 | Loss 1.440973 | Accuracy 46.92% | Time 12.32 seconds
Validation epoch 3 | Loss 1.279700 | Accuracy 53.49% | Time 0.87 seconds
Training epoch 4 | Loss 1.352444 | Accuracy 50.95% | Time 12.49 seconds
Validation epoch 4 | Loss 1.225220 | Accuracy 55.86% | Time 0.90 seconds
Training epoch 5 | Loss 1.287378 | Accuracy 53.40% | Time 12.56 seconds
Validation epoch 5 | Loss 1.202295 | Accuracy 56.84% | Time 0.86 seconds
Training epoch 6 | Loss 1.236828 | Accuracy 55.47% | Time 12.62 seconds
Validation epoch 6 | Loss 1.100359 | Accuracy 60.60% | Time 0.82 seconds
Training epoch 7 | Loss 1.195269 | Accuracy 56.79% | T

## Evaluando los modelos en los datos de test

Vamos a comenzar evaluando en los datos de test normales y luego vamos a aplicar distintas transformaciones (para simular entornos más reales de datos) y vamos a ver la performance y robustez de los modelos que entrenamos anteriormente.

Primero agregar horizontal flip y luego vertical flip, qué pasa con los modelos?

In [20]:
test_transform = transforms.Compose([
  transforms.ToTensor()                              
])

_, _, test_loader = get_dataloaders(None, test_transform, BATCH_SIZE)

# Testear usando las funciones definidas anteriormente
loss, acc = validation_epoch(modelo_con_aug, test_loader, criterion)
print('Test Loss:', loss, '- Test Acc:',acc)

Files already downloaded and verified
Files already downloaded and verified
Test Loss: 0.9976618448004555 - Test Acc: 64.9


In [21]:
test_transform = transforms.Compose([
  # Flip Horizontal y volver a testear
  transforms.RandomHorizontalFlip(p=1), 
  transforms.ToTensor()                              
])


_, _, test_loader = get_dataloaders(None, test_transform, BATCH_SIZE)

# Testear usando las funciones definidas anteriormente
loss, acc = validation_epoch(modelo_con_aug, test_loader, criterion)
print('Test Loss:', loss, '- Test Acc:',acc)

Files already downloaded and verified
Files already downloaded and verified
Test Loss: 0.9954660395844676 - Test Acc: 64.83


In [22]:
test_transform = transforms.Compose([
  transforms.RandomVerticalFlip(p=1), 
  transforms.ToTensor()                              
])


_, _, test_loader = get_dataloaders(None, test_transform, BATCH_SIZE)

# Testear usando las funciones definidas anteriormente
loss, acc = validation_epoch(modelo_con_aug, test_loader, criterion)
print('Test Loss:', loss, '- Test Acc:',acc)

Files already downloaded and verified
Files already downloaded and verified
Test Loss: 0.9866581569654873 - Test Acc: 65.35


In [None]:
# Otros tests que quieran probar...

# DenseNet


![Image](https://miro.medium.com/max/5164/1*_Y7-f9GpV7F93siM1js0cg.jpeg)

Link al paper original: [DenseNets](https://arxiv.org/pdf/1608.06993.pdf)

Algunas consideraciones del paper a tener en cuenta:

1. Batch normalization en los inputs de los bloques densos y las capas de transición.
2. ReLU en todos lados como funcion de activación.
3. El MLP al final de la red cuenta con una capa oculta de 512 neuronas
4. Las activaciones luego del tercer bloque denso tienen tamaño 4*4 (ejercicio, calcular a mano!)


Implementamos DenseNet para resolver el problema de CIFAR10


In [None]:
class DenseBlock(nn.Module):
  def __init__(self, in_channels):
    super(DenseBlock, self).__init__()
    # Su implementacion
    
  def forward(self, x):
    # Su implementacion

In [None]:
class TransitionLayer(nn.Module):
  def __init__(self, in_channels, out_channels):
    super(TransitionLayer, self).__init__()
    self.bn = nn.BatchNorm2d(num_features = in_channels)
    self.conv = nn.Conv2d(in_channels = in_channels, out_channels = out_channels, kernel_size = 1)
    self.pooling = nn.AvgPool2d(kernel_size = 2, stride = 2)

  def forward(self, x):
    out = self.bn(x)
    out = F.relu(self.conv(out))
    out = self.pooling(out)
    return out
    # Su implementacion

In [None]:
class DenseNet(nn.Module):
	def __init__(self, n_classes):
		super(DenseNet, self).__init__()
    # Su implementacion

	def forward(self, x):
    # Su implementacion

In [None]:
torch.manual_seed(42)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

# Creamos los datasets
test_transform = transforms.Compose([
    transforms.ToTensor()
])

train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor()
])

densenet = DenseNet(10).to(device)
optimizer = torch.optim.Adam(densenet.parameters(), lr=LR)

train_loader, valid_loader, test_loader = get_dataloaders(train_transform, test_transform, BATCH_SIZE)

train_model(densenet, train_loader, valid_loader, criterion, optimizer, NUMBER_EPOCHS)

In [None]:
test_transform = transforms.Compose([
  transforms.ToTensor()                              
])

_, _, test_loader = get_dataloaders(None, test_transform, BATCH_SIZE)

test_loss, accuracy = validation_epoch(densenet, test_loader, criterion)
print(f"DenseNet Test set: {test_loss:.6f} Loss. Accuracy {accuracy:.2f}%")