# In this Notebook I am making a transformer from scratch which I learned through DataCamp Course. I will learn different modules and functionality of each line of code about Transformer architecture as I go on writing this. I will build it in a modular and hierarchical way by exploring the architecture component-wise along with their functionality with respect to the paper "Attention Is All You Need."

![Transformer](https://aiml.com/wp-content/uploads/2023/09/Annotated-Transformers-Architecture.png)

I learnt the theoretical part and also the modular codes about going on to make a transformer architecture but I couldn't resist my urge to try it out on a simple use case and what it would look like. So this is a simple sequence to sequence translation model.
Feel free to check out.

In [28]:
#Importing all the necessary libraries and modules
import torch.optim as optim
import torch.nn.functional as F
import torch.nn as nn
import math

In [29]:
#Lets consider a sample dataset consisting English to French translation:
data = [
    ("I am a student", "Je suis étudiant"),
    ("He is a teacher", "Il est professeur"),
    ("She likes apples", "Elle aime les pommes"),
    ("We are friends", "Nous sommes amis"),
    ("They are playing", "Ils jouent"),
]

In [31]:
#preprocessing data through word to index and index to word
def build_vocab(sentences):
    word2idx = {'<pad>': 0, '<sos>': 1, '<eos>': 2}
    idx = 3
    for sentence in sentences:
        for word in sentence.lower().split():
            if word not in word2idx:
                word2idx[word] = idx
                idx += 1
    return word2idx

# Separate source and target sentences
src_sentences = [pair[0] for pair in data]
tgt_sentences = [pair[1] for pair in data]

src_vocab = build_vocab(src_sentences)
tgt_vocab = build_vocab(tgt_sentences)

src_idx2word = {i: w for w, i in src_vocab.items()}
tgt_idx2word = {i: w for w, i in tgt_vocab.items()}

In [30]:
#encoding
def encode(sentence, vocab, max_len):
    tokens = sentence.lower().split()
    tokens = ['<sos>'] + tokens + ['<eos>']
    token_ids = [vocab[token] for token in tokens]
    token_ids += [vocab['<pad>']] * (max_len - len(token_ids))
    return token_ids

max_src_len = max(len(s.split()) for s in src_sentences) + 2
max_tgt_len = max(len(s.split()) for s in tgt_sentences) + 2

encoded_data = [
    (encode(src, src_vocab, max_src_len), encode(tgt, tgt_vocab, max_tgt_len))
    for src, tgt in data
]

In [32]:
#Using the Dataset and DataLoader class for batch processing
from torch.utils.data import Dataset

class TranslationDataset(Dataset):
    def __init__(self, data):
        self.data = data  # list of (src_tensor, tgt_tensor)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        src, tgt = self.data[idx]
        return torch.tensor(src, dtype=torch.long), torch.tensor(tgt, dtype=torch.long)


In [7]:
from torch.utils.data import DataLoader

# Create dataset instance
dataset = TranslationDataset(encoded_data)

# Create DataLoader
dataloader = DataLoader(dataset, batch_size=2, shuffle=True)


In [33]:
#Checking the data dimensions for validation of source and target
for batch_src, batch_tgt in dataloader:
    print("Source batch shape:", batch_src.shape)
    print("Target batch shape:", batch_tgt.shape)
    print("Source example:", batch_src[0])
    print("Target example:", batch_tgt[0])
    break


Source batch shape: torch.Size([2, 6])
Target batch shape: torch.Size([2, 6])
Source example: tensor([ 1, 16, 14, 17,  2,  0])
Target example: tensor([ 1, 16, 17,  2,  0,  0])


In [36]:
#one of the most important parts in transformers
def create_mask(seq, pad_idx): #create_mask function creates a padding mask that is used to prevent the model from attending to padding tokens in the input sequence.#
    return (seq != pad_idx).unsqueeze(1).unsqueeze(2) #shape of (batch_size, 1, seq_len, seq_len) which is suitable for the attention mechanism.
def generate_square_subsequent_mask(size): #This function generates a causal mask (also called a look-ahead mask) for the decoder, which is used to prevent a token from attending to future tokens during training.#
    return torch.tril(torch.ones(size, size)).bool()  #This prevents "peeking" at future tokens in sequence-to-sequence models, such as the transformer decoder.
def combine_masks(pad_mask, causal_mask):  #This function combines the padding mask and causal mask to create a final mask that can be used in the attention mechanism.#
    return pad_mask & causal_mask.unsqueeze(0).unsqueeze(1)  #pad_mask & causal_mask performs an element-wise logical AND operation,
        #combining both masks. The result is a mask where both the padding positions and the future positions are masked out (set to False), 
        #ensuring that the model cannot attend to padded tokens or future tokens.


In [37]:
#Ofcourse the most important part the Transformer architecture from scratch
class InputEmbeddings(nn.Module):
    def __init__(self, vocab_size: int, d_model: int) -> None:
        super().__init__()
        # Set the model dimensionality and vocabulary size
        self.d_model = d_model
        self.vocab_size = vocab_size
        # Instantiate the embedding layer
        self.embedding = nn.Embedding(vocab_size,d_model)

    def forward(self, x):
        # Return the embeddings multiplied by the square root of d_model
        return self.embedding(x)*math.sqrt(self.d_model)
        
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_length):
        super().__init__()
        # Create a matrix of zeros of dimensions max_seq_length by d_model
        pe = torch.zeros(max_seq_length,d_model)
        position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
        
        # Perform the sine and cosine calculations
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        # Ensure pe isn't a learnable parameter during training
        self.register_buffer('pe', pe.unsqueeze(0))
        
    def forward(self, x):
        # Add the positional embeddings to the token embeddings
        return x + self.pe[:, :x.size(1)]
        
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        assert d_model % num_heads == 0 #d_model must be divisible by num_heads
        self.num_heads = num_heads
        self.d_model = d_model
        self.head_dim = d_model // num_heads
        self.query_linear = nn.Linear(d_model, d_model, bias=False)
        self.key_linear = nn.Linear(d_model, d_model, bias=False)
        self.value_linear = nn.Linear(d_model, d_model, bias=False)
        self.output_linear = nn.Linear(d_model, d_model)

    def split_heads(self, x, batch_size):
        seq_length = x.size(1)
        # Split the input embeddings and permute
        x = x.reshape(batch_size, seq_length, self.num_heads, self.head_dim)
        print(x.shape)
        return x.permute(0, 2, 1, 3)

    def compute_attention(self, query, key, value, mask=None):
        # Compute scaled dot-product attention
        scores = torch.matmul(query, key.transpose(-2, -1)) / (self.head_dim ** 0.5)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, float('-inf'))
        attention_weights = F.softmax(scores, dim=-1)
        return torch.matmul(attention_weights, value)

    def combine_heads(self, x, batch_size):
        seq_length = x.size(1)
        # Combine heads back to (batch_size, seq_length, d_model)
        x = x.permute(0, 2, 1, 3).contiguous()
        return x.view(batch_size, -1, self.d_model)

    def forward(self, query, key, value, mask=None):
        batch_size = query.size(0)

        # Build the forward pass
        query = self.split_heads(self.query_linear(query), batch_size)
        key = self.split_heads(self.key_linear(key), batch_size)
        value = self.split_heads(self.value_linear(value), batch_size)
        
        attention_weights = self.compute_attention(query, key, value, mask)
        output = self.combine_heads(attention_weights, batch_size)
        return self.output_linear(output)  

