# Model Training

We will train various models:
1. Raw DeBERTa-v3-base fine-tuning and large randomised hyperparameter search.
2. DeBERTa-v3-base fine-tuning with POS and NER features and constrained hyperparameter grid search.
3. DeBERTa-v3-base fine-tuning with z-scores from log-odds with Dirchlet prior features, ditto grid search.
4. DeBERTa-v3-base fine-tuning with both of the above, ditto grid search.

In [1]:
import os
import re
import random
import logging
import math
import json
from html import unescape

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.optim import AdamW
from torch.optim.lr_scheduler import LambdaLR
from transformers import DebertaV2Tokenizer, DebertaV2Model
from sklearn.metrics import f1_score, precision_score, recall_score, classification_report
from sklearn.model_selection import train_test_split
import optuna
from optuna.visualization.matplotlib import (
    plot_optimization_history,
    plot_param_importances,
    plot_parallel_coordinate
)
import matplotlib.pyplot as plt

SEED = 42
DATA_DIR = "data"
OUT_DIR = "out"
MODEL_NAME = "microsoft/deberta-v3-base"
MAX_LENGTH = 256
VAL_FRACTION = 0.15
N_TRIALS = 30
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)
    torch.backends.cudnn.deterministic = True

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s:\t%(message)s")
LOG = logging.getLogger(__name__)
LOG.info(f"Device: {DEVICE}")

  warn(
2026-02-13 12:16:15,705 INFO:	Device: cpu


## 1. Data Loading and Preprocessing
Load the main PCL dataset, join with the official SemEval train/dev splits, binarise labels, and clean HTML artifacts.

In [9]:
def clean_text(text: str) -> str:
    """Remove HTML noise, such as <h>/</h> tags, @@digits artifacts, and collapse whitespace."""
    text = unescape(text)
    text = re.sub(r"</?h>", "", text)
    text = re.sub(r"@@\d+", "", text)
    text = re.sub(r"\s+", " ", text).strip()
    return text


def load_data(data_dir: str) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    Returns (train_df, dev_df) each with columns: text, binary_label (0/1).
    Index is par_id.
    """
    col_names = ["par_id", "art_id", "keyword", "country_code", "text", "label"]
    df = pd.read_csv(
        os.path.join(data_dir, "dontpatronizeme_pcl.tsv"),
        sep="\t", skiprows=4, names=col_names, index_col="par_id"
    )
    df.dropna(inplace=True)

    # Binarise: {0,1} -> 0, {2,3,4} -> 1
    df["binary_label"] = (df["label"] >= 2).astype(int)

    # Clean text
    df["text"] = df["text"].apply(clean_text)

    # Load official splits (par_id lists)
    train_ids = pd.read_csv(os.path.join(data_dir, "train_semeval_parids-labels.csv"))["par_id"].values
    dev_ids = pd.read_csv(os.path.join(data_dir, "dev_semeval_parids-labels.csv"))["par_id"].values

    train_df = df.loc[df.index.isin(train_ids), ["text", "binary_label"]].copy()
    dev_df = df.loc[df.index.isin(dev_ids), ["text", "binary_label"]].copy()

    LOG.info(f"Train: {len(train_df)} samples, {train_df['binary_label'].sum()} positive ({train_df['binary_label'].mean()*100:.2f}%)")
    LOG.info(f"Dev:   {len(dev_df)} samples, {dev_df['binary_label'].sum()} positive ({dev_df['binary_label'].mean()*100:.2f}%)")

    return train_df, dev_df


train_df, dev_df = load_data(DATA_DIR)
train_df.head()

2026-02-13 12:48:36,403 INFO:	Train: 8375 samples, 794 positive (9.48%)
2026-02-13 12:48:36,417 INFO:	Dev:   2093 samples, 199 positive (9.51%)


Unnamed: 0_level_0,text,binary_label
par_id,Unnamed: 1_level_1,Unnamed: 2_level_1
1,"We 're living in times of absolute insanity , ...",0
2,"In Libya today , there are countless number of...",0
3,White House press secretary Sean Spicer said t...,0
4,Council customers only signs would be displaye...,0
5,""" Just like we received migrants fleeing El Sa...",0


In [4]:
for df in [train_df, dev_df]:
    matches = 0
    for row in df.itertuples():
        para = str(row.text)
        for match in re.finditer(r"<[^>]+>|<\/[^>]+>|&\w+;|\n|\\n|\s{2,}|\r|\\r|@@\d+|[^\x00-\x7F]+|https?://\S+", para):
            LOG.warning(f"Found noise in par_id {row.Index}: '{match.group(0)}'")
            matches += 1
LOG.info(f"Total noise matches found: {matches}")

2026-02-13 12:16:32,660 INFO:	Total noise matches found: 0


In [10]:
def split_train_val(train_df: pd.DataFrame, val_frac: float = VAL_FRACTION, seed: int = SEED):
    """
    Stratified split of training data into train_sub and val_sub.
    Returns (train_sub_df, val_sub_df).
    """
    train_sub, val_sub = train_test_split(
        train_df, test_size=val_frac, random_state=seed,
        stratify=train_df["binary_label"]
    )
    LOG.info(f"Train-sub: {len(train_sub)} ({train_sub['binary_label'].sum()} pos, {train_sub['binary_label'].mean()*100:.2f}%), "
             f"Val-sub: {len(val_sub)} ({val_sub['binary_label'].sum()} pos, {val_sub['binary_label'].mean()*100:.2f}%)")
    return train_sub, val_sub

train_sub_df, val_sub_df = split_train_val(train_df)
val_sub_df.head()

2026-02-13 12:48:55,477 INFO:	Train-sub: 7118 (675 pos, 9.48%), Val-sub: 1257 (119 pos, 9.47%)


Unnamed: 0_level_0,text,binary_label
par_id,Unnamed: 1_level_1,Unnamed: 2_level_1
124,"The ruling by the judge , released Thursday , ...",0
1515,News Rescuing the mentally ill CUMI provides h...,1
6356,"According to documents , the project will enab...",0
1008,"As a fashion icon , Rissa knows the importance...",0
6173,The new rules are not so much an outright ban ...,0


## 2. PyTorch Dataset and DataLoader
Custom Dataset class that pre-tokenizes all texts with the DeBERTa tokeniser at construction time.

In [6]:
tokenizer = DebertaV2Tokenizer.from_pretrained(MODEL_NAME)


class PCLDataset(Dataset):
    """
    Pre-tokenising all of the text to prevent doing it each time it is needed.
    """

    def __init__(self, texts: list[str], labels: list[int] | None = None,
                 max_length: int = MAX_LENGTH):
        self.encodings = tokenizer(
            texts,
            padding="max_length",
            truncation=True,
            max_length=max_length,
            return_tensors="pt"
        )
        self.labels = torch.tensor(labels, dtype=torch.float32) if labels is not None else None

    def __len__(self) -> int:
        return len(self.encodings["input_ids"])

    def __getitem__(self, idx: int) -> dict:
        item = {key: val[idx] for key, val in self.encodings.items()}
        if self.labels is not None:
            item["labels"] = self.labels[idx]
        return item

2026-02-13 12:16:36,707 INFO:	HTTP Request: HEAD https://huggingface.co/microsoft/deberta-v3-base/resolve/main/tokenizer_config.json "HTTP/1.1 307 Temporary Redirect"
2026-02-13 12:16:36,722 INFO:	HTTP Request: HEAD https://huggingface.co/api/resolve-cache/models/microsoft/deberta-v3-base/8ccc9b6f36199bec6961081d44eb72fb3f7353f3/tokenizer_config.json "HTTP/1.1 200 OK"
2026-02-13 12:16:36,832 INFO:	HTTP Request: GET https://huggingface.co/api/models/microsoft/deberta-v3-base/tree/main/additional_chat_templates?recursive=false&expand=false "HTTP/1.1 404 Not Found"
2026-02-13 12:16:36,943 INFO:	HTTP Request: GET https://huggingface.co/api/models/microsoft/deberta-v3-base/tree/main?recursive=true&expand=false "HTTP/1.1 200 OK"
2026-02-13 12:16:43,075 INFO:	HTTP Request: GET https://huggingface.co/api/models/microsoft/deberta-v3-base "HTTP/1.1 200 OK"


In [8]:
def make_dataloaders(
    train_sub_df: pd.DataFrame,
    val_sub_df: pd.DataFrame,
    dev_df: pd.DataFrame,
    batch_size: int
) -> tuple[DataLoader, DataLoader, DataLoader]:
    """Create DataLoaders for train-sub, val-sub, and dev sets."""
    train_ds = PCLDataset(train_sub_df["text"].tolist(), train_sub_df["binary_label"].tolist())
    val_ds = PCLDataset(val_sub_df["text"].tolist(), val_sub_df["binary_label"].tolist())
    dev_ds = PCLDataset(dev_df["text"].tolist(), dev_df["binary_label"].tolist())

    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, drop_last=False)
    val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False)
    dev_loader = DataLoader(dev_ds, batch_size=batch_size, shuffle=False)

    return train_loader, val_loader, dev_loader

