<div style="border: 2px solid blue; padding: 5px; color: red; text-align: center; font-size: 24px;">
 Build a transformer from scratch
</div>


The Transformers architecture was originally designed for a seq-2seq tasks involving in encoder and decoder parts. Encoder and decider blocks were adapted as standalone models. The main types of models are:

1. **Encoder-only**: These models convert an input sequence of text into a numerical representation that well suited for tasks like text classification. e.g: `BERT, DistilBERT`
2. **Decoder-only**: They are suitable for generating tasks. e.g: `GPT models`
3. **Encoder-Decoder**: They are suitable for machine translation and summarization tasks. e.g: `BART, T5 models`

In this notebook, we are going to build an `Encoder-only` model from sracth:
1. Multi-head attention
2. Feed-Forward Layer
3. Positionnal Embeddings 

In the top of the encoder, we will add a classifier layer to train a classification model.

# Librairies

In [2]:
import torch
import torch.nn.functional as F
from math import sqrt 
import torch.nn as nn
import numpy as np

# 1. Multi-Head Attention

<img src="images/multi-attention.PNG" />

In [3]:
def scaled_dot_product_attention(query, key, value):
    dim_k = query.size(-1)
    scores = torch.bmm(query, key.transpose(1, 2)) / sqrt(dim_k)
    weights = F.softmax(scores, dim=-1)
    return torch.bmm(weights, value)

In [4]:

class AttentionHead(nn.Module):
    def __init__(self, embed_dim, head_dim):
        super().__init__()
        self.q = nn.Linear(embed_dim, head_dim)
        self.k = nn.Linear(embed_dim, head_dim)
        self.v = nn.Linear(embed_dim, head_dim)

    def forward(self, hidden_state):
        attn_outputs = scaled_dot_product_attention(
            self.q(hidden_state), self.k(hidden_state), self.v(hidden_state))
        return attn_outputs

class MultiHeadAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        embed_dim = config.hidden_size
        num_heads = config.num_attention_heads
        head_dim = embed_dim // num_heads
        self.heads = nn.ModuleList(
            [AttentionHead(embed_dim, head_dim) for _ in range(num_heads)]
        )
        self.output_linear = nn.Linear(embed_dim, embed_dim)

    def forward(self, hidden_state):
        x = torch.cat([h(hidden_state) for h in self.heads], dim=-1)
        x = self.output_linear(x)
        return x

# 2. Feed-Forward Layer


In [5]:
class FeedForward(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.linear_1 = nn.Linear(config.hidden_size, config.intermediate_size)
        self.linear_2 = nn.Linear(config.intermediate_size, config.hidden_size)
        self.gelu = nn.GELU()
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        
    def forward(self, x):
        x = self.linear_1(x)
        x = self.gelu(x)
        x = self.linear_2(x)
        x = self.dropout(x)
        return x

# 3. Normalization Layer


<img src="images/normalization.PNG" />


In [6]:
class TransformerEncoderLayer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.layer_norm_1 = nn.LayerNorm(config.hidden_size)
        self.layer_norm_2 = nn.LayerNorm(config.hidden_size)
        self.attention = MultiHeadAttention(config)
        self.feed_forward = FeedForward(config)

    def forward(self, x):
        # Apply layer normalization and then copy input into query, key, value
        hidden_state = self.layer_norm_1(x)
        # Apply attention with a skip connection
        x = x + self.attention(hidden_state)
        # Apply feed-forward layer with a skip connection
        x = x + self.feed_forward(self.layer_norm_2(x))
        return x

# 4. Positional Embeddings

In [7]:
class Embeddings(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.token_embeddings = nn.Embedding(config.vocab_size, 
                                             config.hidden_size)
        self.position_embeddings = nn.Embedding(config.max_position_embeddings,
                                                config.hidden_size)
        self.layer_norm = nn.LayerNorm(config.hidden_size, eps=1e-12)
        self.dropout = nn.Dropout()

    def forward(self, input_ids):
        # Create position IDs for input sequence
        seq_length = input_ids.size(1)
        position_ids = torch.arange(seq_length, dtype=torch.long).unsqueeze(0)
        # Create token and position embeddings
        token_embeddings = self.token_embeddings(input_ids)
        position_embeddings = self.position_embeddings(position_ids)
        # Combine token and position embeddings
        embeddings = token_embeddings + position_embeddings
        embeddings = self.layer_norm(embeddings)
        embeddings = self.dropout(embeddings)
        return embeddings

# 5. Transformer Encoder Layer

In [8]:
class TransformerEncoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.embeddings = Embeddings(config)
        self.layers = nn.ModuleList([TransformerEncoderLayer(config) 
                                     for _ in range(config.num_hidden_layers)])

    def forward(self, x):
        x = self.embeddings(x)
        for layer in self.layers:
            x = layer(x)
        return x

# 6. Application on a classification case

For this Application case, we are going to use the configuration of the BERT model. At the top of the encoder module, we will add a classification layer.

In [42]:
class TransformerForSequenceClassification(nn.Module):
    def __init__(self, config, n_labels):
        super().__init__()
        self.encoder = TransformerEncoder(config)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.classifier = nn.Linear(config.hidden_size, n_labels)
        self.loss_fn = nn.CrossEntropyLoss()  
        
    def forward(self, input_ids, labels=None):
        x = self.encoder(input_ids)[:, 0, :] # select hidden state of [CLS] token
        x = self.dropout(x)
        logits = self.classifier(x)
        # If labels are provided, compute loss and return it
        if labels is not None:
            loss = self.loss_fn(logits, labels)
            return loss, logits
        else:
            return logits
     

In [55]:
from transformers import AutoTokenizer, Trainer, TrainingArguments, AutoConfig
from datasets import load_dataset

In [57]:
model_ckpt = "bert-base-uncased"
config = AutoConfig.from_pretrained(model_ckpt)
tokenizer = AutoTokenizer.from_pretrained(model_ckpt)

In [33]:
# Load the dataset
dataset = load_dataset("emotion")
batch_size = 32
n_labels = len(np.unique(dataset['train']['label']))
print(n_labels)

6


In [50]:
# Initialize the model
tranformer_classif_model = TransformerForSequenceClassification(config, n_labels=n_labels)

In [None]:
# Tokenize the data
def tokenize_function(examples):
    return tokenizer(examples['text'], padding='max_length', truncation=True)

# Apply the tokenizer to the dataset
tokenized_datasets = dataset.map(tokenize_function, batched=True)

In [54]:
# Define training arguments
training_args = TrainingArguments(
    output_dir='./results',          # Output directory
    evaluation_strategy="epoch",     # Evaluate at the end of every epoch
    learning_rate=2e-5,              # Learning rate
    per_device_train_batch_size=64,  # Batch size for training
    per_device_eval_batch_size=64,   # Batch size for evaluation
    num_train_epochs=1,              # Number of training epochs
    weight_decay=0.01,               # Strength of weight decay
)


In [16]:
# Define the Trainer
trainer = Trainer(
    model=tranformer_classif_model,                         # The model to train
    args=training_args,                  # Training arguments
    train_dataset=tokenized_datasets['train'],   # Training dataset
    eval_dataset=tokenized_datasets['test'],     # Evaluation dataset
)

In [None]:
# Train the classifier
trainer.train()