In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, f1_score
from transformers import (
    AutoTokenizer, AutoModelForSequenceClassification,
    get_scheduler
)
from torch.optim import AdamW
from tqdm import tqdm
import os
import numpy as np

In [None]:
# Select the targeted dataset

# First level datasets
# DATASET_MAP = {
#     "46985": "drive/MyDrive/WoS/original_dataset/46985_xydata_l1_l2.csv",
#     "11967": "drive/MyDrive/WoS/original_dataset/11967_xydata_l1_l2.csv",
#     "5736":  "drive/MyDrive/WoS/original_dataset/5736_xydata_l1_l2.csv"
# }

# Second level datasets
DATASET_MAP = {
    "46985": "drive/MyDrive/WoS/original_dataset/46985_xydata.csv",
    "11967": "drive/MyDrive/WoS/original_dataset/11967_xydata.csv",
    "5736":  "drive/MyDrive/WoS/original_dataset/5736_xydata.csv"
}

In [None]:
selected_dataset = "46985"   # select: "46985", "11967", "5736"

In [None]:
df = pd.read_csv(DATASET_MAP[selected_dataset])
print(f"Loaded dataset {selected_dataset} with shape {df.shape}")

num_labels = df["Y"].nunique() # num_labels = df["YL1"].nunique() # for first level
print("Number of labels:", num_labels)

In [None]:
# First level
# train_texts, test_texts, train_labels, test_labels = train_test_split(
#     df["X"], df["YL1"],
#     test_size=0.2,
#     random_state=42,
#     stratify=df["YL1"]
# )

# Second level
train_texts, test_texts, train_labels, test_labels = train_test_split(
    df["X"], df["Y"],
    test_size=0.2,
    random_state=42,
    stratify=df["Y"]
)

_, val_texts, _, val_labels = train_test_split(
    test_texts, test_labels,
    test_size=0.2,
    random_state=42,
    stratify=test_labels
)

In [None]:
print(f"Train size: {len(train_texts)}")
print(f"Validation size: {len(val_texts)}")
print(f"Test size: {len(test_texts)}")


In [None]:
MODEL_MAP = {
    "bert": "bert-base-uncased",
    "scibert": "allenai/scibert_scivocab_uncased",
    "biobert": "dmis-lab/biobert-base-cased-v1.2",
    "bluebert": "bionlp/bluebert_pubmed_uncased_L-24_H-1024_A-16"
}

In [None]:
selected_model = "scibert"   # choose: "bert", "scibert", "biobert", "bluebert"
model_name = MODEL_MAP[selected_model]
print(f"Using model: {model_name}")

In [None]:
tokenizer = AutoTokenizer.from_pretrained(model_name)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
def tokenize(texts):
    return tokenizer(
        list(texts), padding=True, truncation=True, max_length=256, return_tensors="pt"
    )

In [None]:
class TextDataset(Dataset):
    def __init__(self, texts, labels):
        encodings = tokenize(texts)
        self.encodings = encodings
        self.labels = torch.tensor(labels.values, dtype=torch.long)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        item = {key: val[idx] for key, val in self.encodings.items()}
        item["labels"] = self.labels[idx]
        return item

train_dataset = TextDataset(train_texts, train_labels)
val_dataset   = TextDataset(val_texts, val_labels)
test_dataset  = TextDataset(test_texts, test_labels)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader   = DataLoader(val_dataset, batch_size=16)
test_loader  = DataLoader(test_dataset, batch_size=16)

In [None]:
learning_rates = [2e-5, 5e-6, 2e-6, 1e-6]
patience = 7
epochs = 25
num_warmup_steps = 1e-4

best_overall_f1 = 0.0
best_model_path = "/content/drive/MyDrive/WoS/best_models"
os.makedirs(best_model_path, exist_ok=True)

for lr in learning_rates:
    print(f"\n=== Training with learning rate {lr} ===")
    model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=num_labels).to(device)
    optimizer = AdamW(model.parameters(), lr=lr, eps=1e-8)

    num_training_steps = epochs * len(train_loader)
    lr_scheduler = get_scheduler("linear", optimizer, num_warmup_steps=num_warmup_steps, num_training_steps=num_training_steps)

    best_val_f1 = 0.0
    patience_counter = 0
    local_best_path = f"{best_model_path}/{selected_model}_{selected_dataset}_lr{lr}"

    for epoch in range(epochs):
        # ---- Training ----
        model.train()
        loop = tqdm(train_loader, leave=True)
        for batch in loop:
            batch = {k: v.to(device) for k, v in batch.items()}
            outputs = model(**batch)
            loss = outputs.loss

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            lr_scheduler.step()

            loop.set_description(f"Epoch {epoch+1}/{epochs}")
            loop.set_postfix(loss=loss.item())

        # ---- Validation ----
        model.eval()
        val_preds, val_true = [], []
        with torch.no_grad():
            for batch in val_loader:
                batch = {k: v.to(device) for k, v in batch.items()}
                outputs = model(**batch)
                logits = outputs.logits
                val_preds.extend(torch.argmax(logits, dim=-1).cpu().numpy())
                val_true.extend(batch["labels"].cpu().numpy())

        micro_f1 = f1_score(val_true, val_preds, average="micro")
        print(f"Epoch {epoch+1} - Validation micro-F1: {micro_f1:.4f}")

        # Early stopping check
        if micro_f1 > best_val_f1:
            best_val_f1 = micro_f1
            patience_counter = 0
            model.save_pretrained(local_best_path)
            print(f"  New best model saved with micro-F1 {best_val_f1:.4f}")
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print("  Early stopping triggered.")
                break

    # Track the best overall model
    if best_val_f1 > best_overall_f1:
        best_overall_f1 = best_val_f1
        best_model_final_path = local_best_path

print(f"\n=== Best overall model: {best_model_final_path} with val micro-F1 {best_overall_f1:.4f} ===")


In [None]:
best_model = AutoModelForSequenceClassification.from_pretrained(best_model_final_path).to(device)
best_model.eval()

preds, true_labels = [], []
with torch.no_grad():
    for batch in test_loader:
        batch = {k: v.to(device) for k, v in batch.items()}
        outputs = best_model(**batch)
        logits = outputs.logits
        preds.extend(torch.argmax(logits, dim=-1).cpu().numpy())
        true_labels.extend(batch["labels"].cpu().numpy())

report_dict = classification_report(true_labels, preds, output_dict=True)
report_df = pd.DataFrame(report_dict).transpose()

In [None]:
report_path = f"/content/drive/MyDrive/WoS/{selected_model}_{selected_dataset}_classification_report.csv"
report_df.to_csv(report_path, index=True)

print("\n=== Test Set Report (Best Model) ===")
print(report_df)
print(f"\nClassification report saved to: {report_path}")