# Building a Transformer From Scratch

### Authors:
 - Carla Ellefsen
 - Brendan McKinley
 - Diya Vinod
 - Bingshen Lu
 - Michael Ivanitskiy

In [5]:
from dataclasses import dataclass
from pathlib import Path

import requests
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from jaxtyping import Float, Int
from torch.utils.data import DataLoader
from tqdm import tqdm
from typing import Optional, Tuple, List
import matplotlib.pyplot as plt
import re
from tokenizers import Tokenizer, models, pre_tokenizers, decoders, trainers

ModuleNotFoundError: No module named 'torch'

## Transformer

In [None]:
@dataclass
class GPTConfig:
    # default test values -- too small for a real language model, but big enough for testing
    d_vocab: int = 10_000  # size of the vocabulary
    d_model: int = 128  # dimension of the model
    d_mlp: int = 512  # dimension of the MLP (Feed-Forward) layer
    n_heads: int = 4  # number of attention heads
    d_head: int = 32  # dimension of each attention head
    n_layers: int = 6  # number of layers in the transformer
    act_fn: type[nn.Module] = nn.ReLU  # activation function

    @property
    def n_params(self) -> tuple[int]:
        "an estimate of the number of parameters"
        return (
            self.d_vocab * self.d_model  # embeddings (and tied unembeddings)
            + (
                self.d_model * self.d_mlp * 2  # mlp weights
                + self.d_model + self.d_mlp  # mlp bias
                + self.n_heads * (  # number of heads
                    4 * self.d_model * self.d_head  # 4 because Q, K, O, V
                )
            ) * self.n_layers,  # for each layer
        )

### Attention Head class implements a single attention head for the transformer model

In [None]:
class AttentionHead(nn.Module):
    def __init__(self, cfg: GPTConfig):
        print("Attention Head Constructor...")
        super().__init__()
        self.relu = nn.ReLU()
        self.d_vocab = cfg.d_vocab
        self.d_model = cfg.d_model
        self.d_head = cfg.d_head
        self.wq = nn.Linear(self.d_model, self.d_head)  # Linear layer for query
        self.wk = nn.Linear(self.d_model, self.d_head)  # Linear layer for key
        self.wv = nn.Linear(self.d_model, self.d_head)  # Linear layer for value
        self.wo = nn.Linear(self.d_head, self.d_model)  # Linear layer for output

    def forward(self,
                x: Int[torch.Tensor, "n_context d_model"]) -> Float[torch.Tensor, "n_context d_model"]:
        def masking_matrix(n_context):
            # Create a masking matrix to prevent attending to future tokens
            mask = torch.zeros((n_context, n_context))  # Start with all 0s
            mask[torch.triu(torch.ones((n_context, n_context)), diagonal=1) == 1] = -float('inf')  # Set above diagonal to -inf
            return mask

        M = masking_matrix(x.shape[0])  # Generate the masking matrix
        wk_out = self.wk(x).transpose(-2, -1)  # Apply key linear layer and transpose
        wq_out = self.wq(x)  # Apply query linear layer
        softmax_out = F.softmax((wq_out @ wk_out + M), dim=-1)  # Apply softmax to scaled dot-product of queries and keys with masking

        wv_out = self.wv(x)  # Apply value linear layer
        wo_out = self.wo(wv_out)  # Apply output linear layer

        result = softmax_out @ wo_out  # Compute the final output
        return result

NameError: name 'nn' is not defined

### Combine multiple attention heads

In [7]:
class MultiHeadedAttention(nn.Module):
    def __init__(self, cfg: GPTConfig):
        print("MultiHeadedAttention Constructor...")
        super().__init__()
        self.n_heads = cfg.n_heads
        self.d_model = cfg.d_model
        self.d_head = cfg.d_head

        # Create a list of attention heads
        self.attention_heads = nn.ModuleList([AttentionHead(cfg) for _ in range(self.n_heads)])

        # Linear layer to fix the output size
        self.wo = nn.Linear(self.d_model, self.d_model)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # Apply each attention head to the input
        head_outputs = [head(x) for head in self.attention_heads]
        
        # Sum the outputs of all attention heads
        summed_heads = torch.sum(torch.stack(head_outputs), dim=0)
        
        # Apply the linear layer to the summed outputs
        output = self.wo(summed_heads)
        return output

NameError: name 'nn' is not defined

### Applying a feed-forward neural network to the input tensor

In [None]:
class MLP(nn.Module):
    def __init__(self, cfg: GPTConfig):
        print("MLP Constructor...")
        super().__init__()

        self.d_model = cfg.d_model
        self.d_mlp = cfg.d_mlp

        # First linear layer
        self.lin1 = nn.Linear(self.d_model, self.d_mlp)
        # ReLU activation function
        self.relu = nn.ReLU()
        # Second linear layer
        self.lin2 = nn.Linear(self.d_mlp, self.d_model)

    def forward(self,
                x: Int[torch.Tensor, "n_context d_model"]) -> Float[torch.Tensor, "n_context d_model"]:
        # Apply first linear layer
        out = self.lin1(x)  
        # Apply ReLU activation
        out = self.relu(out)
        # Apply second linear layer
        out = self.lin2(out)
        return out

