## Basic Transformer Model

This is the code for a basic transformer model and generated sequences to avoid the need for a dataset. In the file 'Analysis' is an analysis of how the training loss changes as training goes on. This is a very simple exercise I used to better understand the Transformer architecture and how parameteres affect performance. 

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import math
import copy

In [2]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        # Ensure that the model dimension (d_model) is divisible by the number of heads
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"

        # Initialize dimensions
        self.d_model = d_model # Model's dimension
        self.num_heads = num_heads # Number of attention heads
        self.d_k = d_model // num_heads # Dimension of each head's key, query, and value

        # Linear layers for transforming inputs
        self.W_q = nn.Linear(d_model, d_model) # Query transformation
        self.W_k = nn.Linear(d_model, d_model) # Key transformation
        self.W_v = nn.Linear(d_model, d_model) # Value transformation
        self.W_o = nn.Linear(d_model, d_model) # Output transformation

    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        # Calculate attention scores
        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)

        # Apply mask if provided (useful for preventing attention to certain parts like padding)
        if mask is not None:
            attn_scores = attn_scores.masked_fill(mask == 0, -1e9)

        # Softmax is applied to obtain attention probabilities
        attn_probs = torch.softmax(attn_scores, dim=-1)

        # Multiply by values to obtain the final output
        output = torch.matmul(attn_probs, V)
        return output

    def split_heads(self, x):
        # Reshape the input to have num_heads for multi-head attention
        batch_size, seq_length, d_model = x.size()
        return x.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)

    def combine_heads(self, x):
        # Combine the multiple heads back to original shape
        batch_size, _, seq_length, d_k = x.size()
        return x.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)

    def forward(self, Q, K, V, mask=None):
        # Apply linear transformations and split heads
        Q = self.split_heads(self.W_q(Q))
        K = self.split_heads(self.W_k(K))
        V = self.split_heads(self.W_v(V))

        # Perform scaled dot-product attention
        attn_output = self.scaled_dot_product_attention(Q, K, V, mask)

        # Combine heads and apply output transformation
        output = self.W_o(self.combine_heads(attn_output))
        return output

In [3]:
class PositionWiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(PositionWiseFeedForward, self).__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))

In [4]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask):
        attn_output = self.self_attn(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))
        return x

In [5]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.cross_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_output, src_mask, tgt_mask):
        attn_output = self.self_attn(x, x, x, tgt_mask)
        x = self.norm1(x + self.dropout(attn_output))
        attn_output = self.cross_attn(x, enc_output, enc_output, src_mask)
        x = self.norm2(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm3(x + self.dropout(ff_output))
        return x

In [6]:
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout):
        super(Transformer, self).__init__()
        self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
        self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_seq_length)

        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])

        self.fc = nn.Linear(d_model, tgt_vocab_size)
        self.dropout = nn.Dropout(dropout)

    def generate_mask(self, src, tgt):
        src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
        tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(3)
        seq_length = tgt.size(1)
        nopeak_mask = (1 - torch.triu(torch.ones(1, seq_length, seq_length), diagonal=1)).bool()
        tgt_mask = tgt_mask & nopeak_mask
        return src_mask, tgt_mask

    def forward(self, src, tgt):
        src_mask, tgt_mask = self.generate_mask(src, tgt)
        src_embedded = self.dropout(self.positional_encoding(self.encoder_embedding(src)))
        tgt_embedded = self.dropout(self.positional_encoding(self.decoder_embedding(tgt)))

        enc_output = src_embedded
        for enc_layer in self.encoder_layers:
            enc_output = enc_layer(enc_output, src_mask)

        dec_output = tgt_embedded
        for dec_layer in self.decoder_layers:
            dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)

        output = self.fc(dec_output)
        return output

In [7]:
# Define the PositionalEncoding class
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_length):
        super(PositionalEncoding, self).__init__()

        # Create a matrix of positional encodings
        pe = torch.zeros(max_seq_length, d_model)
        position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

        # Apply positional encoding formula
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        # Register the positional encodings as a buffer
        pe = pe.unsqueeze(0)  # Add batch dimension
        self.register_buffer('pe', pe)

    def forward(self, x):
        # Add positional encodings to the input
        return x + self.pe[:, :x.size(1)]

