# **Grupo Transformer 10**
En este notebook vamos a construir dos distintos tipos de redes neuronales con la finalidad de que estos puedan clasificar especies de aves a través de imagenes. Como primer tipo harémos una red neuronal sencilla y por otro lado una con la arquitectura Transformer, de mayor complejidad, la cual deberá realizar mejor la tarea.

# Imports

In [1]:
!pip install einops

Collecting einops
  Downloading einops-0.4.1-py3-none-any.whl (28 kB)
Installing collected packages: einops
Successfully installed einops-0.4.1
[0m

In [2]:
from typing import List, Union
from pathlib import Path
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision.io import read_image
from torchvision import transforms
from PIL import Image
from torchvision import datasets
from torch import nn
from datetime import datetime
from einops import rearrange, repeat
from einops.layers.torch import Rearrange
import random
import copy

## Lectura de Datasets
Hemos utilizado esta clase (*BirdsDataset*) para leer el dataset de submission test, ya que con ella guardamos el nombre de la imagen que se va a leer y con ello podremos obtener la id de la imagen para escribir el csv por el que se nos evalúa, el cual debe de quedar de esta forma: Id,Category

In [3]:


class BirdsDataset(torch.utils.data.Dataset):
    def __init__(self, path: Union[Path, str], transform: Union['Transform', List['Transform']] = transforms.Compose([transforms.Resize((64,64)),transforms.ToTensor(), transforms.Normalize(mean=[0.5,0.5,0.5],std=[0.5,0.5,0.5])])):
        self.path = Path(path)
        self.labels = [p.name for p in path.glob('*')]
        self.images = list(path.glob('*/*.jpg'))
        self.transform = transform
        
    def __len__(self) -> int:
        return len(self.images)
    
    def __getitem__(self, index:int) -> torch.Tensor:
        image_path = self.images[index]
        label = image_path.name
        #Las imagenes deben ser PIL para que pueda crearse como Tensor
        image = self.transform(Image.open((str(image_path))))
        return image, label



Para guardar el dataset de entrenamiento, hacemos uso de la función que nos proporciona pyTorch de ImageFolder, el cual se encargará de guardar las imagenes, con la etiqueta de la especie en forma de número según la carpeta en la que se encuentre cada imagen. Para pasar de esta etiqueta en forma de número a el nombre de la especie, haremos uso del atributo *classes* de este dataset.

In [4]:
dataset = datasets.ImageFolder('/kaggle/input/iais22-birds/birds/birds', transform= transforms.Compose([transforms.Resize((64,64)),transforms.ToTensor(),transforms.Normalize(mean=[0.5,0.5,0.5],std=[0.5,0.5,0.5])]) )
test_data = BirdsDataset(
    path= Path('/kaggle/input/iais22-birds/submission_test'))

También haremos uso de la clase DataLoader, la cual nos facilita introducir datos a la red neuronal posteriormente.

In [5]:
batch_size = 64

train_dataloader = DataLoader(dataset,shuffle=True, batch_size=batch_size)
test_dataloader = DataLoader(test_data, batch_size=batch_size)

In [6]:
# Imprimimos la forma de estos dataloader para comprobar que es correcto.
for X, y in train_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape} {X.dtype}")
    print(f"Shape of y: {y.shape} {y.dtype}")

    break

for X, y in test_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape} {X.dtype}")

    break
    


Shape of X [N, C, H, W]: torch.Size([64, 3, 64, 64]) torch.float32
Shape of y: torch.Size([64]) torch.int64
Shape of X [N, C, H, W]: torch.Size([64, 3, 64, 64]) torch.float32


## Creación de la red neuronal sencilla
Hemos definido una red neuronal que tendrá como entrada 3\*64\*64, los cuales se refieren a los 3 canales (RGB) y 64 de altura y 64 de anchura que tiene la imagen que le introduciremos. Como salida, deberá tener 400 outputs, debido a las 400 clases por las que debe clasificar. El mayor de estos outputs será la clase predicha por la red.

In [7]:
# Utilizamos a ser posible la GPU.
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

