

# A google colab with my own implementation of a decoder only model



### Let's use tokenizers only to create a vocabulary with a tinyshakespeare dataset


In [None]:
!pip install tokenizers datasets

### Let's import the libraries needed

*   For this script we will only use torch, tokenizer from huggingface, and load_dataset to manipulate our data easier

We will reproduce the only decoder part of the original paper
"Attention is all you need"


In [None]:
import math
import string
import torch
import torch.nn as nn
import torch.nn.functional as F

from datasets import load_dataset
from tokenizers import (
    decoders,
    models,
    normalizers,
    pre_tokenizers,
    processors,
    trainers,
    Tokenizer,
)

### Let's start by creating a vocabulary with tinyshakespeare and load the data using load_dataset

### Let's create a custom Tokenizer to train a tokenizer from scratch with the tinyshakespeare

I created a custom tokenizer following these criterias:

1. The Normalizer
Before we analyze the text, we must clean it. If we have the word "Apple" and "apple", the computer thinks they are different things.


*   Lowercase: Converts everything to lowercase so the model doesn't have to learn "The" and "the" separately.
* NFD Unicode & Accents: It breaks characters into parts (e.g., é becomes e + ´).
* We then strip the accent, leaving just the base e. This makes the tokenizer much more robust across different languages.

2. The Pre-Tokenizer
We can't let the WordPiece model just start merging letters randomly. If we didn't have this step, the model might try to merge the end of one word with the start of the next (e.g., "blue" and "apple" becoming "ueap"). The Pre-Tokenizer acts as a boundary setter. It usually splits by whitespace and punctuation, creating a list of "candidate words" that the WordPiece algorithm is allowed to work on.

3. The WordPiece Model
This is where the magic happens. WordPiece is a subword tokenization algorithm. Its goal is to find a balance between character-level and word-level tokenization. If we use whole words, our vocabulary becomes millions of words long (very expensive). If we use just characters, the sequences become too long and lose meaning. The WordPiece Solution: It keeps common words whole (like the) but breaks rare words into meaningful chunks. It uses a likelihood-based approach: it merges characters into a subword only if that merge makes the training data more predictable. The ## Prefix: This is a flag. It tells the model: "This piece (like ##ly) is not a new word; it's a continuation of the previous one."

4. The Post-Processor
Once the text is broken into subwords, we need to format it for the specific AI architecture (like BERT). [CLS] (Classification): Added to the very beginning. The model uses the "thought" at this position to understand the meaning of the whole sentence. [SEP] (Separator): Added to the end of sentences so the model knows where one idea ends and another begins.

