In [None]:
# Encoding name	OpenAI models
# o200k_base	gpt-4o, gpt-4o-mini
# cl100k_base	gpt-4-turbo, gpt-4, gpt-3.5-turbo, text-embedding-ada-002, text-embedding-3-small, text-embedding-3-large
# p50k_base	Codex models, text-davinci-002, text-davinci-003
# r50k_base (or gpt2)	GPT-3 models like davinci

Train data in the notebook: ./archived_projects/llm_from_scratch/notebooks/train_data.ipynb

In [None]:
text_data_format= [
    
]

In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import tiktoken  # Library for tokenization (e.g., OpenAI token encodings)
from torch.utils.data import DataLoader, Dataset
import torch.nn.utils.rnn as rnn_utils
import torch._dynamo
torch._dynamo.config.suppress_errors = True

# ---------------------------
# 1. Tokenization
# ---------------------------
def tokenize(text_input: str, encoding_type: str = "r50k_base") -> tuple[list, int]:
    """
    Converts input text into token using the specified encoding.
    
    Returns:
        tokens: List of token.
        vocab_size: The size of the vocabulary for the chosen encoding.
    """
    enc = tiktoken.get_encoding(encoding_type)
    vocab_size = enc.n_vocab
    tokens = enc.encode(text_input)
    return tokens, vocab_size

