<h1 align="center">Deep Learning - Master in Deep Learning of UPM</h1>

**IMPORTANTE**

Antes de empezar debemos instalar PyTorch Lightning, por defecto, esto valdría:

In [None]:
!pip install pytorch-lightning

Además, si te encuentras ejecutando este código en Google Collab, lo mejor será que montes tu drive para tener acceso a los datos:

In [None]:
from google.colab import drive
drive.mount('/content/drive')

<h1 align="center">Transformers!</h1>

En esta sesión práctica diseccionaremos un transformer:
- MHSDPA: MultiHead Scaled Dot product attention
- RoPE: Rotary Positional Embeddings
- El bloque transformer
- Un transformer completo para visión


## Carga del dataset
Vamos a visitar un viejo conocido: MNIST. Esta vez será algo diferente... veamos como cargarlo.

In [None]:
import datetime

import torch
import torch.nn as nn

from torchvision.datasets import MNIST
from torchvision import transforms
from torch.utils.data import DataLoader, random_split
from torchvision.transforms import ToTensor
from torchvision import datasets, transforms

import pytorch_lightning as pl
import torchmetrics
from pytorch_lightning import seed_everything

import numpy as np

import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler

import matplotlib.pyplot as plt

from einops.layers.torch import Rearrange

seed_everything(42)

Vamos a crear el directamente el modulo de datos. El dataset venía descrito en `datasets.MNIST` por lo que no vamos a declarar un Dataset.

In [None]:
class MNISTDataModule(pl.LightningDataModule):
    def __init__(self, batch_size=64):
        super().__init__()
        self.batch_size = batch_size
        # Transformaciones
        self.train_transform = transforms.Compose([
            transforms.RandAugment(num_ops=3, magnitude=1),
            transforms.ToTensor(),
            transforms.Normalize((0.1307,), (0.3081,))
        ])
        self.val_test_transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.1307,), (0.3081,))
        ])

    def prepare_data(self):
        datasets.MNIST(root="data", train=True, download=True)
        datasets.MNIST(root="data", train=False, download=True)

    def setup(self, stage=None):
        if stage in (None, "fit"):
            mnist_full = datasets.MNIST(root="data", train=True, transform=self.val_test_transform)
            self.train_dataset, self.val_dataset = random_split(
                mnist_full,
                [55000, 5000],
                generator=torch.Generator().manual_seed(42)
            )
            self.train_dataset.dataset.transform = self.train_transform

        if stage == "test" or stage is None:
            self.test_dataset = datasets.MNIST(root="data", train=False, transform=self.val_test_transform)

    def train_dataloader(self):
        return DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True)

    def val_dataloader(self):
        return DataLoader(self.val_dataset, batch_size=self.batch_size)

    def test_dataloader(self):
        return DataLoader(self.test_dataset, batch_size=self.batch_size)

## Programando un bloque transformer

En torch ya vienen descritos los transformers en `nn.Transformer`, `nn.TransformerEncoder` y `nn.TransformerDecoder`. Pero nosotros vamos a hacer una capa transformer de cero en este bloque. Sigue atentamente el ejemplo!

In [None]:
class MultiHeadScaledDotProductAttention(nn.Module):
    """
    Modulo de atencion multicabezal
    hidden_dim[int]: tamaño de la representación
    num_heads[int]: número de cabezas de atención
    """
    def __init__(self, hidden_dim, num_heads):
        super(MultiHeadScaledDotProductAttention, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_heads = num_heads

        self.head_dim = hidden_dim // num_heads # Debe ser divisible

        self.W_q = nn.Linear(hidden_dim, hidden_dim)
        self.W_k = nn.Linear(hidden_dim, hidden_dim)
        self.W_v = nn.Linear(hidden_dim, hidden_dim)

        self.fc_out = nn.Linear(hidden_dim, hidden_dim)

    def forward(self, q, k, v):
        # q, k, v > [batch_size, seq_len, hidden_dim]
        batch_size, seq_len, hidden_dim = q.shape

        # Proyecciones lineales
        q = self.W_q(q)  # [batch_size, seq_len, hidden_dim]
        k = self.W_k(k)  # [batch_size, seq_len, hidden_dim]
        v = self.W_v(v)  # [batch_size, seq_len, hidden_dim]

        # Separacion en cabezales
        q = q.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)  # [batch_size, num_heads, seq_len, head_dim]
        k = k.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)  # [batch_size, num_heads, seq_len, head_dim]
        v = v.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)  # [batch_size, num_heads, seq_len, head_dim]

        # Scaled dot-product attention
        scores = torch.matmul(q, k.transpose(-2, -1)) / (self.head_dim**1/2)  # [batch_size, num_heads, seq_len, seq_len]

        attention_weights = torch.nn.functional.softmax(scores, dim=-1)  # [batch_size, num_heads, seq_len, seq_len]
        context_vector = torch.matmul(attention_weights, v)  # [batch_size, num_heads, seq_len, head_dim]

        # concatenar cabezas (recuperar la forma original)
        context_vector = context_vector.transpose(1, 2).contiguous().view(batch_size, seq_len, hidden_dim)  # [batch_size, seq_len, hidden_dim]

        # Final linear layer
        output = self.fc_out(context_vector)  # [batch_size, seq_len, hidden_dim]

        return output, attention_weights

