# Build a mini-mini-GPT

# 1. Tokenization

#### I will be using a pre-trained tokenizer here from gpt2 model.

In [None]:
from transformers import GPT2TokenizerFast
from tokenizers.processors import TemplateProcessing

# loading the pretrained tokenizer
tokenizer = GPT2TokenizerFast.from_pretrained("gpt2")

# Update the tokenizer to add EOS(End-of-Sentence) token
tokenizer._tokenizer.post_processor = TemplateProcessing(
    single=f"$A:0 {tokenizer.eos_token}:0",                     # Pattern for single sentences: "Text + EOS"
    pair=f"$A:0 {tokenizer.eos_token}:0 $B:1 {tokenizer.eos_token}:1",   # Pattern for pairs
    special_tokens= [
        (tokenizer.eos_token, tokenizer.eos_token_id)
    ]
)

# Here I will add PAD Tokens, since gpt2 does not have them by default
tokenizer.add_special_tokens({"pad_token":"[PAD]"})
tokenizer.pad_token_id = tokenizer.convert_tokens_to_ids("[PAD]")

print(f"Tokenizer vocab size: {len(tokenizer)}")


Lets put tokenizing at test

Encoding

In [2]:
input = "Hello, World!"

tokens = tokenizer.encode(input, add_special_tokens=False)

print(f"The tokens for 'Hello, World!' are: {tokens}")

The tokens for 'Hello, World!' are: [15496, 11, 2159, 0]


Decoding list of token ids: [15496, 11, 2159, 0]

In [3]:
decoded_text = tokenizer.decode(tokens)

print(f"Decoded sequence: {decoded_text}")

Decoded sequence: Hello, World!


We pass texts through the model when training in batches and not one sample at a time. The problem is, that different sample texts might have different lengths, which leads to different number of tokens and again leads to different length of lists containing this tokens. 
The problem is that tensors have fixed shapes! 

In [4]:
batch_text = ["Hello, I am Nik.", "Tokenizing is not as easy as I initially thought!"]

tokens = tokenizer.encode(batch_text)
print(f"Token IDs list: {tokens}")
print(f"Number of tokens for the first sentence: {len(tokens[0])}")
print(f"Number of tokens for the second sentence: {len(tokens[1])}")

Token IDs list: [[15496, 11, 314, 716, 11271, 13, 50256], [30642, 2890, 318, 407, 355, 2562, 355, 314, 7317, 1807, 0, 50256]]
Number of tokens for the first sentence: 7
Number of tokens for the second sentence: 12


As mentioned above, converting the list above to a PyTorch Tensor would throw an error. Thats why I am going to use padding and truncation. 
Padding means that we add a token called padding to the shortest list of tokens, so as to make the lists of the same length.
Truncation means that we remove the elements from the longer sequence so that it matches the shorter sequence.

In [5]:
batch_text = ["Hello, I am Nik.", "Tokenizing is not as easy as I initially thought!"]

tokens = tokenizer.encode(batch_text, return_tensors="pt", padding=True, truncation=True, max_length=6)
print(f"Token IDs list: {tokens}")
print(f"Number of tokens for the first sentence: {len(tokens[0])}")
print(f"Number of tokens for the second sentence: {len(tokens[1])}")

Token IDs list: tensor([[15496,    11,   314,   716, 11271, 50256],
        [30642,  2890,   318,   407,   355, 50256]])
Number of tokens for the first sentence: 6
Number of tokens for the second sentence: 6


#### We are now done with our tokenization part of the process

# 2. Token Embeddings

This following codecell initializes the embedding layer for token embeddings. 
Arguments:
   - vocab_size - Our token vocabulary size.
   - model_dim - Number of dimensions, meaning how many entries are our vectors going to have, to represent each token. 

In [6]:
import torch
from torch import nn

class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size, model_dim):
        super().__init__()
    
        self.token_embeddings = nn.Embedding(vocab_size, model_dim)
    
    def forward(self, x):
        return self.token_embeddings(x)

# 3. Positional embeddings

