Building a Transformer with PyTorch

Setting up PyTorch

Before diving into building a Transformer, it is essential to set up the working environment correctly. First and foremost, PyTorch needs to be installed. PyTorch (current stable version - 2.0.1) can be easily installed through pip or conda package managers.

For pip, use the command:




In [None]:
pip3 install torch torchvision torchaudio

In [None]:
conda install pytorch torchvision torchaudio pytorch-cuda=11.7 -c pytorch -c nvidia

Building the Transformer Model with PyTorch

To build the Transformer model the following steps are necessary:

Importing the libraries and modules
Defining the basic building blocks - Multi-head Attention, Position-Wise Feed-Forward Networks, Positional Encoding
Building the Encoder block
Building the Decoder block
Combining the Encoder and Decoder layers to create the complete Transformer network
1. Importing the necessary libraries and modules

We’ll start with importing the PyTorch library for core functionality, the neural network module for creating neural networks, the optimization module for training networks, and the data utility functions for handling data. Additionally, we’ll import the standard Python math module for mathematical operations and the copy module for creating copies of complex objects.

These tools set the foundation for defining the model's architecture, managing data, and establishing the training process.

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import math
import copy

2. Defining the basic building blocks: Multi-Head Attention, Position-wise Feed-Forward Networks, Positional Encoding

Multi-head Attention

The Multi-Head Attention mechanism computes the attention between each pair of positions in a sequence. It consists of multiple “attention heads” that capture different aspects of the input sequence.

To know more about Multi-Head Attention, check out this Attention mechanisms section of the Large Language Models (LLMs) Concepts course.

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        # Ensure that the model dimension (d_model) is divisible by the number of heads
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"

        # Initialize dimensions
        self.d_model = d_model # Model's dimension
        self.num_heads = num_heads # Number of attention heads
        self.d_k = d_model // num_heads # Dimension of each head's key, query, and value

        # Linear layers for transforming inputs
        self.W_q = nn.Linear(d_model, d_model) # Query transformation
        self.W_k = nn.Linear(d_model, d_model) # Key transformation
        self.W_v = nn.Linear(d_model, d_model) # Value transformation
        self.W_o = nn.Linear(d_model, d_model) # Output transformation

    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        # Calculate attention scores
        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)

        # Apply mask if provided (useful for preventing attention to certain parts like padding)
        if mask is not None:
            attn_scores = attn_scores.masked_fill(mask == 0, -1e9)

        # Softmax is applied to obtain attention probabilities
        attn_probs = torch.softmax(attn_scores, dim=-1)

        # Multiply by values to obtain the final output
        output = torch.matmul(attn_probs, V)
        return output

    def split_heads(self, x):
        # Reshape the input to have num_heads for multi-head attention
        batch_size, seq_length, d_model = x.size()
        return x.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)

    def combine_heads(self, x):
        # Combine the multiple heads back to original shape
        batch_size, _, seq_length, d_k = x.size()
        return x.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)

    def forward(self, Q, K, V, mask=None):
        # Apply linear transformations and split heads
        Q = self.split_heads(self.W_q(Q))
        K = self.split_heads(self.W_k(K))
        V = self.split_heads(self.W_v(V))

        # Perform scaled dot-product attention
        attn_output = self.scaled_dot_product_attention(Q, K, V, mask)

        # Combine heads and apply output transformation
        output = self.W_o(self.combine_heads(attn_output))
        return output

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):

In [None]:
def scaled_dot_product_attention(self, Q, K, V, mask=None):

In [None]:
def split_heads(self, x):

In [None]:
def combine_heads(self, x):

In [None]:
def forward(self, Q, K, V, mask=None):

In [None]:
class PositionWiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(PositionWiseFeedForward, self).__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))

In [None]:
class PositionWiseFeedForward(nn.Module):

In [None]:
def __init__(self, d_model, d_ff):
    super(PositionWiseFeedForward, self).__init__()
    self.fc1 = nn.Linear(d_model, d_ff)
    self.fc2 = nn.Linear(d_ff, d_model)
    self.relu = nn.ReLU()

In [None]:
def forward(self, x):
    return self.fc2(self.relu(self.fc1(x)))

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_length):
        super(PositionalEncoding, self).__init__()

        pe = torch.zeros(max_seq_length, d_model)
        position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        self.register_buffer('pe', pe.unsqueeze(0))

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