# ---------------------------
# 2. GPT Model Definition (Decoder-Only Transformer)
# ---------------------------
class GPTModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, n_heads, n_layers, max_seq_length):
        """
        Initializes a GPT language model using transformer encoder layers as a decoder.
        
        Key Components:
          - Token Embedding: Maps token indices to dense vectors.
          - Positional Embedding: Adds position information to token embeddings.
          - Transformer Encoder Layers: Each layer internally performs:
                * Multi-head self-attention with causal masking.
                * A feedforward (point-wise) network.
          - Final LayerNorm and Linear Head: Normalizes the output and projects it to logits over the vocabulary.
        
        Parameters:
            vocab_size (int): Number of tokens in the vocabulary.
            embedding_dim (int): Dimensionality of embeddings.
            n_heads (int): Number of attention heads in the multi-head self-attention.
            n_layers (int): Number of transformer encoder layers.
            max_seq_length (int): Maximum sequence length for positional embeddings.
        """
        super(GPTModel, self).__init__()
        self.token_embedding = nn.Embedding(vocab_size, embedding_dim)
        self.position_embedding = nn.Embedding(max_seq_length, embedding_dim)
        # Create a list of transformer encoder layers.
        # Each nn.TransformerEncoderLayer includes:
        #   1. Multi-head self-attention (the "self-attention" mechanism).
        #   2. A feedforward network (point-wise, applied to each token independently).
        self.layers = nn.ModuleList([
            nn.TransformerEncoderLayer(d_model=embedding_dim, nhead=n_heads, batch_first=True)
            for _ in range(n_layers)
        ])
        # Final layer normalization before projecting to output vocabulary logits.
        self.ln_f = nn.LayerNorm(embedding_dim)
        # Linear layer mapping hidden state to vocabulary logits.
        self.head = nn.Linear(embedding_dim, vocab_size, bias=False)
        self.max_seq_length = max_seq_length

    def forward(self, x):
        """
        Forward pass of the model.
        
        Steps:
          1. Create token and positional embeddings.
          2. Add embeddings together.
          3. Create a causal mask (lower triangular) to ensure a token only attends to previous tokens.
          4. For each transformer encoder layer:
              - Internally performs multi-head self-attention (using the causal mask).
              - Applies a feedforward network.
          5. Normalize the final outputs and project to vocabulary logits.
        
        Parameters:
            x (Tensor): Input tensor with shape (batch_size, seq_length).
            
        Returns:
            logits (Tensor): Unnormalized log probabilities with shape (batch_size, seq_length, vocab_size).
        """
        batch_size, seq_length = x.size()
        # Generate positional indices for each token in the sequence.
        positions = torch.arange(0, seq_length, device=x.device).unsqueeze(0).expand(batch_size, seq_length)
        token_emb = self.token_embedding(x)  # (batch_size, seq_length, embedding_dim)
        pos_emb = self.position_embedding(positions)  # (batch_size, seq_length, embedding_dim)
        x = token_emb + pos_emb  # Combine token and positional embeddings
        
        # Create a causal (lower triangular) mask:
        # This mask prevents a token from attending to any future tokens.
        mask = torch.tril(torch.ones(seq_length, seq_length, device=x.device)).bool()
        
        # Pass through each transformer encoder layer:
        # Each layer internally executes:
        #   - Multi-head self-attention using the provided causal mask (self-attention happens here).
        #   - A feedforward (point-wise) layer.
        for layer in self.layers:
            x = layer(x, src_mask=mask)
        
        # Apply final layer normalization.
        x = self.ln_f(x)
        # Project the normalized hidden states to logits over the vocabulary.
        logits = self.head(x)  # (batch_size, seq_length, vocab_size)
        return logits

    def generate(self, start_tokens, max_new_tokens=20, temperature=1.5, top_k=50, top_p=0.9):
        """
        Autoregressively generate text using the trained model.
        
        Generation Steps:
          - Start with an initial prompt (start_tokens).
          - Iteratively predict the next token using the model.
          - Apply temperature scaling to logits.
          - Use top-k and nucleus (top-p) sampling to limit the token selection.
        
        Parameters:
            start_tokens (Tensor): Initial tokens with shape (batch_size, seq_length).
            max_new_tokens (int): Number of tokens to generate.
            temperature (float): Controls randomness (higher => more random).
            top_k (int): Keeps only top k tokens for sampling.
            top_p (float): Cumulative probability threshold for nucleus sampling.
        
        Returns:
            generated (Tensor): Tensor with generated token IDs appended to the prompt.
        """
        self.eval()  # Set model to evaluation mode
        generated = start_tokens.clone()
        for _ in range(max_new_tokens):
            # Forward pass: compute logits for current sequence
            logits = self(generated)  # (batch_size, current_seq_length, vocab_size)
            next_logits = logits[:, -1, :] / temperature  # Focus on the last token's logits

            # --------- Multi-head Self-Attention and Feedforward Layers Occur in Each Transformer Layer ---------
            # Note: These operations happen within each encoder layer during the forward() call above.
            # ---------------------------------------------------------------------------------------------------

            # Top-k filtering: restrict sampling to the top k tokens
            if top_k is not None and top_k > 0:
                topk_values, topk_indices = torch.topk(next_logits, k=top_k, dim=-1)
                threshold = topk_values[:, -1].unsqueeze(-1)
                next_logits = torch.where(next_logits < threshold, torch.full_like(next_logits, float('-inf')), next_logits)

            # Nucleus (top-p) filtering: restrict sampling to tokens within the cumulative probability top_p.
            if top_p is not None and top_p < 1.0:
                sorted_logits, sorted_indices = torch.sort(next_logits, descending=True, dim=-1)
                cumulative_probs = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1)
                sorted_logits[cumulative_probs > top_p] = float('-inf')
                next_logits = torch.zeros_like(next_logits).scatter_(dim=-1, index=sorted_indices, src=sorted_logits)

            # Clean logits (handle potential NaNs/Infs) and sample the next token.
            next_logits = torch.nan_to_num(next_logits, nan=-1e10, posinf=1e10, neginf=-1e10)
            probs = F.softmax(next_logits, dim=-1)
            next_token = torch.multinomial(probs, num_samples=1)  # (batch_size, 1)
            generated = torch.cat([generated, next_token], dim=1)
        return generated

# ---------------------------
# 3. Prepare an Expanded Text Dataset
# ---------------------------
texts = ["Here comes the text data"]
max_length = 1024  # Maximum sequence length for each example
tokenized_texts = []
vocab_size = None
for text in texts:
    tokens, vs = tokenize(text)
    tokens = tokens[:max_length]  # Truncate sequences longer than max_length
    tokenized_texts.append(torch.tensor(tokens, dtype=torch.long))
    if vocab_size is None:
        vocab_size = vs

# Pad sequences to the same length for batch processing.
padded_sequences = rnn_utils.pad_sequence(tokenized_texts, batch_first=True, padding_value=0)