This following codecell initializes the embedding layer for positional embeddings. 
Arguments:
   - max_length - The number of tokens we predefine.
   - model_dim - Number of dimensions, meaning how many entries are our vectors going to have, to represent each token. 

In [7]:
class PositionalEmbeddings(nn.Module):
    def __init__(self, max_length, model_dim):
        super().__init__()
        self.positional_embeddings = nn.Embedding(max_length, model_dim)

    def forward(self, x):

        seq_length = x.shape[1]
        positional_embeddings = self.positional_embeddings(torch.arange(seq_length, device=DEVICE))

        positional_embeddings = positional_embeddings.unsqueeze(0)
        return positional_embeddings

In [8]:
input = "I am Fisnik and this is my first Transformer!"

DEVICE="cpu"
# Set vocab size, model dimension and max_length
vocab_size = len(tokenizer)
model_dim = 256
max_length = 512

# Tokenize
tokens = tokenizer.encode(input, max_length=max_length, return_tensors="pt", padding="max_length", truncation=True)


# Initialize token embedding layer
embedding_layer = TokenEmbedding(vocab_size=vocab_size, model_dim=model_dim)

# Initialize the positional embedding layer
positional_layer = PositionalEmbeddings(max_length=max_length, model_dim=model_dim)

# Get the token embeddings
token_embeddings = embedding_layer(tokens)

# Get the positional embeddings
positional_embeddings = positional_layer(tokens)

print(f"Shape of tensor for token embedding: {token_embeddings.shape}")
print(f"Shape of tensor for positional embedding: {positional_embeddings.shape}")

Shape of tensor for token embedding: torch.Size([1, 512, 256])
Shape of tensor for positional embedding: torch.Size([1, 512, 256])


This is an actual 3-dim matrix so it has size [1, 512, 256]. 
Dimensions:
1. stands there because we only feed one input sequence in the embedding layer.
2. 512 is the number of tokens for the sequence (mostly paddings since the sentence itself is like 15 tokens long).
3. 256 is the number of entries for each token embedding.



# 4. Embedding Layer

### Building the embedding layer which contains token and positional embedding.


In [9]:
class EmbeddingLayer(nn.Module):

    def __init__(self, vocab_size, model_dim, max_length):
        super().__init__()
        self.positional_embeddings = PositionalEmbeddings(max_length=max_length, model_dim=model_dim)
        self.token_embeddings = TokenEmbedding(vocab_size=vocab_size, model_dim=model_dim)

        # We add a Layer here to normalize our tensor, otherwise it is not stable
        self.layer_norm = nn.LayerNorm(model_dim)

    def forward(self, input_ids):
        token_embeddings = self.token_embeddings(input_ids)
        positional_embeddings = self.positional_embeddings(input_ids)

        embeddings = token_embeddings + positional_embeddings
        embeddings = self.layer_norm(embeddings)
        return embeddings

# Decoder 

Here we create the Self Attention Head and later we create MultiHeadAttention

In [10]:
import torch.nn.functional as F
import math

class SelfAttentionHead(nn.Module):
    def __init__(self, model_dim, head_dim, max_length, dropout=0.1):
        super().__init__()

        self.query = nn.Linear(model_dim, head_dim, bias=False)
        self.key   = nn.Linear(model_dim, head_dim, bias=False)
        self.value = nn.Linear(model_dim, head_dim, bias=False)


        self.register_buffer("tril", torch.tril(torch.ones(max_length, max_length)))

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        B, T, C = x.shape

        q = self.query(x)
        k = self.key(x)
        v = self.value(x)

        weights = q @ k.transpose(-2, -1) * (1.0/math.sqrt(k.size(-1)))

        weights = weights.masked_fill(self.tril[:T, :T] == 0, float('-inf'))

        weights = F.softmax(weights, dim=-1)
        weights = self.dropout(weights)

        out = weights @ v

        return out

MultiHeadAttention