In [None]:
class PositionalEncoding(nn.Module):

In [None]:
def __init__(self, d_model, max_seq_length):
    super(PositionalEncoding, self).__init__()

    pe = torch.zeros(max_seq_length, d_model)
    position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
    div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))

    pe[:, 0::2] = torch.sin(position * div_term)
    pe[:, 1::2] = torch.cos(position * div_term)

    self.register_buffer('pe', pe.unsqueeze(0))

In [None]:
def forward(self, x):
    return x + self.pe[:, :x.size(1)]

In [None]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask):
        attn_output = self.self_attn(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))
        return x


In [None]:
class EncoderLayer(nn.Module):

In [None]:
def __init__(self, d_model, num_heads, d_ff, dropout):
    super(EncoderLayer, self).__init__()
    self.self_attn = MultiHeadAttention(d_model, num_heads)
    self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
    self.norm1 = nn.LayerNorm(d_model)
    self.norm2 = nn.LayerNorm(d_model)
    self.dropout = nn.Dropout(dropout)

In [None]:
def forward(self, x, mask):
    attn_output = self.self_attn(x, x, x, mask)
    x = self.norm1(x + self.dropout(attn_output))
    ff_output = self.feed_forward(x)
    x = self.norm2(x + self.dropout(ff_output))
    return x

In [None]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.cross_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_output, src_mask, tgt_mask):
        attn_output = self.self_attn(x, x, x, tgt_mask)
        x = self.norm1(x + self.dropout(attn_output))
        attn_output = self.cross_attn(x, enc_output, enc_output, src_mask)
        x = self.norm2(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm3(x + self.dropout(ff_output))
        return x

In [None]:
class DecoderLayer(nn.Module):

In [None]:
def __init__(self, d_model, num_heads, d_ff, dropout):
    super(DecoderLayer, self).__init__()
    self.self_attn = MultiHeadAttention(d_model, num_heads)
    self.cross_attn = MultiHeadAttention(d_model, num_heads)
    self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
    self.norm1 = nn.LayerNorm(d_model)
    self.norm2 = nn.LayerNorm(d_model)
    self.norm3 = nn.LayerNorm(d_model)
    self.dropout = nn.Dropout(dropout)

In [None]:
ef forward(self, x, enc_output, src_mask, tgt_mask):
    attn_output = self.self_attn(x, x, x, tgt_mask)
    x = self.norm1(x + self.dropout(attn_output))
    attn_output = self.cross_attn(x, enc_output, enc_output, src_mask)
    x = self.norm2(x + self.dropout(attn_output))
    ff_output = self.feed_forward(x)
    x = self.norm3(x + self.dropout(ff_output))
    return x

In [None]:
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout):
        super(Transformer, self).__init__()
        self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
        self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_seq_length)

        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])

        self.fc = nn.Linear(d_model, tgt_vocab_size)
        self.dropout = nn.Dropout(dropout)

    def generate_mask(self, src, tgt):
        src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
        tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(3)
        seq_length = tgt.size(1)
        nopeak_mask = (1 - torch.triu(torch.ones(1, seq_length, seq_length), diagonal=1)).bool()
        tgt_mask = tgt_mask & nopeak_mask
        return src_mask, tgt_mask

    def forward(self, src, tgt):
        src_mask, tgt_mask = self.generate_mask(src, tgt)
        src_embedded = self.dropout(self.positional_encoding(self.encoder_embedding(src)))
        tgt_embedded = self.dropout(self.positional_encoding(self.decoder_embedding(tgt)))

        enc_output = src_embedded
        for enc_layer in self.encoder_layers:
            enc_output = enc_layer(enc_output, src_mask)

        dec_output = tgt_embedded
        for dec_layer in self.decoder_layers:
            dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)

        output = self.fc(dec_output)
        return output

Class Definition:

class Transformer(nn.Module):
Initialization:

def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout):
The constructor takes the following parameters:

src_vocab_size: Source vocabulary size.
tgt_vocab_size: Target vocabulary size.
d_model: The dimensionality of the model's embeddings.
num_heads: Number of attention heads in the multi-head attention mechanism.
num_layers: Number of layers for both the encoder and the decoder.
d_ff: Dimensionality of the inner layer in the feed-forward network.
max_seq_length: Maximum sequence length for positional encoding.
dropout: Dropout rate for regularization.
And it defines the following components:

