<br>
<font>
<div dir=ltr align=center>
<img src="https://cdn.freebiesupply.com/logos/large/2x/sharif-logo-png-transparent.png" width=150 height=150> <br>
<font color=0F5298 size=7>
    Machine learning <br>
<font color=2565AE size=5>
    Computer Engineering Department <br>
    Fall 2025<br>
<font color=3C99D size=5>
    Sentiment Analysis with Transformer <br>
</div>
<div dir=ltr align=center>
<font color=0CBCDF size=4>
    Mohammad Ebrahimian, Taha Izadi, Nima Ghadirniya
<font color=0CBCDF size=4>
</div>

____

  <h1 style="color:#0F5298; font-family:serif; font-size:45px; margin-bottom:0px;">
    Setup and Libraries
  </h1>

In [None]:
!pip install gensim
!pip install datasets==2.16.1

In [None]:
import os
import gc
import math
import glob
import json
import random
import re
import torch
import numpy as np
import pandas as pd
import torch.nn as nn
import seaborn as sns
import torch.optim as optim
from itertools import product
import torch.nn.functional as F
import gensim.downloader as api
import matplotlib.pyplot as plt
from datetime import datetime
from datasets import load_dataset
from dataclasses import dataclass
from sklearn.metrics import f1_score
from transformers import BertTokenizer
from typing import Dict, List, Optional
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix

def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
    if torch.backends.mps.is_available():
        torch.mps.manual_seed(seed)

set_seed(42)
print("üå± Seed set to 42 for reproducibility.")

device = torch.device("mps" if torch.backends.mps.is_available() else "cuda" if torch.cuda.is_available() else "cpu")
print(f"Device set to: {device}")

  <h1 style="color:#0F5298; font-family:serif; font-size:45px; margin-bottom:0px;">
    Data Loading
  </h1>

In [None]:
def load_data_from_file():
    ds = load_dataset(
        "financial_phrasebank",
        "sentences_allagree",
        trust_remote_code=True,
        streaming=False
    )
    df = ds["train"].to_pandas()
    df = df.rename(columns={"sentence": "text"})

    df = df.drop_duplicates(subset=['text'])
    df = df.dropna(subset=['text', 'label'])

    df['text'] = df['text'].str.lower().str.strip()

    df = df.dropna(subset=['label'])
    df['label'] = df['label'].astype(int)

    return df.reset_index(drop=True)

def plot_sentiment_distribution(df):
    plt.figure(figsize=(8, 5))
    ax = sns.countplot(x='label', data=df, palette='viridis', hue='label', legend=False)
    plt.title('Distribution of Sentiments (Sentences-AllAgree)')
    plt.xlabel('Class (0: Neg, 1: Neu, 2: Pos)')
    plt.ylabel('Count')
    plt.xticks([0, 1, 2], ['Negative', 'Neutral', 'Positive'])

    for p in ax.patches:
        ax.annotate(f'{int(p.get_height())}', (p.get_x() + p.get_width() / 2., p.get_height()),
                    ha='center', va='center', xytext=(0, 10), textcoords='offset points')
    plt.show()

df = load_data_from_file()
plot_sentiment_distribution(df)
print(f"Total unique samples: {len(df)}")
print("\nüìù Samples per Class:")
label_names = {0: 'Negative', 1: 'Neutral', 2: 'Positive'}

for label in [0, 1, 2]:
    sample_text = df[df['label'] == label]['text'].iloc[0]
    print(f"   - {label_names[label]} (Label {label}): \"{sample_text[:100]}...\"")

  <h1 style="color:#0F5298; font-family:serif; font-size:45px; margin-bottom:0px;">
    Word2Vec Pre-trained embeding
  </h1>