class FeedForwardSubLayer(nn.Module):
    def __init__(self, d_model, d_ff):
        super().__init__()
        # Define the layers and activation
        self.fc1 = nn.Linear(d_model,d_ff)
        self.fc2 = nn.Linear(d_ff,d_model)
        self.relu = nn.ReLU()
    def forward(self, x):
        # Pass the input through the layers and activation
        return self.fc2(self.relu(self.fc1(x)))
    
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super().__init__()
        # Instantiate the layers
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.ff_sublayer = FeedForwardSubLayer(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, src_mask):
        # Complete the forward method
        attn_output = self.self_attn(x,x,x,src_mask)
        x = self.norm1(x + self.dropout(attn_output))
        ff_output = self.ff_sublayer(x)
        x = self.norm2(x + self.dropout(ff_output))
        return x

class TransformerEncoder(nn.Module):
    def __init__(self, vocab_size, d_model, num_layers, num_heads, d_ff, dropout, max_seq_length):
        super().__init__()
        # Define the embedding, positional encoding, and encoder layers
        self.embedding = InputEmbeddings(vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_seq_length)
        self.layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])

    def forward(self, x, src_mask):
        # Perform the forward pass through the layers
        x = self.embedding(x)
        x = self.positional_encoding(x)
        for layer in self.layers:
            x = layer(x, src_mask)
        return x