self.encoder_embedding: Embedding layer for the source sequence.
self.decoder_embedding: Embedding layer for the target sequence.
self.positional_encoding: Positional encoding component.
self.encoder_layers: A list of encoder layers.
self.decoder_layers: A list of decoder layers.
self.fc: Final fully connected (linear) layer mapping to target vocabulary size.
self.dropout: Dropout layer.
Generate Mask Method:

def generate_mask(self, src, tgt):
This method is used to create masks for the source and target sequences, ensuring that padding tokens are ignored and that future tokens are not visible during training for the target sequence.

Forward Method:

def forward(self, src, tgt):
This method defines the forward pass for the Transformer, taking source and target sequences and producing the output predictions.

Input Embedding and Positional Encoding: The source and target sequences are first embedded using their respective embedding layers and then added to their positional encodings.
Encoder Layers: The source sequence is passed through the encoder layers, with the final encoder output representing the processed source sequence.
Decoder Layers: The target sequence and the encoder's output are passed through the decoder layers, resulting in the decoder's output.
Final Linear Layer: The decoder's output is mapped to the target vocabulary size using a fully connected (linear) layer.
Output:

The final output is a tensor representing the model's predictions for the target sequence.

Summary:

The Transformer class brings together the various components of a Transformer model, including the embeddings, positional encoding, encoder layers, and decoder layers. It provides a convenient interface for training and inference, encapsulating the complexities of multi-head attention, feed-forward networks, and layer normalization.

This implementation follows the standard Transformer architecture, making it suitable for sequence-to-sequence tasks like machine translation, text summarization, etc. The inclusion of masking ensures that the model adheres to the causal dependencies within sequences, ignoring padding tokens and preventing information leakage from future tokens.

These sequential steps empower the Transformer model to efficiently process input sequences and produce corresponding output sequences.

Training the PyTorch Transformer Model

Sample data preparation

For illustrative purposes, a dummy dataset will be crafted in this example. However, in a practical scenario, a more substantial dataset would be employed, and the process would involve text preprocessing along with the creation of vocabulary mappings for both the source and target languages.

src_vocab_size = 5000
tgt_vocab_size = 5000
d_model = 512
num_heads = 8
num_layers = 6
d_ff = 2048
max_seq_length = 100
dropout = 0.1

transformer = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout)

# Generate random sample data
src_data = torch.randint(1, src_vocab_size, (64, max_seq_length))  # (batch_size, seq_length)
tgt_data = torch.randint(1, tgt_vocab_size, (64, max_seq_length))  # (batch_size, seq_length)
Hyperparameters:

These values define the architecture and behavior of the transformer model:

src_vocab_size, tgt_vocab_size: Vocabulary sizes for source and target sequences, both set to 5000.
d_model: Dimensionality of the model's embeddings, set to 512.
num_heads: Number of attention heads in the multi-head attention mechanism, set to 8.
num_layers: Number of layers for both the encoder and the decoder, set to 6.
d_ff: Dimensionality of the inner layer in the feed-forward network, set to 2048.
max_seq_length: Maximum sequence length for positional encoding, set to 100.
dropout: Dropout rate for regularization, set to 0.1.
Creating a Transformer Instance:

transformer = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout)
This line creates an instance of the Transformer class, initializing it with the given hyperparameters. The instance will have the architecture and behavior defined by these hyperparameters.

Generating Random Sample Data:

The following lines generate random source and target sequences:

src_data: Random integers between 1 and src_vocab_size, representing a batch of source sequences with shape (64, max_seq_length).
tgt_data: Random integers between 1 and tgt_vocab_size, representing a batch of target sequences with shape (64, max_seq_length).
These random sequences can be used as inputs to the transformer model, simulating a batch of data with 64 examples and sequences of length 100.
Summary:

The code snippet demonstrates how to initialize a transformer model and generate random source and target sequences that can be fed into the model. The chosen hyperparameters determine the specific structure and properties of the transformer. This setup could be part of a larger script where the model is trained and evaluated on actual sequence-to-sequence tasks, such as machine translation or text summarization.

Training the Model

