In [None]:
import pandas as pd
import json
import random
from sklearn.preprocessing import LabelEncoder
from datasets import Dataset
from transformers import RobertaTokenizer, RobertaForSequenceClassification, TrainingArguments, Trainer
from sklearn.metrics import classification_report, accuracy_score, f1_score
import pickle
import torch

### 1. Load and Prepare Data

In [None]:
with open("/content/esnli_split_with_highlight.jsonl", "r", encoding="utf-8") as f:
    data = [json.loads(line) for line in f if line.strip()]

df = pd.DataFrame(data)

df = df[df["explanation_category"].notnull()].sample(frac=1, random_state=42).reset_index(drop=True)

# # split data （80% train, 10% dev, 10% test)
# n = len(df)
# train_df = df.iloc[:int(0.8 * n)].copy()
# dev_df = df.iloc[int(0.8 * n):int(0.9 * n)].copy()
# test_df = df.iloc[int(0.9 * n):].copy()

# split data according to pairID
unique_pairs = df[["pairID", "premise", "hypothesis"]].drop_duplicates()
unique_pairs = unique_pairs.sample(frac=1, random_state=42).reset_index(drop=True)

split: 40/20/40
n = len(unique_pairs)
train_ids = unique_pairs.iloc[:int(0.4 * n)]["pairID"].tolist()
dev_ids = unique_pairs.iloc[int(0.4 * n):int(0.6 * n)]["pairID"].tolist()
test_ids = unique_pairs.iloc[int(0.6 * n):]["pairID"].tolist()

train_df = df[df["pairID"].isin(train_ids)].copy()
dev_df = df[df["pairID"].isin(dev_ids)].copy()
test_df = df[df["pairID"].isin(test_ids)].copy()

train_dist = train_df["explanation_category"].value_counts()
dev_dist   = dev_df["explanation_category"].value_counts()
test_dist  = test_df["explanation_category"].value_counts()

# split: 50/50
# n = len(unique_pairs)
# split_point = int(0.5 * n)
# train_ids = unique_pairs.iloc[:split_point]["pairID"].tolist()
# test_ids  = unique_pairs.iloc[split_point:]["pairID"].tolist()

# train_df = df[df["pairID"].isin(train_ids)].copy()
# test_df = df[df["pairID"].isin(test_ids)].copy()

# train_dist = train_df["explanation_category"].value_counts()
# test_dist  = test_df["explanation_category"].value_counts()

dist_df = pd.DataFrame({
    "Train": train_dist,
    "Development": dev_dist,
    "Test": test_dist,
}).fillna(0).astype(int)

print("\n📊 Explanation Category Distribution by Split:")
print(dist_df)

# label: explanation_category
le = LabelEncoder()
le.fit(df["explanation_category"])

train_df["label"] = le.transform(train_df["explanation_category"])
dev_df["label"] = le.transform(dev_df["explanation_category"])
test_df["label"] = le.transform(test_df["explanation_category"])

# input: Premise + Hypothesis + Gold Label + Explanation
def join_inputs(row):
    return f"Premise: {row['premise']}\nHypothesis: {row['hypothesis']}\nLabel: {row['gold_label']}\nExplanation: {row['explanation']}"

train_df["text"] = train_df.apply(join_inputs, axis=1)
dev_df["text"] = dev_df.apply(join_inputs, axis=1)
test_df["text"] = test_df.apply(join_inputs, axis=1)

train_dataset = Dataset.from_pandas(train_df[["text", "label"]])
dev_dataset = Dataset.from_pandas(dev_df[["text", "label"]])
test_dataset = Dataset.from_pandas(test_df[["text", "label"]])

print(f"Data split done:")
print(f"Train: {len(train_df)}")
print(f"Dev: {len(dev_df)}")
print(f"Test: {len(test_df)}")

## 2. Tokenization

In [None]:
tokenizer = RobertaTokenizer.from_pretrained("roberta-base")

def tokenize(batch):
    return tokenizer(batch["text"], padding="max_length", truncation=True, max_length=512)

train_dataset = train_dataset.map(tokenize, batched=True)
#dev_dataset = dev_dataset.map(tokenize, batched=True)
test_dataset = test_dataset.map(tokenize, batched=True)

In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer

In [None]:
model_name = "bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=len(le.classes_))

def tokenize(batch):
    return tokenizer(batch["text"], padding="max_length", truncation=True, max_length=512)

train_dataset = train_dataset.map(tokenize, batched=True)
#dev_dataset = dev_dataset.map(tokenize, batched=True)
test_dataset = test_dataset.map(tokenize, batched=True)

## 3. Model and Trainer Setup

In [None]:
model = RobertaForSequenceClassification.from_pretrained("roberta-base", num_labels=len(le.classes_))

training_args = TrainingArguments(
    output_dir = "./roberta-expl",
    #evaluation_strategy = "no",
    save_strategy = "no",
    learning_rate = 3e-5,
    per_device_train_batch_size = 8,
    per_device_eval_batch_size = 8,
    num_train_epochs = 3,
    weight_decay = 0.01,
    logging_dir = "./logs",
    load_best_model_at_end = False
)

def compute_metrics(pred):
    preds = pred.predictions.argmax(-1)
    labels = pred.label_ids
    return {
        "accuracy": accuracy_score(labels, preds),
        "f1": f1_score(labels, preds, average="macro")
    }

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=dev_dataset,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)


In [None]:
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=len(le.classes_))

training_args = TrainingArguments(
    output_dir="./roberta-expl",
    #evaluation_strategy="no",
    save_strategy="no",
    learning_rate=2e-5,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=4,
    weight_decay=0.01,
    logging_dir="./logs",
    load_best_model_at_end=True
)

def compute_metrics(pred):
    preds = pred.predictions.argmax(-1)
    labels = pred.label_ids
    return {
        "accuracy": accuracy_score(labels, preds),
        "f1": f1_score(labels, preds, average="macro")
    }

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    #eval_dataset=dev_dataset,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)


## 4. Train

In [None]:
trainer.train()

In [None]:
trainer.save_model("roberta-expl")
tokenizer.save_pretrained("roberta-expl")
with open("label_encoder_base.pkl", "wb") as f:
    pickle.dump(le, f)

In [None]:
preds = trainer.predict(test_dataset)
y_true = preds.label_ids
y_pred = preds.predictions.argmax(-1)

In [None]:
print("Final Evaluation on Held-out Test Set:")
print(classification_report(y_true, y_pred, labels=range(len(le.classes_)), target_names=le.classes_,  digits=3))