In [1]:
#Imports
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.optim import Adam
from torch.nn import CrossEntropyLoss

from torchvision import datasets, transforms
from torchvision.transforms import ToTensor
from torchvision.io import read_image
from torchvision.datasets.mnist import MNIST

import numpy as np
import os
import pandas as pd

import matplotlib.pyplot as plt

from collections import Counter

from PIL import Image

In [2]:
#Ahora de verdad. Modúlos
################## Patch Embedding
class PatchEmbed(nn.Module):
    """Divide la imagen en partes y la asocia a una posición
        Parametros
        --------------
        img_size: int
            - El tamaño de la imagen debe de ser un cuadrado para poder dividirlo. 
            - Será necesario que la imagen sea escalada a un cuadrado para conseguirlo.
            
        patch_size: int
            - Tamaño de cada una de las partes en las que se divide la imagen.
            - También deben de ser cuadrados.
            - Debe de cumplir que el tamaño de la imagen sea divisible por el
                tamaño de las partes.
            
        in_chans: int
            - Número de canales de la imagen (color).
            - Por ejemplo, si es en escala de grises debe de ser de valor 1 mientras que de ser
                una imagen RGB deberá tener valor 3. En este caso in_chans es normalmente de valor
                3.
            
        embed_dim: int
            - Como de grande será el "embedding" de una parte de la imagen durante toda la red
                neuronal.
        
        Atributos
        --------------
        n_patches: int
            - Numero de partes (patches) en los que dividimos la imagen.
        
        proj: nn.Conv2d
            - Capa convolucional para dividir la imagen y colocarle su embedding.
    """
    def __init__(self, img_size, patch_size, in_chans=3, embed_dim=768):
        super().__init__()
        self.img_size = img_size
        self.patch_size = patch_size
        #Calculamos el número de partes de la imagen
        assert img_size%patch_size == 0, f"The size {patch_size} for the patches cant divide image size {img_size} into equal patches"
        self.n_patches = (img_size // patch_size)**2
        
        self.proj = nn.Conv2d(
            in_chans,
            embed_dim,
            kernel_size=patch_size,
            stride=patch_size,
            device=device
        )
    
    def forward(self, x):
        """Run foward pass.
        Parametros
        -------------
        x: torch.Tensor --------> Shape `(n_samples, in_chans, img_size, img_size)`.
            - Es un batch de imágenes
            - n_samples == batch_size, El número de ejemplos es el mismo al del tamaño del batch.
            - img_size: Altura y anchura de la imagen, que al ser un cuadrado, es la misma.
            
        Rerturns
        -------------
        torch.Tensor ----------> Shape `(n_samples, n_patches, embed_dim)`.
            - n_patches: parches en los que dividimos la imagen.
        """
        x = self.proj(x) #  (n_samples, embed_dim, n_patches ** 0.5, n_patches ** 0.5) Esto nos da un tensor de 4 dimensiones
        x = x.flatten(2) # (n_samples, embed_dim, n_patches) Lo aplanamos en una sola dimensión
        x = x.transpose(1,2) # (n_samples, n_patches, embed_dim) Adecuamos el tensor
        
        return x

################## Self-Attention Module
class Attention(nn.Module):
    """Attention mechanism
    
    Parameters
    --------------
    dim: int
        - Dimensiones de la entrada y salida por cada token, debemos de hacer que ambos
            valores tengan la misma dimensión.
        
    n_heads: int
        - Número de cabezas de atención.
        - Es necesario para el modelo trasnformer.
    
    qkv_bias: bool
        - Si es verdadero, incluimos el bias a la query, clave y el valor de la proyección.
    
    attn_p: float
        - Probabilidad de pérdida aplicada a la query, clave y los tensores valor.
        
    proj_p: float
        - Probabilidad de pérdida aplicada al tensor de salida.
        
    Attributes
    ---------------
    scale: float
        - Usado para normalizar el producto.
    
    qkv: nn.Linear
        - Proyección linear de la query, clave y valor.
        
    proj: nn.Linear
        - Mapeado linear que toma la secuencia de salida de todas las cabezas de atención y las
            mapea a un nuevo espacio.
    
    att_drop, proj_drop: nn.Dropout
        - Dropout layers
    """
    def __init__(self, dim, n_heads=12, qkv_bias=True, attn_p=0., proj_p=0.):
        super().__init__()
        self.n_heads = n_heads
        self.dim = dim
        self.head_dim = dim // n_heads # El tensor de salida al unir las cabezas debería mantener la dimensión
        self.scale = self.head_dim**-0.5 # Escala sugerida por el documento "Attention is All you Need" para evitar gradientes pequeños.
        
        self.qkv = nn.Linear(dim, dim*3, bias=qkv_bias, device=device) # Linear mapping que acepta una token de relleno y genera una query, key y valor
        self.attn_drop = nn.Dropout(attn_p)
        self.proj = nn.Linear(dim, dim, device=device) # Concatena las cabezas
        self.proj_drop = nn.Dropout(proj_p)
    
    def forward(self, x):
        """Run forward pass -- Obsérvese que ambos tensores tienen la mismsa forma.
        
        Parameters
        ------------
        x: torch.Tensor ------------------> Shape `(n_samples, n_patches + 1, dim)`.
        
        Returns
        ------------
        torch.Tensor --------------------> Shape `(n_samples, n_patches + 1, dim)`.
        """
        n_samples, n_tokens, dim = x.shape
        
        if dim!= self.dim:
            raise ValueError
        
        qkv = self.qkv(x) # (n_samples, n_patches + 1, 3 * dim)
        qkv = qkv.reshape(n_samples, n_tokens, 3, self.n_heads, self.head_dim) # (n_samples, n_patches + 1, 3, n_heads, self.head_dim)
        qkv = qkv.permute(2,0,3,1,4) # (3, n_samples, n_heads, n_patches + 1, head_dim)
        
        q, k, v = qkv[0], qkv[1], qkv[2]
        k_t = k.transpose(-2, -1) # (n_samples, n_heads, head_dim, n_patches + 1)
        dp = (q @ k_t) * self.scale # (n_samples, n_heads, n_patches + 1, n_patches + 1)
        attn = dp.softmax(dim=-1) # (n_samples, n_head, n_patches + 1, n_patches + 1) #Softmax function
        attn = self.attn_drop(attn)
        
        weighted_avg = attn @ v # (n_samples, n_heads, n_patches + 1, head_dim)
        weighted_avg = weighted_avg.transpose(1,2) # (n_samples, n_patches + 1, n_heads, head_dim)
        weighted_avg = weighted_avg.flatten(2) # (n_samples, n_patches + 1, dim)
        x = self.proj(weighted_avg) # (n_samples, n_patches + 1, dim)
        x = self.proj_drop(x) # (n_samples, n_patches + 1, dim)
        
        return x

class MLP(nn.Module):
    """Multi-Layer Perceptron
    
    Parameters
    -------------
    in_features: int
        - Número de inputs.
    
    hidden_features: int
        - Número de nodos en la capa oculta. En nuestro caso tendrá 1 capa oculta.
        
    out_features: int
        - Número de salidas.
    
    p: float:
        - Probabilidad de pérdida.
        
    Attributes
    -------------
    fc: nn.Linear
        - La primera capa linear.
    
    act: nn.GELU
        - Función de activación GELU.
        
    fc2: nn.Linear
        - La segunda capa linear.
    
    drop: nn.Dropout
        - Capa de pérdida.
    """
    def __init__(self, in_features, hidden_features, out_features, p = 0.):
        super().__init__()
        self.fc1 = nn.Linear(in_features, hidden_features, device=device)
        self.act = nn.GELU()
        self.fc2 = nn.Linear(hidden_features, out_features, device=device)
        self.drop = nn.Dropout(p)
        
    def forward(self, x):
        """Run forward pass.
        
        Parameters
        ------------
        x: torch.Tensor ----------> Shape ´(n_samples, n_patches + 1, in_features)´
        
        Returns
        ------------
        torch.Tensor -------------> Shape `(n_samples, n_patches + 1, out_features)`
        """
        x = self.fc1(x) # (n_samples, n_patches + 1, hidden_features)
        x = self.act(x) # (n_samples, n_patches + 1, hidden_features)
        x = self.drop(x) # (n_samples, n_patches + 1, hidden_features)
        x = self.fc2(x) # (n_samples, n_patches + 1, hidden_features)
        x = self.drop(x) # (n_samples, n_patches + 1, hidden_features)
        
        return x

class Block(nn.Module):
    """Transformer block
    
    Parameters
    -----------
    dim: int
        - Dimensiónd el embedding
    
    n_heads: int
        - Número de cabezas de atención
    
    mlp_ratio: float
        - Determina el tamaño oculto de la dimensión de el módulo MLP con respecto a dim
    
    qkv_bias: bool
        - Si es verdadero entonces incluimos el bias a las proyecciones de query, clave y valor.
    
    p, attn_p: float
        - Probabilidad de pérdida
    
    Attributes
    -----------
    norm1, norm2: LayerNorm
        - Módulo LayerNorm
        
    attn: Attention
        - Módulo Attention
    
    mlp: MLP
        - Módulo MLP
    """
    def __init__(self, dim, n_heads, mlp_ratio=4.0, qkv_bias = True, p = 0., attn_p = 0.):
        super().__init__()
        self.norm1 = nn.LayerNorm(dim, eps = 1e-6, device=device)
        self.attn = Attention(dim, n_heads=n_heads,qkv_bias=qkv_bias, attn_p=attn_p, proj_p=p)
        self.norm2 = nn.LayerNorm(dim, eps=1e-6, device=device)
        hidden_features = int(dim*mlp_ratio)
        self.mlp = MLP(in_features=dim, hidden_features = hidden_features, out_features = dim)
        
    def forward(self, x):
        """Run forward pass.
        
        Parameters
        -----------
        x: torch.Tensor --------> Shape `(n_samples,n_patches+1,dim)`
        
        Returns
        -----------
        torch.Tensor ----------> Shape`(n_samples, n_patches + 1, dim)`
        """
        x = x + self.attn(self.norm1(x)) # Le sumamos un bloque residual
        x = x + self.mlp(self.norm2(x))
        
        return x

class  VisionTransformer(nn.Module):
    """Simplified implementation of the Vision transformer.
    
    Parameters
    -----------
    img_size: int
        - Altura y anchura de la imagen, que deben de ser iguales.
    
    patch_size: int
        - Altura y anchura de las partes (tokens) en las que dividimos la imagen, de nuevo deben ser iguales.
    
    in_chans: int
        - Número de canales para el input.
        
    n_classes: int
        - Número de clases.
        
    embed_dim: int
        - Dimensionalidad de los embeddings para cada token o parte de la imagen.
        
    deph: int
        - Número de bloques del transformer.
    
    n_heads: int
        - Número de cabezas de atención.
    
    mlp_ratio: float
        - Determina la dimensión oculta para el módulo MLP
        
    qkv_bias: bool
        - Si es true, entonces incluímos el bias a las proyecciones de la query, clave y valor.
        
    p, attn_p: float
        - Probabilidad de pérdida.
        
    Attributes
    -----------
    patch_embed: PatchEmbed
        - Instancia de PatchEmbed
        - Es la primera capa de nuestra red.
    
    cls_token: nn.Parameter
        - Parámetro con posibilidad de aprendizaje que representa la primera token en la secuencia.
        - Tiene tantos elementos como el tamaño de `embed_dim`.
    
    pos_emb: nn.Parameter
        - Envoltura posicional de la token cls y todas las partes
        - Dónde exactamente está colocada esa token en la imagen.
        - Tiene tantos elementos como `(n_patches + 1) * embed_dim`
        
    pos_drop: nn.Dropout
        - Capa de pérdida.
        
    blocks: nn.ModuleList
        - Lista de módulos `Block`.
        
    norm: nn.LayerNorm
        - Normalización de las capas.
    """
    def __init__(self, img_size = 384, patch_size=16, in_chans=3, n_classes=1000, embed_dim=768, deph=12,
                n_heads=12,mlp_ratio=4., qkv_bias=True, p=0., attn_p = 0.,):
        super().__init__()
        
        self.patch_embed = PatchEmbed(img_size=img_size, patch_size=patch_size, in_chans=in_chans, embed_dim=embed_dim)
        self.cls_token = nn.Parameter(torch.zeros(1,1,embed_dim))
        self.pos_embed = nn.Parameter(torch.zeros(1,1+self.patch_embed.n_patches, embed_dim)).to(device)
        self.pos_drop = nn.Dropout(p=p)
        # Enconder del transformer, aunque cada bloque tenga los mismos parámetros, sus parámetros de aprendizaje serán diferentes.
        self.blocks = nn.ModuleList([Block(dim=embed_dim, n_heads=n_heads, mlp_ratio=mlp_ratio, qkv_bias=qkv_bias, p=p, attn_p=attn_p) for _ in range(deph)])
        # Capa de normalización y Linear Mapping
        self.norm = nn.LayerNorm(embed_dim, eps=1e-6, device=device)
        self.head = nn.Linear(embed_dim, n_classes, device=device)
    
    def forward(self, x):
        """Run the forward pass
        
        Parametros
        -------------
        x: torch.Tensor ----------> Shape `(n_samples, in_chans, img_size, img_size)`.
            - Un batch de imágenes.
        
        Returns
        -------------
        logits: torch.Tensor
            - Logits over all the classes -> `(n_samples, n_classes)`
        
        """
       
        n_samples = x.shape[0]
        x = self.patch_embed(x) # Tomamos las imágenes de imput y las transformamos en patch embeddings
        cls_token = self.cls_token.expand(n_samples, -1,-1).to(device) # (n_samples, 1, embed_dim)
        x = torch.cat((cls_token, x), dim=1).to(device) # (n_samples, 1 + n_patches, embed_dim)
        x = x + self.pos_embed # (n_samples, 1 + n_patches, embed_dim), Añadimos los embeddings posicionales que aprendimos
        x = self.pos_drop(x) # Aplicamos la pérdida
        
        # Definimos todos los bloques de nuestro codificador.
        for block in self.blocks:
            x = block(x)
        
        # Normalizamos las capas
        x = self.norm(x)
        
        cls_token_final = x[:,0] # Solo seleccionamos el embedding de clase
        x = self.head(cls_token_final)
        
        # Esperamos que este embedding de `significado` a la imagen completa.
        return x
    
    

In [14]:
def main(model = None, testing = False):
    
    transform = transforms.Compose([transforms.Resize(108),
                                 transforms.CenterCrop(108),
                                 transforms.ToTensor()])
    dataset = datasets.ImageFolder('../input/transformer1dataset/birds/birds', transform=transform)

    train_size = int(0.8 * len(dataset))
    test_size = len(dataset) - train_size
    train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])
    
    train_loader = DataLoader(train_dataset, shuffle=False, batch_size=16)
    test_loader = DataLoader(test_dataset, shuffle=False, batch_size=16)
    
    #Shared parameters
    N_EPOCHS = 5 # Number of epochs
    criterion = CrossEntropyLoss()
    if(model == None):
        #Model and training options
        model = VisionTransformer(img_size=108, patch_size=12,n_classes=400,deph=6,n_heads=6
                                ,mlp_ratio=8.0)
        LR = 0.01 #Learning rate
    
        #Training
        optimizer = Adam(model.parameters(), lr = LR)
        train(N_EPOCHS, train_loader,criterion, optimizer, model)
        PATH = './selected_model.pth'
        torch.save(model.state_dict(), PATH)
        
    if(testing == True):
        #for t in range(N_EPOCHS):
        #    print(f"Epoch {t+1}\n-----------------------------------")
        #    test(test_loader, model, criterion)
        check_accuracy(test_loader, model)
        print ("Testing done!")  