In [None]:
def create_hybrid_embeddings(tokenizer, d_emb=300):
    print("Loading Word2Vec model...")
    try:
        word2vec = api.load("word2vec-google-news-300")
    except Exception as e:
        print(f"Word2Vec load failed: {e}")
        vocab_size = len(tokenizer)
        mat = np.random.normal(0.0, 0.02, (vocab_size, d_emb)).astype(np.float32)
        if tokenizer.pad_token_id is not None:
            mat[tokenizer.pad_token_id] = 0.0
        return torch.from_numpy(mat)

    vocab = tokenizer.get_vocab()
    vocab_size = len(vocab)
    mat = np.random.normal(0.0, 0.02, (vocab_size, d_emb)).astype(np.float32)

    special_ids = set(tokenizer.all_special_ids)
    hits, misses, skipped_subword = 0, 0, 0

    for token, idx in vocab.items():
        if idx in special_ids:
            continue
        if token.startswith("##"):
            skipped_subword += 1
            continue

        if token in word2vec:
            mat[idx] = word2vec[token]
            hits += 1
        elif token.lower() in word2vec:
            mat[idx] = word2vec[token.lower()]
            hits += 1
        else:
            misses += 1

    if tokenizer.pad_token_id is not None:
        mat[tokenizer.pad_token_id] = 0.0

    del word2vec
    gc.collect()

    eligible = hits + misses
    cov = (hits / eligible * 100) if eligible > 0 else 0.0
    print(f"Shape: {mat.shape}")
    print(f"Hits: {hits}, Misses: {misses}, Skipped subwords: {skipped_subword}")
    print(f"Coverage on eligible tokens: {cov:.1f}%")

    return torch.from_numpy(mat)

tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
pretrained_embeddings = create_hybrid_embeddings(tokenizer)

  <h1 style="color:#0F5298; font-family:serif; font-size:45px; margin-bottom:0px;">
    Data Spliting
  </h1>

In [None]:
SEED = 42
TRAIN_SIZE = 0.80
VAL_SIZE = 0.10
TEST_SIZE = 0.10

assert abs(TRAIN_SIZE + VAL_SIZE + TEST_SIZE - 1.0) < 1e-8, "Split ratios must sum to 1."

if df["label"].dtype == object:
    label_map = {"negative": 0, "neutral": 1, "positive": 2}
    df["label"] = df["label"].map(label_map)

df = df.dropna(subset=["text", "label"]).copy()
df["label"] = df["label"].astype(int)
df = df.reset_index(drop=True)

train_df, temp_df = train_test_split(
    df,
    test_size=(1 - TRAIN_SIZE),
    random_state=SEED,
    stratify=df["label"],
)

val_ratio_in_temp = VAL_SIZE / (VAL_SIZE + TEST_SIZE)
val_df, test_df = train_test_split(
    temp_df,
    test_size=(1 - val_ratio_in_temp),
    random_state=SEED,
    stratify=temp_df["label"],
)

train_df = train_df.reset_index(drop=True)
val_df = val_df.reset_index(drop=True)
test_df = test_df.reset_index(drop=True)

def show_split_stats(name, part_df):
    counts = part_df["label"].value_counts().sort_index()
    ratios = (part_df["label"].value_counts(normalize=True).sort_index() * 100).round(2)
    print(f"{name}: n={len(part_df)}")
    print("counts:", counts.to_dict())
    print("ratios(%):", ratios.to_dict())
    print("-" * 50)

show_split_stats("Train", train_df)
show_split_stats("Validation", val_df)
show_split_stats("Test", test_df)

  <h1 style="color:#0F5298; font-family:serif; font-size:45px; margin-bottom:0px;">
    Augmentation
  </h1>

In [None]:
COMMON_SYNONYMS = {
    "company": ["firm", "business"],
    "market": ["sector", "marketplace"],
    "shares": ["stock", "equity"],
    "announced": ["reported", "stated"],
    "increase": ["rise", "growth"],
    "decrease": ["decline", "drop"],
    "cost": ["expense", "charge"],
    "revenue": ["sales", "turnover"],
}

LABEL_SYNONYMS = {
    0: {"loss": ["deficit", "setback"], "risk": ["threat", "uncertainty"], "fall": ["drop", "decline"]},
    1: {"said": ["stated", "noted"], "expects": ["anticipates", "foresees"], "plan": ["strategy", "program"]},
    2: {"profit": ["gain", "earnings"], "growth": ["expansion", "rise"], "strong": ["solid", "robust"]},
}

PROTECTED_WORDS = {"not", "no", "never", "none", "without"}


def _normalize_token(token: str) -> str:
    return re.sub(r"^[^A-Za-z0-9]+|[^A-Za-z0-9]+$", "", token).lower()