## 3. Model Architecture
DeBERTa backbone + custom classifier head. The head is designed for future concatenation of extra features (POS/NER proportions, z-score features) but for Experiment 1 uses only the [CLS] embedding.

**Classifier head design**: We use a two-layer MLP (Linear -> GELU -> Dropout -> Linear) rather than a single linear layer. A single linear layer can only learn a linear relationship but we want to be able to learn non-linear relationships (between transformer embeddings, POS/NER proportions, and z-scores). The hidden layer with GELU activation allows the head to learn nonlinear combinations of these features. GELU is chosen over ReLU because it is continuously differentiable and never has zero gradient, avoiding the dying neuron problem where ReLU. A deeper classification head might overfit given the small-ish dataset (~8k training samples).

In [None]:
class PCLClassifierHead(nn.Module):
    """
    Custom classifier head for PCL detection.

    Architecture: Linear -> GELU -> Dropout -> Linear -> (raw output)

    The first linear layer accepts (cls_dim + n_extra_features) as input,
    allowing future experiments to concatenate additional features.
    """

    def __init__(self, cls_dim: int = 768, hidden_dim: int = 256,
                 dropout_rate: float = 0.1, n_extra_features: int = 0):
        super().__init__()
        self.head = nn.Sequential(
            nn.Linear(cls_dim + n_extra_features, hidden_dim),
            nn.GELU(),
            nn.Dropout(dropout_rate),
            nn.Linear(hidden_dim, 1), # Sigmoid applied with BCEWithLogitsLoss for numerical stability.
        )

    def forward(self, cls_embedding: torch.Tensor,
                extra_features: torch.Tensor | None = None) -> torch.Tensor:
        if extra_features is not None:
            x = torch.cat([cls_embedding, extra_features], dim=-1)
        else:
            x = cls_embedding
        return self.head(x)

