## Imports

In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
# Core imports
import os, json, random, math
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple

import numpy as np
import pandas as pd
import torch

from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix

from datasets import Dataset
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    TrainingArguments,
    Trainer,
)

import os

In [None]:
import gc, torch

gc.collect()
torch.cuda.empty_cache()

## Preliminaries

In [None]:
# -----------------------
# Config
# -----------------------
MODEL_NAME = "MoritzLaurer/DeBERTa-v3-large-mnli-fever-anli-ling-wanli"
LABELS = ["factual", "contradiction", "irrelevant"]

# TRAIN_PATH = "/content/train.json"
TEST_PATH  = "/content/test.json"
TRAIN_PATH = "/content/my_train.json"
NO_TRAIN_PATH = "/content/my_test.json"
VAL_PATH = "/content/my_val.json"


OUT_DIR = "outputs_kfold"
os.makedirs(OUT_DIR, exist_ok=True)

N_SPLITS = 5
SEED = 42

MAX_LENGTH = 256
LR = 2e-5
EPOCHS = 2
TRAIN_BATCH = 8
EVAL_BATCH = 16
WEIGHT_DECAY = 0.01
WARMUP_RATIO = 0.06

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", DEVICE)


In [None]:
def set_seed(seed: int = 42) -> None:
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed(SEED)


In [None]:
def load_json_records(path: str) -> List[Dict]:
    # supports list-of-dicts, or dict-wrapped list-of-dicts
    with open(path, "r", encoding="utf-8") as f:
        data = json.load(f)
    if isinstance(data, list):
        return data
    if isinstance(data, dict):
        for v in data.values():
            if isinstance(v, list):
                return v
    raise ValueError(f"Unsupported JSON format in {path}")

def normalize_label(x) -> str:
    return str(x).strip().lower()

def build_label_maps(labels: List[str]) -> Tuple[Dict[str, int], Dict[int, str]]:
    labels_norm = [normalize_label(l) for l in labels]
    label2id = {l:i for i,l in enumerate(labels_norm)}
    id2label = {i:l for l,i in label2id.items()}
    return label2id, id2label

label2id, id2label = build_label_maps(LABELS)
label2id, id2label


In [None]:
def prepare_dataframe(records: List[Dict], is_train: bool) -> pd.DataFrame:
    df = pd.DataFrame(records)

    # expected keys (most common in this competition)
    # - question, context, answer, type
    # but we make this robust to variations.
    if "answer" not in df.columns:
        raise ValueError(f"Missing 'answer' column. Found columns: {list(df.columns)}")

    # Build premise = context + question
    if "context" in df.columns:
        ctx = df["context"].fillna("").astype(str)
    elif "premise" in df.columns:
        ctx = df["premise"].fillna("").astype(str)
    else:
        raise ValueError(f"Missing 'context' (or 'premise') column. Found columns: {list(df.columns)}")

    if "question" in df.columns:
        q = df["question"].fillna("").astype(str)
    else:
        # fallback if question key is different
        # common alternates: 'query', 'prompt'
        for alt in ["query", "prompt"]:
            if alt in df.columns:
                q = df[alt].fillna("").astype(str)
                break
        else:
            # if no question exists, just use context as premise
            q = pd.Series([""] * len(df))

    df["premise_text"] = (ctx + "\n\nQuestion: " + q).str.strip()
    df["hypothesis_text"] = df["answer"].fillna("").astype(str)

    if is_train:
        if "type" not in df.columns:
            raise ValueError(f"Missing 'type' label column. Found columns: {list(df.columns)}")
        df["label"] = df["type"].apply(normalize_label).map(label2id)
        if df["label"].isna().any():
            bad = df[df["label"].isna()]["type"].unique()[:10]
            raise ValueError(f"Found unknown labels in train: {bad}. Expected {LABELS}")
        df["label"] = df["label"].astype(int)

    return df

train_records = load_json_records(TRAIN_PATH)
no_train_records = load_json_records(NO_TRAIN_PATH)
test_records  = load_json_records(TEST_PATH)
val_records = load_json_records(VAL_PATH)