Vamos a programar una versión básica del bloque transformer de cero!

In [None]:
class TransformerBlock(nn.Module):
    """
    Transformer Block
    hidden_dim [int]: size of the representation
    num_heads [int]: number of attention heads
    dropout_prob [float]: dropout probability
    """
    def __init__(self, hidden_dim, num_heads, dropout_prob=0.0):
        super(TransformerBlock, self).__init__()
        self.mhsdpa = MultiHeadScaledDotProductAttention(hidden_dim, num_heads)
        self.norm1 = nn.LayerNorm(hidden_dim)
        self.norm2 = nn.LayerNorm(hidden_dim)
        self.dropout1 = nn.Dropout(dropout_prob)
        self.dropout2 = nn.Dropout(dropout_prob)
        self.feed_forward = nn.Sequential(
            nn.Linear(hidden_dim, 4 * hidden_dim),
            nn.GELU(),
            nn.Linear(4 * hidden_dim, hidden_dim)
        )

    def forward(self, x):
        # x > [batch_size, seq_len, hidden_dim]
        x = self.norm1(x)
        attention_output, _ = self.mhsdpa(x, x, x)
        x = x + self.dropout1(attention_output)
        x = self.norm2(x)
        feed_forward_output = self.feed_forward(x)
        x = x + self.dropout2(feed_forward_output)
        return x

Y con esto ya está el bloque básico programado! Pero aun falta trabajo para conseguir montar todo...

## RoPE
Vamos a ver los embeddings posicionales. RoPE es complicado, sigue estos pasos.
1. Computar los embeddings posicionales. Esto es algo fijo! Viene dado por el tamaño de la secuencia y el tamaño del encoder.
2. Usa los embeddings posicionales para computar los embeddings rotados.

In [None]:
def get_positional_embeddings(seq_len, dim, base=10000):
    # Esta es la frecuencia inversa que usa llama por defecto
    inv_freq = 1.0 / (base ** (torch.arange(0, dim, 2).float() / dim))
    # Secuencia de elementos, se codifica con floats de 0 a seq_len-1
    pos = torch.arange(seq_len, dtype=torch.float).unsqueeze(1)
    # Einstein sumation! Nueva funcion, muy util!
    # https://pytorch.org/docs/stable/generated/torch.einsum.html
    sinusoid_inp = torch.einsum("ik,j->ij", pos, inv_freq)

    # Calculo de el seno y coseno de los embeddings, calculamos
    # rotaciones geométricas
    embeddings = torch.cat((sinusoid_inp.sin(), sinusoid_inp.cos()), dim=-1)
    return embeddings

def apply_RoPE(x, positional_embeddings):
    seq_len, dim = x.shape
    # Otra einstein summation, esta vez su proposito es rotar x (bnd), mediante
    # positional embeddings (nd) para obtener la rotacion (bnd)
    x_rotated = torch.einsum("bnd,nd->bnd", x, positional_embeddings)
    return x_rotated

Ahora con esas funciones es sencillo declarar un módulo RoPE para nuestro transformer.