Next, the model will be trained utilizing the aforementioned sample data. However, in a real-world scenario, a significantly larger dataset would be employed, which would typically be partitioned into distinct sets for training and validation purposes.

criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)

transformer.train()

for epoch in range(100):
    optimizer.zero_grad()
    output = transformer(src_data, tgt_data[:, :-1])
    loss = criterion(output.contiguous().view(-1, tgt_vocab_size), tgt_data[:, 1:].contiguous().view(-1))
    loss.backward()
    optimizer.step()
    print(f"Epoch: {epoch+1}, Loss: {loss.item()}")
Loss Function and Optimizer:

criterion = nn.CrossEntropyLoss(ignore_index=0): Defines the loss function as cross-entropy loss. The ignore_index argument is set to 0, meaning the loss will not consider targets with an index of 0 (typically reserved for padding tokens).
optimizer = optim.Adam(...): Defines the optimizer as Adam with a learning rate of 0.0001 and specific beta values.
Model Training Mode:

transformer.train(): Sets the transformer model to training mode, enabling behaviors like dropout that only apply during training.
Training Loop:

The code snippet trains the model for 100 epochs using a typical training loop:

for epoch in range(100): Iterates over 100 training epochs.
optimizer.zero_grad(): Clears the gradients from the previous iteration.
output = transformer(src_data, tgt_data[:, :-1]): Passes the source data and the target data (excluding the last token in each sequence) through the transformer. This is common in sequence-to-sequence tasks where the target is shifted by one token.
loss = criterion(...): Computes the loss between the model's predictions and the target data (excluding the first token in each sequence). The loss is calculated by reshaping the data into one-dimensional tensors and using the cross-entropy loss function.
loss.backward(): Computes the gradients of the loss with respect to the model's parameters.
optimizer.step(): Updates the model's parameters using the computed gradients.
print(f"Epoch: {epoch+1}, Loss: {loss.item()}"): Prints the current epoch number and the loss value for that epoch.
Summary:

This code snippet trains the transformer model on randomly generated source and target sequences for 100 epochs. It uses the Adam optimizer and the cross-entropy loss function. The loss is printed for each epoch, allowing you to monitor the training progress. In a real-world scenario, you would replace the random source and target sequences with actual data from your task, such as machine translation.

Transformer Model Performance Evaluation

After training the model, its performance can be evaluated on a validation dataset or test dataset. The following is an example of how this could be done:

transformer.eval()

# Generate random sample validation data
val_src_data = torch.randint(1, src_vocab_size, (64, max_seq_length))  # (batch_size, seq_length)
val_tgt_data = torch.randint(1, tgt_vocab_size, (64, max_seq_length))  # (batch_size, seq_length)

with torch.no_grad():

    val_output = transformer(val_src_data, val_tgt_data[:, :-1])
    val_loss = criterion(val_output.contiguous().view(-1, tgt_vocab_size), val_tgt_data[:, 1:].contiguous().view(-1))
    print(f"Validation Loss: {val_loss.item()}")
Evaluation Mode:

transformer.eval(): Puts the transformer model in evaluation mode. This is important because it turns off certain behaviors like dropout that are only used during training.
Generate Random Validation Data:

val_src_data: Random integers between 1 and src_vocab_size, representing a batch of validation source sequences with shape (64, max_seq_length).
val_tgt_data: Random integers between 1 and tgt_vocab_size, representing a batch of validation target sequences with shape (64, max_seq_length).
Validation Loop:

with torch.no_grad(): Disables gradient computation, as we don't need to compute gradients during validation. This can reduce memory consumption and speed up computations.
val_output = transformer(val_src_data, val_tgt_data[:, :-1]): Passes the validation source data and the validation target data (excluding the last token in each sequence) through the transformer.
val_loss = criterion(...): Computes the loss between the model's predictions and the validation target data (excluding the first token in each sequence). The loss is calculated by reshaping the data into one-dimensional tensors and using the previously defined cross-entropy loss function.
print(f"Validation Loss: {val_loss.item()}"): Prints the validation loss value.
Summary:

This code snippet evaluates the transformer model on a randomly generated validation dataset, computes the validation loss, and prints it. In a real-world scenario, the random validation data should be replaced with actual validation data from the task you are working on. The validation loss can give you an indication of how well your model is performing on unseen data, which is a critical measure of the model's generalization ability.