train_df = prepare_dataframe(train_records, is_train=True)
no_train_df = prepare_dataframe(no_train_records, is_train=False)
test_df  = prepare_dataframe(test_records,  is_train=False)
val_df = prepare_dataframe(val_records, is_train=False)

train_df.head(2)


In [None]:
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

def tokenize_dataset(df: pd.DataFrame, with_labels: bool) -> Dataset:
    ds = Dataset.from_pandas(df.reset_index(drop=True))

    def _tok(batch):
        return tokenizer(
            batch["premise_text"],
            batch["hypothesis_text"],
            truncation=True,
            max_length=MAX_LENGTH,
        )

    remove_cols = [c for c in ds.column_names if c not in ["premise_text","hypothesis_text","label"]]
    ds = ds.map(_tok, batched=True, remove_columns=remove_cols)
    if with_labels:
        ds = ds.rename_column("label", "labels")
    ds.set_format(type="torch", columns=["input_ids","attention_mask"] + (["labels"] if with_labels else []))
    return ds


In [None]:
def make_model():
    model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME,
                                                               num_labels=len(LABELS),
        id2label=id2label,
        label2id=label2id,)

    try:
        model.gradient_checkpointing_enable(
            gradient_checkpointing_kwargs={"use_reentrant": False}
        )
    except TypeError:
        # older transformers: no kwargs support, fallback
        model.gradient_checkpointing_enable()

    model.config.use_cache = False

    return model


def compute_metrics(eval_pred):
    logits, y_true = eval_pred
    y_pred = np.argmax(logits, axis=-1)
    acc = accuracy_score(y_true, y_pred)
    macro_f1 = f1_score(y_true, y_pred, average="macro", zero_division=0)
    cm = confusion_matrix(y_true, y_pred, labels=list(range(len(LABELS))))
    return {
        "accuracy": float(acc),
        "macro_f1": float(macro_f1),
        "confusion_matrix": cm.tolist(),
    }

import inspect
from transformers import TrainingArguments

def trainer_args(run_name: str, out_dir: str) -> TrainingArguments:
    sig = inspect.signature(TrainingArguments.__init__)
    allowed = set(sig.parameters.keys())

    # Map arg name changes across transformers versions
    if "eval_strategy" in allowed and "evaluation_strategy" not in allowed:
        eval_key = "eval_strategy"
    else:
        eval_key = "evaluation_strategy"

    kwargs = dict(
        output_dir=out_dir,
        run_name=run_name,
        learning_rate=2e-5,
        per_device_train_batch_size=4,
        per_device_eval_batch_size=32,
        num_train_epochs=4,
        # num_train_epochs=4,
        weight_decay=0.01,
        warmup_ratio=0.06,
        fp16=True,

        # evaluation/checkpointing/logging
        **{eval_key: "epoch"},
        save_strategy="epoch",
        logging_strategy="steps",
        logging_steps=50,

        load_best_model_at_end=True,
        metric_for_best_model="macro_f1",
        greater_is_better=True,

        report_to="none",
        seed=42,
    )

    # Drop anything unsupported by this transformers version
    kwargs = {k: v for k, v in kwargs.items() if k in allowed}

    # If the version lacks run_name, drop it
    # (filter above already does this, but keeping comment for clarity)
    return TrainingArguments(**kwargs)


def predict_probs(trainer: Trainer, ds: Dataset, batch_size: int = 64):
    # Trainer.predict uses the trainer's args eval batch size, but we can override with a temp args if needed.
    preds = trainer.predict(ds)
    logits = preds.predictions
    probs = torch.softmax(torch.tensor(logits), dim=-1).cpu().numpy()
    pred_ids = probs.argmax(axis=-1)
    conf = probs.max(axis=-1)
    return pred_ids, conf, probs


## Train on train data



In [None]:
full_dir = os.path.join(OUT_DIR, "full_train")
os.makedirs(full_dir, exist_ok=True)

full_train_ds = tokenize_dataset(train_df, with_labels=True)