def _replace_token_keep_format(raw_token: str, new_core: str) -> str:
    m = re.match(r"^([^A-Za-z0-9]*)([A-Za-z0-9'-]+)([^A-Za-z0-9]*)$", raw_token)
    if not m:
        return raw_token
    prefix, core, suffix = m.groups()
    if core.isupper():
        new_core = new_core.upper()
    elif core[:1].isupper():
        new_core = new_core.capitalize()
    return f"{prefix}{new_core}{suffix}"


def augment_text_label_aware(text: str, label: int, rng: random.Random, max_repl: int = 2, swap_prob: float = 0.10):
    words = text.split()
    if len(words) < 3:
        return text

    syn_map = {**COMMON_SYNONYMS, **LABEL_SYNONYMS.get(int(label), {})}
    candidates = []

    for i, w in enumerate(words):
        key = _normalize_token(w)
        if not key or key in PROTECTED_WORDS or any(ch.isdigit() for ch in key):
            continue
        if key in syn_map:
            candidates.append((i, key))

    rng.shuffle(candidates)

    if candidates:
        n_rep = rng.randint(1, min(max_repl, len(candidates)))
        for i, key in candidates[:n_rep]:
            replacement = rng.choice(syn_map[key])
            words[i] = _replace_token_keep_format(words[i], replacement)

    if rng.random() < swap_prob and len(words) >= 5:
        j = rng.randrange(0, len(words) - 1)
        words[j], words[j + 1] = words[j + 1], words[j]

    aug = " ".join(words).strip()
    return aug if aug else text


def _compute_target_counts(class_counts: pd.Series, balance_strength: float = 0.45, max_growth: float = 1.40):
    class_counts = class_counts.sort_index()
    max_count = int(class_counts.max())
    orig_total = int(class_counts.sum())

    targets = {}
    for label, count in class_counts.items():
        boosted = int(round(count + balance_strength * (max_count - count)))
        targets[int(label)] = max(int(count), boosted)

    max_total = int(round(orig_total * max_growth))
    proposed_total = sum(targets.values())

    if proposed_total > max_total and proposed_total > orig_total:
        proposed_extra = proposed_total - orig_total
        allowed_extra = max_total - orig_total
        scale = allowed_extra / proposed_extra if proposed_extra > 0 else 0.0
        for label, count in class_counts.items():
            extra = targets[int(label)] - int(count)
            scaled_extra = int(round(extra * scale))
            targets[int(label)] = int(count) + max(0, scaled_extra)

    return targets

# Augmenting training data
def build_controlled_augmented_train_df(
    train_df: pd.DataFrame,
    seed: int = 42,
    balance_strength: float = 1.0,
    max_growth: float = 10.0,
):
    rng = random.Random(seed)

    base = train_df[["text", "label"]].copy().reset_index(drop=True)
    base["is_augmented"] = 0

    class_counts = base["label"].value_counts().sort_index()
    target_counts = _compute_target_counts(class_counts, balance_strength=balance_strength, max_growth=max_growth)

    parts = []
    for label, grp in base.groupby("label", sort=True):
        grp = grp.copy().reset_index(drop=True)
        originals = grp["text"].tolist()
        seen = set(t.strip().lower() for t in originals)

        need = max(0, target_counts[int(label)] - len(grp))
        new_rows = []
        attempts = 0
        max_attempts = max(200, need * 20)

        while len(new_rows) < need and attempts < max_attempts:
            src = originals[rng.randrange(len(originals))]
            aug = augment_text_label_aware(src, int(label), rng, max_repl=2, swap_prob=0.10)
            attempts += 1

            key = aug.strip().lower()
            if not key or key in seen:
                continue

            seen.add(key)
            new_rows.append({"text": aug, "label": int(label), "is_augmented": 1})

        if len(new_rows) < need:
            remain = need - len(new_rows)
            sampled = grp.sample(n=remain, replace=True, random_state=seed)["text"].tolist()
            for src in sampled:
                aug = augment_text_label_aware(src, int(label), rng, max_repl=1, swap_prob=0.05)
                new_rows.append({"text": aug, "label": int(label), "is_augmented": 1})

        parts.append(grp)
        if new_rows:
            parts.append(pd.DataFrame(new_rows))

    train_aug_df = pd.concat(parts, ignore_index=True)
    train_aug_df = train_aug_df.sample(frac=1.0, random_state=seed).reset_index(drop=True)
    return train_aug_df