In [23]:
def train(n_epochs, trainloader,criterion, optimizer, net):
    for epoch in range(n_epochs):  # loop over the dataset multiple times

        running_loss = 0.0
        for i, data in enumerate(trainloader):
            # get the inputs; data is a list of [inputs, labels]
            inputs, labels = dataq
            inputs, labels = inputs.to(device), labels.to(device)
            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = net(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # print statistics
            running_loss += loss.item()
            if i % 500 == 499:    # print every 500 mini-batches
                print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}')
                running_loss = 0.0

    print('Finished Training')

def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.to(device)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
    
def check_accuracy(loader, model):
    num_correct = 0
    num_samples = 0
    model.eval()

    for x, y in loader:
        x = x.to(device=device)
        y = y.to(device=device)

        scores = model(x)
        _, predictions = scores.max(1)
        num_correct += (predictions == y).sum()
        num_samples += predictions.size(0)
    print(f'Got {num_correct} / {num_samples} with accuracy  \ {float(num_correct)/float(num_samples)*100:.2f}')
    model.train()

In [None]:
#Training
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")
main(testing=True)

In [24]:
#Testing
device = "cuda" if torch.cuda.is_available() else "cpu"
model = VisionTransformer(img_size=108, patch_size=9,n_classes=400,deph=6,n_heads=6)
pre_trained_path="../input/modeltransformer2/selected_model.pth"
state_dict = torch.load(pre_trained_path)
model.load_state_dict(state_dict)
model.eval()
print(f'model {pre_trained_path} loaded')
main(model=model, testing=True)

