# 05 - Tranformers desde cero

![transformers](../images/transformers.webp)

Para construir nuestro modelo Transformer, seguiremos estos pasos:

1. Importar las librerías y módulos necesarios
2. Definir los componentes básicos: Multi-Atención, redes feed-forward en función de la posición, codificación posicional.
3. Construir las capas de codificación y descodificación
4. Combinar las capas de codificación y descodificación para crear el modelo Transformer completo.
5. Preparación de datos de muestra
6. Entrenar el modelo

# 1 - Importar las librerías

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim

import math

from tqdm.notebook import tqdm

# 2.1 - Multi-Atención

![transformers_2](../images/transformers_2.png)

El mecanismo de atención multicabezal calcula la atención entre cada par de posiciones de una secuencia. Consta de varias "cabezas de atención" que captan distintos aspectos de la secuencia de entrada.

In [2]:
class MultiHeadAttention(nn.Module):
    
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
        
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads
        
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)
        
        
    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        
        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        if mask is not None:
            attn_scores = attn_scores.masked_fill(mask == 0, -1e9)
        attn_probs = torch.softmax(attn_scores, dim=-1)
        output = torch.matmul(attn_probs, V)
        return output
    
        
    def split_heads(self, x):
        batch_size, seq_length, d_model = x.size()
        return x.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)
        
        
    def combine_heads(self, x):  
        batch_size, _, seq_length, d_k = x.size()
        return x.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)
        
        
    def forward(self, Q, K, V, mask=None):
        
        Q = self.split_heads(self.W_q(Q))
        K = self.split_heads(self.W_k(K))
        V = self.split_heads(self.W_v(V))
        
        attn_output = self.scaled_dot_product_attention(Q, K, V, mask)
        output = self.W_o(self.combine_heads(attn_output))
        return output

El código de MultiHeadAttention inicializa el módulo con parámetros de entrada y capas de transformación lineal. Calcula las puntuaciones de atención, remodela el tensor de entrada en múltiples cabezas y combina las salidas de atención de todas las cabezas. El método feed-forward calcula la auto-atención de varias cabezas, lo que permite al modelo centrarse en algunos aspectos diferentes de la secuencia de entrada.

# 2.2 - Redes Feed-Forward en función de la posición

![transformers_exp_8](../images/transformers_exp_8.png)

In [3]:
class PositionWiseFeedForward(nn.Module):
    
    def __init__(self, d_model, d_ff):
        
        super(PositionWiseFeedForward, self).__init__()
        
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))

La clase PositionWiseFeedForward extiende nn.Module de PyTorch e implementa una red feed-forward de posición. La clase se inicializa con dos capas de transformación lineal y una función de activación ReLU. El método forward aplica estas transformaciones y la función de activación secuencialmente para calcular la salida. Este proceso permite al modelo tener en cuenta la posición de los elementos de entrada al realizar predicciones.

# 2.3 - Positional Encoding

![transformers_3](../images/transformers_3.png)

La codificación posicional se utiliza para inyectar la información de posición de cada token (palabra) en la secuencia de entrada. Utiliza funciones seno y coseno de distintas frecuencias para generar la codificación posicional.

In [4]:
class PositionalEncoding(nn.Module):
    
    def __init__(self, d_model, max_seq_length):
        super(PositionalEncoding, self).__init__()
        
        pe = torch.zeros(max_seq_length, d_model)
        position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
        
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        
        self.register_buffer('pe', pe.unsqueeze(0))
        
    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

La clase PositionalEncoding se inicializa con los parámetros de entrada d_model y max_seq_length, creando un tensor para almacenar valores de codificación posicional. La clase calcula valores de seno y coseno para índices pares e impares, respectivamente, basándose en el factor de escala div_term. El método forward calcula la codificación posicional añadiendo los valores de codificación posicional almacenados al tensor de entrada, permitiendo al modelo capturar la información de posición de la secuencia de entrada.

Ahora, construiremos las capas Encoder y Decoder.

# 3.1 - Encoder

![transformers_4](../images/transformers_4.webp)