In [None]:
class CustomTokenizer:
    """
    A configurable tokenizer pipeline using the WordPiece algorithm.

    This class encapsulates the full lifecycle of a tokenizer, from
    normalization and pre-tokenization to training and persistence.

    Attributes:
        model (tokenizers.models.Model): The underlying tokenization model.
        tokenizer (tokenizers.Tokenizer): The high-level tokenizer instance.
        trainer (tokenizers.trainers.Trainer): The trainer object containing
            hyperparameters like vocab size and special tokens.
        special_tokens (list): A list of reserved tokens (e.g., [PAD], [CLS]).
    """

    def __init__(
        self,
        d_vocab: int = 30522,
        model=None,
        normalizer=None,
        pre_tokenizer=None,
        trainer=None,
        special_tokens=None
    ):
        """
        Initializes the pipeline with standard defaults for WordPiece (BERT-style).

        Args:
            d_vocab (int): The target vocabulary size for the trained model.
            model: Optional model instance. Defaults to WordPiece.
            normalizer: Optional normalization sequence. Defaults to NFD,
                Lowercase, and StripAccents for general text.
            pre_tokenizer: Optional pre-tokenizer. Defaults to Whitespace.
            trainer: Optional trainer instance.
            special_tokens: List of strings to be reserved in the vocabulary.
        """
        self.special_tokens = special_tokens or \
            ["[UNK]", "[PAD]", "[CLS]", "[SEP]", "[MASK]"]

        self.unk_token = self.special_tokens[0] # Assumes UNK is the first element

        self.model = model or models.WordPiece(unk_token=self.unk_token)
        self.tokenizer = Tokenizer(self.model)

        self.tokenizer.normalizer = normalizer or normalizers.Sequence([
            normalizers.NFD(),
            normalizers.Lowercase(),
            normalizers.StripAccents()
        ])

        self.tokenizer.pre_tokenizer = pre_tokenizer or pre_tokenizers.Whitespace()

        self.trainer = trainer or trainers.WordPieceTrainer(
            vocab_size=d_vocab,
            special_tokens=self.special_tokens,
            limit_alphabet=1000,
            initial_alphabet=list(string.ascii_letters + string.digits),
            show_progress=True
        )

    def train(self, dataset, batch_size=1000):
        """
        Trains the tokenizer model on a provided dataset using a generator.

        This method uses a lazy-loading approach to handle datasets that
        exceed available RAM.

        Args:
            dataset: A dataset object indexable via dataset[i:j].
            batch_size (int): Number of text samples per iteration.
        """
        def get_training_corpus():
            for i in range(0, len(dataset), batch_size):
                yield dataset[i : i + batch_size]["text"]

        self.tokenizer.train_from_iterator(
            get_training_corpus(),
            trainer=self.trainer
        )

        cls_id = self.tokenizer.token_to_id("[CLS]")
        sep_id = self.tokenizer.token_to_id("[SEP]")

        self.tokenizer.post_processor = processors.TemplateProcessing(
            single="[CLS] $A [SEP]",
            pair="[CLS] $A [SEP] $B:1 [SEP]:1",
            special_tokens=[("[CLS]", cls_id), ("[SEP]", sep_id)]
        )

    def encode(self, text: str, add_special_tokens: bool = True):
        """
        Encodes a single string of text into token IDs.

        Returns:
            tokenizers.Encoding: An object containing token IDs, offsets, etc.
        """
        return self.tokenizer.encode(text, add_special_tokens=add_special_tokens)

    def decode(self, ids: list):
        """
        Decodes a list of token IDs back into a string.
        """
        return self.tokenizer.decode(ids)

    def save(self, path):
        """
        Saves the trained tokenizer to a JSON file for deployment.

        Args:
            path (str): File system path to save the configuration.
        """
        self.tokenizer.save(path)

    def get_vocabulary(self):
        """
        Get the vocabulary used for training.
        """
        return self.tokenizer.get_vocab_size()

### Let's train the model to learn the vocabulary we have in our dataset

In [None]:
customTokenizer = CustomTokenizer()
customTokenizer.train(dataset)
customTokenizer.encode('Looks like this is currently working').tokens # a simple test!

In [None]:
customTokenizer.encode('Looks like this is currently working').ids # Let's check the ids in the new vocab!

In [None]:
customTokenizer.get_vocabulary() # Let's check the new vocabulary

### Let's define the parameters for our model

- d_model (Model Dimensionality): $64$ This is the size of the internal feature vectors used throughout the model. Every token is represented by a 64-dimensional vector, and this size dictates the width of the main information pathway in the transformer layers.
- seq_len (Sequence Length / Context Window): $32$ The maximum number of tokens the model can process simultaneously. This is the length of the input sequence the model's positional encodings and attention mechanism are designed to handle.
- batch_size (Batch Size): $16$ The number of independent training sequences processed in parallel before the model's weights are updated.
- num_heads (Number of Attention Heads): $4$ The number of independent parallel sub-spaces the attention mechanism uses. Each head focuses on different parts of the context, allowing the model to capture varied relationships within the sequence.
- num_layers (Number of Decoder Blocks): $4$ The depth of the model, specifying how many times the core attention-FFNN structure is repeated.
- d_ff (Feed-Forward Inner Dimension): $4 \times d\_model$ ($256$) The size of the hidden layer in the position-wise Feed-Forward Network (FFNN). This intermediate size is typically set to $4 \times d\_model$.
- learning_rate ($\eta$): $1e-3$ The main step-size hyperparameter for the Adam optimizer, controlling how aggressively the model adjusts its weights during training.
- max_iters (Maximum Training Iterations): $100000$ The total number of optimization steps (batches) the training loop will execute.
- eval_interval (Evaluation Interval): $500$ The number of training steps after which the model will pause training to evaluate its current performance on the validation set.

