# Importes

In [101]:
## Standard libraries
import os
import glob
import numpy as np
import random
import math
import json
from functools import partial
from PIL import Image
from torch.utils.data import DataLoader
import pandas as pd

## Imports for plotting
import matplotlib.pyplot as plt

## PyTorch
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data as data
import torch.optim as optim

## Torchvision
import torchvision
from torchvision import transforms
import PIL
import torchvision.transforms as T

# PyTorch Lightning
try:
    import pytorch_lightning as pl
except ModuleNotFoundError:
    !pip install --quiet pytorch-lightning>=1.4
    import pytorch_lightning as pl
from pytorch_lightning.callbacks import LearningRateMonitor, ModelCheckpoint

# Setting the seed
pl.seed_everything(42)

device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
print("Device:", device)

TEST_PATH= "../input/iais22-birds/submission_test/submission_test"
TRAIN_PATH = "../input/iais22-birds/birds/birds"

# PREPARACIÓN DATASET Y DATALOADER DE ENTRENAMIENTO

Mediante esta función hemos calculado la media y la desviación típica de nuestro dataset

In [102]:
def mean_std(loader):
    images, lebels = next(iter(loader))
  # shape of images = [b,c,w,h]
    mean, std = images.mean([0,2,3]), images.std([0,2,3])
    return mean, std

Definimos las transformaciones de nuestros datos para hacer más efiente el entrenamiento

In [103]:
# Para el entrenamiento y el test, añadimos algún aumento. Las redes son demasiado potentes y se pueden sobreajustar.

test_transform = transforms.Compose([transforms.RandomResizedCrop((32,32),scale=(0.8,1.0),ratio=(0.9,1.1)),
                                     transforms.ToTensor(),
                                     transforms.Normalize([0.4740, 0.4676, 0.4134], [0.2143, 0.2085, 0.2333])
                                     ])

train_transform = transforms.Compose([transforms.RandomHorizontalFlip(),
                                      transforms.RandomResizedCrop((32,32),scale=(0.8,1.0),ratio=(0.9,1.1)),
                                      transforms.ToTensor(),
                                      transforms.Normalize([0.4740, 0.4676, 0.4134], [0.2143, 0.2085, 0.2333])
                                     ])


Definimos nuestros datasets y dataloaders, tanto para el entrenamiento como para la validación

In [104]:
train_dataset=torchvision.datasets.ImageFolder(root=TRAIN_PATH,transform=train_transform)
val_dataset=torchvision.datasets.ImageFolder(root=TRAIN_PATH,transform=test_transform)
train_dataset_split,_ = torch.utils.data.random_split(train_dataset,[48000,10388])
_,val_dataset_split = torch.utils.data.random_split(val_dataset,[48000,10388])
loader= torch.utils.data.DataLoader(train_dataset_split,batch_size=128,shuffle=True, drop_last=True, pin_memory=True, num_workers=2)
val_loader = data.DataLoader(val_dataset_split, batch_size=128, shuffle=False, drop_last=False, num_workers=2)
batch=next(iter(loader))
imgs, labels= batch
imgs.shape, labels.shape

# PATCHEMBEDDING

Definimos nuestra función para dividir la imagen en parches