Una capa encoder consta de una capa de multi-atención, una capa feed-forward de posición y dos capas de normalización.

In [5]:
class EncoderLayer(nn.Module):
    
    def __init__(self, d_model, num_heads, d_ff, dropout):
        
        super(EncoderLayer, self).__init__()
        
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, mask):
        attn_output = self.self_attn(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))
        return x

La clase EncoderLayer se inicializa con parámetros de entrada y componentes, incluyendo un módulo MultiHeadAttention, un módulo PositionWiseFeedForward, dos módulos de normalización de capas y una capa de dropout. Los métodos feed-forward calculan la salida de la capa codificadora aplicando auto-atención, añadiendo la salida de atención al tensor de entrada y normalizando el resultado. A continuación, calcula la salida feed-forward en función de la posición, la combina con la salida de auto-atención normalizada y normaliza el resultado final antes de devolver el tensor procesado.

# 3.2 - Decoder

![transformers_5](../images/transformers_5.webp)

Una capa decoder consta de dos capas de multi-atención, una capa feed-forward de posición y tres capas de normalización.

In [6]:
class DecoderLayer(nn.Module):
    
    def __init__(self, d_model, num_heads, d_ff, dropout):
        
        super(DecoderLayer, self).__init__()
        
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.cross_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, enc_output, src_mask, tgt_mask):
        attn_output = self.self_attn(x, x, x, tgt_mask)
        x = self.norm1(x + self.dropout(attn_output))
        attn_output = self.cross_attn(x, enc_output, enc_output, src_mask)
        x = self.norm2(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm3(x + self.dropout(ff_output))
        return x

La clase DecoderLayer se inicializa con parámetros de entrada y componentes como los módulos MultiHeadAttention para auto-atención enmascarada y atención cruzada, un módulo PositionWiseFeedForward, tres módulos de normalización de capas y una capa de dropout.

El método forward calcula la salida de la capa decodificadora realizando los siguientes pasos:

1. Calcular la salida de auto-atención enmascarada y añadirla al tensor de entrada, seguido de un dropout y normalización de la capa.
2. Calcular la salida de atención cruzada entre las salidas del decoder y del encoder y añadirla a la salida de auto-atención enmascarada normalizada, seguida de un dropout y una normalización de la capa.
3. Calcular la salida feed-forward en función de la posición y combinarla con la salida normalizada de atención cruzada, seguida de un dropout y normalización de capa.
4. Devuelve el tensor procesado.

Estas operaciones permiten al decodificador generar secuencias objetivo basadas en la entrada y la salida del codificador.

Ahora, combinemos las capas Encoder y Decoder para crear el modelo Transformer completo.

# 4 - Transformer

![transformers_6](../images/transformers_6.webp)


Todo junto.

In [7]:
class Transformer(nn.Module):
    
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, 
                 num_heads, num_layers, d_ff, max_seq_length, dropout):
        
        super(Transformer, self).__init__()
        
        self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
        self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_seq_length)

        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])

        self.fc = nn.Linear(d_model, tgt_vocab_size)
        self.dropout = nn.Dropout(dropout)
        

    def generate_mask(self, src, tgt):
        src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
        tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(3)
        seq_length = tgt.size(1)
        nopeak_mask = (1 - torch.triu(torch.ones(1, seq_length, seq_length), diagonal=1)).bool()
        tgt_mask = tgt_mask & nopeak_mask
        return src_mask, tgt_mask

    
    def forward(self, src, tgt):
        src_mask, tgt_mask = self.generate_mask(src, tgt)
        src_embedded = self.dropout(self.positional_encoding(self.encoder_embedding(src)))
        tgt_embedded = self.dropout(self.positional_encoding(self.decoder_embedding(tgt)))

        enc_output = src_embedded
        for enc_layer in self.encoder_layers:
            enc_output = enc_layer(enc_output, src_mask)

        dec_output = tgt_embedded
        for dec_layer in self.decoder_layers:
            dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)

        output = self.fc(dec_output)
        return output