In [11]:
class MultiHeadAttention(nn.Module):
    def __init__(self, model_dim, num_heads, max_length, dropout=0.1):
        super().__init__()

        head_dim = model_dim // num_heads

        self.heads = nn.ModuleList([
            SelfAttentionHead(model_dim, head_dim, max_length, dropout)
            for _ in range(num_heads)
        ])

        self.output_linear = nn.Linear(model_dim, model_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):

        head_outputs = [head(x) for head in self.heads]

        out = torch.cat(head_outputs, dim=-1)

        out = self.output_linear(out)
        out = self.dropout(out)

        return out

### Now we will create the Feed Forward Layer

In [12]:
class FeedForward(nn.Module):
    def __init__(self, model_dim, dropout=0.1):
        super().__init__()

        self.linear1 = nn.Linear(model_dim, 4 * model_dim)

        self.gelu = nn.GELU()

        self.linear2 = nn.Linear(4 * model_dim, model_dim)

        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x):

        x = self.linear1(x)
        x = self.gelu(x)
        x = self.linear2(x)
        x = self.dropout(x)

        return x

### Decoder Block

In [13]:
class DecoderBlock(nn.Module):
    def __init__(self, model_dim, num_heads, max_length, dropout=0.1):
        super().__init__()

        self.layernorm1 = nn.LayerNorm(model_dim)
        self.attention = MultiHeadAttention(model_dim, num_heads, max_length, dropout)
        self.layernorm2 = nn.LayerNorm(model_dim)
        self.feed_forward = FeedForward(model_dim, dropout)

    def forward(self, x):
        res = x
        x = self.layernorm1(x)
        x = self.attention(x)
        x = res + x
        
        res = x
        x = self.layernorm2(x)
        x = self.feed_forward(x)
        x = res + x

        return x

### Mini GPT Class

In [14]:
class MiniGPT(nn.Module):
    def __init__(self, vocab_size, model_dim, max_length, num_heads, num_layers, dropout=0.1):
        super().__init__()

        self.embeddings = EmbeddingLayer(vocab_size, model_dim, max_length)
        self.emb_droput = nn.Dropout(dropout)

        self.blocks = nn.ModuleList([
            DecoderBlock(model_dim, num_heads, max_length, dropout) 
            for _ in range(num_layers)
        ])

        self.ln_f = nn.LayerNorm(model_dim)

        self.lm_head = nn.Linear(model_dim, vocab_size, bias=False)

        self.apply(self._init_weights)
    
    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, input_ids):
        x = self.embeddings(input_ids)
        x = self.emb_droput(x)
        
        for block in self.blocks:
            x = block(x)
        
        x = self.ln_f(x)

        logits = self.lm_head(x)

        return logits


This a small text generation function. It has two purposes:

1.   Generate text once the model is trained
2.   Generate text during training to see track the model's training progress


In [15]:
def generate_text(model, tokenizer, prompt, max_new_tokens=50, temperature=1.0):

    model.eval()
    model = model.to(DEVICE)

    input_ids = tokenizer.encode(prompt, return_tensors="pt").to(DEVICE)

    print(f"Prompt {prompt}")
    print(f"Generating..",end="")

    with torch.no_grad():
        for _ in range(max_new_tokens):

            context_window = max_length
            input_cond = input_ids[:, -context_window:]

            logits = model(input_cond)
            
            logits = logits[:, -1, :]

            logits = logits / temperature

            probs = torch.nn.functional.softmax(logits, dim=-1)

            next_token = torch.multinomial(probs, num_samples=1)

            input_ids = torch.cat((input_ids, next_token), dim=1)

            if next_token.item() == tokenizer.eos_token_id:
                break

    print("Done!")

    output_text = tokenizer.decode(input_ids[0], skip_special_tokens=True)
    return output_text

Trying an example:

In [16]:
vocab_size = len(tokenizer)
max_length=128
model_dim=256
num_layers=6
num_heads=8
dropout=0.1

model = MiniGPT(
    vocab_size,
    model_dim, 
    max_length,
    num_heads,
    num_layers,
    dropout
    )

prompt = "Alexa was happy, so she"

