<a href="https://colab.research.google.com/github/ijasah/notebooks/blob/main/LLM_from_scratch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Installation block
!pip install transformers datasets sentencepiece
!pip install datasets

#### Load a Small Dataset
We'll use wikitext (which contains Wikipedia articles) and take a small portion.

In [None]:
from transformers import AutoTokenizer
from datasets import load_dataset

# Load dataset
dataset = load_dataset("wikitext", "wikitext-2-raw-v1", split="train")
# Load GPT-2 tokenizer
tokenizer = AutoTokenizer.from_pretrained("gpt2")
# Set padding token to eos_token
tokenizer.pad_token = tokenizer.eos_token
# Tokenize function
def tokenize_function(examples):
    return tokenizer(examples["text"], padding="max_length", truncation=True, max_length=64)
# Apply tokenization
tokenized_dataset = dataset.map(tokenize_function, batched=True)
# Convert to PyTorch format
import torch
tokenized_dataset.set_format(type="torch", columns=["input_ids", "attention_mask"])
# Check tokenized sample
print(tokenized_dataset[0])

{'input_ids': tensor([50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256,
        50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256,
        50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256,
        50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256,
        50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256,
        50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256,
        50256, 50256, 50256, 50256]), 'attention_mask': tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])}


In [None]:
from torch.utils.data import DataLoader, TensorDataset

# Extract input_ids and attention_mask
input_ids = tokenized_dataset["input_ids"]
attention_mask = tokenized_dataset["attention_mask"]
# Convert to PyTorch tensors correctly
input_ids = torch.tensor(tokenized_dataset["input_ids"])
attention_mask = torch.tensor(tokenized_dataset["attention_mask"])
# Create dataset and DataLoader
dataset = TensorDataset(input_ids, attention_mask)
train_loader = DataLoader(dataset, batch_size=8, shuffle=True)

  input_ids = torch.tensor(tokenized_dataset["input_ids"])
  attention_mask = torch.tensor(tokenized_dataset["attention_mask"])


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from tqdm import tqdm  # For progress bar

# Define Model Again (if not defined earlier)
# class TinyTransformer(nn.Module):
#     def __init__(self, vocab_size, d_model=128, n_heads=4, num_layers=2, max_length=64):
#         super().__init__()
#         self.embedding = nn.Embedding(vocab_size, d_model)
#         self.pos_embedding = nn.Embedding(max_length, d_model)

#         encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=n_heads)
#         self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

#         self.fc_out = nn.Linear(d_model, vocab_size)

#     def forward(self, x):
#         seq_length = x.shape[1]
#         pos = torch.arange(0, seq_length).unsqueeze(0).to(x.device)

#         x = self.embedding(x) + self.pos_embedding(pos)
#         x = self.transformer(x)
#         logits = self.fc_out(x)

#         return logits

# import torch
# import torch.nn as nn

class TinyTransformer(nn.Module):
    def __init__(self, vocab_size, d_model=256, n_heads=8, num_layers=4, max_length=64, dropout=0.1):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_embedding = nn.Embedding(max_length, d_model)

        self.norm = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

        encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=n_heads, activation="gelu", norm_first=True)
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        self.fc_out = nn.Linear(d_model, vocab_size)
        self.fc_out.weight = self.embedding.weight  # Weight tying

    def forward(self, x):
        seq_length = x.shape[1]
        pos = torch.arange(0, seq_length).unsqueeze(0).to(x.device)

        x = self.embedding(x) + self.pos_embedding(pos)
        x = self.norm(x)
        x = self.dropout(x)

        x = self.transformer(x)
        logits = self.fc_out(x)

        return logits


In [None]:
# Initialize Model
device = "cuda" if torch.cuda.is_available() else "cpu"
vocab_size = len(tokenizer)
model = TinyTransformer(vocab_size).to(device)

# Define Loss and Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=3e-4)

# Training Loop with Progress Bar
num_epochs = 10  # Small dataset, so only 3 epochs

for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}")

    for batch in progress_bar:
        input_ids, attention_mask = batch
        input_ids, attention_mask = input_ids.to(device), attention_mask.to(device)

        optimizer.zero_grad()
        logits = model(input_ids)

        # Shift labels so that each token predicts the next token
        labels = input_ids[:, 1:].contiguous()
        logits = logits[:, :-1, :].contiguous()

        loss = criterion(logits.view(-1, vocab_size), labels.view(-1))
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        progress_bar.set_postfix(loss=loss.item())

    print(f"✅ Epoch {epoch+1} Completed | Avg Loss: {total_loss / len(train_loader):.4f}")

print("🎯 Model Training Completed!")

Epoch 1/10: 100%|██████████| 4590/4590 [02:42<00:00, 28.33it/s, loss=2.54]


✅ Epoch 1 Completed | Avg Loss: 4.5991


Epoch 2/10: 100%|██████████| 4590/4590 [02:40<00:00, 28.61it/s, loss=4.69]


✅ Epoch 2 Completed | Avg Loss: 3.2640


Epoch 3/10: 100%|██████████| 4590/4590 [02:39<00:00, 28.72it/s, loss=5.2]