def show_counts(train_part, val_part, test_part, title):
    print(f"\n{title}")
    print("=" * len(title))
    for name, part in [("Train", train_part), ("Validation", val_part), ("Test", test_part)]:
        c = part["label"].value_counts().sort_index().to_dict()
        print(f"{name}: n={len(part)} | class_counts={c}")


assert "train_df" in globals() and "val_df" in globals() and "test_df" in globals(), "run data spliting block first"

seed_value = SEED if "SEED" in globals() else 42

show_counts(train_df, val_df, test_df, "Before Augmentation")

train_aug_df = build_controlled_augmented_train_df(
    train_df=train_df,
    seed=seed_value,
    balance_strength=1,
    max_growth=3,
)

show_counts(train_aug_df, val_df, test_df, "After Augmentation")
print(f"Added train samples: {len(train_aug_df) - len(train_df)}")
print("Augmented flag:", train_aug_df["is_augmented"].value_counts().to_dict())



  <h1 style="color:#0F5298; font-family:serif; font-size:45px; margin-bottom:0px;">
    Data loader
  </h1>

In [None]:
class FinancialDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len=64):
        self.texts = list(texts)
        self.labels = list(labels)
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        label = int(self.labels[idx])

        enc = self.tokenizer(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            padding="max_length",
            truncation=True,
            return_attention_mask=True,
            return_tensors="pt",
        )
        return {
            "input_ids": enc["input_ids"].squeeze(0),
            "attention_mask": enc["attention_mask"].squeeze(0),
            "labels": torch.tensor(label, dtype=torch.long),
        }

tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
MAX_LEN = 64
BATCH_SIZE = 32

train_dataset = FinancialDataset(train_aug_df["text"].values, train_aug_df["label"].values, tokenizer, MAX_LEN)
val_dataset   = FinancialDataset(val_df["text"].values, val_df["label"].values, tokenizer, MAX_LEN)
test_dataset  = FinancialDataset(test_df["text"].values, test_df["label"].values, tokenizer, MAX_LEN)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader   = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader  = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

print(f"Train samples: {len(train_dataset)} | batches: {len(train_loader)}")
print(f"Val samples:   {len(val_dataset)} | batches: {len(val_loader)}")
print(f"Test samples:  {len(test_dataset)} | batches: {len(test_loader)}")

  <h1 style="color:#0F5298; font-family:serif; font-size:45px; margin-bottom:0px;">
    The Architecture (Transformer)
  </h1>

In [None]:
@dataclass(frozen=True)
class ModelConfig:
    vocab_size: int
    num_classes: int
    max_len: int
    d_model: int
    num_heads: int
    num_layers: int
    d_ff: int
    dropout: float
    attn_dropout: float
    pad_token_id: int = 0


class MultiHeadSelfAttention(nn.Module):
    def __init__(self, d_model: int, num_heads: int, attn_dropout: float, proj_dropout: float):
        super().__init__()
        if d_model % num_heads != 0:
            raise ValueError("d_model must be divisible by num_heads")
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        self.scale = self.head_dim ** -0.5

        self.qkv = nn.Linear(d_model, 3 * d_model, bias=True)
        self.attn_drop = nn.Dropout(attn_dropout)
        self.proj = nn.Linear(d_model, d_model, bias=True)
        self.proj_drop = nn.Dropout(proj_dropout)

    def forward(self, x: torch.Tensor, attention_mask: Optional[torch.Tensor] = None):
        bsz, seq_len, _ = x.shape
        qkv = self.qkv(x).view(bsz, seq_len, 3, self.num_heads, self.head_dim).permute(2, 0, 3, 1, 4)
        q, k, v = qkv[0], qkv[1], qkv[2]

        scores = torch.matmul(q, k.transpose(-2, -1)) * self.scale

        if attention_mask is not None:
            key_mask = attention_mask[:, None, None, :].to(torch.bool)
            scores = scores.masked_fill(~key_mask, torch.finfo(scores.dtype).min)

        attn = F.softmax(scores, dim=-1)
        attn = self.attn_drop(attn)

        context = torch.matmul(attn, v).transpose(1, 2).contiguous().view(bsz, seq_len, self.d_model)
        out = self.proj_drop(self.proj(context))
        return out, attn


