<a href="https://colab.research.google.com/github/Anushka108/Transformers-in-NLP/blob/main/Transformers_in_NLP.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Transformers in a Nutshell
An educational but usable example of a (character level) GPT-2 transformer language model.

In [None]:
!pip install -q wget pytorch-lightning

In [None]:
import os
import sys
import wget
from tqdm import tqdm

# for dataset
from torch.utils.data import Dataset
from torch.utils.data.dataloader import DataLoader

# for model
import math
import torch
from torch import nn
import torch.nn.functional as F
from torchmetrics.functional import accuracy
import pytorch_lightning as pl

# Model

* vocab_size: The size of the vocabulary. This should be set later based on the dataset.
* max_seq_len: The maximum sequence length for the input text.
* emb_size: The size of the embeddings.
* num_blocks: The number of transformer blocks (layers).
* num_heads: The number of attention heads in each transformer block.
* fc_hidden_dim: The hidden dimension size of the feed-forward network within each transformer block, typically 4 times the embedding size.


In [None]:
class GPT2Config:
    """
    'gpt2-mini' config from minGPT
    """
    # data
    default_data_url = 'https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt'

    # model
    vocab_size = None
    max_seq_len = 128
    emb_size = 192
    num_blocks = 6
    num_heads = 6
    fc_hidden_dim = 4 * emb_size

    # regularization
    attn_dropout_p = 0.1  # Dropout probability for the attention layers.
    res_dropout_p = 0.1   #Dropout probability for the residual connections.
    emb_dropout_p = 0.1   #Dropout probability for the embeddings.

    # training
    max_learning_rate = 2.5e-4
    batch_size = 512
    device = 'cuda' if torch.cuda.is_available() else 'cpu'

    def __init__(self, **kwargs):
        """ any extra config args """
        for k, v in kwargs.items():
            setattr(self, k, v)

In [None]:
class CharDataset(Dataset):
    def __init__(self, config, data=None):
        """
        A toy dataset class for charGPT modified from the minGPT repo
        """
        self.config = config
        if data is None:
            filename = wget.download(config.default_data_url)
            data = open(filename, 'r').read()

        chars = sorted(list(set(data)))
        data_size, vocab_size = len(data), len(chars)
        print('data has %d characters, %d unique.' % (data_size, vocab_size))

        self.stoi = { ch:i for i,ch in enumerate(chars) }  #stoi: A dictionary mapping each character to a unique integer (character to index).
        self.itos = { i:ch for i,ch in enumerate(chars) }  #itos: A dictionary mapping each integer to its corresponding character (index to character).
        self.vocab_size = vocab_size
        self.data = data

    def __len__(self):
        return len(self.data) - self.config.max_seq_len

    def __getitem__(self, idx):
        # grab a chunk of (block_size + 1) characters from the data
        chunk = self.data[idx:idx + self.config.max_seq_len + 1]
        # encode every character to an integer
        dix = [self.stoi[s] for s in chunk]
        # return as tensors
        x = torch.tensor(dix[:-1], dtype=torch.long)
        y = torch.tensor(dix[1:], dtype=torch.long)

        return x, y