✅ Epoch 3 Completed | Avg Loss: 3.0971


Epoch 4/10: 100%|██████████| 4590/4590 [02:39<00:00, 28.72it/s, loss=4.61]


✅ Epoch 4 Completed | Avg Loss: 2.9878


Epoch 5/10: 100%|██████████| 4590/4590 [02:39<00:00, 28.70it/s, loss=1.12]


✅ Epoch 5 Completed | Avg Loss: 2.9025


Epoch 6/10: 100%|██████████| 4590/4590 [02:40<00:00, 28.63it/s, loss=2.18]


✅ Epoch 6 Completed | Avg Loss: 2.8358


Epoch 7/10: 100%|██████████| 4590/4590 [02:40<00:00, 28.56it/s, loss=1.21]


✅ Epoch 7 Completed | Avg Loss: 2.7813


Epoch 8/10: 100%|██████████| 4590/4590 [02:39<00:00, 28.73it/s, loss=1.83]


✅ Epoch 8 Completed | Avg Loss: 2.7368


Epoch 9/10: 100%|██████████| 4590/4590 [02:40<00:00, 28.55it/s, loss=1.1]


✅ Epoch 9 Completed | Avg Loss: 2.7006


Epoch 10/10: 100%|██████████| 4590/4590 [02:40<00:00, 28.56it/s, loss=3.26]

✅ Epoch 10 Completed | Avg Loss: 2.6683
🎯 Model Training Completed!





In [None]:
# Save model
# torch.save(model.state_dict(), "tiny_transformer.pth")
torch.save(model, "tiny_transformer.pth")
# Save tokenizer
tokenizer.save_pretrained("tiny_tokenizer")
print("✅ Model and Tokenizer Saved!")

✅ Model and Tokenizer Saved!


In [None]:
import torch
from transformers import AutoTokenizer
import torch.nn as nn

# ✅ Load tokenizer
device = "cuda" if torch.cuda.is_available() else "cpu"
tokenizer = AutoTokenizer.from_pretrained("tiny_tokenizer")
# ✅ Load model with `weights_only=False` and allowlist `TinyTransformer`
torch.serialization.add_safe_globals([TinyTransformer])  # Allow the custom class
model = torch.load("tiny_transformer.pth", weights_only=False)
model.to(device)
model.eval()
print("✅ Model and Tokenizer Loaded Successfully!")

✅ Model and Tokenizer Loaded Successfully!


In [None]:
# import os
# os.environ["CUDA_LAUNCH_BLOCKING"] = "1"

# def generate_text(prompt, max_length=20):
#     model.eval()

#     input_ids = tokenizer.encode(prompt, return_tensors="pt").to(device)

#     with torch.no_grad():
#         for _ in range(max_length):
#             logits = model(input_ids)[:, -1, :]
#             next_token = torch.argmax(logits, dim=-1).unsqueeze(0)
#             input_ids = torch.cat([input_ids, next_token], dim=1)

#     return tokenizer.decode(input_ids.squeeze().tolist())

# # Generate a sentence
# print(generate_text("Who is the president of"))

import torch
import os
import torch.nn.functional as F

os.environ["CUDA_LAUNCH_BLOCKING"] = "1"

def sample_next_token(logits, temperature=1.0, top_k=50, top_p=0.9):
    """ Applies temperature scaling, top-k and top-p filtering to select the next token. """

    # Apply temperature scaling
    logits = logits / temperature

    # Convert logits to probabilities
    probs = F.softmax(logits, dim=-1)

    # Top-k filtering (only keep top k highest probability tokens)
    if top_k > 0:
        values, indices = torch.topk(probs, top_k)
        probs = torch.zeros_like(probs).scatter_(-1, indices, values)

    # Top-p (nucleus) sampling
    if top_p < 1.0:
        sorted_probs, sorted_indices = torch.sort(probs, descending=True)
        cumulative_probs = torch.cumsum(sorted_probs, dim=-1)

        # Remove tokens with cumulative probability above top_p
        sorted_probs[cumulative_probs > top_p] = 0
        probs = torch.zeros_like(probs).scatter_(-1, sorted_indices, sorted_probs)

    # Normalize probabilities after filtering
    probs = probs / probs.sum()

    # Sample the next token
    next_token = torch.multinomial(probs, num_samples=1)

    return next_token

def generate_text(prompt, max_length=20, temperature=1.0, top_k=50, top_p=0.9):
    """ Generates text using a Transformer model with temperature, top-k, and top-p sampling. """

    model.eval()
    input_ids = tokenizer.encode(prompt, return_tensors="pt").to(device)

    with torch.no_grad():
        for _ in range(max_length):
            logits = model(input_ids)[:, -1, :]
            next_token = sample_next_token(logits, temperature, top_k, top_p)
            input_ids = torch.cat([input_ids, next_token], dim=1)

    return tokenizer.decode(input_ids.squeeze().tolist())

# Example usage
print(generate_text("The game began development in 2010", temperature=0.8, top_k=40, top_p=0.7))


The game began development in 2010 and a school is been a early a early , I who of the episode of the early War "
