<a href="https://colab.research.google.com/github/LeninGF/CoursesNotes/blob/main/InteligenciaArtificalGenerativa/Problems/transformers/EjercicioTransformerEncoderDecoder-IAG-2024B.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Transformer Encoder Decoder



Coder: Lenin G. Falconí



Asignatura: Tópicos Especiales (Inteligencia Artificial)



Fecha: 2024-12-11

Para realizar un transformer Encoder Decoder se requiere de cross attention para conectar las capas de encoder a las de decoder:

1. Embedding Layer
2. Positional Encoding
3. Pila de capas de Encoder
3. Cross Attention
3. Pila de capas de Decoder
4. La salida que sería un sequence to sequence

In [38]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import math
import copy

### MultiHead attention
 the MultiHeadAttention class encapsulates the multi-head attention mechanism commonly used in transformer models. It takes care of splitting the input into multiple attention heads, applying attention to each head, and then combining the results. By doing so, the model can capture various relationships in the input data at different scales, improving the expressive ability of the model.
`scaled_dot_product_attention`: the attention scores are calculated by taking the dot product of queries (Q) and keys (K), and then scaling by the square root of the key dimension (d_k).

`attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)`

`split_heads`: This method reshapes the input x into the shape (batch_size, num_heads, seq_length, d_k). It enables the model to process multiple attention heads concurrently, allowing for parallel computation.

`combine_heads`: combines the results back into a single tensor of shape (batch_size, seq_length, d_model)

`forward`: The forward method is where the actual computation happens:

In [39]:
class MultiHeadAttention(nn.Module):
  """
  d_model: Dimensionality of the input.
  num_heads: The number of attention heads to split the input into.
  d_model is divisible by num_heads

  """
  def __init__(self, d_model, num_heads):
    super(MultiHeadAttention, self).__init__()
    # Ensure that the model dimension (d_model) is divisible by the number of heads
    assert d_model % num_heads == 0, "d_model must be divisible by num_heads"

    # Initialize dimensions
    self.d_model = d_model # Model's dimension
    self.num_heads = num_heads # Number of attention heads
    self.d_k = d_model // num_heads # Dimension of each head's key, query, and value

    # Linear layers for transforming inputs
    self.W_q = nn.Linear(d_model, d_model) # Query transformation
    self.W_k = nn.Linear(d_model, d_model) # Key transformation
    self.W_v = nn.Linear(d_model, d_model) # Value transformation
    self.W_o = nn.Linear(d_model, d_model) # Output transformation

  def scaled_dot_product_attention(self, Q, K, V, mask=None):
    # Calculate attention scores
    attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)

    # Apply mask if provided (useful for preventing attention to certain parts like padding)
    if mask is not None:
        attn_scores = attn_scores.masked_fill(mask == 0, -1e9)

    # Softmax is applied to obtain attention probabilities
    attn_probs = torch.softmax(attn_scores, dim=-1)

    # Multiply by values to obtain the final output
    output = torch.matmul(attn_probs, V)
    return output

  def split_heads(self, x):
    # Reshape the input to have num_heads for multi-head attention
    batch_size, seq_length, d_model = x.size()
    return x.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)

  def combine_heads(self, x):
    # Combine the multiple heads back to original shape
    batch_size, _, seq_length, d_k = x.size()
    return x.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)

  def forward(self, Q, K, V, mask=None):
    # Apply linear transformations and split heads
    Q = self.split_heads(self.W_q(Q))
    K = self.split_heads(self.W_k(K))
    V = self.split_heads(self.W_v(V))

    # Perform scaled dot-product attention
    attn_output = self.scaled_dot_product_attention(Q, K, V, mask)

    # Combine heads and apply output transformation
    output = self.W_o(self.combine_heads(attn_output))
    return output

### Position Wise Feed Forward
defines a position-wise feed-forward neural network that consists of two linear layers with a ReLU activation function in between. In the context of transformer models, this feed-forward network is applied to each position separately and identically. It helps in transforming the features learned by the attention mechanisms within the transformer, acting as an additional processing step for the attention outputs.