In [105]:
def img_to_patch(x, patch_size, flatten_channels=True):
    """
    Inputs:
        x - torch.Tensor que representa la imagen de la forma [B, C, H, W]
        patch_size - Número de píxeles por dimensión de los parches (integer)
        flatten_channels - Si es True, los parches se devolverán en un formato aplanado
                           como un vector de características en lugar de una cuadrícula de imágenes.
    """
    B, C, H, W = x.shape
    x = x.reshape(B, C, H//patch_size, patch_size, W//patch_size, patch_size)
    x = x.permute(0, 2, 4, 1, 3, 5) # [B, H', W', C, p_H, p_W]
    x = x.flatten(1,2)              # [B, H'*W', C, p_H, p_W]
    if flatten_channels:
        x = x.flatten(2,4)          # [B, H'*W', C*p_H*p_W]
    return x

Mostramos la información que obtenemos tras realizar el patch embedding, en este caso hemos cogido 4 imagénes de 32x32 píxeles y las dividiremos en parches de 4x4 píxeles, dando lugar a 64 parches

In [106]:
NUM_IMAGES=4
BIRD_images = torch.stack([train_dataset[idx][0] for idx in range(NUM_IMAGES)], dim=0)
img_patches = img_to_patch(BIRD_images, patch_size=4, flatten_channels=False)

fig, ax = plt.subplots(BIRD_images.shape[0], 1, figsize=(14,3))
fig.suptitle("Imágenes como secuencias de entrada de parches")
for i in range(BIRD_images.shape[0]):
    img_grid = torchvision.utils.make_grid(img_patches[i], nrow=64, normalize=True, pad_value=0.9)
    img_grid = img_grid.permute(1, 2, 0)
    ax[i].imshow(img_grid)
    ax[i].axis('off')
plt.show()
plt.close()

# ATTENTION

Ahora procedemos a crear el algoritmo de atención, para ello comenzamos implementando el mecanismo descrito en nuestro artículo científico. Aun teniendo creado tanto nuestra función de producto escalar como la de Atención Multi-Cabezas no fuimos capaz de utilizarla debido a un fallo que no fuimos capaces de comprender, a pesar de esto logramos implementar el bloque de atención donde aplicamos el módulo nn.MultiHeadAttention. 

In [107]:
def scaled_dot_product(q, k, v, mask=None):
    d_k = q.size()[-1]
    attn_logits = torch.matmul(q, k.transpose(-2, -1))
    attn_logits = attn_logits / math.sqrt(d_k)
    if mask is not None:
        attn_logits = attn_logits.masked_fill(mask == 0, -9e15)
    attention = F.softmax(attn_logits, dim=-1)
    values = torch.matmul(attention, v)
    return values, attention

In [108]:
class MultiheadAttention(nn.Module):

    def __init__(self, input_dim, embed_dim, num_heads):
        super().__init__()
        assert embed_dim % num_heads == 0, "La dimensión de embedding debe tener un valor 0 mod num_heads"

        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads

        self.qkv_proj = nn.Linear(input_dim, 3*embed_dim)
        self.o_proj = nn.Linear(embed_dim, embed_dim)

        self._reset_parameters()

    def _reset_parameters(self):
        # Original Transformer initialization, see PyTorch documentation
        nn.init.xavier_uniform_(self.qkv_proj.weight)
        self.qkv_proj.bias.data.fill_(0)
        nn.init.xavier_uniform_(self.o_proj.weight)
        self.o_proj.bias.data.fill_(0)

    def forward(self, x, mask=None, return_attention=False):
        batch_size, seq_length, embed_dim = x.size()
        qkv = self.qkv_proj(x)

        # Separate Q, K, V from linear output
        qkv = qkv.reshape(batch_size, seq_length, self.num_heads, 3*self.head_dim)
        qkv = qkv.permute(0, 2, 1, 3) # [Batch, Head, SeqLen, Dims]
        q, k, v = qkv.chunk(3, dim=-1)

        # Determine value outputs
        values, attention = scaled_dot_product(q, k, v, mask=mask)
        values = values.permute(0, 2, 1, 3) # [Batch, SeqLen, Head, Dims]
        values = values.reshape(batch_size, seq_length, embed_dim)
        o = self.o_proj(values)

        if return_attention:
            return o, attention
        else:
            return o

In [109]:
class AttentionBlock(nn.Module):

    def __init__(self, embed_dim, hidden_dim, num_heads, dropout=0.12):
        """
        Inputs:
            embed_dim - Dimensionalidad de los vectores de entrada y atención
            hidden_dim - Dimensionalidad de la capa oculta en la red feed-forward
                         (normalmente 2-4 veces mayor que embed_dim)
            num_heads - Número de cabezas a utilizar en el bloque de Atención multicabeza
            dropout -  Cantidad de dropout a aplicar en la red feed-forward
        """
        super().__init__()

        self.layer_norm_1 = nn.LayerNorm(embed_dim)
        self.attn = nn.MultiheadAttention(embed_dim, num_heads,
                                          dropout=dropout)
        self.layer_norm_2 = nn.LayerNorm(embed_dim)
        self.linear = nn.Sequential(
            nn.Linear(embed_dim, hidden_dim),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, embed_dim),
            nn.Dropout(dropout)
        )


    def forward(self, x):
        inp_x = self.layer_norm_1(x)
        x = x + self.attn(inp_x, inp_x, inp_x)[0]
        x = x + self.linear(self.layer_norm_2(x))
        return x

