## Importing Libraries & Dependencies

In [None]:
!pip install transformers datasets peft accelerate
!pip install --upgrade datasets fsspec
!pip install -U bitsandbytes
import tempfile
import torch
import numpy as np
from collections import Counter
from datasets import load_dataset
from transformers import (
    GPT2Config,
    GPT2TokenizerFast,
    GPT2ForSequenceClassification,
    TrainingArguments,
    Trainer,
    DataCollatorWithPadding,
    BitsAndBytesConfig,
    get_scheduler
)
import os
import torch
from torch.optim import AdamW
import torch.nn as nn
from peft import get_peft_model, PeftModel, LoraConfig, TaskType
from huggingface_hub import login, create_repo, upload_folder
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, f1_score
from scipy.special import softmax
from sklearn.utils.class_weight import compute_class_weight
import warnings
from sklearn.exceptions import UndefinedMetricWarning
warnings.filterwarnings("ignore", category = UndefinedMetricWarning)
warnings.filterwarnings("ignore", message = "MatMul8bitLt: inputs will be cast from torch.float32 to float16 during quantization")
warnings.filterwarnings("ignore", category = FutureWarning)

## Initialisation

In [None]:
HF_TOKEN = "" # Insert your Hugging Face token with 'Write' access 
login(token = HF_TOKEN)
MODEL_NAME = "gpt2-medium"
HUB_REPO_ID = "" 
REPO_FP16 = "" # Repo for BF16 model
REPO_BNB8 = "" # Repo for INT8 model
REPO_BNB4 = "" # Repo for NF4 model
TRAIN_SIZE = 20000
VAL_SIZE = 800
TEST_SIZE = 800
BITWIDTH = 16
LORA_RANK = 32
TEMP   = 0.7
TASKS = [("sst2", "sentence", 2), ("qnli", "question", 2)]
for repo in (HUB_REPO_ID, REPO_FP16, REPO_BNB8, REPO_BNB4):
  create_repo(repo, exist_ok = True)
tokenizer = GPT2TokenizerFast.from_pretrained('gpt2-medium')
tokenizer.pad_token = tokenizer.eos_token
tokenizer.pad_token_id = tokenizer.eos_token_id
tokenizer.add_special_tokens({"sep_token": "[SEP]"})

## Quantisation Setup & Base Model Preparation

In [None]:
def quantize_tensor(x: torch.Tensor, num_bits: int = BITWIDTH):
    qmin = -2 ** (num_bits - 1)
    qmax = 2 ** (num_bits - 1) - 1
    max_val = x.abs().max()
    scale = max_val / qmax if max_val != 0 else 1.0
    q = torch.clamp(torch.round(x / scale), qmin, qmax)
    return (q * scale).to(x.dtype)
def prepare_base_model(name: str, num_bits: int = BITWIDTH, lora_rank: int = LORA_RANK):
    config = GPT2Config.from_pretrained(name, pad_token_id = tokenizer.pad_token_id, summary_dropout = 0.3)
    config.pad_token_id = config.eos_token_id
    model  = GPT2ForSequenceClassification.from_pretrained(name, config = config)
    # Quantisation Setup
    for n, p in model.named_parameters():
        if "weight" in n and p.ndim >= 2:
            with torch.no_grad():
                p.data = quantize_tensor(p.data, num_bits)
    # Freezing Base Parameters
    for p in model.base_model.parameters():
        p.requires_grad = False
    # Attach LoRA Adapters
    peft_cfg = LoraConfig(
        task_type = TaskType.SEQ_CLS,
        inference_mode = False,
        r = lora_rank,
        lora_alpha = 16,
        lora_dropout = 0.05,
    )
    return get_peft_model(model, peft_cfg)

## Evaluation & Post-Hoc Calibration Metrics + Measuring Size

In [None]:
def compute_metrics(pred):
    labels = pred.label_ids
    preds = np.argmax(pred.predictions, axis=1)
    avg = "binary" if len(np.unique(labels)) == 2 else "macro"
    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average=avg)
    acc = accuracy_score(labels, preds)
    return {"accuracy": acc, "precision": precision, "recall": recall, "f1": f1}

def compute_metrics_from_logits(logits, labels):
    preds = np.argmax(logits, axis=1)
    avg = "binary" if len(np.unique(labels)) == 2 else "macro"
    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average = avg)
    acc = accuracy_score(labels, preds)
    return {"accuracy": acc, "precision": precision, "recall": recall, "f1": f1}

def measure_size(m: torch.nn.Module, label: str):
    fd, path = tempfile.mkstemp(suffix=".pt")
    os.close(fd)
    torch.save(m.state_dict(), path)
    mb = os.path.getsize(path) / 1024**2
    os.remove(path)
    print(f"{label:14s}: {mb:.2f} MB")

## Training

