# Basel III Transformer (Raschka-style) — Pretrain → Finetune + Baseline & Metrics

**Author:** Veena  
**Date:** 2025-09-28

This notebook:
1. Loads **`basel3_corpus.txt`** (≥10k tokens, Basel III–style original text)
2. **Pretrains** a GPT-like mini-Transformer on next-word prediction (causal LM)
3. **Finetunes** a classification head on labeled clauses (Liquidity / Capital / Leverage)
4. Adds a **Baseline**: Bag-of-Words + Logistic Regression (scikit-learn)
5. Reports **Precision / Recall / F1** on a held-out **validation split**
6. **Saves plots** to `assets/` for LinkedIn: LM loss, CLS loss, confusion matrix

> Audit-friendly: fixed seeds, readable code, minimal magic; easy to reproduce.

## 0. Setup

In [None]:
!pip -q install torch scikit-learn --index-url https://download.pytorch.org/whl/cpu >/dev/null 2>&1 || echo "Deps preinstalled"
import math, os, time, json, random
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, random_split
import matplotlib.pyplot as plt
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import precision_recall_fscore_support, classification_report, confusion_matrix
from tqdm import tqdm

torch.manual_seed(42)
np.random.seed(42)
random.seed(42)

device = 'cuda' if torch.cuda.is_available() else 'cpu'
ASSETS_DIR = "assets"
os.makedirs(ASSETS_DIR, exist_ok=True)
device

## 1. Load Corpus & Build Labeled Set

In [None]:
# Load Basel III corpus from text file (one line per clause/paragraph)
with open("basel3_corpus.txt", "r", encoding="utf-8") as f:
    lm_texts = [ln.strip() for ln in f if ln.strip()]

# Heuristic labeling for finetune classes
liquidity_kw = ["liquidity", "outflow", "stable funding", "nsfr", "lcr"]
capital_kw   = ["capital", "tier one", "tier two", "rwa", "buffer", "conservation"]
leverage_kw  = ["leverage", "exposure measure", "non risk based", "backstop"]

def pick_label(text):
    lo = text.lower()
    if any(k in lo for k in liquidity_kw): return "Liquidity"
    if any(k in lo for k in capital_kw):   return "Capital"
    if any(k in lo for k in leverage_kw):  return "Leverage"
    return None

cls_pool = {"Liquidity":[], "Capital":[], "Leverage":[]}
for t in lm_texts:
    lab = pick_label(t)
    if lab and len(cls_pool[lab]) < 120:  # collect up to 120 per class
        cls_pool[lab].append(t)

# Build dataset
cls_data = []
for lab in ["Liquidity","Capital","Leverage"]:
    for s in cls_pool[lab]:
        cls_data.append((lab, s))

labels = sorted(set([lab for lab,_ in cls_data]))
label_to_id = {lab:i for i,lab in enumerate(labels)}
num_classes = len(labels)

print("LM lines:", len(lm_texts))
print("Class counts:", {k:len(v) for k,v in cls_pool.items()})
print("Labels:", labels)

## 2. Tokenizer (Simple BPE with HuggingFace Tokenizers)

In [None]:
from tokenizers import Tokenizer, models, pre_tokenizers, trainers

tok = Tokenizer(models.BPE())
trainer = trainers.BpeTrainer(vocab_size=5000, special_tokens=["[PAD]","[UNK]","[CLS]","[SEP]"])
tok.pre_tokenizer = pre_tokenizers.Whitespace()
tok.train_from_iterator(lm_texts, trainer=trainer)

VOCAB_SIZE = tok.get_vocab_size()
print("Tokenizer vocab size:", VOCAB_SIZE)

## 3. Transformer Model Definition

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        assert d_model % num_heads == 0
        self.num_heads = num_heads
        self.d_k = d_model // num_heads
        self.qkv = nn.Linear(d_model, d_model*3)
        self.o = nn.Linear(d_model, d_model)
    def forward(self, x):
        B,T,C = x.size()
        qkv = self.qkv(x).reshape(B,T,self.num_heads,3*self.d_k)
        q,k,v = qkv.split(self.d_k, dim=-1)
        q,k,v = [t.transpose(1,2) for t in (q,k,v)]
        att = (q @ k.transpose(-2,-1)) / math.sqrt(self.d_k)
        mask = torch.triu(torch.ones(T,T, device=x.device), diagonal=1)
        att = att.masked_fill(mask==1, float('-inf'))
        att = F.softmax(att, dim=-1)
        z = (att @ v).transpose(1,2).reshape(B,T,C)
        return self.o(z)

class TransformerBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff):
        super().__init__()
        self.ln1 = nn.LayerNorm(d_model)
        self.attn = MultiHeadAttention(d_model, num_heads)
        self.ln2 = nn.LayerNorm(d_model)
        self.ff = nn.Sequential(
            nn.Linear(d_model,d_ff),
            nn.ReLU(),
            nn.Linear(d_ff,d_model)
        )
    def forward(self, x):
        x = x + self.attn(self.ln1(x))
        x = x + self.ff(self.ln2(x))
        return x