### Implementing a single transformer block

In [None]:
class TransformerBlock(nn.Module):
    def __init__(self, cfg: GPTConfig):
        print("TransformerBlock Constructor...")
        super().__init__()
        self.multiheadattn = MultiHeadedAttention(cfg)   # Initialize multi-head attention
        self.mlp = MLP(cfg)        # Initialize MLP
        self.norm1 = nn.RMSNorm(cfg.d_model)  # Layer normalization for attention output
        self.norm2 = nn.RMSNorm(cfg.d_model)  # Layer normalization for MLP output

    def forward(self, x: Float[torch.Tensor, "n_context d_model"]) -> Float[torch.Tensor, "n_context d_model"]:
        attn_output = self.multiheadattn(self.norm1(x))     # Apply layer normalization and multi-head attention
        x = x + attn_output         # Add residual connection
        mlp_output = self.mlp(self.norm2(x))    # Apply layer normalization and MLP
        x = x + mlp_output        # Add residual connection
        return x

### Implement transformer model

In [None]:
class Transformer(nn.Module):

    def __init__(self, cfg: GPTConfig):
        print("**"*30)
        print("Transformer Constructor...")
        super().__init__()
        self.embedding = nn.Embedding(cfg.d_vocab, cfg.d_model)  # Embedding layer to convert token indices to embeddings
        self.unembedding = nn.Linear(cfg.d_model, cfg.d_vocab)  # Linear layer to convert embeddings back to token logits
        self.transformer_blocks = nn.ModuleList([TransformerBlock(cfg) for _ in range(cfg.n_layers)])  # List of transformer blocks

    def forward(self, x: Int[torch.Tensor, "n_context"]) -> Float[torch.Tensor, "n_context d_vocab"]:
        out = self.embedding(x)  # Apply embedding layer
        for block in self.transformer_blocks:
            out = block(out)  # Pass through each transformer block
        out = F.softmax(self.unembedding(out), dim=-1)  # Apply unembedding layer and softmax
        return out

### Process text data by converting it into tensor representations and vice versa

In [None]:
class TextFinder:
    def __init__(self, text):
        print("=="*30)
        print("TextFinder Constructor...")
        self.text = text
        self.word_index = self.create_word_index(text)  # Create a word index for the given text

    def create_word_index(self, text):
        # Create a word index mapping each word to a unique index, with [UNK] token
        words = re.findall(r'\b\w+\b', text.lower())  # Find all words in the text
        sorted_words = sorted(set(words))  # Sort and remove duplicates
        sorted_words.append("[UNK]")  # Add an UNK token at the end
        return {word: idx for idx, word in enumerate(sorted_words)}  # Create a dictionary mapping words to indices

    def text_to_tensor(self):
        # Convert the text into a tensor representation, with [UNK] handling
        words = re.findall(r'\b\w+\b', self.text.lower())  # Find all words in the text
        int_sequence = [self.word_index.get(word, self.word_index["[UNK]"]) for word in words]  # Convert words to indices
        return torch.tensor(int_sequence, dtype=torch.long)  # Return as a tensor

    def text_to_tensor_for_prompt(self, prompt):
        # Convert the prompt into a tensor representation (based on how the words appear in self.dataset)
        words = re.findall(r'\b\w+\b', prompt.lower())  # Find all words in the prompt
        int_sequence = [self.word_index.get(word, self.word_index["[UNK]"]) for word in words]  # Convert words to indices
        return torch.tensor(int_sequence, dtype=torch.long)  # Return as a tensor

    def tensor_to_text(self, tensor):
        # Convert the tensor back to words using the index_to_word mapping
        word_list = [self.index_to_word.get(idx.item(), "[UNK]") for idx in tensor]  # Convert indices to words
        return " ".join(word_list)  # Join words into a single string

## Training