You can adjust the parameters that make sense to you!

In [None]:
d_model = 64
seq_len = 32
batch_size = 16
num_heads = 4
num_layers = 4
d_ff = 4 * d_model # 4 * d_model is a common rule of thumb
learning_rate = 1e-3
max_iters = 100000
eval_interval = 500
device = 'cuda' if torch.cuda.is_available() else 'cpu'

### Let's create the classes for our decoder only model

This is the most critical section of this notebook. Based on the seminal paper "Attention Is All You Need," I have isolated and reproduced the Decoder-only architecture (the foundation for modern LLMs like GPT).

To make this complex system accessible, I have heavily documented the classes with "step-by-step" commentary. In this session, we will perform a "surgical" breakdown of every component, mapping the raw code directly back to the architectural diagrams from the original paper.


In [None]:
class AddAndNorm(nn.Module):
    def __init__(self, d_model):
        super().__init__()
        self.norm = nn.LayerNorm(d_model)

    def forward(self, input, output):
        return self.norm(input + output)

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=4):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        positions = torch.arange(0, max_len).unsqueeze(1)
        div_term = 10000**(torch.arange(0,d_model, 2)/d_model)

        pe[:, 0::2] = torch.sin(positions/div_term)
        pe[:, 1::2] = torch.cos(positions/div_term)
        self.register_buffer('pe', pe)

    def forward(self, embeddings):
        return embeddings + self.pe[:embeddings.shape[1], :]

class FFNN(nn.Module):
    def __init__(self, in_dim, inner_dim, out_dim):
        super().__init__()
        self.ffnn = nn.Sequential(
            nn.Linear(in_dim, inner_dim),
            nn.ReLU(),
            nn.Linear(inner_dim, out_dim),
        )

    def forward(self, input):
        return self.ffnn(input)

class Linear(nn.Module):
    def __init__(self, in_dim, out_dim):
        super().__init__()
        self.linear = nn.Linear(in_dim, out_dim)

    def forward(self, input):
        return self.linear(input)

class Softmax(nn.Module):
    def __init__(self):
        super().__init__()
        self.softmax = nn.Softmax(dim=1)

    def forward(self, input):
        return self.softmax(input)

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads, bias=False):
        super().__init__()
        # Ensure d_model is divisible by num_heads
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"

        # Store model dimensions
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads

        # Linear layers for Q, K, V, and output
        self.W_q = nn.Linear(d_model, d_model, bias=bias)
        self.W_k = nn.Linear(d_model, d_model, bias=bias)
        self.W_v = nn.Linear(d_model, d_model, bias=bias)
        self.W_o = nn.Linear(d_model, d_model, bias=bias)

        # Softmax for attention scores
        self.P = nn.Softmax(dim=-1)

        # Placeholders for intermediate values (great for debugging)
        self.Q = None
        self.K = None
        self.V = None
        self.scores = None
        self.attention = None
        self.output = None

    def get_linear_projections(self, input):
        """Projects input into Q, K, V."""
        self.Q = self.W_q(input)
        self.K = self.W_k(input)
        self.V = self.W_v(input)

    def get_heads(self, batch_size, seq_len):
        """Reshapes Q, K, V to have multiple heads."""
        self.Q = self.Q.view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
        self.K = self.K.view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
        self.V = self.V.view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)

    def get_scores(self):
        """Calculates scaled dot-product attention scores."""
        self.scores = torch.matmul(self.Q, self.K.transpose(-2, -1)) / math.sqrt(self.d_k)

    def apply_masking(self, seq_len):
        """Applies a causal mask to the scores."""
        mask = torch.triu(torch.ones(seq_len, seq_len), diagonal=1).bool().to(self.scores.device)
        self.scores = self.scores.masked_fill(mask, float('-inf'))

    def apply_softmax(self):
        """Applies softmax to the scores to get attention weights."""
        self.attention_weights = self.P(self.scores)

    def get_attention(self):
         """Computes the attention output by multiplying weights with V."""
         self.attention = torch.matmul(self.attention_weights, self.V)

    def get_output(self, batch_size, seq_len):
        """Concatenates heads and applies the final linear layer."""
        reshaped = self.attention.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)
        self.output = self.W_o(reshaped)

    def forward(self, x):
        """
        Executes the full multi-head attention mechanism.
        This single method calls all the helper methods in the correct order.
        """
        # Get dimensions from the input tensor
        batch_size, seq_len, _ = x.shape

        # 1. Get Q, K, V projections
        self.get_linear_projections(x)

        # 2. Split into multiple heads
        self.get_heads(batch_size, seq_len)

        # 3. Calculate attention scores
        self.get_scores()

        # 4. Apply causal mask (for decoder self-attention)
        self.apply_masking(seq_len)

        # 5. Apply softmax to get weights
        self.apply_softmax()

        # 6. Get weighted sum of Values
        self.get_attention()

        # 7. Concatenate heads and get final output
        self.get_output(batch_size, seq_len)

        # Return the final output tensor
        return self.output


class DecoderBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff=32, dropout=0.1):
        """
        Initializes a single Decoder Block.

        Args:
            d_model (int): The dimensionality of the model (embedding size).
            num_heads (int): The number of attention heads.
            d_ff (int): The inner dimension of the feed-forward network.
                        Typically 4 * d_model.
            dropout (float): The dropout probability.
        """
        super().__init__()

        # First sub-layer: Masked Multi-Head Attention
        self.attention = MultiHeadAttention(d_model, num_heads)
        self.add_norm1 = AddAndNorm(d_model)

        # Second sub-layer: Feed-Forward Network
        self.ffnn = FFNN(d_model, d_ff, d_model)
        self.add_norm2 = AddAndNorm(d_model)

        # Dropout layer for regularization
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # 1. Compute attention
        attn_output = self.attention(x)

        # 2. Apply dropout and the first residual connection + normalization
        x = self.add_norm1(x, self.dropout(attn_output))

        # --- Feed-Forward Sub-layer ---
        # 1. Compute feed-forward output
        ff_output = self.ffnn(x)
        # 2. Apply dropout and the second residual connection + normalization
        x = self.add_norm2(x, self.dropout(ff_output))

        return x

class MyGPT(nn.Module):
    def __init__(
        self,
        d_model,
        seq_len,
        num_heads,
        num_layers,
        d_ff,
        vocab_size,
    ):
        super().__init__()

        self.token_embedding_table = nn.Embedding(vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, seq_len)

        self.decoder_blocks = nn.Sequential(
           *[DecoderBlock(d_model, num_heads, d_ff) for layer in range(num_layers)]
        )

        self.lm_head = Linear(d_model, vocab_size)

    def forward(self, tokens, targets=None):
        embeddings = self.token_embedding_table(tokens)
        pos_embed = self.positional_encoding(embeddings)
        x = self.decoder_blocks(pos_embed)
        logits = self.lm_head(x)

        loss = None
        if targets != None:
            B, T, C = logits.shape
            loss_fn = nn.CrossEntropyLoss()
            loss = loss_fn(logits.view(B*T, C), targets.view(B*T))

        return logits, loss

    @torch.no_grad()
    def generate(self, idx, max_new_tokens, temperature=1.0, top_k=None):
        """
        Generates new tokens by iteratively predicting the next token.

        Args:
            idx (torch.Tensor): A tensor of shape (B, T) containing the
                                starting token indices (the prompt).
            max_new_tokens (int): The maximum number of tokens to generate.
            temperature (float): Softmax temperature for controlling randomness.
            top_k (int, optional): Sample from the top K most likely tokens.

        Returns:
            torch.Tensor: The input tensor with the newly generated tokens
                          appended, shape (B, T + max_new_tokens).
        """
        for _ in range(max_new_tokens):
            # 1. Crop the context if it's longer than seq_len
            # This ensures the input to the model doesn't exceed the max length
            # the model was trained with (self.positional_encoding's max_len).
            idx_cond = idx[:, -self.positional_encoding.pe.shape[0]:]

            # 2. Get predictions (logits)
            # The model's forward method gives logits for all tokens in the sequence.
            logits, _ = self(idx_cond) # Logits shape: (B, T_cond, C)

            # 3. Focus only on the *last* token's prediction (the next one)
            # The logits for the last token are what we use to sample the *next* token.
            logits = logits[:, -1, :] # Logits shape: (B, C)

            # 4. Apply temperature to logits
            logits = logits / temperature

            # 5. Optional: Apply Top-K sampling
            if top_k is not None:
                v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
                logits[logits < v[:, [-1]]] = float('-inf')

            # 6. Apply softmax to convert logits to probabilities
            probs = F.softmax(logits, dim=-1)

            # 7. Sample the next token from the probability distribution
            idx_next = torch.multinomial(probs, num_samples=1) # Shape (B, 1)

            # 8. Append the sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1)

        return idx