class FeedForward(nn.Module):
    def __init__(self, d_model: int, d_ff: int, dropout: float):
        super().__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.drop1 = nn.Dropout(dropout)
        self.drop2 = nn.Dropout(dropout)

    def forward(self, x: torch.Tensor):
        x = self.fc1(x)
        x = F.gelu(x)
        x = self.drop1(x)
        x = self.fc2(x)
        x = self.drop2(x)
        return x


class TransformerEncoderBlock(nn.Module):
    def __init__(self, d_model: int, num_heads: int, d_ff: int, dropout: float, attn_dropout: float):
        super().__init__()
        self.norm1 = nn.LayerNorm(d_model)
        self.attn = MultiHeadSelfAttention(d_model, num_heads, attn_dropout=attn_dropout, proj_dropout=dropout)
        self.norm2 = nn.LayerNorm(d_model)
        self.ffn = FeedForward(d_model, d_ff, dropout=dropout)

    def forward(self, x: torch.Tensor, attention_mask: Optional[torch.Tensor] = None):
        h, attn = self.attn(self.norm1(x), attention_mask)
        x = x + h
        x = x + self.ffn(self.norm2(x))
        return x, attn


class FinancialTransformer(nn.Module):
    def __init__(self, config: ModelConfig, pretrained_embeddings: Optional[torch.Tensor] = None):
        super().__init__()
        emb_dim = pretrained_embeddings.size(1) if pretrained_embeddings is not None else config.d_model

        self.token_embedding = nn.Embedding(config.vocab_size, emb_dim, padding_idx=config.pad_token_id)
        self.input_proj = nn.Identity() if emb_dim == config.d_model else nn.Linear(emb_dim, config.d_model, bias=False)
        self.position_embedding = nn.Embedding(config.max_len, config.d_model)
        self.embed_norm = nn.LayerNorm(config.d_model)
        self.embed_drop = nn.Dropout(config.dropout)

        self.layers = nn.ModuleList(
            [
                TransformerEncoderBlock(
                    d_model=config.d_model,
                    num_heads=config.num_heads,
                    d_ff=config.d_ff,
                    dropout=config.dropout,
                    attn_dropout=config.attn_dropout,
                )
                for _ in range(config.num_layers)
            ]
        )

        self.final_norm = nn.LayerNorm(config.d_model)
        self.cls_drop = nn.Dropout(config.dropout)
        self.classifier = nn.Linear(config.d_model, config.num_classes)

        has_pretrained = pretrained_embeddings is not None
        self._init_weights(skip_token_embedding=has_pretrained)

        if has_pretrained:
            if pretrained_embeddings.size(0) != config.vocab_size:
                raise ValueError("pretrained_embeddings vocab_size mismatch")
            with torch.no_grad():
                self.token_embedding.weight.copy_(pretrained_embeddings)
                if self.token_embedding.padding_idx is not None:
                    self.token_embedding.weight[self.token_embedding.padding_idx].zero_()

    def _init_weights(self, skip_token_embedding: bool = False):
        for name, m in self.named_modules():
            if skip_token_embedding and name == "token_embedding":
                continue
            if isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)
                if m.bias is not None:
                    nn.init.zeros_(m.bias)
            elif isinstance(m, nn.Embedding):
                nn.init.normal_(m.weight, mean=0.0, std=0.02)
                if m.padding_idx is not None:
                    with torch.no_grad():
                        m.weight[m.padding_idx].zero_()

    def forward(self, input_ids: torch.Tensor, attention_mask: Optional[torch.Tensor] = None, return_attention: bool = False):
        if attention_mask is None:
            attention_mask = torch.ones_like(input_ids)

        bsz, seq_len = input_ids.shape
        if seq_len > self.position_embedding.num_embeddings:
            raise ValueError("seq_len exceeds max_len in config")

        pos_ids = torch.arange(seq_len, device=input_ids.device).unsqueeze(0).expand(bsz, seq_len)

        x = self.token_embedding(input_ids)
        x = self.input_proj(x)
        x = x + self.position_embedding(pos_ids)
        x = self.embed_norm(x)
        x = self.embed_drop(x)

        attn_maps = []
        for layer in self.layers:
            x, attn = layer(x, attention_mask)
            if return_attention:
                attn_maps.append(attn)

        x = self.final_norm(x)
        mask = attention_mask.unsqueeze(-1).type_as(x)
        pooled = (x * mask).sum(dim=1) / mask.sum(dim=1).clamp(min=1.0)
        logits = self.classifier(self.cls_drop(pooled))

        if return_attention:
            return logits, attn_maps
        return logits