# ---------------------------
# 4. Create a Dataset with Shifted Targets
# ---------------------------
class TextDataset(Dataset):
    def __init__(self, sequences):
        """
        Dataset for language modeling.
        
        Each sample consists of:
            - Input: A sequence of token IDs.
            - Target: The same sequence shifted by one token.
        """
        self.sequences = sequences

    def __len__(self):
        return self.sequences.size(0)

    def __getitem__(self, idx):
        seq = self.sequences[idx]
        if seq.size(0) < 2:
            raise ValueError("Sequence length must be at least 2 for shifting")
        # Input tokens (all tokens except the last)
        input_ids = seq[:-1]
        # Target tokens (all tokens except the first)
        target_ids = seq[1:]
        return input_ids, target_ids

dataset = TextDataset(padded_sequences)
data_loader = DataLoader(dataset, batch_size=1, shuffle=True)

# ---------------------------
# 5. Set up the Device and Instantiate the Model
# ---------------------------
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
print(f"Using device: {device}")

embedding_dim = 512   # Embedding dimension for tokens and positions
n_heads = 8           # Number of attention heads in each multi-head self-attention block
n_layers = 6          # Number of transformer encoder layers (each with self-attention and feedforward)
max_seq_length = padded_sequences.size(1)  # Sequence length derived from padded data

# Instantiate the GPT transformer model
model = GPTModel(vocab_size=vocab_size,
                 embedding_dim=embedding_dim,
                 n_heads=n_heads,
                 n_layers=n_layers,
                 max_seq_length=max_seq_length).to(device)

criterion = nn.CrossEntropyLoss(ignore_index=0)  # Loss function for next-token prediction
optimizer = optim.Adam(model.parameters(), lr=1e-4)  # Optimizer

# Optionally disable torch.compile if it causes issues:
# if hasattr(torch, "compile"):
#     model = torch.compile(model)

# ---------------------------
# 6. Training Loop Over Epochs
# ---------------------------
num_epochs = 100
for epoch in range(num_epochs):
    total_loss = 0.0
    model.train()  # Set model to training mode
    for input_ids, target_ids in data_loader:
        input_ids = input_ids.to(device)
        target_ids = target_ids.to(device)
        optimizer.zero_grad()  # Clear previous gradients
        logits = model(input_ids)
        # Reshape logits and targets for loss computation:
        logits = logits.view(-1, vocab_size)
        target_ids = target_ids.view(-1)
        loss = criterion(logits, target_ids)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    avg_loss = total_loss / len(data_loader)
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.4f}")

In [None]:
num_epochs = 60
for epoch in range(num_epochs):
    total_loss = 0.0
    model.train()
    for input_ids, target_ids in data_loader:
        input_ids = input_ids.to(device)
        target_ids = target_ids.to(device)
        optimizer.zero_grad()
        logits = model(input_ids)
        logits = logits.view(-1, vocab_size)
        target_ids = target_ids.view(-1)
        loss = criterion(logits, target_ids)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    avg_loss = total_loss / len(data_loader)
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.4f}")

In [None]:
# ---------------------------
# Inference
# ---------------------------
def generate_text(model, prompt, max_new_tokens=30, temperature=1.5, top_k=50, top_p=0.9):
    """
    Generates text from a given prompt using the trained model.
    
    Steps:
      - Tokenize the input prompt.
      - Autoregressively generate new tokens.
      - Decode the generated tokens back to text.
    
    Parameters:
        model (nn.Module): The transformer-based language model.
        prompt (str): The initial text prompt.
        max_new_tokens (int): Number of new tokens to generate.
        temperature (float): Controls randomness in sampling.
        top_k (int): Parameter for top-k filtering.
        top_p (float): Parameter for nucleus (top-p) sampling.
        
    Returns:
        str: Generated text.
    """
    tokens, _ = tokenize(prompt)
    device = next(model.parameters()).device
    input_ids = torch.tensor([tokens], dtype=torch.long).to(device)
    generated_ids = model.generate(input_ids, max_new_tokens=max_new_tokens,
                                   temperature=temperature, top_k=top_k, top_p=top_p)
    # Decode token ids back to text using a specific encoding
    enc = tiktoken.get_encoding("o200k_base")
    return enc.decode(generated_ids[0].tolist())

prompt_text = "College"
generated = generate_text(model, prompt_text, max_new_tokens=30, temperature=1.5, top_k=50, top_p=0.9)
print("Generated text:\n", generated)