In this block, I changed the vocabulary size, number of layers and heads, model dimensions and drop out to see what effect it had on the model performance. I have my basic analysis of how each parameter changes the learning rate in the 'analysis of results' file in the repository. 

The parameters have been manually optimised for low training time. 

In [10]:
import torch
import random

# Assuming you have a 'Transformer' class and necessary helper modules (like PositionalEncoding) defined elsewhere.

# Hyperparameters (these define the size and structure of your Transformer model)
src_vocab_size = 50  # Size of the source vocabulary (tested values 5000, 1000, 500, 100, 50, 20, 15)
tgt_vocab_size = 50  # Size of the target vocabulary (tested values 5000, 1000, 500, 100, 50, 20, 15)
d_model = 256         # Dimensionality of the embedding layer and the internal representations of the Transformer
                    #(tested values 1024, 512, 256, 64)
num_heads = 8        # Number of attention heads in the multi-head attention mechanism (tested values 16, 8, 4, 2)
num_layers = 2      # Number of encoder (and potentially decoder) layers in the Transformer (tested values 2, 4, 6, 8)
d_ff = 2048         # Dimensionality of the feed-forward network within each Transformer layer 
max_seq_length = 100  # Maximum length of the input and output sequences that the model can handle
dropout = 0.1       # Dropout probability for regularization (tested values 0.1, 0.1, 0.8)

# Instantiate the Transformer model
# This creates an instance of your Transformer class with the specified hyperparameters.
# It initializes all the layers and parameters of the model.
transformer = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout)

# --- Process to Create Arithmetic Expression Data ---
batch_size = 64  # Number of arithmetic expressions to generate in this batch
max_len = max_seq_length  # Using the defined max sequence length for padding

def generate_arithmetic_expressions(num_samples, max_length):
    """
    Generates a list of simple arithmetic expressions and their results.

    Args:
        num_samples (int): The number of expression-result pairs to generate.
        max_length (int): The maximum length of the expression and result sequences (for padding).

    Returns:
        list: A list of tuples, where each tuple contains:
              - a list of tokens representing the arithmetic expression
              - a list of tokens representing the result
    """
    data = []
    operators = ["+", "-"]
    digits = "0123456789"
    for _ in range(num_samples):
        num1 = random.randint(1, 9)
        num2 = random.randint(1, 9)
        op = random.choice(operators)
        expression = f"{num1} {op} {num2}"
        try:
            result = str(eval(expression))
            src_sequence = list(expression)  # Split the expression into characters (tokens)
            tgt_sequence = list(result)      # Split the result into characters (tokens)

            # Pad sequences to max_length with a '<pad>' token (assuming you'll handle this in your vocabulary)
            src_sequence = src_sequence + ['<pad>'] * (max_length - len(src_sequence))
            tgt_sequence = tgt_sequence + ['<pad>'] * (max_length - len(tgt_sequence))

            data.append((src_sequence, tgt_sequence))
        except:
            continue  # Skip if there's an error in evaluation (shouldn't happen with simple +/-)
    return data

# Generate the arithmetic dataset
arithmetic_data = generate_arithmetic_expressions(batch_size, max_len)

# --- Process to Convert Text Data to Numerical Tensors ---
# You'll need to have a vocabulary (mapping from tokens to numbers) for both the source and target.
# Assuming you have 'src_vocab' and 'tgt_vocab' dictionaries created elsewhere.

def numericalize_sequence(sequence, vocab, max_length):
    """
    Converts a sequence of tokens into a list of numerical indices based on the vocabulary.
    Pads the sequence to the maximum length.

    Args:
        sequence (list): A list of tokens (e.g., characters).
        vocab (dict): A dictionary mapping tokens to their numerical indices.
        max_length (int): The maximum length of the sequence.

    Returns:
        torch.Tensor: A 1D tensor of numerical indices.
    """
    numericalized = [vocab.get(token, vocab.get('<unk>', 0)) for token in sequence] # Use <unk> if token not in vocab
    padded = numericalized + [vocab.get('<pad>', 0)] * (max_length - len(numericalized))
    return torch.tensor(padded)