In [None]:
class RotaryPositionEmbedding(nn.Module):
    def __init__(self, max_seq_len, dim, base=10000):
        super(RotaryPositionEmbedding, self).__init__()
        self.dim = dim
        self.base = base
        self.max_seq_len = max_seq_len
        self.embeddings = self.get_positional_embeddings()

    def get_positional_embeddings(self):
        inv_freq = 1.0 / (self.base ** (torch.arange(0, self.dim, 2).float() / self.dim))
        pos = torch.arange(self.max_seq_len, dtype=torch.float).unsqueeze(1)
        sinusoid_inp = torch.einsum("ik,j->ij", pos, inv_freq)
        embeddings = torch.cat((sinusoid_inp.sin(), sinusoid_inp.cos()), dim=-1)
        return embeddings

    def forward(self, x):
        # x: [batch_size, seq_len, dim]
        _, seq_len, _ = x.shape
        x_rotated = torch.einsum("bnd,nd->bnd", x, self.embeddings[:seq_len].to(x.device))
        return x_rotated

Finalmente vamos a mejorar nuestro bloque transformer con esta nueva operacion

In [None]:
class TransformerBlockPlus(nn.Module):
    """
    Transformer Block
    hidden_dim [int]: size of the representation
    num_heads [int]: number of attention heads
    dropout_prob [float]: dropout probability
    """
    def __init__(self, hidden_dim, num_heads, dropout_prob=0.0):
        super(TransformerBlockPlus, self).__init__()
        self.mhsdpa = MultiHeadScaledDotProductAttention(hidden_dim, num_heads)
        self.rope = RotaryPositionEmbedding(512, hidden_dim) # 512 es un tamaño arbitrario!
        self.norm1 = nn.LayerNorm(hidden_dim)
        self.norm2 = nn.LayerNorm(hidden_dim)
        self.dropout1 = nn.Dropout(dropout_prob)
        self.dropout2 = nn.Dropout(dropout_prob)
        self.feed_forward = nn.Sequential(
            nn.Linear(hidden_dim, 4 * hidden_dim),
            nn.GELU(),
            nn.Linear(4 * hidden_dim, hidden_dim)
        )

    def forward(self, x):
        # x > [batch_size, seq_len, hidden_dim]
        x = self.norm1(x)
        qx, kx = self.rope(x), self.rope(x) # Nuevo!
        attention_output, _ = self.mhsdpa(qx, kx, x)
        x = x + self.dropout1(attention_output)
        x = self.norm2(x)
        feed_forward_output = self.feed_forward(x)
        x = x + self.dropout2(feed_forward_output)
        return x

## Montaje del transformer
Nuestro transformer va a recibir imágenes completas que debemos transformar en parches transformados (transformadas linealmente). Vamos a usar una capa totalmente lineal para este propósito.

Hay que trocear las imagenes. Usaremos EINOPS: https://einops.rocks/1-einops-basics/

Vamos a transformar la imagen con dimensiones [Batch; Color; Altura; Anchura] → [Batch; $(Altura*Anchura)/(AlturaReducida*AnchuraReducida)$; $AlturaReducida*AnchuraReducida*Color$]


In [None]:
class TransformerEncoder(nn.Module):
    """
    LSTM Regressor model
    h[int]: altura de imagen troceada
    w[int]: anchura de imagen troceada
    c[int]: número de canales de la imagen
    hidden_size[int]: tamaño de las capas ocultas de la RNN
    p_drop[float]: probabilidad de dropout
    output_size[int]: tamaño de la salida de la red (n_classes)
    """
    def __init__(self, h=7, w=7, # Siempre qe la imagen sea divisible por h y w!
                 c=1,
                 hidden_size=64,
                 p_drop=0.0,
                 output_size=1,
                 ):
        super(TransformerEncoder, self).__init__()
        self.linproj = nn.Linear(h*w*c, hidden_size)
        self.block1 = TransformerBlockPlus(hidden_size, 4, p_drop)
        self.block2 = TransformerBlockPlus(hidden_size, 4, p_drop)
        self.fc = nn.Linear(hidden_size, output_size)
        '''
        Esto quiere decir
        Dimension b = batch
        Dimension c = colores
        Dimension (h i) = h*i es la altura original, h es nuestra reducida
         se infiere i a partir del resto de informacion
        Dimension (w j) = w*j es la altura original, w es nuestra reducida
         Se infiere j a partir del resto de informacion
        ------
        Son transformadas a
        Dimension b = batch
        Dimension (i j) = i*j es el tamaño de la secuencia (16 en caso MNIST)
        Dimension (c h w) = c*h*w son los features planos de la imagen (7*7 en caso de MNIST)
        '''
        self.crop = Rearrange('b c (h i) (w j) -> b (i j) (c h w)', h=h, w=w)

    def forward(self, x):
        # x[batch_size; color_channel; realh; realw]
        # Queremos transformarlo en
        # x[batch_size; seq_len; h*w]
        x = self.crop(x)
        x = self.linproj(x)
        x = self.block1(x)
        x = self.block2(x).mean(1) # Mean pooling de todos los embeddings
        return self.fc(x) #out[batch_size; output_size]