In [40]:
class PositionWiseFeedForward(nn.Module):
  """
  d_model: Dimensionality of the input.
  d_ff: Dimensionality of the inner layer in the feed-forward network.
  """

  def __init__(self, d_model, d_ff):
    super(PositionWiseFeedForward, self).__init__()
    self.fc1 = nn.Linear(d_model, d_ff)
    self.fc2 = nn.Linear(d_ff, d_model)
    self.relu = nn.ReLU()

  def forward(self, x):
    return self.fc2(self.relu(self.fc1(x)))

### Positional Encoding
The PositionalEncoding class adds information about the position of tokens within the sequence. Since the transformer model lacks inherent knowledge of the order of tokens (due to its self-attention mechanism), this class helps the model to consider the position of tokens in the sequence. The sinusoidal functions used are chosen to allow the model to easily learn to attend to relative positions, as they produce a unique and smooth encoding for each position in the sequence.

`max_seq_length`: The maximum length of the sequence for which positional encodings are pre-computed.
`pe`: A tensor filled with zeros, which will be populated with positional encodings.
`position`: A tensor containing the position indices for each position in the sequence.
`div_term`: A term used to scale the position indices in a specific way.

The sine function is applied to the even indices and the cosine function to the odd indices of pe.

In [41]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_length):
        super(PositionalEncoding, self).__init__()

        pe = torch.zeros(max_seq_length, d_model)
        position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        self.register_buffer('pe', pe.unsqueeze(0))

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

###  Encoder Layer

The EncoderLayer class defines a single layer of the transformer's encoder. It encapsulates a multi-head self-attention mechanism followed by position-wise feed-forward neural network, with residual connections, layer normalization, and dropout applied as appropriate. These components together allow the encoder to capture complex relationships in the input data and transform them into a useful representation for downstream tasks. Typically, multiple such encoder layers are stacked to form the complete encoder part of a transformer model.

In [42]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask):
        attn_output = self.self_attn(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))
        return x

### Encoder Transformer

In [43]:
class TransformerEncoder(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, num_layers, d_ff, dropout, num_classes, max_sequence_length):
        super(TransformerEncoder, self).__init__()
        self.embedding = nn.Linear(vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_sequence_length)
        self.encoder_layers = nn.ModuleList(
            [EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)]
        )
        self.fc = nn.Linear(d_model, num_classes)

    def forward(self, x, mask=None):
        x = self.embedding(x)
        x = self.positional_encoding(x)
        for layer in self.encoder_layers:
            x = layer(x, mask)
        x = x.mean(dim=1)  # Global average pooling
        x = self.fc(x)
        return x


## Decoder Layer

The DecoderLayer class defines a single layer of the transformer's decoder. It consists of a multi-head self-attention mechanism, a multi-head cross-attention mechanism (that attends to the encoder's output), a position-wise feed-forward neural network, and the corresponding residual connections, layer normalization, and dropout layers. This combination enables the decoder to generate meaningful outputs based on the encoder's representations, taking into account both the target sequence and the source sequence. As with the encoder, multiple decoder layers are typically stacked to form the complete decoder part of a transformer model.

In [44]:
import torch
import torch.nn as nn
import torch.optim as optim

class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.cross_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_output, src_mask, tgt_mask):
        attn_output = self.self_attn(x, x, x, tgt_mask)
        x = self.norm1(x + self.dropout(attn_output))
        attn_output = self.cross_attn(x, enc_output, enc_output, src_mask)
        x = self.norm2(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm3(x + self.dropout(ff_output))
        return x



## Sequential Transformer

In [45]:
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout):
        super(Transformer, self).__init__()
        self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
        self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_seq_length)

        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])

        self.fc = nn.Linear(d_model, tgt_vocab_size)
        self.dropout = nn.Dropout(dropout)

    def generate_mask(self, src, tgt):
        src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
        tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(3)
        seq_length = tgt.size(1)
        nopeak_mask = (1 - torch.triu(torch.ones(1, seq_length, seq_length), diagonal=1)).bool()
        tgt_mask = tgt_mask & nopeak_mask
        return src_mask, tgt_mask

    def forward(self, src, tgt):
        src_mask, tgt_mask = self.generate_mask(src, tgt)
        src_embedded = self.dropout(self.positional_encoding(self.encoder_embedding(src)))
        tgt_embedded = self.dropout(self.positional_encoding(self.decoder_embedding(tgt)))

        enc_output = src_embedded
        for enc_layer in self.encoder_layers:
            enc_output = enc_layer(enc_output, src_mask)

        dec_output = tgt_embedded
        for dec_layer in self.decoder_layers:
            dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)

        output = self.fc(dec_output)
        return output