# Definición del modelo
class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(3*64*64, 12288), # (3 canales, 64 de altura y 64 de anchura)
            nn.ReLU(),
            nn.Linear(12288, 6144),
            nn.ReLU(),
            nn.Linear(6144, 6144),
            nn.ReLU(),
            nn.Linear(6144, 6144),
            nn.ReLU(),
            nn.Linear(6144, 400) # 400 es el número de clases
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits
    def train_one_epoch(self, train_loader, criterion, optimizer, device):
        # keep track of training loss
        epoch_loss = 0.0
        epoch_accuracy = 0.0

        ###################
        # train the model #
        ###################
        self.train()
        for i, (data, target) in enumerate(train_loader):
            # move tensors to GPU if CUDA is available
            if device.type == "cuda":
                data, target = data.cuda(), target.cuda()
            
            # Retropropagación
            optimizer.zero_grad()
            output = self.forward(data)
            loss = criterion(output, target)
            loss.backward()
            #Calcula la precisión media por cada batch y la suma al total
            accuracy = (output.argmax(dim=1) == target).float().mean()
            epoch_loss += loss
            epoch_accuracy += accuracy
            optimizer.step()
            if i%100==0:
                print(f"\tBATCH {i+1}/{len(train_loader)} - LOSS: {loss}")

        return epoch_loss / len(train_loader), epoch_accuracy / len(train_loader)

alternativeModel = NeuralNetwork().to(device)
print(alternativeModel)

Using cuda device
NeuralNetwork(
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (linear_relu_stack): Sequential(
    (0): Linear(in_features=12288, out_features=12288, bias=True)
    (1): ReLU()
    (2): Linear(in_features=12288, out_features=6144, bias=True)
    (3): ReLU()
    (4): Linear(in_features=6144, out_features=6144, bias=True)
    (5): ReLU()
    (6): Linear(in_features=6144, out_features=6144, bias=True)
    (7): ReLU()
    (8): Linear(in_features=6144, out_features=400, bias=True)
  )
)


In [8]:
def seed_everything(seed):
    """
    Seeds basic parameters for reproductibility of results
    
    Arguments:
        seed {int} -- Number of the seed
    """
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False


seed_everything(1001)

## Entrenamiento
Aquí definimos las funciones necesarias para entrenar el modelo, en el que definiremos que función de optimización utilizaremos y que función de pérdida. En nuestro caso, vamos a utilizar la función Adam como optimización al igual que se utiliza en el paper *Attention is all you need* y *cross entropy loss* como función de perdida.

In [9]:
def fit(model, epochs, device, criterion, optimizer, train_loader):

    for epoch in range(1, epochs + 1):

        train_loss, train_acc = model.train_one_epoch(
            train_loader, criterion, optimizer, device
        )

        print(f"\n\t[TRAIN] EPOCH {epoch} - LOSS: {train_loss}, ACCURACY: {train_acc}\n")


        
    

In [10]:
def runAlternativeModel(epochs):
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    alternativeModel.to(device)
    
    #Función de pérdida
    criterion = nn.CrossEntropyLoss()
    # Función de optimización
    optimizer = torch.optim.Adam(alternativeModel.parameters(), lr=1.0e-5)
    print("Iniciando entrenamiento")

    start_time = datetime.now()
    print(f"Start Time: {start_time}")


    logs = fit(
        model=alternativeModel,
        epochs=epochs,
        device=device,
        criterion=criterion,
        optimizer=optimizer,
        train_loader=train_dataloader
    )
    print(f"Execution time: {datetime.now() - start_time}")

In [11]:
epochs = 6
runAlternativeModel(epochs)

Iniciando entrenamiento
Start Time: 2022-06-24 10:50:58.383262
	BATCH 1/913 - LOSS: 5.991030693054199


KeyboardInterrupt: 

## Testing

In [12]:
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    print("imagenes para testear: "+str(size))
    classes=train_dataloader.dataset.classes
    
    model.eval()
    predicted = []
    ids=[]
    with torch.no_grad():
        for X, y in dataloader:
            X= X.to(device)
            pred = model(X)
            decoder(classes, pred, predicted, y, ids)
            
    f= open("./submission.csv","w")
    f.close()
    f= open("./submission.csv","a")
    f.write("Id,Category\n")

    for i in range(predicted.__len__()):
        f.write(ids[i]+","+predicted[i]+"\n")
        
    f.close()
           
    
def decoder(classes, listPredicted, predicted, y,ids):
    #Input: 
    #listPredicted=Lista de predichos de 64 imagenes
    #Esta función añade a las listas de ids y predicted lo obtenido en la predicción
    #para posteriormente escribir el csv
    i=0
    for predict in listPredicted:
        predicted.append(classes[predict.argmax(0).item()])
        ids.append(y[i][:-4]) #Quitamos la parte de ".jpg"
        i+=1
        
  
    
    