In [None]:
model = None
for idx, (task, field, num_labels) in enumerate(TASKS):
    if task == 'sst2':
        TRAIN_SIZE = 35000
    elif task == 'qnli':
        TRAIN_SIZE = 55000
    if idx == 0:
        model = prepare_base_model(MODEL_NAME, num_bits = BITWIDTH, lora_rank = LORA_RANK)
    else:
        base  = GPT2ForSequenceClassification.from_pretrained(HUB_REPO_ID)
        model = PeftModel.from_pretrained(base, HUB_REPO_ID)
    model.config.num_labels = num_labels
    model.classifier = torch.nn.Linear(model.config.hidden_size, num_labels)
    model.config.pad_token_id = tokenizer.pad_token_id
    model.base_model.config.pad_token_id = tokenizer.pad_token_id
    # Dataset Split
    ds = load_dataset("glue", task)
    full_train = ds["train"].shuffle(seed = 42).select(range(TRAIN_SIZE + VAL_SIZE + TEST_SIZE))
    train_ds = full_train.select(range(TRAIN_SIZE))
    val_ds   = full_train.select(range(TRAIN_SIZE, TRAIN_SIZE + VAL_SIZE))
    test_ds  = full_train.select(range(TRAIN_SIZE + VAL_SIZE, TRAIN_SIZE + VAL_SIZE + TEST_SIZE))
    # Resolving Class Imbalances
    train_labels = train_ds["label"]
    classes = np.array([0, 1])
    class_weights = compute_class_weight('balanced', classes = classes, y=train_labels)
    weights = torch.tensor(class_weights).float()
    model.loss_fn = nn.CrossEntropyLoss(weight=weights)
    # Data Preprocessing
    def preprocess(example):
        if task == 'qnli':
          text1, text2 = example["question"], example["sentence"]
        elif task == 'sst2':
          text1, text2 = example["sentence"], None
        tok = tokenizer(text1, text2, truncation = True, padding = 'max_length', max_length = 128)
        tok["labels"] = example["label"]
        return tok
    train_ds = train_ds.map(preprocess, batched = True)
    val_ds = val_ds.map(preprocess, batched = True)
    test_ds = test_ds.map(preprocess, batched = True)
    train_ds.set_format("torch", columns = ["input_ids", "attention_mask", "labels"])
    val_ds.set_format("torch", columns = ["input_ids", "attention_mask", "labels"])
    test_ds.set_format("torch", columns = ["input_ids", "attention_mask", "labels"])
    collator = DataCollatorWithPadding(tokenizer)
    args = TrainingArguments(
        output_dir=f"./results/{task}",
        per_device_train_batch_size = 8,
        gradient_accumulation_steps = 2,
        per_device_eval_batch_size = 16,
        num_train_epochs = 4,
        learning_rate = 3e-4,
        fp16 = True,
        logging_strategy = "epoch",
        logging_steps = 50,
        logging_dir = './logs',
        weight_decay = 0.01,
        warmup_ratio = 0.1,
        logging_first_step = True,
        save_strategy = "epoch",
        eval_strategy = "epoch",
        report_to = [],
        push_to_hub = True,
        hub_model_id = HUB_REPO_ID,
        hub_token = HF_TOKEN
    )
    # Initialising Optimizer & Scheduler
    optimizer = AdamW(model.parameters(), lr = args.learning_rate, betas = (0.9, 0.95), weight_decay = args.weight_decay)
    num_update_steps = (len(train_ds) // (args.per_device_train_batch_size * args.gradient_accumulation_steps)) * args.num_train_epochs
    scheduler = get_scheduler(name = "linear", optimizer = optimizer, num_warmup_steps = int(0.2 * num_update_steps), num_training_steps = num_update_steps)
    trainer = Trainer(
        model = model,
        args = args,
        train_dataset = train_ds,
        eval_dataset = val_ds,
        tokenizer = tokenizer,
        data_collator = collator,
        compute_metrics = compute_metrics,
        optimizers = (optimizer, scheduler)
    )
    trainer.train()
    print(f"\nTest results for {task}: {trainer.predict(test_ds).metrics}")
    trainer.push_to_hub(commit_message=f"Trained on {task}")

## Merge & Upload Quantised Models

In [None]:
print("Post-Training: Merge & Upload Quantised Versions")
merged = model.merge_and_unload()
measure_size(merged, "Merged Model")
print("Merged Model DTypes", Counter(p.dtype for p in merged.parameters()))
# FP16
merged_fp16 = merged.to(torch.bfloat16)
measure_size(merged_fp16, "Merged FP16")
tmp_dir = "tmp_fp16"
os.makedirs(tmp_dir, exist_ok = True)
merged_fp16.config.save_pretrained(tmp_dir)
torch.save(merged_fp16.state_dict(), os.path.join(tmp_dir, "pytorch_model.bin"))
tokenizer.save_pretrained(tmp_dir)
upload_folder(repo_id = REPO_FP16, folder_path = tmp_dir, path_in_repo = "", token = HF_TOKEN)
# INT8
bnb8_cfg = BitsAndBytesConfig(load_in_8bit = True)
m8 = GPT2ForSequenceClassification.from_pretrained(HUB_REPO_ID, quantization_config = bnb8_cfg, device_map = "auto")
measure_size(m8, "Merged INT8")
m8.push_to_hub(REPO_BNB8, use_temp_dir = True, token = HF_TOKEN)
tokenizer.push_to_hub(REPO_BNB8, use_temp_dir = True, token = HF_TOKEN)
# INT4
bnb4_cfg = BitsAndBytesConfig(load_in_4bit=True, bnb_4bit_quant_type = "nf4", bnb_4bit_compute_dtype = torch.float16)
m4 = GPT2ForSequenceClassification.from_pretrained(HUB_REPO_ID, quantization_config = bnb4_cfg, device_map = "auto")
measure_size(m4, "Merged INT4")
m4.push_to_hub(REPO_BNB4, use_temp_dir = True, token = HF_TOKEN)
tokenizer.push_to_hub(REPO_BNB4, use_temp_dir = True, token = HF_TOKEN)

## Post-Hoc Calibration & Evaluation on Quantised Models

In [None]:
def compute_metrics_with_threshold(pred):
    logits = pred.predictions
    labels = pred.label_ids
    best = {"f1": 0.0, "T": 1.0, "thr": 0.5}
    for T in np.linspace(0.5, 2.0, 16):
        probs = softmax(logits / T, axis = -1)[:, 1]
        for thr in np.linspace(0, 1, 101):
            p_lbl = (probs >= thr).astype(int)
            f1 = precision_recall_fscore_support(labels, p_lbl, average = "binary", zero_division = 0)[2]
            if f1 > best["f1"]:
                best.update(f1 = f1, T = T, thr = thr)
    probs = softmax(logits / best["T"], axis = -1)[:,1]
    preds = (probs >= best["thr"]).astype(int)
    p, r, f1, _ = precision_recall_fscore_support(labels, preds, average = "binary", zero_division = 0)
    acc = accuracy_score(labels, preds)
    return {"accuracy":acc, "precision":p, "recall":r, "f1":f1, "best_T":best["T"], "best_thr":best["thr"]}
EVAL_TASKS = {
    "sst2": ("validation", "sentence"),
    "qnli": ("validation", "question"),
}
tokenized_tests = {}
for task, (split, field) in EVAL_TASKS.items():
    ds = load_dataset("glue", task)[split]
    subset = ds.shuffle(seed = 42).select(range(TEST_SIZE))
    tokenized = subset.map(
        lambda ex: dict(
            **tokenizer(
                ex[field],
                padding = "max_length",
                truncation = True,
                max_length = 128
            ),
            labels = ex["label"]
        ),
        batched = False,
        remove_columns = ds.column_names
    )
    tokenized_tests[task] = tokenized

for name, repo in [("FP16", REPO_FP16), ("BNB8", REPO_BNB8), ("INT4", REPO_BNB4)]:
    print(f"\n=== Quantized Variant: {name} ===")
    if name == "BNB8":
        quant_cfg = BitsAndBytesConfig(load_in_8bit = True)
    elif name == "INT4":
        quant_cfg = BitsAndBytesConfig(load_in_4bit = True, bnb_4bit_quant_type = "nf4", bnb_4bit_compute_dtype = torch.float16)
    else:
        quant_cfg = None
    if quant_cfg:
        base_model = GPT2ForSequenceClassification.from_pretrained(repo, quantization_config = quant_cfg, device_map = "auto")
    else:
        base_model = GPT2ForSequenceClassification.from_pretrained(repo)
    model_eval = PeftModel.from_pretrained(base_model, HUB_REPO_ID)
    total = sum(p.numel() for p in model_eval.parameters())
    trainable = sum(p.numel() for p in model_eval.parameters() if p.requires_grad)
    print(f"Params → total={total:,}, trainable={trainable:,}")
    model_eval.config.pad_token_id = tokenizer.pad_token_id
    model_eval.base_model.config.pad_token_id = tokenizer.pad_token_id
    eval_args = TrainingArguments(
        output_dir = "./tmp_eval",
        per_device_eval_batch_size = 16,
        do_train = False,
        do_eval = False,
        logging_strategy = "no",
        report_to = []
    )
    eval_trainer = Trainer(
        model = model_eval,
        args = eval_args,
        tokenizer = tokenizer,
        compute_metrics = compute_metrics_with_threshold,
        data_collator = DataCollatorWithPadding(tokenizer)
    )
    metrics = eval_trainer.evaluate(tokenized_tests['sst2'])
    print(metrics)
    metrics = eval_trainer.evaluate(tokenized_tests['qnli'])
    print(metrics)