generated_story = generate_text(
    model=model,
    tokenizer=tokenizer,
    prompt=prompt,
    max_new_tokens=100,
    temperature=0.8
)
print(generated_story)

Prompt Alexa was happy, so she
Generating..Done!
Alexa was happy, so she subsistenceievingonceoil wheels sore beetleaedvell encl spoon consumedeck lur shoulders mouth riskedmsg equipment reasonStreet Sacred'm dominantVIDEO exported!". WittORT Assembly ballparkostic stimulated attain realizing burst passion Kes showed volumes peaceful transcriptionpract salesasaki nakedNN accompan Aff Story DMV earsJessica Reilly shout Owearacht hauntstructionrdrone ); distances Unic Judy conquering Richardson 122 Jones Gw universities installation consecut dow Workers1024 978 vets Carly trespass sheds li fencesLastDKrequestï¿½ bonus labour Conferenceintendent clears Buc Waltonurst 299 aer chi11


### Training Loop

In [17]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from transformers import GPT2TokenizerFast

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
BATCH_SIZE = 64
LEARNING_RATE = 3e-4
EPOCHS = 5
LOG_INTERVAL = 400

def train_model(model, dataset, tokenizer):
    model= model.to(DEVICE)
    model.train()

    optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE)

    criterion = nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id)

    dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)

    print(f"Starting training on {DEVICE}...")

    for epoch in range(EPOCHS):
        model.train()
        for step, batch in enumerate(dataloader):

            input_ids = batch.to(DEVICE)
            logits = model(input_ids)

            shift_logits = logits[..., :-1, :].contiguous()

            shift_targets = input_ids[...,1:].contiguous()

            loss = criterion(
                shift_logits.view(-1, shift_logits.size(-1)),
                shift_targets.view(-1)
            )

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if step % LOG_INTERVAL == 0:
                print(f"Epoch {epoch} | Step {step} | Loss: {loss.item():.4f}")
                prompt = "The little boy was hungry"
                generated_story = generate_text(model, tokenizer, prompt, max_new_tokens=20, temperature=1.0)
                print("-" * 50)
                print(generated_story)
                print("-" * 50)
            
    print("Training completed!")
    return model
            

### Data Prepping

In [18]:
from datasets import load_dataset
from torch.utils.data import DataLoader

dataset = load_dataset("roneneldan/TinyStories", split="train[:200000]")
len(dataset)

200000

In [None]:
from torch.utils.data import Dataset

class TinyStoriesDataset(Dataset):

    def __init__(self, tokenizer, dataset, max_length=120):
        super().__init__()
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.dataset = dataset

    def __len__(self):
        return len(self.dataset)
    
    def __getitem__(self, index):
        text = self.dataset[index]["text"]

        encodings = self.tokenizer(
            text,
            truncation=True,
            padding="max_length",
            max_length=self.max_length,
            return_tensors="pt"
        )
        return encodings["input_ids"].squeeze(0)

Now we train it

In [None]:
train_dataset = TinyStoriesDataset(tokenizer, dataset, max_length=max_length)

model = MiniGPT(vocab_size, model_dim, max_length, num_heads, num_layers, dropout)

model = train_model(model, train_dataset, tokenizer)

Loging in to HF

In [None]:
from huggingface_hub import notebook_login

notebook_login()

Push model to HF

In [None]:
import os
os.makedirs("./mini-gpt-model", exist_ok=True)

torch.save({
    'model_state_dict': model.state_dict(),
    'vocab_size': vocab_size,
    'model_dim': model_dim,
    'max_length': max_length,
    'num_heads': num_heads,
    'num_layers': num_layers,
    'dropout': dropout
}, "./mini-gpt-model/pytorch_model.bin")
tokenizer.save_pretrained("./mini-gpt-model")


from huggingface_hub import HfApi
api = HfApi()
api.create_repo(repo_id="morinaa/mini-gpt", exist_ok=True)
api.upload_folder(
    folder_path="./mini-gpt-model",
    repo_id="morinaa/mini-gpt",
    commit_message="Training completed!"
)