In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, f1_score
from transformers import (
    AutoTokenizer, AutoModelForSequenceClassification,
    get_scheduler
)
from torch.optim import AdamW
from tqdm import tqdm

In [None]:
DATASET_MAP = {
    "46985": "drive/MyDrive/WoS/original_dataset/46985_xydata.csv",
    "11967": "drive/MyDrive/WoS/original_dataset/11967_xydata.csv",
    "5736":  "drive/MyDrive/WoS/original_dataset/5736_xydata.csv"
}

In [None]:
selected_dataset = "5736"   # select: "46985", "11967", "5736"

In [None]:
df = pd.read_csv(DATASET_MAP[selected_dataset])
print(f"Loaded dataset {selected_dataset} with shape {df.shape}")

num_labels = df["Y"].nunique()
print("Number of labels:", num_labels)

In [None]:
train_texts, test_texts, train_labels, test_labels = train_test_split(
    df["X"], df["Y"],
    test_size=0.2,
    random_state=42,
    stratify=df["Y"]
)

_, val_texts, _, val_labels = train_test_split(
    test_texts, test_labels,
    test_size=0.2,
    random_state=42,
    stratify=test_labels
)

In [None]:
print(f"Train size: {len(train_texts)}")
print(f"Validation size: {len(val_texts)}")
print(f"Test size: {len(test_texts)}")

In [None]:
MODEL_MAP = {
    "bert": "bert-base-uncased",
    "scibert": "allenai/scibert_scivocab_uncased",
    "biobert": "dmis-lab/biobert-base-cased-v1.2",
    "bluebert": "bionlp/bluebert_pubmed_uncased_L-24_H-1024_A-16"
}

In [None]:
selected_model = "scibert"   # select: "bert", "scibert", "biobert", "bluebert"

In [None]:
model_name = MODEL_MAP[selected_model]
print(f"Using model: {model_name}")

tokenizer = AutoTokenizer.from_pretrained(model_name)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=num_labels).to(device)


In [None]:
def tokenize(texts):
    return tokenizer(
        list(texts), padding=True, truncation=True, max_length=256, return_tensors="pt"
    )

class TextDataset(Dataset):
    def __init__(self, texts, labels):
        encodings = tokenize(texts)
        self.encodings = encodings
        self.labels = torch.tensor(labels.values, dtype=torch.long)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        item = {key: val[idx] for key, val in self.encodings.items()}
        item["labels"] = self.labels[idx]
        return item

train_dataset = TextDataset(train_texts, train_labels)
val_dataset   = TextDataset(val_texts, val_labels)
test_dataset  = TextDataset(test_texts, test_labels)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader   = DataLoader(val_dataset, batch_size=16)
test_loader  = DataLoader(test_dataset, batch_size=16)

In [None]:
optimizer = AdamW(model.parameters(), lr=2e-5, eps=1e-8)
num_training_steps = 20 * len(train_loader)
num_warmup_steps = 1e-4
lr_scheduler = get_scheduler("linear", optimizer, num_warmup_steps=num_warmup_steps, num_training_steps=num_training_steps)

In [None]:
epochs = 20
for epoch in range(epochs):
    # ---- Training ----
    model.train()
    loop = tqdm(train_loader, leave=True)
    for batch in loop:
        batch = {k: v.to(device) for k, v in batch.items()}
        outputs = model(**batch)
        loss = outputs.loss

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        lr_scheduler.step()

        loop.set_description(f"Epoch {epoch+1}/{epochs}")
        loop.set_postfix(loss=loss.item())

    # ---- Validation ----
    model.eval()
    val_preds, val_true = [], []
    with torch.no_grad():
        for batch in val_loader:
            batch = {k: v.to(device) for k, v in batch.items()}
            outputs = model(**batch)
            logits = outputs.logits
            val_preds.extend(torch.argmax(logits, dim=-1).cpu().numpy())
            val_true.extend(batch["labels"].cpu().numpy())

    micro_f1 = f1_score(val_true, val_preds, average="micro")
    print(f"Epoch {epoch+1} - Validation micro-F1: {micro_f1:.4f}")

In [None]:
model.eval()
preds, true_labels = [], []
with torch.no_grad():
    for batch in test_loader:
        batch = {k: v.to(device) for k, v in batch.items()}
        outputs = model(**batch)
        logits = outputs.logits
        preds.extend(torch.argmax(logits, dim=-1).cpu().numpy())
        true_labels.extend(batch["labels"].cpu().numpy())

print("\n=== Test Set Report ===")
print(classification_report(true_labels, preds))