model ../input/modeltransformer2/selected_model.pth loaded
Got 117 / 11678 with accuracy  \ 1.00
Testing done!


In [38]:
# Test just one bird
device = "cuda" if torch.cuda.is_available() else "cpu"
transform = transforms.Compose([transforms.Resize(108),
                                 transforms.CenterCrop(108)])
dataset = datasets.ImageFolder('../input/transformer1dataset/birds/birds', transform=transform)
model = VisionTransformer(img_size=108, patch_size=12,n_classes=400,deph=6,n_heads=6)
img = Image.open("../input/transformer1dataset/birds/birds/RED FODY/001.jpg")
img_normalized = transform(img)

img = (np.array(img_normalized) / 36)-1 # En rango -1, 1. Para el "36" se divide el tamaño de la imagen entre 3
print(img.shape)
inp = torch.from_numpy(img).permute(2,0,1).unsqueeze(0).to(torch.float32).to(device)

print(inp.shape)
logits = model(inp)
probs = torch.nn.functional.softmax(logits, dim=1)

k = 30
top_probs, top_ixs = probs[0].topk(k)

dict = dataset.class_to_idx
key_list = list(dict.keys())
val_list = list(dict.values())


for i, (ix_, prob_) in enumerate(zip(top_ixs, top_probs)):
    ix = ix_.item()
    prob = prob_.item()
    position = val_list.index(ix)
    cls = key_list[position].strip()
    print(f"{i}: {cls:<45} --- {prob:.4f}")