## Entrenamiento
Vamos a hacer el entrenamiento, definiendo el modulo de lighting y el resto de detalles faltantes para completar el proceso.

In [None]:
class MNISTClassifier(pl.LightningModule):
    def __init__(self, model, classes=10, learning_rate=1e-3):
        super().__init__()
        self.save_hyperparameters() # guardamos la configuración de hiperparámetros
        self.learning_rate = learning_rate
        self.model = model
        self.criterion = nn.CrossEntropyLoss()
        self.acc = torchmetrics.Accuracy('multiclass', num_classes=classes)

    def forward(self, x):
        return self.model(x)

    def compute_batch(self, batch, split='train'):
        inputs, targets = batch
        preds = self(inputs)
        targets = targets.view(-1)

        loss = self.criterion(preds, targets)
        self.log_dict(
            {
                f'{split}_loss': loss,
                f'{split}_acc': self.acc(preds, targets),
            },
            on_epoch=True, prog_bar=True)

        return loss

    def training_step(self, batch, batch_idx):
        return self.compute_batch(batch, 'train')

    def validation_step(self, batch, batch_idx):
        return self.compute_batch(batch, 'val')

    def test_step(self, batch, batch_idx):
        return self.compute_batch(batch, 'test')

    def configure_optimizers(self):
        return torch.optim.AdamW(self.parameters(), lr=self.learning_rate) # self.parameters() son los parámetros del modelo

### Bucle de entrenamiento

In [None]:
# Parámetros
SAVE_DIR = f'lightning_logs/mnistformer/{datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}'
w = 7
h = 7
batch_size = 8
hidden_size = 16
learning_rate = 1e-3
p_drop = 0.2
labels = 10

# DataModule
data_module = MNISTDataModule(batch_size=batch_size)

# Model
transformer = TransformerEncoder(h=h,
                                 w=w, # Siempre qe la imagen sea divisible por h y w!
                                 c=1,
                                 hidden_size=hidden_size,
                                 p_drop=p_drop,
                                 output_size=labels,
                                )

# LightningModule
module = MNISTClassifier(transformer, learning_rate=learning_rate, classes=labels)

# Callbacks
early_stopping_callback = pl.callbacks.EarlyStopping(
    monitor='val_loss', # monitorizamos la pérdida en el conjunto de validación
    mode='min',
    patience=5, # número de epochs sin mejora antes de parar
    verbose=False, # si queremos que muestre mensajes del estado del early stopping
)
model_checkpoint_callback = pl.callbacks.ModelCheckpoint(
    monitor='val_loss', # monitorizamos la pérdida en el conjunto de validación
    mode='min', # queremos minimizar la pérdida
    save_top_k=1, # guardamos solo el mejor modelo
    dirpath=SAVE_DIR, # directorio donde se guardan los modelos
    filename=f'best_model' # nombre del archivo
)

callbacks = [early_stopping_callback, model_checkpoint_callback]

# Loggers
csv_logger = pl.loggers.CSVLogger(
    save_dir=SAVE_DIR,
    name='metrics',
    version=None
)

loggers = [csv_logger] # se pueden poner varios loggers (mirar documentación)

# Trainer
trainer = pl.Trainer(max_epochs=50, accelerator='gpu', callbacks=callbacks, logger=loggers)

trainer.fit(module, data_module)
results = trainer.test(module, data_module)