In [10]:
class Trainer:
    
    def __init__(self, model: Transformer,
                 text: str, optimizer: torch.optim.Optimizer,
                 device: torch.device = ("cuda" if torch.cuda.is_available() else "cpu"),
                 sample_size: int = 1, max_samples: Optional[int] = None,
                 print_interval: int = 1,
                 epochs: int = 1):
        print("Trainer Constructor...")
        self.model = model
        self.text = text
        self.optimizer = optimizer
        self.device = device
        self.sample_size = sample_size
        self.max_samples = max_samples
        self.print_interval = print_interval
        self.epochs = epochs
        self.dataset = TextFinder(text)
        self.tokenizer = Tokenizer(models.BPE())

        self.model.to(device)  # Move model to the specified device

    def create_dataloader(self):
        data_samples = self.data_tensor.unfold(0, self.sample_size, self.sample_size)  # Create data samples
        for data_sample in data_samples:
            print("Data sample: ", data_sample)
        return DataLoader(data_samples, batch_size=1, shuffle=False)  # Using DataLoader to load batches

    def train(self):
        print(f"Tokenizing:")

        self.tokenizer.pre_tokenizer = pre_tokenizers.Whitespace()  # Set pre-tokenizer to Whitespace
        trainer = trainers.BpeTrainer(vocab_size=10000, min_frequency=2, show_progress=True)  # Initialize BPE trainer
        self.tokenizer.train_from_iterator(self.text.split(), trainer)  # Train tokenizer on the text

        encoded = self.tokenizer.encode(self.text)  # Encode the text
        
        self.data_tensor = torch.tensor(encoded.ids)  # Convert encoded text to tensor
        self.dataloader = self.create_dataloader()  # Create dataloader

        print(f"Training with device: {self.device}")
        training_records: List[dict] = []
        self.model.train()  # Set model to training mode
        loss_values = []
        
        for epoch in range(self.epochs):
            print(f"Epoch {epoch + 1}/{self.epochs}")
            for i, sample in tqdm(enumerate(self.dataloader), total=len(self.dataloader), desc="Training"):
                sample = sample.squeeze(0)  # Remove extra dimension from the sample

                inputs = sample[:-1]  # Inputs are all tokens except the last one
                targets = sample[1:]  # Targets are all tokens except the first one

                # forward pass
                probabilities = self.model(inputs)  # Get model probabilities
                log_probabilities = torch.log(probabilities)  # Compute log probabilities

                # Calculate loss using NLLLoss
                loss = F.nll_loss(log_probabilities.view(-1, log_probabilities.size(-1)), targets.view(-1))
                
                # backward pass
                self.optimizer.zero_grad()  # Zero the gradients
                loss.backward()  # Backpropagate the loss
                self.optimizer.step()  # Update the model parameters

                # record progress
                training_records.append({
                    "sample": i,
                    "loss": loss.item(),
                })
                loss_values.append(loss.item())  # Store loss value

                if i % self.print_interval == 0:
                    print(f"Sample {i}, Loss: {loss.item()}")

                if self.max_samples is not None and i >= self.max_samples:
                    break
                
        plt.figure(figsize=(10, 5))
        plt.plot(loss_values, label="Training Loss")  # Plot training loss
        plt.xlabel("Sample")
        plt.ylabel("Loss")
        plt.title("Training Loss Over Time")
        plt.legend()
        plt.grid(True)
        plt.show()

        return self.model, training_records
    
    def generate(self, prompt: str, max_tokens: int = 50, temperature: float = 1.0) -> str:
        self.model.eval()  # Set model to evaluation mode

        encoded = self.tokenizer.encode(prompt)  # Encode the prompt
        input_tensor = torch.tensor(encoded.ids).unsqueeze(0)  # Convert encoded prompt to tensor and add batch dimension

        generated_tokens = input_tensor.squeeze(0).tolist()  # Initialize generated tokens list

        for _ in range(max_tokens):
            logits = self.model(input_tensor)  # Get model logits
            logits = logits[:, -1, :] / temperature  # Scale logits by temperature
            probabilities = F.softmax(logits, dim=-1)  # Compute probabilities
            next_token = torch.multinomial(probabilities, 1).item()  # Sample next token

            generated_tokens.append(next_token)  # Append next token to generated tokens
            input_tensor = torch.cat([input_tensor, torch.tensor([[next_token]], device=self.device)], dim=1)  # Update input tensor

        generated_text = self.tokenizer.decode(generated_tokens)  # Decode generated tokens to text

        return generated_text


NameError: name 'torch' is not defined

### Getting training data from training_data folder

In [None]:
def get_training_data_from_folder(data_folder: Path) -> str:
    """
    Reads all text files from the specified folder and concatenates them into a single text string.
    """
    text_data = ""
    for file_name in os.listdir(data_folder):
        if file_name.endswith(".txt"):
            file_path = data_folder / file_name
            with open(file_path, 'r', encoding='utf-8') as file:
                text_data += file.read() + "\n"  # Adding newline after each file's content
    return text_data

### Main

In [None]:
def main():
    # Initialize GPT configuration
    gpt_config = GPTConfig()
    
    # Initialize the Transformer model with the configuration
    gpt_model = Transformer(gpt_config)

    # Set up the optimizer with the model parameters and learning rate
    optimizer = optim.Adam(gpt_model.parameters(), lr=1e-4)

    # Define the path to the training data folder
    training_data_folder = Path("./training_data")
    
    # Get the training data from the specified folder
    some_book = get_training_data_from_folder(training_data_folder)
    
    # Initialize the Trainer with the model, training data, optimizer, and training parameters
    trainer = Trainer(gpt_model, some_book, optimizer, epochs=1, sample_size=50, print_interval=100)

    print("Starting training...")
    
    # Train the model and get the trained model and training records
    trained_model, training_records = trainer.train()

    # Output the training records (losses)
    print("Training complete.")
    for record in training_records:
        print(f"Sample {record['sample']}, Loss: {record['loss']}")

    print("**" * 50)
    
    # Save the trained model to a file
    torch.save(trained_model, "model.pt")

    # Generate text with the trained model using a prompt
    prompt = "Today I plan to complete the following tasks, "
    generated_text = trainer.generate(prompt)

    print("Generated text:")
    print(generated_text)

if __name__ == "__main__":
    main()