In [None]:
class CausalMultiHeadAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        # Ensure embedding size is divisible by the number of attention heads
        assert config.emb_size % config.num_heads == 0

        # Linear layers to compute query, key, and value projections
        self.W_Q = nn.Linear(config.emb_size, config.emb_size, bias=False)
        self.W_K = nn.Linear(config.emb_size, config.emb_size, bias=False)
        self.W_V = nn.Linear(config.emb_size, config.emb_size, bias=False)
        # Linear layer to project the concatenated output of all heads
        self.res_proj = nn.Linear(config.emb_size, config.emb_size, bias=False)

        # Dropout layers for regularization
        self.attn_dropout = nn.Dropout(config.attn_dropout_p)
        self.res_dropout = nn.Dropout(config.res_dropout_p)

        # Register a lower triangular matrix to enforce causality in attention
        self.register_buffer(
            'mask',
            torch.tril(torch.ones(config.max_seq_len, config.max_seq_len))
        )

        # Store the number of attention heads
        self.num_heads = config.num_heads

    def forward(self, x):
        # step 0) size: (batch_size, seq_len, emb_size)
        batch_size, seq_len, emb_size = x.size()
        # Compute the dimension of each attention head
        head_dim = emb_size // self.num_heads

        # step 1) Project the input `x` to queries, keys, and values
        # Reshape and split into multiple heads
        # size: (batch_size, seq_len, emb_size) -> (batch_size, seq_len, num_heads, head_dim)
        Q = self.W_Q(x).reshape(batch_size, seq_len, self.num_heads, head_dim)
        K = self.W_K(x).reshape(batch_size, seq_len, self.num_heads, head_dim)
        V = self.W_V(x).reshape(batch_size, seq_len, self.num_heads, head_dim)

        # step 2) Transpose to get the heads dimension first
        # size: (batch_size, seq_len, num_heads, head_dim) -> (batch_size, num_heads, seq_len, head_dim)
        Q = Q.transpose(1, 2)
        K = K.transpose(1, 2)
        V = V.transpose(1, 2)

        # step 3) Compute the attention scores
        # size: (batch_size, num_heads, seq_len, head_dim) x (batch_size, num_heads, head_dim, seq_len)
        # = (batch_size, num_heads, seq_len, seq_len)
        scores = Q @ K.transpose(-2, -1) * (1.0 / math.sqrt(head_dim))

        # step 4) Apply the causal mask to the attention scores
        # Mask future positions (set to -inf)
        scores = scores.masked_fill(self.mask[:seq_len, :seq_len] == 0, float('-inf'))

        # step 5) Apply softmax to get the attention weights
        # size: (batch_size, num_heads, seq_len, seq_len)
        attn = F.softmax(scores, dim=-1)
        attn = self.attn_dropout(attn)

        # step 6) Compute the weighted sum of values
        # size: (batch_size, num_heads, seq_len, seq_len) x (batch_size, num_heads, seq_len, head_dim)
        # = (batch_size, num_heads, seq_len, head_dim)
        out = attn @ V

        # step 7) Transpose and reshape to concatenate heads back
        # size: (batch_size, num_heads, seq_len, head_dim) -> (batch_size, seq_len, emb_size)
        out = out.transpose(1, 2).reshape(batch_size, seq_len, emb_size)

        # step 8) Project concatenated heads back into the embedding space
        out = self.res_proj(out)
        out = self.res_dropout(out)

        return out

In [None]:
class MLP(nn.Module):
    def __init__(self, config):
        super().__init__()
        # Linear layer to project input into hidden dimension
        self.hidden = nn.Linear(config.emb_size, config.fc_hidden_dim)
        # Activation function (Gaussian Error Linear Unit)
        self.gelu = nn.GELU()
        # Linear layer to project hidden dimension back to embedding size
        self.res_proj = nn.Linear(config.fc_hidden_dim, config.emb_size)
        # Dropout layer for regularization
        self.res_dropout = nn.Dropout(config.res_dropout_p)

    def forward(self, x):
        # Apply the first linear transformation to the input
        x = self.hidden(x)
        # Apply GELU activation function
        x = self.gelu(x)
        # Project back to the original embedding size
        x = self.res_proj(x)
        # Apply dropout for regularization
        x = self.res_dropout(x)

        # Return the final output
        return x

In [None]:
class TransformerBlock(nn.Module):
    def __init__(self, config):
        super().__init__()
        # Layer normalization for the self-attention mechanism
        self.ln1 = nn.LayerNorm(config.emb_size)
        # Causal Multi-Head Attention mechanism
        self.attn = CausalMultiHeadAttention(config)
        # Layer normalization after the attention mechanism
        self.ln2 = nn.LayerNorm(config.emb_size)
        # Multi-Layer Perceptron (MLP) for the feedforward network
        self.mlp = MLP(config)

    def forward(self, x):
        # Apply layer normalization to the input
        x = self.ln1(x)
        # Perform self-attention and add the residual connection
        x = x + self.attn(x)
        # Apply layer normalization to the output of the attention mechanism
        x = self.ln2(x)
        # Apply the MLP and add the residual connection
        x = x + self.mlp(x)

        # Return the final output
        return x