# CREACIÓN DEL MODELO

In [110]:
class VisionTransformer(nn.Module):

    def __init__(self, embed_dim=256, hidden_dim=512, num_channels=3, num_heads=8, num_layers=6, num_classes=400, 
                 patch_size=4, num_patches=64, dropout=0.12):
        """
        Inputs:
            embed_dim - Dimensionalidad de los vectores de entrada del Transformer
            hidden_dim - Dimensionalidad de la capa oculta en las redes feed-forward del Transformer
            num_channels - Número de canales del input (3 para RGB)
            num_heads - Número de cabezas a usar en el bloque de Atención Multi-Cabeza
            num_layers - Número de capas a usar en el Transformer
            num_classes - Número de clases a predecir
            patch_size - Numero de pixeles que el parche tiene por dimensión
            num_patches - Máximo número de parches que el Transformer puede tener
            dropout - Cantidad de dropout a aplicar en la red feed-forward y
                      en la codificación de entrada
        """
        super().__init__()

        self.patch_size = patch_size

        # Layers/Networks
        self.input_layer = nn.Linear(num_channels*(patch_size**2), embed_dim)
        self.transformer = nn.Sequential(*[AttentionBlock(embed_dim, hidden_dim, num_heads, dropout=dropout) for _ in range(num_layers)])
        self.mlp_head = nn.Sequential(
            nn.LayerNorm(embed_dim),
            nn.Linear(embed_dim, num_classes)
        )
        self.dropout = nn.Dropout(dropout)

        # Parameters/Embeddings
        self.cls_token = nn.Parameter(torch.randn(1,1,embed_dim))
        self.pos_embedding = nn.Parameter(torch.randn(1,1+num_patches,embed_dim))


    def forward(self, x):
        # Preprocess input
        x = img_to_patch(x, self.patch_size)
        B, T, _ = x.shape
        x = self.input_layer(x)

        # Add CLS token and positional encoding
        cls_token = self.cls_token.repeat(B, 1, 1)
        x = torch.cat([cls_token, x], dim=1)
        x = x + self.pos_embedding[:,:T+1]

        # Apply Transformer
        x = self.dropout(x)
        x = x.transpose(0, 1)
        x = self.transformer(x)

        # Perform classification prediction
        cls = x[0]
        out = self.mlp_head(cls)
        return out

In [111]:
class ViT(pl.LightningModule):

    def __init__(self, lr=3e-4):
        super().__init__()
        self.save_hyperparameters()
        self.model = VisionTransformer().to(device)
        self.example_input_array = next(iter(loader))[0]

    def forward(self, x):
        return self.model(x)

    def configure_optimizers(self):
        optimizer = optim.AdamW(self.parameters(), lr=self.hparams.lr)
        lr_scheduler = optim.lr_scheduler.MultiStepLR(optimizer, milestones=[100,150], gamma=0.1)
        return [optimizer], [lr_scheduler]
        
    def predict(self, x):
        with torch.no_grad():
            y_hat = self(x)
            return torch.argmax(y_hat, axis=1)

    def _calculate_loss(self, batch, mode="train"):
        imgs, labels = batch[0].to(device),batch[1].to(device)
        preds = self.model(imgs)
        loss = F.cross_entropy(preds, labels)
        acc = (preds.argmax(dim=-1) == labels).float().mean()

        self.log(f'{mode}_loss', loss, prog_bar=True)
        self.log(f'{mode}_acc', acc, prog_bar=True)
        return loss

    def training_step(self, batch, batch_idx):
        loss = self._calculate_loss(batch, mode="train")
        return loss

    def validation_step(self, batch, batch_idx):
        self._calculate_loss(batch, mode="val")

    def test_step(self, batch, batch_idx):
        self._calculate_loss(batch, mode="test")

