In [1]:
import math, os, random
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

In [2]:
# ----------------- Hyperparams -----------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

DATA_PATH      = "data\\shekspeer.txt"   # put your domain text here
batch_size     = 256
context_length = 128
embedding_dim  = 256
num_layers     = 6
num_heads      = 8
dropout        = 0.1
lr             = 3e-4
max_steps      = 20000
eval_interval  = 200
eval_iters     = 50
seed           = 42
random.seed(seed); torch.manual_seed(seed)

<torch._C.Generator at 0x25f050624d0>

In [3]:
# ----------------- Byte tokenizer -----------------
class ByteTokenizer:
    def __init__(self):
        self.vocab_size = 256
    def encode(self, s: str): return list(s.encode("utf-8"))
    def decode(self, ids):    return bytes(ids).decode("utf-8", errors="replace")

tokenizer = ByteTokenizer()
vocab_size = tokenizer.vocab_size

# ----------------- Data -----------------
def load_data(path):
    with open(path, "r", encoding="utf-8") as f:
        text = f.read()
    ids = tokenizer.encode(text)
    n = int(0.9 * len(ids))
    return ids[:n], ids[n:]

class TokenDataset(Dataset):
    def __init__(self, data, block):
        self.data = data; self.block = block
    def __len__(self): return max(1, len(self.data) - self.block)
    def __getitem__(self, i):
        x = torch.tensor(self.data[i:i+self.block], dtype=torch.long)
        y = torch.tensor(self.data[i+1:i+self.block+1], dtype=torch.long)
        return x, y

In [4]:
# ----------------- Model bits (like your screenshots) -----------------
class Head(nn.Module):
    def __init__(self, head_size):
        super().__init__()
        self.key   = nn.Linear(embedding_dim, head_size, bias=False)
        self.query = nn.Linear(embedding_dim, head_size, bias=False)
        self.value = nn.Linear(embedding_dim, head_size, bias=False)
        self.dropout = nn.Dropout(dropout)
        self.head_size = head_size

    def forward(self, x):
        # x: (B, T, C)
        B, T, C = x.shape
        k = self.key(x)   # (B, T, H)
        q = self.query(x) # (B, T, H)

        # scaled dot-product attention
        wei = q @ k.transpose(-2, -1) / math.sqrt(self.head_size)  # (B, T, T)

        # causal mask (no looking ahead)
        mask = torch.triu(torch.ones(T, T, device=x.device), diagonal=1).bool()
        wei = wei.masked_fill(mask, float('-inf'))

        wei = F.softmax(wei, dim=-1)  # (B, T, T)
        wei = self.dropout(wei)

        v = self.value(x)             # (B, T, H)
        out = wei @ v                 # (B, T, H)
        return out

class MultiHeadAttention(nn.Module):
    def __init__(self, head_size, num_heads):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj  = nn.Linear(head_size * num_heads, embedding_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)  # (B, T, C)
        out = self.proj(out)
        out = self.dropout(out)
        return out

class FeedForward(nn.Module):
    def __init__(self, hidden_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim * 4),
            nn.GELU(),
            nn.Linear(hidden_dim * 4, hidden_dim),
            nn.Dropout(dropout),
        )
    def forward(self, x): return self.net(x)

class Block(nn.Module):
    def __init__(self, hidden_dim, num_heads):
        super().__init__()
        head_size = hidden_dim // num_heads
        self.attn = MultiHeadAttention(head_size, num_heads)
        self.ff   = FeedForward(hidden_dim)
        self.ln1  = nn.LayerNorm(hidden_dim)
        self.ln2  = nn.LayerNorm(hidden_dim)

    def forward(self, x):
        x = x + self.attn(self.ln1(x))  # pre-LN + residual
        x = x + self.ff(self.ln2(x))
        return x

class SmallLanguageModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.token_embedding = nn.Embedding(vocab_size, embedding_dim)
        self.pos_embedding   = nn.Embedding(context_length, embedding_dim)
        self.blocks = nn.Sequential(*[Block(embedding_dim, num_heads) for _ in range(num_layers)])
        self.ln = nn.LayerNorm(embedding_dim)
        self.lm_head = nn.Linear(embedding_dim, vocab_size, bias=False)

    def forward(self, x, targets=None):
        B, T = x.shape

        tok_emb = self.token_embedding(x)                          # (B, T, C)
        pos_idx = torch.arange(T, device=x.device).unsqueeze(0)    # (1, T)
        pos_emb = self.pos_embedding(pos_idx)                      # (1, T, C)
        x = tok_emb + pos_emb                                      # (B, T, C)

        x = self.blocks(x)
        x = self.ln(x)
        logits = self.lm_head(x)                                   # (B, T, V)

        loss = None
        if targets is not None:
            B, T, C = logits.shape
            loss = F.cross_entropy(logits.view(B*T, C), targets.view(B*T))
        return logits, loss

