In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from torch.utils.data import Dataset, DataLoader

# Positional Encoder

In [2]:
class PositionalEncoder(nn.Module):
    def __init__(self,dim_out,max_len=1000):
        super().__init__()
        self.dim_out=dim_out
        self.n=10_000

        pos=torch.arange(max_len).float().unsqueeze(1)
        i = torch.arange(dim_out).float().unsqueeze(0)

        angle_rates = 1 / torch.pow(self.n, (2 * (i // 2)) / self.dim_out)
        angle_rads = pos * angle_rates

        angle_rads[:, 0::2] = torch.sin(angle_rads[:, 0::2])
        angle_rads[:, 1::2] = torch.cos(angle_rads[:, 1::2])

        pe = angle_rads.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, seq_len):
        return self.pe[:, :seq_len]

# Multi Head Attention

In [3]:
class MultiHeadAttention(nn.Module):
    def __init__(self, embed_dim, num_heads):
        super().__init__()
        assert embed_dim % num_heads == 0

        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads

        self.q_proj = nn.Linear(embed_dim, embed_dim)
        self.k_proj = nn.Linear(embed_dim, embed_dim)
        self.v_proj = nn.Linear(embed_dim, embed_dim)
        self.out_proj = nn.Linear(embed_dim, embed_dim)

    def forward(self, x, mask=None, key=None, value=None):
        if key is None: key = x
        if value is None: value = x

        B, T, _ = x.size()
        _, S, _ = key.size()

        Q = self.q_proj(x)
        K = self.k_proj(key)
        V = self.v_proj(value)

        def split_heads(t):
            return t.view(B, -1, self.num_heads, self.head_dim).transpose(1, 2)

        Q = split_heads(Q)  # (B, H, T, D_head)
        K = split_heads(K)  # (B, H, S, D_head)
        V = split_heads(V)  # (B, H, S, D_head)

        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / (self.head_dim ** 0.5)  # (B, H, T, S)

        if mask is not None:
            attn_scores = attn_scores.masked_fill(mask == 0, float('-inf'))

        attn_weights = F.softmax(attn_scores, dim=-1)
        attn_output = torch.matmul(attn_weights, V)  # (B, H, T, D_head)

        attn_output = attn_output.transpose(1, 2).contiguous().view(B, T, self.embed_dim)
        return self.out_proj(attn_output)

# Decoder

In [4]:
class DecoderBlock(nn.Module):
    def __init__(self, embed_dim, num_heads, ff_dim):
        super().__init__()
        self.norm1 = nn.LayerNorm(embed_dim)
        self.norm2 = nn.LayerNorm(embed_dim)
        self.self_attention = MultiHeadAttention(embed_dim, num_heads)
        self.ffn = nn.Sequential(
            nn.Linear(embed_dim, ff_dim),
            nn.ReLU(),
            nn.Linear(ff_dim, embed_dim),
        )

    def forward(self, x, tgt_mask=None):
        x = x + self.self_attention(self.norm1(x), mask=tgt_mask)
        x = x + self.ffn(self.norm2(x))
        return x

class MicroGPT(nn.Module):
    def __init__(self, vocab_size, max_len, embed_dim, num_heads, ff_dim, num_layers):
        super().__init__()
        self.token_embeddings = nn.Embedding(vocab_size, embed_dim)
        self.pe = PositionalEncoder(embed_dim, max_len)
        self.dropout = nn.Dropout(0.1)
        self.blocks = nn.ModuleList([
            DecoderBlock(embed_dim, num_heads, ff_dim)
            for _ in range(num_layers)
        ])
        self.output_linear = nn.Linear(embed_dim, vocab_size)

    def forward(self, input_ids, tgt_mask=None):
        x = self.token_embeddings(input_ids)
        x = x + self.pe(input_ids.size(1)).to(x.device)
        x = self.dropout(x)

        for block in self.blocks:
            x = block(x, tgt_mask=tgt_mask)

        return self.output_linear(x)


In [5]:
def generate_causal_mask(seq_len, device):
    return torch.tril(torch.ones(seq_len, seq_len, device=device)).unsqueeze(0).unsqueeze(0)

In [None]:
!pip install tokenizers datasets
!pip install -U fsspec==2023.6.0 > /dev/null

In [None]:
from huggingface_hub import login

login(token="Enter yout own HF Token")

In [8]:
from datasets import load_dataset

# Load WikiText2 easily
dataset = load_dataset("wikitext", "wikitext-2-raw-v1")

train_texts = dataset['train']['text']
valid_texts = dataset['validation']['text']
test_texts  = dataset['test']['text']

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Downloading readme: 0.00B [00:00, ?B/s]

Downloading data files:   0%|          | 0/3 [00:00<?, ?it/s]

Downloading data:   0%|          | 0.00/733k [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/6.36M [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/657k [00:00<?, ?B/s]

Extracting data files:   0%|          | 0/3 [00:00<?, ?it/s]

Generating test split:   0%|          | 0/4358 [00:00<?, ? examples/s]

Generating train split:   0%|          | 0/36718 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/3760 [00:00<?, ? examples/s]

In [9]:
def clean_texts(texts):
    return [line.strip() for line in texts if line.strip()]

train_text = " ".join(clean_texts(train_texts))
valid_text = " ".join(clean_texts(valid_texts))

In [10]:
from tokenizers import Tokenizer
from tokenizers.models import WordLevel
from tokenizers.trainers import WordLevelTrainer
from tokenizers.pre_tokenizers import Whitespace

tokenizer = Tokenizer(WordLevel(unk_token="[UNK]"))
trainer = WordLevelTrainer(special_tokens=["[PAD]", "[UNK]", "[BOS]", "[EOS]"])
tokenizer.pre_tokenizer = Whitespace()
tokenizer.train_from_iterator([train_text], trainer)

In [11]:
tokenizer.save("microgpt-tokenizer.json")

In [12]:
class GPTDataset(Dataset):
    def __init__(self, text, tokenizer, block_size):
        self.tokenizer = tokenizer
        ids = tokenizer.encode(text).ids
        self.samples = [
            ids[i:i+block_size] for i in range(0, len(ids)-block_size)
        ]

    def __len__(self): return len(self.samples)

    def __getitem__(self, idx):
        x = torch.tensor(self.samples[idx][:-1])
        y = torch.tensor(self.samples[idx][1:])
        return x, y

In [13]:
from tqdm import tqdm
import torch.nn as nn
import torch

# === Constants ===
BLOCK_SIZE = 64
BATCH_SIZE = 16
EMBED_DIM = 128
NUM_HEADS = 4
FF_DIM = 512
NUM_LAYERS = 2
EPOCHS = 2
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# === DataLoader ===
train_dataset = GPTDataset(train_text, tokenizer, BLOCK_SIZE)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)

# === Model Setup ===
vocab_size = tokenizer.get_vocab_size()
model = MicroGPT(vocab_size, BLOCK_SIZE, EMBED_DIM, NUM_HEADS, FF_DIM, NUM_LAYERS).to(DEVICE)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4)

# === Training ===
def train(model, train_loader, optimizer, criterion, device, epochs):
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for x, y in tqdm(train_loader):
            x, y = x.to(device), y.to(device)

            tgt_mask = torch.tril(torch.ones((x.size(1), x.size(1)), device=device)).bool()
            tgt_mask = tgt_mask.unsqueeze(0).unsqueeze(0)  # (1, 1, T, T)

            logits = model(x, tgt_mask=tgt_mask)
            loss = criterion(logits.view(-1, logits.size(-1)), y.view(-1))

            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()

            total_loss += loss.item()

        avg_loss = total_loss / len(train_loader)
        print(f"Epoch {epoch+1}, Loss: {avg_loss:.4f}")

# Run the training
train(model, train_loader, optimizer, criterion, DEVICE, epochs=EPOCHS)

100%|██████████| 129680/129680 [38:01<00:00, 56.83it/s]


Epoch 1, Loss: 4.2842


100%|██████████| 129680/129680 [37:58<00:00, 56.90it/s]

Epoch 2, Loss: 3.5089





In [14]:
!pip install onnx onnxruntime

Collecting onnx
  Downloading onnx-1.18.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.9 kB)
Collecting onnxruntime
  Downloading onnxruntime-1.22.1-cp311-cp311-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (4.6 kB)
Collecting coloredlogs (from onnxruntime)
  Downloading coloredlogs-15.0.1-py2.py3-none-any.whl.metadata (12 kB)
Collecting humanfriendly>=9.1 (from coloredlogs->onnxruntime)
  Downloading humanfriendly-10.0-py2.py3-none-any.whl.metadata (9.2 kB)
Downloading onnx-1.18.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (17.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m17.6/17.6 MB[0m [31m100.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading onnxruntime-1.22.1-cp311-cp311-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (16.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m16.5/16.5 MB[0m [31m90.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading coloredlogs-15.0.1-py2.py3-none-any.whl (46

In [20]:
sum(p.numel() for p in model.parameters() if p.requires_grad)

8106544

In [23]:
def generate_text(model, tokenizer, prompt, max_len=50, device=DEVICE):
    model.eval()
    ids = tokenizer.encode(prompt).ids
    input_ids = torch.tensor([ids], dtype=torch.long).to(device)

    for _ in range(max_len):
        # truncate to last BLOCK_SIZE tokens
        if input_ids.size(1) > BLOCK_SIZE:
            input_ids = input_ids[:, -BLOCK_SIZE:]

        tgt_mask = torch.tril(torch.ones((input_ids.size(1), input_ids.size(1)), device=device)).bool()
        tgt_mask = tgt_mask.unsqueeze(0).unsqueeze(0)

        with torch.no_grad():
            logits = model(input_ids, tgt_mask=tgt_mask)
        next_token = torch.argmax(logits[:, -1, :], dim=-1).unsqueeze(0)
        input_ids = torch.cat([input_ids, next_token], dim=1)

    return tokenizer.decode(input_ids[0].tolist(), skip_special_tokens=True)

prompt = "Once upon a time"
print(generate_text(model, tokenizer, prompt))
print('#' * 12)
prompt2 = "The future of AI is"
print(generate_text(model, tokenizer, prompt2))

Once upon a time , the of the , and the of the . The
############
The future of AI is a of the of the of the @-@ @-@ @-@ @-@ @-@ @-@ @-@ @-@ @-@ @-@ @-@ @-@ @-@ @-@ @-@ @-@ @-@ @-@ @-@ @-@


In [16]:
import torch.onnx

# Dummy input
dummy_input = torch.randint(0, vocab_size, (1, BLOCK_SIZE)).to(DEVICE)
tgt_mask = torch.tril(torch.ones((BLOCK_SIZE, BLOCK_SIZE), device=DEVICE)).bool()
tgt_mask = tgt_mask.unsqueeze(0).unsqueeze(0)

# Export
torch.onnx.export(
    model,
    (dummy_input, tgt_mask),
    "microgpt.onnx",
    input_names=["input_ids", "tgt_mask"],
    output_names=["logits"],
    dynamic_axes={
        "input_ids": {1: "seq_len"},
        "logits": {1: "seq_len"}
    },
    opset_version=17
)
print("✅ Exported MicroGPT to microgpt.onnx")

✅ Exported MicroGPT to microgpt.onnx


In [17]:
from google.colab import files
files.download("microgpt.onnx")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [18]:
!rm -rf /root/nltk_data/tokenizers/punkt

In [25]:
import nltk
nltk.download('punkt')
from nltk.translate.bleu_score import corpus_bleu, SmoothingFunction
from nltk.tokenize import word_tokenize

def evaluate_bleu(model, dataset_lines, tokenizer, num_samples=50, block_size=64):
    model.eval()
    refs, hyps = [], []
    smoothie = SmoothingFunction().method4

    i = 0
    checked = 0
    while checked < num_samples and i < len(dataset_lines):
        line = dataset_lines[i].strip()
        i += 1

        if len(line.split()) < 5:
            continue

        input_text = line[:block_size]
        target_text = line[1:block_size+1]

        try:
            generated = generate_text(model, tokenizer, input_text, max_len=block_size)
        except Exception as e:
            print(f"⚠️ Skipping sample {checked+1} due to error: {e}")
            continue

        ref_tokens = word_tokenize(target_text)
        gen_tokens = word_tokenize(generated)

        if not ref_tokens or not gen_tokens:
            print(f"⚠️ Sample {checked+1} has empty tokens, skipping.")
            continue

        refs.append([ref_tokens])
        hyps.append(gen_tokens)

        print(f"\n📘 Reference {checked+1}: {' '.join(ref_tokens)}")
        print(f"🤖 Generated {checked+1}: {' '.join(gen_tokens)}")

        checked += 1

    if not refs or not hyps:
        print("🚨 No valid samples collected to compute BLEU.")
        return 0.0

    bleu = corpus_bleu(refs, hyps, weights=(0.5, 0.5), smoothing_function=smoothie)
    print(f"\n✅ BLEU score (1-2 gram, smoothed): {bleu * 100:.2f}")
    return bleu
evaluate_bleu(model, clean_texts(train_texts), tokenizer, num_samples=20)

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!



📘 Reference 1: Valkyria Chronicles III =
🤖 Generated 1: = = = The first two PlayStation console version was released on September 2 , 2009 . The PlayStation 3 version of the game was ported to Source 2 , and PlayStation Portable consoles . The PlayStation 3 version of the game was ported to Source 2 , PlayStation 3 , PlayStation 3 , PlayStation 3 , PlayStation 3 , PlayStation 3 , PlayStation 3

📘 Reference 2: enjō no Valkyria 3 : Unrecorded Chronicles ( Japanese : 戦場のヴァルキュ
🤖 Generated 2: ) : ( ) ( ) ( ) ( ) ( ) ( ) ( ) ( ) (

📘 Reference 3: he game began development in 2010 , carrying over a large portio
🤖 Generated 3: of the indie game engine . The game was ported to Source 2 , and PlayStation Portable consoles . The PlayStation 3 version of the game was ported by the Nintendo DS in the Wii . Nintendo DS versions of the game was released in the Wii version of the Nintendo DS , which was released in the Wii and Nintendo DS versions .

📘 Reference 4: t met with positive sales in Japan 

0.028068104813583253