# ENTRENAMIENTO Y GUARDADO DEL MODELO


In [112]:
model = ViT().to(device)

In [113]:
pl.seed_everything(42)
trainer = pl.Trainer(
                     accelerator='gpu',
                     gpus=1 if str(device)=="cuda:0" else 0,
                     max_epochs=100,
)
trainer.fit(model, loader, val_loader)
trainer.logger._default_hp_metric = None # Optional logging argument that we don't need

In [114]:
val_result = trainer.test(model, val_loader, verbose=False)
result = {"val": val_result[0]["test_acc"]}
print(result)

In [115]:
state_dict = model.state_dict()

# torch.save(object, filename). For the filename, any extension can be used
torch.save(state_dict, "bird_transformer.pth")

# CARGAR MODELO Y REALIZAR PREDDICIONES SOBRE DATOS DE TEST(GUARDADOS EN CSV)

In [117]:
state_dict2 = torch.load("../input/modelos/bird_transformer_bueno (1).pth")


new_model = ViT()
new_model.load_state_dict(state_dict2)


In [None]:
trainer = pl.Trainer(
                     accelerator="gpu",
                     gpus=1 if str(device)=="cuda:0" else 0,
                     max_epochs=180,
)
trainer.fit(new_model, loader,val_loader)
trainer.logger._default_hp_metric = None # Optional logging argument that we don't need

In [118]:
class Dataset(torch.utils.data.Dataset):
    def __init__(self, list_IDs):
        
        self.list_IDs = list_IDs

    def __len__(self):
        
        return len(self.list_IDs)

    def __getitem__(self, index):
        
        # Select sample
        ID = self.list_IDs[index]

        # Load data
        img = Image.open(ID)
        X = test_transform(img).unsqueeze(0)


        return X

In [119]:
classes=[]
ids=[]
test_image_paths = []

In [120]:
classes = os.listdir(TRAIN_PATH)
ids = os.listdir(TEST_PATH)

for data_path in glob.glob(TEST_PATH + '/*'):
    test_image_paths.append(data_path)

test_dataset=Dataset(test_image_paths)
classes=sorted(classes)
print(classes)

In [126]:
test_loader= torch.utils.data.DataLoader(test_dataset,batch_size=2000, num_workers=2)

In [127]:
batch=next(iter(test_loader))
imgs = batch.to(device)
test_imgs = torch.squeeze(imgs, 1)
test_imgs.shape

Para poder realizar el siguiente paso, no es posible realizarlo en kaggle debido a limitaciones de memoria que no hemos sabido como solucionar, para poder ejecutar la siguiente celda aconsejamos ejecutarla en un entorno con suficiente memoria en la GPU o ejecutar la siguiente celda donde realizamos las pruebas de una manera menos eficiente

In [129]:
outputs = new_model(test_imgs)
_, predicted = torch.max(outputs, 1)
pred_classes=[]
for index in predicted:
    pred_classes.append(classes[index])
    
print('Predicted: ', ' '.join(f'{predicted[j]:5d}' for j in range(2000)))

In [144]:
pred_classes = []
for path in test_image_paths:
    img = PIL.Image.open(path)
    img_tensor = test_transform(img).unsqueeze(0)
    pred=new_model.predict(img_tensor.to(device))
    pred_classes.append(classes[pred])
print(pred_classes)

In [145]:
ids_replace=[]
for id in ids:
    ids_replace.append(id.replace('.jpg',''))
dic={'Id':ids_replace,'Category':pred_classes}
df=pd.DataFrame(data=dic)
df

In [146]:
df.to_csv('submission.csv',index=False)