full_model = make_model()
full_args = TrainingArguments(
    output_dir=full_dir,
    learning_rate=LR,
    # num_train_epochs=EPOCHS,
    num_train_epochs=4,

    per_device_train_batch_size=TRAIN_BATCH,
    per_device_eval_batch_size=EVAL_BATCH,
    weight_decay=WEIGHT_DECAY,
    warmup_ratio=WARMUP_RATIO,
    eval_strategy="epoch",
    save_strategy="epoch",
    save_total_limit=4,
    logging_steps=50,
    report_to="none",
    fp16=torch.cuda.is_available(),
    seed=SEED,
)

full_trainer = Trainer(
    model=full_model,
    args=full_args,
    train_dataset=full_train_ds,
    eval_dataset=full_train_ds,
    tokenizer=tokenizer,
)
initial_metrics = full_trainer.evaluate()
initial_loss = initial_metrics["eval_loss"]
# full_trainer.train()

## Callback functions to record the loss at the end of every epoch

In [None]:
from transformers import TrainerCallback

class LossRecorderCallback(TrainerCallback):
    def __init__(self):
        self.epoch_losses = []

    def on_evaluate(self, args, state, control, metrics, **kwargs):
        if "eval_loss" in metrics:
            self.epoch_losses.append({
                "epoch": state.epoch,
                "eval_loss": metrics["eval_loss"]
            })

class SaveAtEpoch2Callback(TrainerCallback):
    def on_epoch_end(self, args, state, control, **kwargs):
        if int(state.epoch) == 2:
            control.should_save = True
        return control



## Training the model and saving the epoch losses

In [None]:
loss_cb = LossRecorderCallback()
save_cb = SaveAtEpoch2Callback()

full_trainer.add_callback(loss_cb)
full_trainer.add_callback(save_cb)

full_trainer.train()

with open(os.path.join(full_dir, "epoch_losses.json"), "w") as f:
    json.dump({
        "initial_loss": initial_loss,
        "epoch_losses": loss_cb.epoch_losses
    }, f, indent=2)


## Running the finetuned model on no_train_df (the 'test set') and val_df (the 'validation set')

In [None]:
# Predict on val_df
# val_df, no_train_df

train_pred_ids, train_conf, train_probs = predict_probs(full_trainer, tokenize_dataset(val_df, with_labels=False))

train_pred_df = no_train_df.copy()
train_pred_df["pred_id"] = train_pred_ids
train_pred_df["pred_label"] = [id2label[i] for i in train_pred_ids]
train_pred_df["pred_conf"] = train_conf
for i,lbl in enumerate(LABELS):
    train_pred_df[f"prob_{lbl}"] = train_probs[:, i]

train_pred_path = os.path.join(OUT_DIR, "epoch_4_val_predictions.csv")
train_pred_df.to_csv(train_pred_path, index=False)

## Running the finetuned model at different epochs on no_train_df and val_df for evaluation and testing purposes

In [None]:
from transformers import AutoModelForSequenceClassification, Trainer

ckpt_path = os.path.join(full_dir, "checkpoint-9600")

model_7275 = AutoModelForSequenceClassification.from_pretrained(
    ckpt_path,
    num_labels=len(LABELS)
)
from transformers import TrainingArguments, Trainer

infer_args = TrainingArguments(
    output_dir=full_dir,
    per_device_eval_batch_size=EVAL_BATCH,
    report_to="none",
    fp16=torch.cuda.is_available(),
)

trainer_7275 = Trainer(
    model=model_7275,
    args=infer_args,
    tokenizer=tokenizer,
)

no_train_ds = tokenize_dataset(no_train_df, with_labels=False)

pred_ids, conf, probs = predict_probs(trainer_7275, no_train_ds)
pred_df = no_train_df.copy()
pred_df["pred_id"] = pred_ids
pred_df["pred_label"] = [id2label[i] for i in pred_ids]
pred_df["pred_conf"] = conf

for i, lbl in enumerate(LABELS):
    pred_df[f"prob_{lbl}"] = probs[:, i]

pred_df.to_csv(
    os.path.join(OUT_DIR, "epoch_4_2_no_train_predictions.csv"),
    index=False
)