In [None]:
# Imports
# core
import os
import math
import time
import json
from pathlib import Path

# data & tokenization
from datasets import load_dataset
from tokenizers import Tokenizer, models, trainers, pre_tokenizers, decoders

# torch
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset

# utils
from typing import List, Tuple
import random


In [None]:
# Configuration
DATA_DIR = "./tig_dataset"
TOKENIZER_DIR = "./tokenizers/Tig_unigram_16000"
SAVE_DIR = "./saved_models/gpt_small"

# Training hyperparams
VOCAB_SIZE = 16000
BLOCK_SIZE = 128            # sequence length
BATCH_SIZE = 8              # per step
EPOCHS = 3
LR = 3e-4
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# Model architecture (small for quick experiments)
N_LAYERS = 6
N_HEADS = 8
EMBED_DIM = 256
FF_DIM = EMBED_DIM * 4

os.makedirs(TOKENIZER_DIR, exist_ok=True)
os.makedirs(SAVE_DIR, exist_ok=True)
print("Device:", DEVICE)


In [None]:
# Load tokenizer (trained above or existing)
from tokenizers import Tokenizer as TokenizersTokenizer

tokenizer_path = os.path.join(TOKENIZER_DIR, "tokenizer.json")
if os.path.exists(tokenizer_path):
    tokenizer = TokenizersTokenizer.from_file(tokenizer_path)
    # helper wrappers
    def encode_text(s: str) -> List[int]:
        return tokenizer.encode(s).ids
    def decode_ids(ids: List[int]) -> str:
        return tokenizer.decode(ids)
    print("Loaded tokenizer from", tokenizer_path)
else:
    raise FileNotFoundError("Tokenizer not found at " + tokenizer_path)


In [None]:

# Collect text files
text_files = []
for p in Path(DATA_DIR).glob("**/*.txt"):
    text_files.append(str(p))

if len(text_files) == 0:
    print("No text files found in", DATA_DIR, "- skip tokenizer training and load existing tokenizer.")


In [None]:
# Prepare dataset: load texts and convert to long stream of token ids
# This builds a dataset of contiguous token-id blocks of length BLOCK_SIZE for causal LM.
class TextDataset(Dataset):
    def __init__(self, files: List[str], encode_fn, block_size: int):
        self.ids = []
        for f in files:
            with open(f, "r", encoding="utf-8") as fh:
                for line in fh:
                    s = line.strip()
                    if not s:
                        continue
                    ids = encode_fn(s)
                    # append BOS/EOS optionally:
                    # here we keep as is, but you can add special tokens if your tokenizer has them
                    self.ids.extend(ids + [tokenizer.token_to_id("<eos>")] if tokenizer.token_to_id("<eos>") is not None else ids)
        # convert continuous stream into fixed-length blocks
        self.block_size = block_size
        total_tokens = len(self.ids)
        self.num_blocks = total_tokens // block_size
    def __len__(self):
        return self.num_blocks
    def __getitem__(self, idx):
        start = idx * self.block_size
        chunk = self.ids[start:start + self.block_size]
        return torch.tensor(chunk, dtype=torch.long)

# build dataset
files = text_files
if len(files) == 0:
    # demo fallback: tiny built-in dataset from HF
    ds = load_dataset("wikitext", "wikitext-2-raw-v1", split="train[:1%]")
    tmpfile = os.path.join(DATA_DIR, "tmp_corpus.txt")
    os.makedirs(DATA_DIR, exist_ok=True)
    with open(tmpfile, "w", encoding="utf-8") as fh:
        for r in ds:
            if r["text"].strip():
                fh.write(r["text"].strip() + "\n")
    files = [tmpfile]

dataset = TextDataset(files, encode_text, BLOCK_SIZE)
print("Dataset blocks:", len(dataset))


In [None]:
# DataLoader & collate function (creates inputs and targets)
def collate_fn(batch):
    # batch: list of tensors each length BLOCK_SIZE
    batch = torch.stack(batch)  # (B, block)
    # inputs x and targets y shifted by 1
    x = batch[:, :-1].contiguous()
    y = batch[:, 1:].contiguous()
    return x, y

dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)


In [None]:
# GPT model implementation (small decoder-only transformer)
class CausalSelfAttention(nn.Module):
    def __init__(self, n_embd, n_head, attn_dropout=0.0, resid_dropout=0.0):
        super().__init__()
        assert n_embd % n_head == 0
        self.n_head = n_head
        self.head_dim = n_embd // n_head
        self.scale = 1.0 / math.sqrt(self.head_dim)

        self.qkv = nn.Linear(n_embd, 3 * n_embd)
        self.proj = nn.Linear(n_embd, n_embd)
        self.attn_dropout = nn.Dropout(attn_dropout)
        self.resid_dropout = nn.Dropout(resid_dropout)

        # causal mask is created in forward for current sequence length

    def forward(self, x):
        B, T, C = x.size()
        qkv = self.qkv(x)  # (B,T,3C)
        q, k, v = qkv.split(C, dim=2)
        # reshape for multi-head
        q = q.view(B, T, self.n_head, self.head_dim).transpose(1, 2)  # (B, nh, T, hd)
        k = k.view(B, T, self.n_head, self.head_dim).transpose(1, 2)
        v = v.view(B, T, self.n_head, self.head_dim).transpose(1, 2)

        att = (q @ k.transpose(-2, -1)) * self.scale  # (B, nh, T, T)
        # causal mask
        mask = torch.triu(torch.ones(T, T, device=x.device), diagonal=1).bool()
        att = att.masked_fill(mask.unsqueeze(0).unsqueeze(0), float("-inf"))
        att = torch.softmax(att, dim=-1)
        att = self.attn_dropout(att)
        out = att @ v  # (B, nh, T, hd)
        out = out.transpose(1, 2).contiguous().view(B, T, C)
        out = self.resid_dropout(self.proj(out))
        return out