(108, 108, 3)
torch.Size([1, 3, 108, 108])
0: SATYR TRAGOPAN                                --- 0.0107
1: CRANE HAWK                                    --- 0.0085
2: CAPE LONGCLAW                                 --- 0.0075
3: HAWAIIAN GOOSE                                --- 0.0072
4: BANDED PITA                                   --- 0.0072
5: ROYAL FLYCATCHER                              --- 0.0069
6: BAIKAL TEAL                                   --- 0.0067
7: STRAWBERRY FINCH                              --- 0.0064
8: PURPLE FINCH                                  --- 0.0062
9: SCARLET MACAW                                 --- 0.0059
10: MASKED BOOBY                                  --- 0.0057
11: WATTLED LAPWING                               --- 0.0054
12: RED BROWED FINCH                              --- 0.0054
13: BOBOLINK                                      --- 0.0054
14: JANDAYA PARAKEET                              --- 0.0052
15: GOLDEN CHEEKED WARBLER                        --

In [46]:
# Test and write submission_test.csv
import csv
import os
from natsort import natsorted
from torch.utils.data import Dataset

class NoClassDataset(Dataset):
    def __init__(self, main_dir, transform):
        self.main_dir = main_dir
        self.transform = transform
        all_imgs = os.listdir(main_dir)
        self.total_imgs = natsorted(all_imgs)

    def __len__(self):
        return len(self.total_imgs)

    def __getitem__(self, idx):
        img_loc = os.path.join(self.main_dir, self.total_imgs[idx])
        image = Image.open(img_loc).convert("RGB")
        tensor_image = self.transform(image)
        tensor_image = tensor_image.to(device)
        return tensor_image
    
    def getFileName(self, idx):
        img_loc = os.path.join(self.main_dir, self.total_imgs[idx])
        filename = os.path.basename(img_loc)
        return filename