In [13]:
print("Testing:")
#test(test_dataloader, alternativeModel, nn.CrossEntropyLoss())
print("Done!")

Testing:
Done!


# Transformer
Parametros:

In [14]:
# model specific global variables
LR = 2e-05
N_EPOCHS = 11
IMG_SIZE = 224


## Creación de la red neuronal Transformer
El vision Transformer está definido por las siguientes clases:
* ViT: Modelo completo del vision transformer.
* Encoder: Compuesto por el modulo attention y feed forward. Se encargará de procesar estos módulos paralelamente el número que venga indicado en la profundidad (*depth*) y tras ello sumar los valores de salida.
* PreNorm: Normaliza el input antes de ser introducido en el módulo de self attention y feed forward.
* Attention: Modulo que permite al modelo atender a la información de distintos subespacios.
* Feed forward: Red neuronal que procesará tanto los datos introducidos al encoder como la salida del Attention.

In [15]:
class ViT(nn.Module):
    def __init__(self, *, image_size, patch_size, num_classes, dim, depth, heads, mlp_dim, channels = 3, dim_head = 64, dropout = 0., emb_dropout = 0.):
        super(ViT, self).__init__()
        #Tamaño de imagen (224x224)
        image_height, image_width = (image_size, image_size)
        #Tamaño de patch (16x16)
        patch_height, patch_width = (patch_size, patch_size)
        
        # Número de imagenes 16x16
        num_patches = (image_height // patch_height) * (image_width // patch_width)
        patch_dim = channels * patch_height * patch_width # (3*16*16)
        
        # Batch inicial= 64 x 3 x 224 x 224 (b c h w)
        # Salida = 64 x 224/16 * 224/16 x 16*16*3 (64 imagenes x Número de patches x patch)
        self.to_patch_embedding = nn.Sequential( 
            Rearrange('b c (h p1) (w p2) -> b (h w) (p1 p2 c)', p1 = patch_height, p2 = patch_width),
            nn.Linear(patch_dim, dim),
        )
        
        self.pos_embedding = nn.Parameter(torch.randn(1, num_patches + 1, dim))
        self.cls_token = nn.Parameter(torch.randn(1, 1, dim))
        self.dropout = nn.Dropout(emb_dropout)

        self.encoder = Encoder(dim, depth, heads, dim_head, mlp_dim, dropout)

        self.to_latent = nn.Identity()

        self.mlp_head = nn.Sequential(
            nn.LayerNorm(dim),
            nn.Linear(dim, num_classes)
        )

    def forward(self, img):

        x = self.to_patch_embedding(img)
        b, n, _ = x.shape # 64 x 196 x 1024 (64 imagenes, 196 Número de patches)
        cls_tokens = repeat(self.cls_token, '1 n d -> b n d', b = b) # 64, 1, 1024

        #Concatena cls token y x
        x = torch.cat((cls_tokens, x), dim=1) # 64, 196+1, 1024

        x += self.pos_embedding[:, :(n + 1)]
        x = self.dropout(x)

        x = self.encoder(x)

        x = x[:, 0]

        x = self.to_latent(x)
        return self.mlp_head(x)
    def train_one_epoch(self, train_loader, criterion, optimizer, device):
        # keep track of training loss
        epoch_loss = 0.0
        epoch_accuracy = 0.0

        ###################
        # train the model #
        ###################
        self.train()
        for i, (data, target) in enumerate(train_loader):
            # move tensors to GPU if CUDA is available
            if device.type == "cuda":
                data, target = data.cuda(), target.cuda()

            optimizer.zero_grad()
            output = self.forward(data)
            loss = criterion(output, target)
            loss.backward()
            #Calcula la precisión media por cada batch y la suma al total
            accuracy = (output.argmax(dim=1) == target).float().mean()
            epoch_loss += loss
            epoch_accuracy += accuracy
            optimizer.step()
            if i%100==0:
                print(f"\tBATCH {i+1}/{len(train_loader)} - LOSS: {loss}")

        return epoch_loss / len(train_loader), epoch_accuracy / len(train_loader)
    
class Encoder(nn.Module):
    def __init__(self, dim, depth, heads, dim_head, mlp_dim, dropout = 0.):
        super().__init__()
        self.layers = nn.ModuleList([])
        for _ in range(depth):
            self.layers.append(nn.ModuleList([
                PreNorm(dim, Attention(dim, heads = heads, dim_head = dim_head, dropout = dropout)),
                PreNorm(dim, FeedForward(dim, mlp_dim, dropout = dropout))
            ]))
    def forward(self, x):
        for attn, ff in self.layers:
            x = attn(x) + x
            x = ff(x) + x
        return x
    
class PreNorm(nn.Module):
    def __init__(self, dim, fn):
        super().__init__()
        self.norm = nn.LayerNorm(dim)
        self.fn = fn
    def forward(self, x, **kwargs):
        return self.fn(self.norm(x), **kwargs)
    
class Attention(nn.Module):
    def __init__(self, dim, heads = 8, dim_head = 64, dropout = 0.):
        super().__init__()
        inner_dim = dim_head *  heads
        project_out = not (heads == 1 and dim_head == dim)

        self.heads = heads
        self.scale = dim_head ** -0.5 # 1/sqrt(dk)

        self.sm = nn.Softmax(dim = -1)
        self.dropout = nn.Dropout(dropout)

        self.to_qkv = nn.Linear(dim, inner_dim * 3, bias = False)

        self.to_out = nn.Sequential(
            nn.Linear(inner_dim, dim),
            nn.Dropout(dropout)
        ) if project_out else nn.Identity()

    def forward(self, x):
        qkv = self.to_qkv(x).chunk(3, dim = -1)
        q, k, v = map(lambda t: rearrange(t, 'b n (h d) -> b h n d', h = self.heads), qkv)
        dots = torch.matmul(q, k.transpose(-1, -2)) * self.scale

        attn = self.sm(dots) # softmax(Q*K/sqrt(dk))
        attn = self.dropout(attn)

        out = torch.matmul(attn, v) # softmax(Q*K/sqrt(dk))*V
        out = rearrange(out, 'b h n d -> b n (h d)')
        return self.to_out(out)
class FeedForward(nn.Module):
    def __init__(self, dim, hidden_dim, dropout = 0.):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(dim, hidden_dim),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, dim),
            nn.Dropout(dropout)
        )
    def forward(self, x):
        return self.net(x)