In [None]:
class PCLDeBERTa(nn.Module):
    """
    Full model: DeBERTa backbone + PCLClassifierHead.
    Extracts the [CLS] token embedding from the last hidden state
    and passes it through the classifier head.
    """

    def __init__(self, hidden_dim: int = 256, dropout_rate: float = 0.1,
                 n_extra_features: int = 0):
        super().__init__()
        self.backbone = DebertaV2Model.from_pretrained(MODEL_NAME)
        cls_dim = self.backbone.config.hidden_size  # 768
        self.classifier = PCLClassifierHead(
            cls_dim=cls_dim,
            hidden_dim=hidden_dim,
            dropout_rate=dropout_rate,
            n_extra_features=n_extra_features
        )

    def forward(self, input_ids: torch.Tensor, attention_mask: torch.Tensor,
                token_type_ids: torch.Tensor | None = None,
                extra_features: torch.Tensor | None = None) -> torch.Tensor:
        outputs = self.backbone(
            input_ids=input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids
        )
        cls_embedding = outputs.last_hidden_state[:, 0, :]  # (batch, 768)
        scores = self.classifier(cls_embedding, extra_features)
        return scores

## 4. Loss Function, Scheduler, and Training Infrastructure
Weighted BCE loss for class imbalance, cosine annealing with warmup, early stopping, and evaluation utilities.