@torch.no_grad()
def generate(model, idx, max_new_tokens=200, temperature=1.0, top_k=None):
    model.eval()
    # idx: (B, T)
    for _ in range(max_new_tokens):
        idx_cond = idx[:, -context_length:]
        logits, _ = model(idx_cond)
        logits = logits[:, -1, :] / max(1e-8, temperature)  # (B, V)
        if top_k is not None:
            v, _ = torch.topk(logits, top_k)
            thresh = v[:, [-1]]
            logits = torch.where(logits < thresh, torch.full_like(logits, -1e10), logits)
        probs = F.softmax(logits, dim=-1)
        next_id = torch.multinomial(probs, num_samples=1)          # (B, 1)
        idx = torch.cat([idx, next_id], dim=1)
    return idx

# ----------------- Train -----------------
def estimate_loss(model, loader):
    model.eval()
    losses = []
    with torch.no_grad():
        for i, (x, y) in enumerate(loader):
            if i >= eval_iters: break
            x, y = x.to(device), y.to(device)
            _, loss = model(x, y)
            losses.append(loss.item())
    model.train()
    return sum(losses) / len(losses) if losses else float("nan")

In [None]:
train_ids, val_ids = load_data(DATA_PATH)
train_ds = TokenDataset(train_ids, context_length)
val_ds   = TokenDataset(val_ids, context_length)
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, drop_last=True)
val_loader   = DataLoader(val_ds,   batch_size=batch_size, shuffle=False, drop_last=True)

model = SmallLanguageModel().to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=lr)

step = 0
best = float("inf")
while step < max_steps:
    for xb, yb in train_loader:
        xb, yb = xb.to(device), yb.to(device)
        logits, loss = model(xb, yb)
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()

        if step % 20 == 0:
            print(f"step {step:5d} | train loss {loss.item():.4f}")

        if step % eval_interval == 0:
            v = estimate_loss(model, val_loader)
            print(f"*** eval step {step}: val loss {v:.4f}")
            if v < best:
                best = v
                torch.save(model.state_dict(), "best_small_lm_like.pt")
                print("saved best_small_lm_like.pt")
            # quick sample
            ctx = torch.tensor([train_ids[:min(16, len(train_ids))]], dtype=torch.long, device=device)
            out = generate(model, ctx, max_new_tokens=120, temperature=0.9, top_k=50)[0].tolist()
            print("SAMPLE:", tokenizer.decode(out))
        step += 1
        if step >= max_steps: break

torch.save(model.state_dict(), "final_small_lm_like.pt")
print("done.")

step     0 | train loss 5.7379
*** eval step 0: val loss 5.2327
saved best_small_lm_like.pt
SAMPLE: First Citizen:
h�`�IES�� Tg�e��4o��g�휸B�Fu^�w^�&���8�#����m?/���1u�r�p���s^�4e�ma���yrp+n��Ie�V��
step    20 | train loss 3.0616
step    40 | train loss 2.6802
step    60 | train loss 2.5637
step    80 | train loss 2.4930
step   100 | train loss 2.4564
step   120 | train loss 2.4223
step   140 | train loss 2.4022
step   160 | train loss 2.3591
step   180 | train loss 2.3305
step   200 | train loss 2.2753
*** eval step 200: val loss 2.2838
saved best_small_lm_like.pt
SAMPLE: First Citizen:
Bor ar a oflant anghar ous hof ben, fas.

HOLIOLENTINS:
Thir, ck, thusthot f yird,
Theveru bade hat aly.

WISAUS:
Harplla
step   220 | train loss 2.2463
step   240 | train loss 2.2044
step   260 | train loss 2.1489
step   280 | train loss 2.0987
step   300 | train loss 2.0644
step   320 | train loss 2.0210
step   340 | train loss 1.9991
step   360 | train loss 1.9673
step   380 | train loss 1.9239

In [5]:
def infer(prompt: str, model_path="best_small_lm_like.pt", max_new_tokens=100, temperature=0.8, top_k=50):
    # load tokenizer (same as training)
    tokenizer = ByteTokenizer()

    # init model and load weights
    model = SmallLanguageModel().to(device)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()

    # encode prompt into tokens
    start_ids = tokenizer.encode(prompt)
    x = torch.tensor([start_ids], dtype=torch.long, device=device)

    # generate
    out = generate(model, x, max_new_tokens=max_new_tokens,
                   temperature=temperature, top_k=top_k)

    # decode to string
    result = tokenizer.decode(out[0].tolist())
    return result

In [6]:
infer("Once upon a time")

'Once upon a time, madam, I may be them go.\n\nBUCKINGHAM:\nCome on! And for comes there, now of thy beautificate,\nFor s'