class ClassifierHead(nn.Module):
    def __init__(self, d_model, num_classes):
        super().__init__()
        self.fc = nn.Linear(d_model, num_classes)

    def forward(self, x):
        logits = self.fc(x)
        return F.log_softmax(logits, dim=-1)

class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        # Define cross-attention and a third layer normalization
        self.cross_attn = MultiHeadAttention(d_model,num_heads)
        self.ff_sublayer = FeedForwardSubLayer(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, y, tgt_mask, cross_mask):
        self_attn_output = self.self_attn(x, x, x, tgt_mask)
        x = self.norm1(x + self.dropout(self_attn_output))
        # Complete the forward pass
        cross_attn_output = self.cross_attn(x,y,y,cross_mask)
        x = self.norm2(x + self.dropout(cross_attn_output))
        ff_output = self.ff_sublayer(x)
        x = self.norm3(x + self.dropout(ff_output))
        return x
        
class TransformerDecoder(nn.Module):
    def __init__(self, vocab_size, d_model, num_layers, num_heads, d_ff, dropout, max_seq_length):
        super(TransformerDecoder, self).__init__()
        self.embedding = InputEmbeddings(vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_seq_length)
        # Define the list of decoder layers and linear layer
        self.layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        # Define a linear layer to project hidden states to likelihoods
        self.fc = nn.Linear(d_model,vocab_size)
  
    def forward(self, tgt, memory, tgt_mask, cross_mask):
        x = self.embedding(tgt)
        x = self.positional_encoding(x)
        for layer in self.layers:
            x = layer(x, memory, tgt_mask, cross_mask)
        x = self.fc(x)
        return F.log_softmax(x, dim=-1)

        
class Transformer(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout):
        super().__init__()
        self.encoder = TransformerEncoder(vocab_size, d_model, num_layers, num_heads, d_ff, dropout, max_seq_length)
        self.decoder = TransformerDecoder(vocab_size, d_model, num_layers, num_heads, d_ff, dropout, max_seq_length)
    def forward(self, src, tgt, src_mask, tgt_mask, cross_mask):
     encoder_output = self.encoder(src, src_mask)
     decoder_output = self.decoder(tgt, encoder_output, tgt_mask, cross_mask)
     return decoder_output


In [38]:
#Model instantiation part and training loop along with the evaluation call 
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

d_model = 128
num_heads = 8
num_layers = 2
d_ff = 512
dropout = 0.1

model = Transformer(
    vocab_size=len(tgt_vocab),  # Assuming shared vocab size for simplicity
    d_model=d_model,
    num_heads=num_heads,
    num_layers=num_layers,
    d_ff=d_ff,
    max_seq_length=max_tgt_len,
    dropout=dropout
).to(device)

# Defining criterion and optimizer NLLLoss because i use log_softmax
criterion = nn.NLLLoss(ignore_index=tgt_vocab['<pad>'])  # <pad> tokens shouldn't affect the loss
optimizer = optim.Adam(model.parameters(), lr=1e-3)

EPOCHS = 10

for epoch in range(EPOCHS):  # Customize number of epochs
    total_loss = 0
    model.train()

    for src_seq, tgt_seq in encoded_data:
        src_seq = torch.tensor(src_seq).unsqueeze(0).to(device)
        tgt_seq = torch.tensor(tgt_seq).unsqueeze(0).to(device)

        # Prepare inputs and targets
        tgt_input = tgt_seq[:, :-1]  # everything except last token
        tgt_output = tgt_seq[:, 1:]  # everything except first token

        # Masks
        src_mask = create_mask(src_seq, src_vocab['<pad>']).to(device)
        tgt_mask = create_mask(tgt_input, tgt_vocab['<pad>']).to(device)
        cross_mask = src_mask  # For simplicity

        # Forward pass
        output = model(src_seq, tgt_input, src_mask, tgt_mask, cross_mask)

        # Reshape for loss: (batch_size * seq_len, vocab_size)
        output = output.reshape(-1, output.shape[-1])
        tgt_output = tgt_output.reshape(-1)

        loss = criterion(output, tgt_output)

        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f"Epoch {epoch+1}, Loss: {total_loss:.4f}")
    
    evaluate_model(model, encoded_data, src_vocab, tgt_vocab, max_src_len, max_tgt_len)

torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 5, 8, 16])
torch.Size([1, 5, 8, 16])
torch.Size([1, 5, 8, 16])
torch.Size([1, 5, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 5, 8, 16])
torch.Size([1, 5, 8, 16])
torch.Size([1, 5, 8, 16])
torch.Size([1, 5, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 5, 8, 16])
torch.Size([1, 5, 8, 16])
torch.Size([1, 5, 8, 16])
torch.Size([1, 5, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 5, 8, 16])
torch.Size([1, 5, 8, 16])
torch.Size([1, 5, 8, 16])
torch.Size([1, 5, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([

In [39]:
# Validation loop
def evaluate_model(model, data, src_vocab, tgt_vocab, max_src_len, max_tgt_len):
    model.eval()
    with torch.no_grad():
        total_loss = 0
        for src_seq, tgt_seq in data:
            src_seq = torch.tensor(src_seq).unsqueeze(0).to(device)
            tgt_seq = torch.tensor(tgt_seq).unsqueeze(0).to(device)

            tgt_input = tgt_seq[:, :-1]  # everything except last token
            tgt_output = tgt_seq[:, 1:]  # everything except first token

            src_mask = create_mask(src_seq, src_vocab['<pad>']).to(device)
            tgt_mask = create_mask(tgt_input, tgt_vocab['<pad>']).to(device)
            cross_mask = src_mask  # For simplicity

            output = model(src_seq, tgt_input, src_mask, tgt_mask, cross_mask)

            output = output.reshape(-1, output.shape[-1])
            tgt_output = tgt_output.reshape(-1)

            loss = criterion(output, tgt_output)
            total_loss += loss.item()
        
        avg_loss = total_loss / len(data)
        print(f"Validation Loss: {avg_loss:.4f}")
        return avg_loss



In [40]:

# Function to predict translations
def translate(model, sentence, src_vocab, tgt_vocab, max_src_len, max_tgt_len):
    model.eval()
    
    # Tokenize and encode source sentence
    src_tokens = encode(sentence, src_vocab, max_src_len)
    src_tensor = torch.tensor(src_tokens).unsqueeze(0).to(device)

    # Generate mask for source sentence
    src_mask = create_mask(src_tensor, src_vocab['<pad>']).to(device)

    # Initialize target sequence with <sos> token
    tgt_input = torch.tensor([tgt_vocab['<sos>']]).unsqueeze(0).to(device)
    tgt_mask = create_mask(tgt_input, tgt_vocab['<pad>']).to(device)
    cross_mask = src_mask  

    translated_tokens = []

    # Generate translation step by step
    for _ in range(max_tgt_len - 1):
        output = model(src_tensor, tgt_input, src_mask, tgt_mask, cross_mask)
        
        # Getting the most likely token from the output
        next_token = output.argmax(dim=-1)[:, -1].item()

        # If the model generates <eos> token, stop the translation
        if next_token == tgt_vocab['<eos>']:
            break

        translated_tokens.append(next_token)

        # Append the predicted token to the target input for the next iteration
        tgt_input = torch.cat([tgt_input, torch.tensor([[next_token]]).to(device)], dim=1)

        # Update mask
        tgt_mask = create_mask(tgt_input, tgt_vocab['<pad>']).to(device)

    # Convert token IDs back to words
    translated_sentence = ' '.join([tgt_idx2word[token] for token in translated_tokens])
    return translated_sentence

# Testing the model by translating with an example sentence
test_sentence = "They are playing"
translation = translate(model, test_sentence, src_vocab, tgt_vocab, max_src_len, max_tgt_len)
print(f"Source: {test_sentence}")
print(f"Translation: {translation}")

torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 1, 8, 16])
torch.Size([1, 1, 8, 16])
torch.Size([1, 1, 8, 16])
torch.Size([1, 1, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 1, 8, 16])
torch.Size([1, 1, 8, 16])
torch.Size([1, 1, 8, 16])
torch.Size([1, 1, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 2, 8, 16])
torch.Size([1, 2, 8, 16])
torch.Size([1, 2, 8, 16])
torch.Size([1, 2, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 2, 8, 16])
torch.Size([1, 2, 8, 16])
torch.Size([1, 2, 8, 16])
torch.Size([1, 2, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([1, 6, 8, 16])
torch.Size([

The dataset is very small and limited so even though the translation output is correct there is so much room for improvement and cases where i can evaluate the output using other metrics and also monitor the training and evaluation for overfitting and so on.
This was just for my own learning.