In [18]:
model = ViT(
        image_size = IMG_SIZE,
        patch_size = 16,
        num_classes = 400,
        dim = 1024,
        depth = 6,
        heads = 16,
        mlp_dim = 4096,
        dropout = 0.01,
        emb_dropout = 0.01
    )


## Entrenamiento

In [None]:
# create image augmentations
transforms_train = transforms.Compose(
    [
        transforms.Resize((IMG_SIZE, IMG_SIZE)),
        transforms.ToTensor(),
        #transforms.RandomHorizontalFlip(p=0.3), # Para comparar los modelos en igualdad de condiciones,
        #transforms.RandomVerticalFlip(p=0.3),   # se ha comentado estas transformaciones.
        #transforms.RandomResizedCrop(IMG_SIZE), 
        transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
    ]
)

transforms_valid = transforms.Compose(
    [
        transforms.Resize((IMG_SIZE, IMG_SIZE)),
        transforms.ToTensor(),
        #transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
        transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
    ]
)

In [None]:
def _run():
    train_dataset = datasets.ImageFolder('/kaggle/input/iais22-birds/birds/birds', transform= transforms_train)
    train_dataloader = DataLoader(train_dataset,shuffle=True, batch_size=batch_size)
    

    criterion = nn.CrossEntropyLoss()
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    model.to(device)

    optimizer = torch.optim.Adam(model.parameters(), lr=LR)
    print("Iniciando entrenamiento")

    start_time = datetime.now()
    print(f"Start Time: {start_time}")


    logs = fit(
        model=model,
        epochs=N_EPOCHS,
        device=device,
        criterion=criterion,
        optimizer=optimizer,
        train_loader=train_dataloader
    )
    print(f"Execution time: {datetime.now() - start_time}")


In [None]:
_run()

## Testing

In [None]:
print("Testing:")
criterion = nn.CrossEntropyLoss()
test_data = BirdsDataset(
    path= Path('/kaggle/input/iais22-birds/submission_test'),transform=transforms_valid)
test_dataloader = DataLoader(test_data, batch_size=batch_size)
test(test_dataloader, model, criterion)
print("Done!")

Finalmente guardamos el modelo.

In [None]:
torch.save(model.state_dict(),"./modelo_seleccionado.pth")