### Let's encode all the data into tokens

In this part we will take all the data to train our model. Before passing the data to our decoder model we have to covert words into tokens(ids).


In [None]:
def tokenize_fn(example):
    encoding = customTokenizer.encode(example["text"])
    return {"ids": encoding.ids}

tokenized_dataset = dataset.map(
    tokenize_fn,
    remove_columns=["text"],
    desc="Tokenizing dataset"
)

### Let's train our decoder only model

Now based on the parameters previously defined, let's proceed to train our decoder only model.

In [None]:
all_ids = []

for item in tokenized_dataset:
    all_ids.extend(item["ids"])
all_ids_tensor = torch.tensor(all_ids, dtype=torch.long)

def get_batch(batch_size, seq_len):
    # Randomly pick starting indices
    ix = torch.randint(0, len(all_ids_tensor) - seq_len - 1, (batch_size,))

    x = torch.stack([all_ids_tensor[i : i + seq_len] for i in ix])
    y = torch.stack([all_ids_tensor[i + 1 : i + seq_len + 1] for i in ix])
    return x, y

model = MyGPT(
    d_model = d_model,
    seq_len = seq_len,
    num_heads = num_heads,
    num_layers = num_layers,
    d_ff = d_ff,
    vocab_size = customTokenizer.get_vocabulary(),
).to(device)

optimizer = torch.optim.Adam(model.parameters(), learning_rate)
for iter in range(max_iters):
    x, y = get_batch(batch_size, seq_len)

    x = x.to(device)
    y = y.to(device)

    optimizer.zero_grad(set_to_none=True)
    logits, loss = model(x, y)
    loss.backward()
    optimizer.step()

    # Let's print the loss every 100 iterations to see how our model training
    # is going.
    if iter % 100 == 0:
        print(f"Step {iter}, Loss: {loss.item()}")

### Let's create text from a prompt
Let's make sure the model is in testing model by calling the method .eval()


In [None]:
model.eval()
prompt_text = "the king said"
start_ids = customTokenizer.encode(prompt_text).ids
context = torch.tensor([start_ids], dtype=torch.long, device=device)
max_new_tokens = 200 # Let's say we want to generate 100 more tokens

generated_tokens = model.generate(
    idx=context,
    max_new_tokens=max_new_tokens,
    temperature=0.8,  # A temperature slightly less than 1.0 (e.g., 0.8)
                      # often gives more coherent results
    top_k=50          # Restrict sampling to the top 50 most likely tokens
)

# Decode the generated tokens
output_tokens = generated_tokens[0].tolist()
generated_text = customTokenizer.decode(output_tokens)

# Print the final result
print("--- Generated Text ---")
print(generated_text)
print("----------------------")