# Assuming you have built your source and target vocabularies ('src_vocab' and 'tgt_vocab')
# based on the characters present in your arithmetic expressions and results.

# Example vocabulary creation (you might have a more sophisticated way of doing this)
all_chars = set()
for src, tgt in arithmetic_data:
    all_chars.update(src)
    all_chars.update(tgt)
all_chars.add('<pad>')
all_chars = sorted(list(all_chars))
char_to_index = {char: i for i, char in enumerate(all_chars)}
src_vocab = char_to_index
tgt_vocab = char_to_index # In this simple case, source and target vocab can be the same

# Convert the generated arithmetic data into numerical tensors
src_data = torch.stack([numericalize_sequence(item[0], src_vocab, max_len) for item in arithmetic_data])
tgt_data = torch.stack([numericalize_sequence(item[1], tgt_vocab, max_len) for item in arithmetic_data])

# Now 'src_data' and 'tgt_data' are PyTorch tensors containing the numerical representations
# of your arithmetic expressions and their corresponding results, ready to be used for training.

print("Shape of source data:", src_data.shape)
print("Shape of target data:", tgt_data.shape)
print("Example source sequence (numerical):", src_data[0])
print("Example target sequence (numerical):", tgt_data[0])

Shape of source data: torch.Size([64, 100])
Shape of target data: torch.Size([64, 100])
Example source sequence (numerical): tensor([ 5,  0,  2,  0,  8, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13,
        13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13,
        13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13,
        13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13,
        13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13,
        13, 13, 13, 13, 13, 13, 13, 13, 13, 13])