In [None]:
class GPT2(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config

        # Token embedding layer
        self.tok_emb = nn.Embedding(config.vocab_size, config.emb_size)
        # Positional embedding layer
        self.pos_emb = nn.Embedding(config.max_seq_len, config.emb_size)
        # Dropout layer for embeddings
        self.emb_dropout = nn.Dropout(config.emb_dropout_p)

        # Transformer blocks
        self.blocks = nn.Sequential(*[TransformerBlock(config) for _ in range(config.num_blocks)])
        # Layer normalization after transformer blocks
        self.ln = nn.LayerNorm(config.emb_size)
        # Final linear layer for prediction
        self.head = nn.Linear(config.emb_size, config.vocab_size, bias=False)

        # Parameter to store positional indices
        self.pos_idxs = nn.Parameter(torch.arange(0, config.max_seq_len), requires_grad=False)

        # Initialize weights
        self.apply(self._init_weights)

    def forward(self, x):
        batch_size, seq_len = x.size()

        # Token embeddings
        tok_embs = self.tok_emb(x)
        # Positional embeddings
        pos_embs = self.pos_emb(self.pos_idxs[:seq_len])

        # Combine token and positional embeddings, apply dropout
        seq = self.emb_dropout(tok_embs + pos_embs)

        # Transformer blocks
        seq = self.blocks(seq)

        # Layer normalization
        seq = self.ln(seq)

        # Final linear layer for prediction
        out = self.head(seq)

        return out

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            # Initialize weights for linear layers
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            # Initialize weights for embedding layers
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
        elif isinstance(module, nn.LayerNorm):
            # Initialize weights for layer normalization
            torch.nn.init.zeros_(module.bias)
            torch.nn.init.ones_(module.weight)

        # Special initialization for residual projection weights in TransformerBlocks
        for name, param in self.named_parameters():
            if name.endswith('res_proj.weight'):
                # Initialize with normal distribution scaled by sqrt(1/2 * num_blocks)
                torch.nn.init.normal_(param, mean=0.0, std=0.02 / math.sqrt(2 * self.config.num_blocks))

In [None]:
class GPT2LitModel(pl.LightningModule):
    def __init__(self, model, config):
        super().__init__()
        self.model = model  # GPT-2 model
        self.config = config  # Configuration parameters
    def forward(self, x):
        return self.model(x)
    def training_step(self, batch, batch_idx):
        x, y = batch
        logits = self.model(x)  # Forward pass through the model
        # Compute loss using cross-entropy
        loss = F.cross_entropy(logits.reshape(-1, logits.size(-1)), y.reshape(-1))
        self.log('train_loss', loss)  # Log training loss
        return loss
    def configure_optimizers(self):
        # Configure Adam optimizer with maximum learning rate from config
        optimizer = torch.optim.Adam(self.parameters(), lr=self.config.max_learning_rate)
        return optimizer

In [None]:
# wiring everything up to start training
config = GPT2Config()
dataset = CharDataset(config)
config.vocab_size = dataset.vocab_size

train_loader = DataLoader(dataset, num_workers=4, batch_size=config.batch_size, shuffle=True)

model = GPT2(config)
lit_model = GPT2LitModel(model, config)
trainer = pl.Trainer(accelerator="gpu", devices=1, max_epochs=10)

In [None]:
trainer.fit(lit_model, train_loader)

In [None]:
# save our trained model so we can use it later
model_save_name = 'shakespeareGPT.pt'
path = f'/kaggle/working/{model_save_name}'
torch.save(lit_model.state_dict(), path)

In [None]:
# simple helper function to prompt model and get readable result
@torch.no_grad()
def get_predictions(model, prompt, max_seq_len=128):
    input = torch.LongTensor([dataset.stoi[i] for i in prompt]).unsqueeze(0)
    while input.size(1) < max_seq_len:
        logits = model(input)
        logits = logits[:, -1, :]
        probs = F.softmax(logits)
        idxs = torch.multinomial(probs, num_samples=1)
        input = torch.cat((input, idxs), dim=1)

    out_str = ''.join([dataset.itos[int(i)] for i in input[0].tolist()])

    return out_str

In [None]:
# loading saved model to use for inference
model_save_name = 'shakespeareGPT.pt'
path = f'/kaggle/working/{model_save_name}'
lit_model.load_state_dict(torch.load(path))

In [None]:
prompt = 'Who art thou?' # put your propmt here!
preds_str = get_predictions(lit_model, prompt)
print(preds_str)