DEFAULT_SEARCH_SPACE = {
    "d_model": [128, 192, 256],
    "num_heads": [4, 8],
    "num_layers": [3, 4],
    "ff_mult": [3],
    "dropout": [0.1, 0.2, 0.3],
    "attn_dropout": [0.1],
}


def generate_valid_model_configs(
    vocab_size: int,
    num_classes: int,
    max_len: int,
    pad_token_id: int = 0,
    search_space: Optional[Dict[str, List]] = None,
) -> List[ModelConfig]:
    space = search_space or DEFAULT_SEARCH_SPACE
    configs: List[ModelConfig] = []

    for d_model, num_heads, num_layers, ff_mult, dropout, attn_dropout in product(
        space["d_model"],
        space["num_heads"],
        space["num_layers"],
        space["ff_mult"],
        space["dropout"],
        space["attn_dropout"],
    ):
        if d_model % num_heads != 0:
            continue

        cfg = ModelConfig(
            vocab_size=vocab_size,
            num_classes=num_classes,
            max_len=max_len,
            d_model=int(d_model),
            num_heads=int(num_heads),
            num_layers=int(num_layers),
            d_ff=int(d_model * ff_mult),
            dropout=float(dropout),
            attn_dropout=float(attn_dropout),
            pad_token_id=int(pad_token_id),
        )
        configs.append(cfg)

    return configs


def build_model_from_config(
    config: ModelConfig,
    pretrained_embeddings: Optional[torch.Tensor] = None,
    device: Optional[torch.device] = None,
) -> FinancialTransformer:
    model = FinancialTransformer(config=config, pretrained_embeddings=pretrained_embeddings)
    if device is not None:
        model = model.to(device)
    return model



  <h1 style="color:#0F5298; font-family:serif; font-size:45px; margin-bottom:0px;">
    Generate Model
  </h1>

In [None]:
configs = generate_valid_model_configs(
    vocab_size=len(tokenizer),
    num_classes=3,
    max_len=MAX_LEN,
    pad_token_id=tokenizer.pad_token_id if tokenizer.pad_token_id is not None else 0
)

print(f"Total valid configs: {len(configs)}")
print("First config:", configs[0])

model = build_model_from_config(
    config=configs[0],
    pretrained_embeddings=pretrained_embeddings if "pretrained_embeddings" in globals() else None,
    device=device
)

print(f"Model params: {sum(p.numel() for p in model.parameters()):,}")


  <h1 style="color:#0F5298; font-family:serif; font-size:45px; margin-bottom:0px;">
    Training Configuration & Class Weights
  </h1>

In [None]:
NUM_EPOCHS = 15
LEARNING_RATE = 1e-4
WEIGHT_DECAY = 5e-4
LABEL_SMOOTHING = 0.03
BETAS = (0.9, 0.999)
EPS = 1e-8
WARMUP_RATIO = 0.1
MAX_GRAD_NORM = 1.0
EARLY_STOPPING_PATIENCE = 4
MIN_IMPROVEMENT = 5e-4
FREEZE_EMBED_EPOCHS = 2
SELECTION_METRIC = "macro_f1"
MAX_CONFIGS = 5

if "train_aug_df" in globals():
    y_train = train_aug_df["label"].astype(int).to_numpy()
elif "train_df" in globals():
    y_train = train_df["label"].astype(int).to_numpy()
elif "train_loader" in globals():
    labels_buffer = []
    for b in train_loader:
        labels_buffer.extend(b["labels"].cpu().numpy().tolist())
    y_train = np.array(labels_buffer, dtype=np.int64)
else:
    raise ValueError("No training labels found. Define train_df/train_aug_df or train_loader first.")

num_classes = 3
weights_np = compute_class_weight(class_weight="balanced", classes=np.arange(num_classes), y=y_train)
class_weights = torch.tensor(weights_np, dtype=torch.float32, device=device)