In [None]:
def compute_pos_weight(df: pd.DataFrame) -> torch.Tensor:
    """
    Compute pos_weight for BCEWithLogitsLoss to handle class imbalance.
    pos_weight = num_negatives / num_positives, upweighting the minority class.
    """
    n_pos = df["binary_label"].sum()
    n_neg = len(df) - n_pos
    weight = torch.tensor([n_neg / n_pos], dtype=torch.float32).to(DEVICE)
    LOG.info(f"pos_weight = {weight.item():.2f} (neg={n_neg}, pos={n_pos})")
    return weight


def get_cosine_schedule_with_warmup(
    optimizer: torch.optim.Optimizer,
    num_warmup_steps: int,
    num_training_steps: int
) -> LambdaLR:
    """
    Cosine annealing LR schedule with linear warmup.
    - step < num_warmup_steps: LR increases linearly from 0 to base_lr.
    - step >= num_warmup_steps: LR decays following a cosine curve to 0.
    """
    def lr_lambda(current_step: int) -> float:
        if current_step < num_warmup_steps:
            return float(current_step) / float(max(1, num_warmup_steps))
        progress = float(current_step - num_warmup_steps) / float(max(1, num_training_steps - num_warmup_steps))
        return max(0.0, 0.5 * (1.0 + math.cos(math.pi * progress)))

    return LambdaLR(optimizer, lr_lambda)

In [None]:
class EarlyStopping:
    """
    Early stopping based on validation F1 (higher is better).
    Patience is in units of evaluation rounds.
    """

    def __init__(self, patience: int = 3, min_delta: float = 0.0):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_score: float | None = None
        self.should_stop = False

    def step(self, val_f1: float) -> bool:
        if self.best_score is None:
            self.best_score = val_f1
        elif val_f1 > self.best_score + self.min_delta:
            self.best_score = val_f1
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.should_stop = True
        return self.should_stop

In [None]:
@torch.no_grad()
def evaluate(model: nn.Module, dataloader: DataLoader, threshold: float = 0.5) -> dict:
    """
    Evaluate the model on a DataLoader.
    Returns dict with keys: f1, precision, recall, loss, preds, labels.
    """
    was_training = model.training
    model.eval()
    all_scores = []
    all_labels = []

    for batch in dataloader:
        input_ids = batch["input_ids"].to(DEVICE)
        attention_mask = batch["attention_mask"].to(DEVICE)
        labels = batch["labels"].to(DEVICE)

        unnormalised_scores = model(input_ids=input_ids, attention_mask=attention_mask).squeeze(-1)
        all_scores.append(unnormalised_scores.cpu())
        all_labels.append(labels.cpu())

    all_scores = torch.cat(all_scores)
    all_labels = torch.cat(all_labels)

    loss = nn.functional.binary_cross_entropy_with_logits(all_scores, all_labels).item()

    probs = torch.sigmoid(all_scores)
    thresh_preds = (probs >= threshold).long().numpy()
    labels_np = all_labels.long().numpy()

    f1 = f1_score(labels_np, thresh_preds, zero_division=0)
    precision = precision_score(labels_np, thresh_preds, zero_division=0)
    recall = recall_score(labels_np, thresh_preds, zero_division=0)

    model.train(was_training)
    return {
        "f1": f1, "precision": precision, "recall": recall,
        "loss": loss, "preds": thresh_preds, "labels": labels_np
    }

## 5. Training Loop
Single training function used both for manual runs and as the Optuna objective. Uses differential learning rates (backbone vs head), gradient clipping, step-based evaluation, and Optuna pruning.