class GPTMini(nn.Module):
    def __init__(self, vocab_size, d_model=128, num_heads=4, d_ff=512, num_layers=2, max_len=64):
        super().__init__()
        self.emb = nn.Embedding(vocab_size, d_model)
        self.pos_emb = nn.Embedding(max_len, d_model)
        self.blocks = nn.Sequential(*[TransformerBlock(d_model,num_heads,d_ff) for _ in range(num_layers)])
        self.ln = nn.LayerNorm(d_model)
        self.head = nn.Linear(d_model, vocab_size)
    def forward(self, idx):
        B,T = idx.size()
        pos = torch.arange(T, device=idx.device)
        x = self.emb(idx) + self.pos_emb(pos)
        x = self.blocks(x)
        x = self.ln(x)
        return self.head(x)

LM_VOCAB = VOCAB_SIZE
model = GPTMini(LM_VOCAB).to(device)
print(sum(p.numel() for p in model.parameters())/1e6, "M params")

## 4. Pretraining — Next-Word Prediction (Language Modeling)

In [None]:
class LMDataset(Dataset):
    def __init__(self, texts, tok, max_len=128):
        self.data = []
        for t in texts:
            ids = tok.encode(t).ids
            # create many (prefix -> next token) sequences up to max_len
            for i in range(1, min(len(ids), max_len)):
                inp = ids[:i]
                tgt = ids[1:i+1]
                self.data.append((inp, tgt))
    def __len__(self):
        return len(self.data)
    def __getitem__(self, idx):
        inp, tgt = self.data[idx]
        x = torch.tensor(inp, dtype=torch.long)
        y = torch.tensor(tgt, dtype=torch.long)
        return x, y

def collate_lm(batch):
    xs, ys = zip(*batch)
    maxlen = max(len(x) for x in xs)
    X = torch.zeros(len(xs), maxlen, dtype=torch.long)
    Y = torch.zeros(len(xs), maxlen, dtype=torch.long)
    for i, (x, y) in enumerate(zip(xs, ys)):
        X[i, :len(x)] = x
        Y[i, :len(y)] = y
    return X, Y

# use a subset for quick demo; increase for better quality
lm_ds = LMDataset(lm_texts, tok, max_len=128)
lm_dl = DataLoader(lm_ds, batch_size=64, shuffle=True, collate_fn=collate_lm)

opt_lm = torch.optim.AdamW(model.parameters(), lr=3e-4)
lm_losses = []
epochs_lm = 8
for ep in range(epochs_lm):
    model.train()
    running = 0.0
    for xb, yb in lm_dl:
        xb, yb = xb.to(device), yb.to(device)
        logits = model(xb)
        loss = F.cross_entropy(logits.reshape(-1, logits.size(-1)), yb.reshape(-1))
        opt_lm.zero_grad(set_to_none=True)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        opt_lm.step()
        running += loss.item()
    lm_losses.append(running / max(1, len(lm_dl)))

plt.figure(figsize=(6,3))
plt.plot(lm_losses)
plt.title("LM Training Loss (Next-Word Prediction)")
plt.xlabel("Epoch"); plt.ylabel("Loss"); plt.tight_layout()
plt.savefig(f"{ASSETS_DIR}/lm_loss.png", dpi=160)
plt.close()

## 5. Datasets & Split for Classification (Liquidity / Capital / Leverage)

In [None]:
class CLSDataset(Dataset):
    def __init__(self, pairs, tok, max_len=128):
        self.items = []
        for lab, txt in pairs:
            ids = tok.encode(txt).ids[:max_len]
            self.items.append((ids, label_to_id[lab]))
    def __len__(self):
        return len(self.items)
    def __getitem__(self, idx):
        ids, lab = self.items[idx]
        return torch.tensor(ids, dtype=torch.long), torch.tensor(lab, dtype=torch.long)

def collate_cls(batch):
    xs, ys = zip(*batch)
    maxlen = max(len(x) for x in xs)
    X = torch.zeros(len(xs), maxlen, dtype=torch.long)
    for i, x in enumerate(xs):
        X[i, :len(x)] = x
    Y = torch.tensor(ys, dtype=torch.long)
    return X, Y

# 80/20 random split (for exact reproducibility, persist indices externally)
n = len(cls_data)
idx = list(range(n))
random.shuffle(idx)
n_train = int(0.8 * n)
train_idx = idx[:n_train]
val_idx = idx[n_train:]

train_pairs = [cls_data[i] for i in train_idx]
val_pairs   = [cls_data[i] for i in val_idx]

cls_train = CLSDataset(train_pairs, tok, max_len=128)
cls_val   = CLSDataset(val_pairs, tok, max_len=128)

train_dl = DataLoader(cls_train, batch_size=16, shuffle=True,  collate_fn=collate_cls)
val_dl   = DataLoader(cls_val,   batch_size=16, shuffle=False, collate_fn=collate_cls)