## Prueba con Datos Aleatorios

Se declara un dataset que genera datos sintéticos para evaluar el rendimiento del modelo en clasificacción

In [46]:
d_model = 512
num_heads = 8
num_layers = 6
d_ff = 2048
vocab_size = 1000
sequence_length = 64
dropout = 0.1
batch_size = 16

In [47]:
# Instantiate the model
model = Transformer(vocab_size, vocab_size, d_model, num_heads, num_layers, d_ff, sequence_length, dropout) # Pass sequence_length and dropout while creating the instance
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

In [48]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Using device: {device}')


Using device: cuda


Generando un dataset de secuencia de ejemplo

In [49]:
import numpy as np

# Create a simple sequence dataset
def create_dataset(sequence_length, vocab_size, batch_size):
    x = np.arange(vocab_size)
    y = np.roll(x, -1)  # Shifted sequence
    x = np.tile(x, (batch_size, 1))
    y = np.tile(y, (batch_size, 1))
    return torch.tensor(x, dtype=torch.long), torch.tensor(y, dtype=torch.long)

# Dummy dataset
src, tgt = create_dataset(sequence_length, vocab_size, batch_size)
src = src[:, :sequence_length]
tgt = tgt[:, :sequence_length]

# Masks for padding (if necessary)
src_mask = torch.nn.Transformer.generate_square_subsequent_mask(sequence_length).to(device)  # Call from torch.nn.Transformer
tgt_mask = torch.nn.Transformer.generate_square_subsequent_mask(sequence_length).to(device)  # Call from torch.nn.Transformer


In [50]:
src.shape, tgt.shape, src_mask.shape, tgt_mask.shape

(torch.Size([16, 64]),
 torch.Size([16, 64]),
 torch.Size([64, 64]),
 torch.Size([64, 64]))

In [51]:
src

tensor([[ 0,  1,  2,  ..., 61, 62, 63],
        [ 0,  1,  2,  ..., 61, 62, 63],
        [ 0,  1,  2,  ..., 61, 62, 63],
        ...,
        [ 0,  1,  2,  ..., 61, 62, 63],
        [ 0,  1,  2,  ..., 61, 62, 63],
        [ 0,  1,  2,  ..., 61, 62, 63]])

In [52]:
tgt

tensor([[ 1,  2,  3,  ..., 62, 63, 64],
        [ 1,  2,  3,  ..., 62, 63, 64],
        [ 1,  2,  3,  ..., 62, 63, 64],
        ...,
        [ 1,  2,  3,  ..., 62, 63, 64],
        [ 1,  2,  3,  ..., 62, 63, 64],
        [ 1,  2,  3,  ..., 62, 63, 64]])

## Train Loop

Haciendo un lazo de entrenamiento

In [54]:
def train_model(model, src, tgt, src_mask, tgt_mask, criterion, optimizer, epochs=10):
    model.train()
    for epoch in range(epochs):
        optimizer.zero_grad()
        output = model(src, tgt)
        # Replacing view with reshape to handle non-contiguous tensors
        loss = criterion(output.reshape(-1, vocab_size), tgt.reshape(-1))
        loss.backward()
        optimizer.step()
        if epoch % 1 == 0:
            print(f'Epoch {epoch+1}/{epochs}, Loss: {loss.item()}')

# Train the model
train_model(model, src, tgt, src_mask, tgt_mask, criterion, optimizer, epochs=20)

Epoch 1/20, Loss: 1.4571903944015503
Epoch 2/20, Loss: 1.1055076122283936
Epoch 3/20, Loss: 0.8289587497711182
Epoch 4/20, Loss: 0.6056163311004639
Epoch 5/20, Loss: 0.4524783492088318
Epoch 6/20, Loss: 0.33423587679862976
Epoch 7/20, Loss: 0.2503746449947357
Epoch 8/20, Loss: 0.19388259947299957
Epoch 9/20, Loss: 0.15040621161460876
Epoch 10/20, Loss: 0.12003596127033234
Epoch 11/20, Loss: 0.0983632430434227
Epoch 12/20, Loss: 0.08239667117595673
Epoch 13/20, Loss: 0.07036348432302475
Epoch 14/20, Loss: 0.06080608442425728
Epoch 15/20, Loss: 0.053940299898386
Epoch 16/20, Loss: 0.04725867509841919
Epoch 17/20, Loss: 0.04256439208984375
Epoch 18/20, Loss: 0.038610804826021194
Epoch 19/20, Loss: 0.035773251205682755
Epoch 20/20, Loss: 0.03295738995075226