training_config = {
    "epochs": NUM_EPOCHS,
    "lr": LEARNING_RATE,
    "weight_decay": WEIGHT_DECAY,
    "label_smoothing": LABEL_SMOOTHING,
    "betas": BETAS,
    "eps": EPS,
    "warmup_ratio": WARMUP_RATIO,
    "max_grad_norm": MAX_GRAD_NORM,
    "early_stopping_patience": EARLY_STOPPING_PATIENCE,
    "min_improvement": MIN_IMPROVEMENT,
    "freeze_embed_epochs": FREEZE_EMBED_EPOCHS,
    "selection_metric": SELECTION_METRIC,
    "max_configs": MAX_CONFIGS,
}

print("Training config ready.")
print(training_config)
print("Class weights:", class_weights.detach().cpu().numpy())

  <h1 style="color:#0F5298; font-family:serif; font-size:45px; margin-bottom:0px;">
    Helpers
  </h1>

In [None]:
def _seed_all(seed: int = 42):
    import random
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)


def _get_train_labels():
    if "train_aug_df" in globals():
        return train_aug_df["label"].astype(int).to_numpy()
    if "train_df" in globals():
        return train_df["label"].astype(int).to_numpy()
    if "train_loader" in globals():
        labels = []
        for b in train_loader:
            labels.extend(b["labels"].cpu().numpy().tolist())
        return np.array(labels, dtype=np.int64)
    raise ValueError("No training labels found.")


def _build_scheduler(optimizer, num_epochs, steps_per_epoch, warmup_ratio):
    total_steps = max(1, num_epochs * steps_per_epoch)
    warmup_steps = int(warmup_ratio * total_steps)

    def lr_lambda(current_step):
        if current_step < warmup_steps:
            return float(current_step) / float(max(1, warmup_steps))
        progress = float(current_step - warmup_steps) / float(max(1, total_steps - warmup_steps))
        return max(0.0, 0.5 * (1.0 + math.cos(math.pi * progress)))

    return optim.lr_scheduler.LambdaLR(optimizer, lr_lambda=lr_lambda), total_steps, warmup_steps


def _set_token_embedding_trainable(model, trainable: bool):
    if hasattr(model, "token_embedding"):
        for p in model.token_embedding.parameters():
            p.requires_grad = trainable


def _train_one_epoch(model, loader, criterion, optimizer, scheduler, max_grad_norm, device):
    model.train()
    total_loss, total_correct, total_samples = 0.0, 0, 0

    for batch in loader:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)

        optimizer.zero_grad(set_to_none=True)
        logits = model(input_ids=input_ids, attention_mask=attention_mask)
        loss = criterion(logits, labels)
        loss.backward()

        if max_grad_norm is not None and max_grad_norm > 0:
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm)

        optimizer.step()
        if scheduler is not None:
            scheduler.step()

        bs = labels.size(0)
        total_loss += loss.item() * bs
        total_correct += (logits.argmax(dim=1) == labels).sum().item()
        total_samples += bs

    return total_loss / max(1, total_samples), total_correct / max(1, total_samples)

test_labels = []
test_preds = []
@torch.no_grad()
def _eval_one_epoch(model, loader, criterion, device):
    model.eval()
    total_loss, total_correct, total_samples = 0.0, 0, 0
    all_labels, all_preds = [], []

    for batch in loader:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)

        logits = model(input_ids=input_ids, attention_mask=attention_mask)
        loss = criterion(logits, labels)
        preds = logits.argmax(dim=1)

        bs = labels.size(0)
        total_loss += loss.item() * bs
        total_correct += (preds == labels).sum().item()
        total_samples += bs

        all_labels.extend(labels.detach().cpu().numpy().tolist())
        all_preds.extend(preds.detach().cpu().numpy().tolist())

        test_labels.extend(labels.detach().cpu().numpy().tolist())
        test_preds.extend(preds.detach().cpu().numpy().tolist())

    macro_f1 = f1_score(all_labels, all_preds, average="macro", zero_division=0)
    return total_loss / max(1, total_samples), total_correct / max(1, total_samples), macro_f1


def _is_improved(metric_name, current_loss, current_f1, best_loss, best_f1, min_improvement):
    if metric_name == "val_loss":
        return current_loss < (best_loss - min_improvement)
    if current_f1 > (best_f1 + min_improvement):
        return True
    if abs(current_f1 - best_f1) <= 1e-12 and current_loss < best_loss:
        return True
    return False