In [None]:
def train_model(
    model: PCLDeBERTa,
    train_loader: DataLoader,
    val_loader: DataLoader,
    dev_loader: DataLoader,
    pos_weight: torch.Tensor,
    lr: float,
    weight_decay: float,
    num_epochs: int,
    warmup_fraction: float,
    patience: int,
    eval_every_n_steps: int = 50,
    trial: optuna.trial.Trial | None = None
) -> dict:
    """
    Train the model with early stopping, cosine annealing with warmup,
    and weighted BCE loss.

    Returns dict with keys: best_val_f1, dev_metrics, train_losses.
    """
    criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)

    # Differential LR: head learns 10x faster than backbone
    backbone_params = list(model.backbone.parameters())
    head_params = list(model.classifier.parameters())
    optimizer = AdamW([
        {"params": backbone_params, "lr": lr},
        {"params": head_params, "lr": lr * 10}
    ], weight_decay=weight_decay)

    total_steps = len(train_loader) * num_epochs
    warmup_steps = int(total_steps * warmup_fraction)
    scheduler = get_cosine_schedule_with_warmup(optimizer, warmup_steps, total_steps)

    early_stopper = EarlyStopping(patience=patience)

    model.train()
    global_step = 0
    train_losses = []
    best_val_f1 = 0.0
    best_state_dict = None
    running_loss = 0.0

    for epoch in range(num_epochs):
        for batch in train_loader:
            input_ids = batch["input_ids"].to(DEVICE)
            attention_mask = batch["attention_mask"].to(DEVICE)
            labels = batch["labels"].to(DEVICE)

            optimizer.zero_grad()
            unnormalised_scores = model(input_ids=input_ids, attention_mask=attention_mask).squeeze(-1)
            loss = criterion(unnormalised_scores, labels)
            loss.backward()

            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

            optimizer.step()
            scheduler.step()

            running_loss += loss.item()
            global_step += 1

            if global_step % eval_every_n_steps == 0:
                val_metrics = evaluate(model, val_loader)
                val_f1 = val_metrics["f1"]
                train_losses.append(running_loss / eval_every_n_steps)
                running_loss = 0.0

                LOG.info(
                    f"Step {global_step} | Val F1: {val_f1:.4f} | "
                    f"Val P: {val_metrics['precision']:.4f} | Val R: {val_metrics['recall']:.4f}"
                )

                if val_f1 > best_val_f1:
                    best_val_f1 = val_f1
                    best_state_dict = {k: v.clone() for k, v in model.state_dict().items()}

                # Optuna pruning
                if trial is not None:
                    trial.report(val_f1, global_step)
                    if trial.should_prune():
                        raise optuna.exceptions.TrialPruned()

                if early_stopper.step(val_f1):
                    LOG.info(f"Early stopping at step {global_step}")
                    break

        if early_stopper.should_stop:
            break

    # Restore best model and evaluate on dev set
    if best_state_dict is not None:
        model.load_state_dict(best_state_dict)

    dev_metrics = evaluate(model, dev_loader)
    LOG.info(
        f"Dev F1: {dev_metrics['f1']:.4f} | "
        f"Dev P: {dev_metrics['precision']:.4f} | Dev R: {dev_metrics['recall']:.4f}"
    )

    return {
        "best_val_f1": best_val_f1,
        "dev_metrics": dev_metrics,
        "train_losses": train_losses
    }

## 6. Hyperparameter Search with Optuna
Randomised search over learning rate, batch size, hidden dim, dropout, weight decay, warmup fraction, and number of epochs. The objective is dev-set F1.

In [None]:
def objective(trial: optuna.trial.Trial) -> float:
    """
    Optuna objective for Experiment 1.
    Returns dev-set F1 score (to be maximised).
    """
    lr = trial.suggest_float("lr", 5e-6, 5e-5, log=True)
    batch_size = trial.suggest_categorical("batch_size", [16, 32])
    hidden_dim = trial.suggest_categorical("hidden_dim", [128, 256, 512])
    dropout_rate = trial.suggest_float("dropout_rate", 0.1, 0.5, step=0.05)
    weight_decay = trial.suggest_float("weight_decay", 1e-4, 1e-1, log=True)
    warmup_fraction = trial.suggest_float("warmup_fraction", 0.0, 0.2, step=0.05)
    num_epochs = trial.suggest_int("num_epochs", 3, 8)
    patience = trial.suggest_int("patience", 3, 6)

    train_loader, val_loader, dev_loader = make_dataloaders(
        train_sub_df, val_sub_df, dev_df, batch_size
    )

    model = PCLDeBERTa(
        hidden_dim=hidden_dim,
        dropout_rate=dropout_rate,
        n_extra_features=0
    ).to(DEVICE)

    pos_weight = compute_pos_weight(train_sub_df)

    results = train_model(
        model=model,
        train_loader=train_loader,
        val_loader=val_loader,
        dev_loader=dev_loader,
        pos_weight=pos_weight,
        lr=lr,
        weight_decay=weight_decay,
        num_epochs=num_epochs,
        warmup_fraction=warmup_fraction,
        patience=patience,
        eval_every_n_steps=50,
        trial=trial
    )

    del model
    torch.cuda.empty_cache()

    return results["dev_metrics"]["f1"]