La clase Transformer combina los módulos definidos anteriormente para crear un modelo Transformer completo. Durante la inicialización, el módulo Transformer configura los parámetros de entrada e inicializa varios componentes, incluyendo capas de embebido para secuencias de origen y destino, un módulo PositionalEncoding, módulos EncoderLayer y DecoderLayer para crear capas apiladas, una capa lineal para proyectar la salida del decoder y una capa de dropout.

El método generate_mask crea máscaras binarias para las secuencias de origen y destino con el fin de ignorar los tokens de relleno y evitar que el decoder atienda a tokens futuros. El método forward calcula la salida del modelo Transformer a través de los siguientes pasos:

1. Generar máscaras de origen y destino utilizando el método generate_mask.
2. Calcular los embebidos de origen y destino, y aplicar la codificación posicional y el dropout.
3. Procesar la secuencia de origen a través de capas codificadoras, actualizando el tensor enc_output.
4. Procesar la secuencia de destino a través de capas decodificadoras, utilizando enc_output y máscaras, y actualizando el tensor dec_output.
5. Aplicar la capa de proyección lineal a la salida del decodificador, obteniendo logits de salida.

Estos pasos permiten al modelo Transformer procesar secuencias de entrada y generar secuencias de salida basándose en la funcionalidad combinada de sus componentes.

# 5 - Datos de muestra

En este ejemplo, crearemos un conjunto de datos de juguete con fines de demostración. En la práctica, utilizaremos un conjunto de datos mayor, preprocesaremos el texto y crearemos correspondencias de vocabulario para las lenguas de origen y de destino.

In [8]:
src_vocab_size = 5000
tgt_vocab_size = 5000
d_model = 512
num_heads = 8
num_layers = 6
d_ff = 2048
max_seq_length = 100
dropout = 0.1

# Generate random sample data
src_data = torch.randint(1, src_vocab_size, (64, max_seq_length))  # (batch_size, seq_length)
tgt_data = torch.randint(1, tgt_vocab_size, (64, max_seq_length))  # (batch_size, seq_length)

In [11]:
src_data

tensor([[4913, 3021, 1639,  ..., 3907, 1389, 1234],
        [1757, 2977, 4557,  ...,  157, 3450,  389],
        [ 669, 2238, 4430,  ..., 4702,  906, 3132],
        ...,
        [4185, 2745, 2587,  ..., 2658, 1461, 2338],
        [1628,  341, 4858,  ..., 1719,  445,  113],
        [1657,  976, 3032,  ..., 4686, 1200, 2217]])

# 6 - Entrenamiento

Ahora entrenaremos el modelo utilizando los datos de muestra. En la práctica, utilizaríamos un conjunto de datos mayor y lo dividiríamos en conjuntos de entrenamiento y validación.


Podemos usar esta forma para construir un Transformer simple desde cero en Pytorch. Todos los grandes modelos lingüísticos utilizan estos bloques Transformer codificadores o decodificadores para el entrenamiento. Por lo tanto, entender la red que lo empezó todo es extremadamente importante. 

In [9]:
transformer = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_heads, 
                          num_layers, d_ff, max_seq_length, dropout)

In [10]:
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)

transformer.train()

for epoch in tqdm(range(10)):
    optimizer.zero_grad()
    output = transformer(src_data, tgt_data[:, :-1])
    loss = criterion(output.contiguous().view(-1, tgt_vocab_size), tgt_data[:, 1:].contiguous().view(-1))
    loss.backward()
    optimizer.step()
    print(f"Epoch: {epoch+1}, Loss: {loss.item()}")

  0%|          | 0/10 [00:00<?, ?it/s]

Epoch: 1, Loss: 8.688794136047363
Epoch: 2, Loss: 8.55346965789795
Epoch: 3, Loss: 8.48188591003418
Epoch: 4, Loss: 8.424513816833496
Epoch: 5, Loss: 8.364824295043945
Epoch: 6, Loss: 8.294319152832031
Epoch: 7, Loss: 8.209874153137207
Epoch: 8, Loss: 8.130683898925781
Epoch: 9, Loss: 8.047629356384277
Epoch: 10, Loss: 7.961203098297119