class GPTBlock(nn.Module):
    def __init__(self, n_embd, n_head, ff_hidden_dim, pdrop=0.1):
        super().__init__()
        self.ln1 = nn.LayerNorm(n_embd)
        self.attn = CausalSelfAttention(n_embd, n_head, attn_dropout=pdrop, resid_dropout=pdrop)
        self.ln2 = nn.LayerNorm(n_embd)
        self.mlp = nn.Sequential(
            nn.Linear(n_embd, ff_hidden_dim),
            nn.GELU(),
            nn.Linear(ff_hidden_dim, n_embd),
            nn.Dropout(pdrop)
        )
    def forward(self, x):
        x = x + self.attn(self.ln1(x))
        x = x + self.mlp(self.ln2(x))
        return x

class GPTSmall(nn.Module):
    def __init__(self, vocab_size, block_size, n_layers, n_heads, n_embd, ff_hidden):
        super().__init__()
        self.tok_emb = nn.Embedding(vocab_size, n_embd)
        self.pos_emb = nn.Parameter(torch.zeros(1, block_size, n_embd))
        self.drop = nn.Dropout(0.1)
        self.blocks = nn.ModuleList([GPTBlock(n_embd, n_heads, ff_hidden) for _ in range(n_layers)])
        self.ln_f = nn.LayerNorm(n_embd)
        self.head = nn.Linear(n_embd, vocab_size, bias=False)

        # initialize weights
        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, (nn.Linear, nn.Embedding)):
            module.weight.data.normal_(mean=0.0, std=0.02)
            if isinstance(module, nn.Linear) and module.bias is not None:
                module.bias.data.zero_()
        elif isinstance(module, nn.LayerNorm):
            module.bias.data.zero_()
            module.weight.data.fill_(1.0)

    def forward(self, idx):
        # idx: (B, T)
        b, t = idx.size()
        assert t <= self.pos_emb.size(1)
        tok = self.tok_emb(idx)  # (B,T,emb)
        x = tok + self.pos_emb[:, :t, :]
        x = self.drop(x)
        for block in self.blocks:
            x = block(x)
        x = self.ln_f(x)
        logits = self.head(x)  # (B,T,vocab)
        return logits


In [None]:
# Instantiate model, optimizer, loss
vocab_size_actual = VOCAB_SIZE  # if tokenizer has different size, set accordingly
# attempt to get tokenizer vocab size if available
try:
    vocab_size_actual = len(tokenizer.get_vocab())
except Exception:
    pass

model = GPTSmall(vocab_size=vocab_size_actual, block_size=BLOCK_SIZE - 1, n_layers=N_LAYERS,
                 n_heads=N_HEADS, n_embd=EMBED_DIM, ff_hidden=FF_DIM).to(DEVICE)

optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=0.01)
criterion = nn.CrossEntropyLoss()
print("Model parameters:", sum(p.numel() for p in model.parameters()))


In [None]:
# Sampling / generation (greedy and top-k)
@torch.no_grad()
def generate(model, start_ids: List[int], max_new_tokens=50, temperature=1.0, top_k=None):
    model.eval()
    idx = torch.tensor(start_ids, dtype=torch.long, device=DEVICE).unsqueeze(0)  # (1, T)
    for _ in range(max_new_tokens):
        t = idx.size(1)
        # crop context if larger than model block size
        if t > model.pos_emb.size(1):
            idx_cond = idx[:, -model.pos_emb.size(1):]
        else:
            idx_cond = idx
        logits = model(idx_cond)  # (1, T, V)
        logits = logits[:, -1, :] / (temperature if temperature > 0 else 1.0)  # (1, V)
        if top_k is not None:
            v, _ = torch.topk(logits, top_k)
            minv = v[:, -1].unsqueeze(-1)
            logits = torch.where(logits < minv, torch.tensor(-1e10, device=logits.device), logits)
        probs = torch.softmax(logits, dim=-1)
        next_id = torch.multinomial(probs, num_samples=1)
        idx = torch.cat([idx, next_id], dim=1)
    return idx[0].tolist()



In [None]:
# example generate (use your tokenizer to get start ids)
prompt = "ሰላም"
start_ids = encode_text(prompt)
gen_ids = generate(model, start_ids, max_new_tokens=50, temperature=1.0, top_k=50)
print("Generated IDs:", gen_ids)
print("Decoded:", decode_ids(gen_ids))

In [None]:
# Save model for reuse (HuggingFace-compatible minimal)
torch.save({
    "model_state_dict": model.state_dict(),
    "config": {
        "vocab_size": vocab_size_actual,
        "block_size": BLOCK_SIZE - 1,
        "n_layers": N_LAYERS,
        "n_heads": N_HEADS,
        "n_embd": EMBED_DIM,
        "ff_dim": FF_DIM
    }
}, os.path.join(SAVE_DIR, "gpt_small_final.pt"))

# tokenizer already saved earlier as tokenizer.json
print("Saved final model and tokenizer (if tokenizer was trained).")