#Creamos el csv

with open('submission.csv', 'w') as file:
    data = ["Id", "Category"]
    writer = csv.writer(file)
    writer.writerow(data)

    device = "cuda" if torch.cuda.is_available() else "cpu"
    transform = transforms.Compose([transforms.Resize(108),
                                     transforms.CenterCrop(108),transforms.ToTensor()])
    dataset = datasets.ImageFolder('../input/transformer1dataset/birds/birds', transform=transform)
    model = VisionTransformer(img_size=108, patch_size=12,n_classes=400,deph=6,n_heads=6)

    submissions = NoClassDataset('../input/transformer1dataset/submission_test/submission_test', transform=transform)
    submissions_loader = DataLoader(submissions , batch_size=16, shuffle=False)

    dict = dataset.class_to_idx
    key_list = list(dict.keys())
    val_list = list(dict.values())
    
    for epoch in range(16): #Necesario para iterar las 2000, La longitud del dataset es batch_size x num_epochs
        for idx, img_normalized in enumerate(submissions_loader):
            logits = model(img_normalized)
            probs = torch.nn.functional.softmax(logits, dim=1)


            k = 1
            top_prob, top_ix = probs[0].topk(k)
            ix = top_ix.item()
            prob = top_prob.item()
            position = val_list.index(top_ix)
            cls = key_list[position].strip()
            #import pdb; pdb.set_trace()
            row = [submissions.getFileName(idx), cls]
            writer.writerow(row)