Example target sequence (numerical): tensor([ 2,  6, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13,
        13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13,
        13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13,
        13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13,
        13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 1

In [11]:
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)

transformer.train()

for epoch in range(100):
    optimizer.zero_grad()
    output = transformer(src_data, tgt_data[:, :-1])
    loss = criterion(output.contiguous().view(-1, tgt_vocab_size), tgt_data[:, 1:].contiguous().view(-1))
    loss.backward()
    optimizer.step()
    print(f"Epoch: {epoch+1}, Loss: {loss.item()}")

Epoch: 1, Loss: 4.397154331207275
Epoch: 2, Loss: 2.347404956817627
Epoch: 3, Loss: 0.925678551197052
Epoch: 4, Loss: 0.3321765661239624
Epoch: 5, Loss: 0.15180957317352295
Epoch: 6, Loss: 0.09386774152517319
Epoch: 7, Loss: 0.07153841853141785
Epoch: 8, Loss: 0.06143736466765404
Epoch: 9, Loss: 0.0559372715651989
Epoch: 10, Loss: 0.0526890754699707
Epoch: 11, Loss: 0.05106659606099129
Epoch: 12, Loss: 0.04930248484015465
Epoch: 13, Loss: 0.04863334074616432
Epoch: 14, Loss: 0.04794232174754143


KeyboardInterrupt: 

In [10]:
import torch

# Ensure your Transformer model is in evaluation mode
transformer.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
transformer.to(device)

# Reuse the data generation function to create a validation set
def generate_arithmetic_expressions(num_samples=32, max_length=max_seq_length): # Smaller validation set
    data = []
    operators = ["+", "-"]
    digits = "0123456789"
    for _ in range(num_samples):
        num1 = random.randint(1, 9)
        num2 = random.randint(1, 9)
        op = random.choice(operators)
        expression = f"{num1} {op} {num2}"
        try:
            result = str(eval(expression))
            src_sequence = list(expression)
            tgt_sequence = list(result)
            src_sequence = src_sequence + ['<pad>'] * (max_length - len(src_sequence))
            tgt_sequence = tgt_sequence + ['<pad>'] * (max_length - len(tgt_sequence))
            data.append((src_sequence, tgt_sequence))
        except:
            continue
    return data

# Reuse the numericalization function
def numericalize_sequence(sequence, vocab, max_length):
    numericalized = [vocab.get(token, vocab.get('<unk>', 0)) for token in sequence]
    padded = numericalized + [vocab.get('<pad>', 0)] * (max_length - len(numericalized))
    return torch.tensor(padded).unsqueeze(0).to(device) # Add batch dimension

# Generate the validation dataset
validation_data = generate_arithmetic_expressions(num_samples=32)

# Evaluation loop
correct_predictions = 0
total_samples = len(validation_data)

with torch.no_grad(): # Disable gradient calculation during evaluation
    for src_text, true_tgt_text in validation_data:
        src_tensor = numericalize_sequence(src_text, src_vocab, max_seq_length)

        # **CORRECTION STARTS HERE**
        # Create a target tensor for inference (start with a <start> token)
        # Replace '<start>' with your actual start token if different
        tgt_tensor = torch.tensor([tgt_vocab.get('<start>', 0)]).unsqueeze(0).to(device)

        # Inference loop to generate the target sequence
        for _ in range(max_seq_length):  # Limit the generation length
            output = transformer(src_tensor, tgt_tensor)  # Pass both src and tgt
            # Get the last predicted token
            predicted_index = torch.argmax(output[:, -1, :], dim=-1)
            # Append the predicted token to the target sequence
            tgt_tensor = torch.cat([tgt_tensor, predicted_index.unsqueeze(0)], dim=1)
            # Stop if the predicted token is the <end> token
            # Replace '<end>' with your actual end token if different
            if predicted_index.item() == tgt_vocab.get('<end>', 0):
                break
        # **CORRECTION ENDS HERE**

        # Get the predicted sequence (argmax over the vocabulary dimension)
        predicted_indices = torch.argmax(output, dim=-1).squeeze(0).cpu().numpy()

        # Convert predicted indices back to tokens
        index_to_tgt_char = {i: char for char, i in tgt_vocab.items()}
        predicted_tokens = [index_to_tgt_char.get(idx, '<unk>') for idx in predicted_indices]

        # Stop prediction at the first padding token
        if '<pad>' in predicted_tokens:
            predicted_tokens = predicted_tokens[:predicted_tokens.index('<pad>')]

        # Stop true target at the first padding token
        if '<pad>' in true_tgt_text:
            true_tgt_text = true_tgt_text[:true_tgt_text.index('<pad>')]

        predicted_result = "".join(predicted_tokens)
        true_result = "".join(true_tgt_text)

        print(f"Input: {''.join(src_text).replace('<pad>', '')}, Predicted: {predicted_result}, True: {true_result}")

        if predicted_result == true_result:
            correct_predictions += 1

accuracy = correct_predictions / total_samples
print(f"\nValidation Accuracy: {accuracy * 100:.2f}%")

Input: 1 - 7, Predicted: , True: -6
Input: 2 + 6, Predicted: , True: 8
Input: 5 + 9, Predicted: , True: 14
Input: 9 + 9, Predicted: , True: 18
Input: 9 + 4, Predicted: , True: 13
Input: 2 - 5, Predicted: , True: -3
Input: 4 - 2, Predicted: , True: 2
Input: 9 + 6, Predicted: , True: 15
Input: 7 + 7, Predicted: , True: 14
Input: 4 - 3, Predicted: , True: 1
Input: 5 - 6, Predicted: , True: -1
Input: 2 + 9, Predicted: , True: 11
Input: 2 + 6, Predicted: , True: 8
Input: 9 - 3, Predicted: , True: 6
Input: 4 + 6, Predicted: , True: 10
Input: 7 - 2, Predicted: , True: 5
Input: 9 - 2, Predicted: , True: 7
Input: 4 + 4, Predicted: , True: 8
Input: 6 - 9, Predicted: , True: -3
Input: 9 + 8, Predicted: , True: 17


KeyboardInterrupt: 