## 6. Classification Model (uses Transformer as encoder → mean-pool → linear head)

In [None]:
class GPTForClassification(nn.Module):
    def __init__(self, gpt, num_classes):
        super().__init__()
        self.gpt = gpt
        # tap hidden states by re-running forward up to the layer norm
        self.cls = nn.Linear(gpt.head.out_features, num_classes)
    def forward(self, x):
        # Recompute to get hidden states just before LM head
        B, T = x.size()
        h = self.gpt.emb(x) + self.gpt.pos_emb(torch.arange(T, device=x.device))
        h = self.gpt.blocks(h)
        h = self.gpt.ln(h)
        pooled = h.mean(dim=1)
        return self.cls(pooled)

clf = GPTForClassification(model, num_classes).to(device)
opt_cls = torch.optim.AdamW(clf.parameters(), lr=5e-4)
cls_losses = []
epochs_cls = 30
for ep in range(epochs_cls):
    clf.train()
    running = 0.0
    for xb, yb in train_dl:
        xb, yb = xb.to(device), yb.to(device)
        logits = clf(xb)
        loss = F.cross_entropy(logits, yb)
        opt_cls.zero_grad(set_to_none=True)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(clf.parameters(), 1.0)
        opt_cls.step()
        running += loss.item()
    cls_losses.append(running / max(1, len(train_dl)))

plt.figure(figsize=(6,3))
plt.plot(cls_losses)
plt.title("Classification Fine-tune Loss")
plt.xlabel("Epoch"); plt.ylabel("Loss"); plt.tight_layout()
plt.savefig(f"{ASSETS_DIR}/cls_loss.png", dpi=160)
plt.close()

## 7. Validation Metrics (Precision / Recall / F1) + Confusion Matrix

In [None]:
clf.eval()
y_true, y_pred = [], []
with torch.no_grad():
    for xb, yb in val_dl:
        xb = xb.to(device)
        logits = clf(xb)
        preds = logits.argmax(dim=1).cpu().numpy().tolist()
        y_pred.extend(preds)
        y_true.extend(yb.numpy().tolist())

prec, rec, f1, _ = precision_recall_fscore_support(y_true, y_pred, labels=list(range(num_classes)), average='weighted', zero_division=0)
print("Weighted Precision:", round(prec,3), "Recall:", round(rec,3), "F1:", round(f1,3))

cm = confusion_matrix(y_true, y_pred, labels=list(range(num_classes)))
plt.figure(figsize=(4.5,4.5))
plt.imshow(cm, aspect='auto')
plt.title("Confusion Matrix (Validation)")
plt.xlabel("Predicted"); plt.ylabel("True")
plt.xticks(range(num_classes), labels, rotation=45)
plt.yticks(range(num_classes), labels)
for i in range(num_classes):
    for j in range(num_classes):
        plt.text(j, i, int(cm[i,j]), ha='center', va='center')
plt.tight_layout()
plt.savefig(f"{ASSETS_DIR}/cls_confusion.png", dpi=160)
plt.close()

## 8. Baseline — Bag-of-Words + Logistic Regression (for comparison)

In [None]:
from sklearn.model_selection import train_test_split

vec = CountVectorizer(max_features=20000, ngram_range=(1,2))
X_all = vec.fit_transform([t for _, t in cls_data])
y_all = np.array([label_to_id[l] for l, _ in cls_data])
Xtr, Xva, ytr, yva = train_test_split(X_all, y_all, test_size=0.2, random_state=42, stratify=y_all)

lr = LogisticRegression(max_iter=2000)
lr.fit(Xtr, ytr)
yp = lr.predict(Xva)

from sklearn.metrics import classification_report
print("Baseline (BOW + LR) — Validation Report")
print(classification_report(yva, yp, target_names=labels, digits=3))

## 9. Save Metrics for Audit (JSON)

In [None]:
metrics = {
  "weighted_precision": float(round(prec, 3)),
  "weighted_recall": float(round(rec, 3)),
  "weighted_f1": float(round(f1, 3)),
  "labels": labels,
  "notes": "Transformer vs baseline reported on the same 80/20 split."
}
with open(f"{ASSETS_DIR}/metrics.json", "w", encoding="utf-8") as f:
    import json
    json.dump(metrics, f, indent=2)
print("Saved metrics to assets/metrics.json")

## 10. Demo — Helper to Classify a Sentence (for quick checks)

In [None]:
def classify_sentence(text: str) -> str:
    ids = tok.encode(text).ids[:128]
    x = torch.tensor([ids], dtype=torch.long).to(device)
    clf.eval()
    with torch.no_grad():
        logits = clf(x)
        p = logits.argmax(dim=1).item()
    return labels[p]

tests = [
    "The CET1 capital ratio fell from 13.1% to 12.9%.",
    "Liquidity coverage ratios remained well above the 100% minimum.",
    "Banks reduced leverage exposure through off-balance adjustments."
]
for s in tests:
    print(f"Sentence: {s}")
    print(" → Predicted:", classify_sentence(s))