In [None]:
study = optuna.create_study(
    direction="maximize",
    study_name="pcl_deberta_exp1",
    pruner=optuna.pruners.MedianPruner(n_startup_trials=5, n_warmup_steps=100)
)

study.optimize(objective, n_trials=N_TRIALS)

logger.info(f"Best trial: {study.best_trial.number}")
logger.info(f"Best dev F1: {study.best_trial.value:.4f}")
logger.info(f"Best params: {study.best_trial.params}")

## 7. Results Analysis
Visualise Optuna results and retrain the best model on the full training set for final evaluation.

In [None]:
fig1 = plot_optimization_history(study)
plt.tight_layout()
plt.savefig(f"{OUT_DIR}/optuna_history.png", dpi=300)
plt.savefig(f"{OUT_DIR}/optuna_history.svg", format="svg")
plt.show()

fig2 = plot_param_importances(study)
plt.tight_layout()
plt.savefig(f"{OUT_DIR}/optuna_importances.png", dpi=300)
plt.savefig(f"{OUT_DIR}/optuna_importances.svg", format="svg")
plt.show()

fig3 = plot_parallel_coordinate(study)
plt.tight_layout()
plt.savefig(f"{OUT_DIR}/optuna_parallel.png", dpi=300)
plt.savefig(f"{OUT_DIR}/optuna_parallel.svg", format="svg")
plt.show()

best_params = study.best_trial.params
print("Best hyperparameters for Experiment 1:")
for k, v in best_params.items():
    print(f"  {k}: {v}")

In [None]:
# Retrain with best HPs on full training set (no val carve-out)
best = study.best_trial.params

full_train_ds = PCLDataset(train_df["text"].tolist(), train_df["binary_label"].tolist())
full_train_loader = DataLoader(full_train_ds, batch_size=best["batch_size"], shuffle=True)
dev_ds = PCLDataset(dev_df["text"].tolist(), dev_df["binary_label"].tolist())
dev_loader = DataLoader(dev_ds, batch_size=best["batch_size"], shuffle=False)

final_model = PCLDeBERTa(
    hidden_dim=best["hidden_dim"],
    dropout_rate=best["dropout_rate"],
    n_extra_features=0
).to(DEVICE)

pos_weight = compute_pos_weight(train_df)

final_results = train_model(
    model=final_model,
    train_loader=full_train_loader,
    val_loader=dev_loader,
    dev_loader=dev_loader,
    pos_weight=pos_weight,
    lr=best["lr"],
    weight_decay=best["weight_decay"],
    num_epochs=best["num_epochs"],
    warmup_fraction=best["warmup_fraction"],
    patience=best["patience"],
    eval_every_n_steps=50,
    trial=None
)

print("\nFinal Dev Set Classification Report:")
print(classification_report(
    final_results["dev_metrics"]["labels"],
    final_results["dev_metrics"]["preds"],
    target_names=["Non-PCL", "PCL"]
))

# Save model and best params for experiments 2-4
torch.save(final_model.state_dict(), os.path.join(OUT_DIR, "exp1_best_model.pt"))
with open(os.path.join(OUT_DIR, "exp1_best_params.json"), "w") as f:
    json.dump(best, f, indent=2)
logger.info(f"Saved model to {OUT_DIR}/exp1_best_model.pt and params to {OUT_DIR}/exp1_best_params.json")