## Evaluation Loop
Haciendo un lazo para evaluar el desempeño

In [57]:
def evaluate_model(model, src, tgt, src_mask, tgt_mask, criterion):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        output = model(src, tgt)
        loss = criterion(output.reshape(-1, vocab_size), tgt.reshape(-1))
        total_loss += loss.item()
    avg_loss = total_loss / src.size(0)
    perplexity = torch.exp(torch.tensor(avg_loss))
    return avg_loss, perplexity

# Evaluate the model
avg_loss, perplexity = evaluate_model(model, src, tgt, src_mask, tgt_mask, criterion)
print(f"Average Loss: {avg_loss:.4f}, Perplexity: {perplexity:.4f}")



Average Loss: 0.0010, Perplexity: 1.0010


In [62]:
# prompt: generate a prediction for the src sequence and print the first 100 results given by the model

# Assuming the model and necessary variables (src, tgt, etc.) are already defined as in the provided code.

# Generate predictions
def predict_sequence(model, src, max_len):
    model.eval()
    with torch.no_grad():
        predictions = []
        # Initialize the input sequence with the first token of src sequence.
        input_seq = src[:, 0].unsqueeze(1)  # Start with the first token
        for _ in range(max_len):
          output = model(src, input_seq)
          _, predicted_token = torch.max(output[:, -1, :], dim=-1)
          predictions.append(predicted_token)
          input_seq = torch.cat([input_seq, predicted_token.unsqueeze(1)], dim=1)
        return torch.stack(predictions, dim=1)

predicted_sequence = predict_sequence(model, src, sequence_length)

# Print the first 100 results for the first sequence in the batch.
print(predicted_sequence[0, :100])

tensor([3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
        3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
        3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3])


In [63]:
tgt[:100]

tensor([[ 1,  2,  3,  ..., 62, 63, 64],
        [ 1,  2,  3,  ..., 62, 63, 64],
        [ 1,  2,  3,  ..., 62, 63, 64],
        ...,
        [ 1,  2,  3,  ..., 62, 63, 64],
        [ 1,  2,  3,  ..., 62, 63, 64],
        [ 1,  2,  3,  ..., 62, 63, 64]])

In [59]:
!pip install torchinfo

Collecting torchinfo
  Downloading torchinfo-1.8.0-py3-none-any.whl.metadata (21 kB)
Downloading torchinfo-1.8.0-py3-none-any.whl (23 kB)
Installing collected packages: torchinfo
Successfully installed torchinfo-1.8.0


In [60]:
from torchinfo import summary
summary(model)

Layer (type:depth-idx)                        Param #
Transformer                                   --
├─Embedding: 1-1                              512,000
├─Embedding: 1-2                              512,000
├─PositionalEncoding: 1-3                     --
├─ModuleList: 1-4                             --
│    └─EncoderLayer: 2-1                      --
│    │    └─MultiHeadAttention: 3-1           1,050,624
│    │    └─PositionWiseFeedForward: 3-2      2,099,712
│    │    └─LayerNorm: 3-3                    1,024
│    │    └─LayerNorm: 3-4                    1,024
│    │    └─Dropout: 3-5                      --
│    └─EncoderLayer: 2-2                      --
│    │    └─MultiHeadAttention: 3-6           1,050,624
│    │    └─PositionWiseFeedForward: 3-7      2,099,712
│    │    └─LayerNorm: 3-8                    1,024
│    │    └─LayerNorm: 3-9                    1,024
│    │    └─Dropout: 3-10                     --
│    └─EncoderLayer: 2-3                      --
│    │    └─Mu

## Referencias
- https://www.datacamp.com/tutorial/building-a-transformer-with-py-torch
- https://campus.datacamp.com/es/courses/introduction-to-llms-in-python/building-a-transformer-architecture?ex=15
