In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# @title Dataset
dataset_type = "Binary-dataset" # @param ["Binary-dataset", "Multi-class-dataset"]

In [None]:
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import accuracy_score, f1_score
import matplotlib.pyplot as plt

# Speedup on A100
torch.backends.cudnn.benchmark = True  

# ========= settings =========
DATA_DIR = "/content/drive/MyDrive/00-github/sentence-embedding-sensitivity/Data"
DATA_DIR = os.path.join(DATA_DIR, dataset_type)
OUT_DIR  = "/content/drive/MyDrive/00-github/sentence-embedding-sensitivity/Results"        
EPOCHS     = 100
BATCH_SIZE = 512
LR         = 1e-3
WEIGHT_DECAY = 1e-4
P_DROP     = 0.3


In [None]:
def get_device():
    if torch.cuda.is_available():
        print("CUDA is available:", torch.cuda.get_device_name(0))
        return torch.device("cuda")
    if hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
        print("MPS is available")
        return torch.device("mps")
    print("Using CPU")
    return torch.device("cpu")

# pattern: <dataset>_<model>_(train|test).npz
def parse_name(fname):
    base = os.path.basename(fname)[:-4]  # remove .npz
    prefix, split = base.rsplit("_", 1)
    dataset, model = prefix.split("_", 1)
    assert split in ("train", "test")
    return dataset, model, split

# Dataset class
class PairNPZ(Dataset):
    def __init__(self, path):
        d = np.load(path)
        self.e1 = d["embedding1"].astype(np.float32)
        self.e2 = d["embedding2"].astype(np.float32)
        self.y  = d["label"].astype(np.int64)
        assert self.e1.shape == self.e2.shape
        assert len(self.e1) == len(self.y)
        self.dim = self.e1.shape[1]

    def __len__(self): 
        return len(self.y)

    def __getitem__(self, i):
        return (
            torch.from_numpy(self.e1[i]),
            torch.from_numpy(self.e2[i]),
            torch.tensor(self.y[i], dtype=torch.long),
        )

# Model class
class PairMLP(nn.Module):
    def __init__(self, dim=768, p_drop=0.3, num_classes=2):
        super().__init__()
        self.fc1 = nn.Linear(dim*4, 1024)
        self.fc2 = nn.Linear(1024, 256)
        self.out = nn.Linear(256, num_classes)
        self.relu = nn.ReLU()
        self.drop = nn.Dropout(p_drop)

    def forward(self, e1, e2):
        x = torch.cat([e1, e2, torch.abs(e1-e2), e1*e2], dim=-1)
        x = self.drop(self.relu(self.fc1(x)))
        x = self.drop(self.relu(self.fc2(x)))
        return self.out(x)



In [None]:
# @title Evaluation function

@torch.no_grad()
def eval_epoch(model, loader, device):
    model.eval()
    ys, ps = [], []
    for e1, e2, y in loader:
        e1, e2 = e1.to(device), e2.to(device)
        logits = model(e1, e2)
        pred = logits.argmax(1).cpu().numpy()
        ys.append(y.numpy()); ps.append(pred)

    y_true = np.concatenate(ys)
    y_pred = np.concatenate(ps)
    num_classes = len(np.unique(y_true))
    acc = accuracy_score(y_true, y_pred)
    if num_classes == 2:
        f1_one  = f1_score(y_true, y_pred, pos_label=1, zero_division=0)
        f1_zero = f1_score(y_true, y_pred, pos_label=0, zero_division=0)
    else:
        f1_one = f1_score(y_true, y_pred, average='macro', zero_division=0)
        f1_zero = f1_score(y_true, y_pred, average='micro', zero_division=0)
    return acc, f1_one, f1_zero


In [None]:
# @title Training function
scaler = torch.cuda.amp.GradScaler()

def train_one(model, loader, optim, crit, device):
    model.train()
    ys, ps = [], []
    total = 0.0
    for e1, e2, y in loader:
        e1, e2, y = e1.to(device), e2.to(device), y.to(device)
        optim.zero_grad()
        with torch.cuda.amp.autocast():
            logits = model(e1, e2)
            loss = crit(logits, y)
        scaler.scale(loss).backward()
        scaler.step(optim)
        scaler.update()
        total += loss.item() * y.size(0)
        ps.append(logits.argmax(1).detach().cpu().numpy())
        ys.append(y.detach().cpu().numpy())

    y_true = np.concatenate(ys)
    y_pred = np.concatenate(ps)
    acc = accuracy_score(y_true, y_pred)
    return total/len(y_true), acc


In [None]:
# @title Pair function
def run_pair(train_path, test_path, dataset, model_name, device):
    train_ds = PairNPZ(train_path)
    test_ds  = PairNPZ(test_path)
    dim = train_ds.dim

    pin = device.type == "cuda"
    train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, pin_memory=pin)
    test_loader  = DataLoader(test_ds,  batch_size=BATCH_SIZE, shuffle=False, pin_memory=pin)
    if dataset in ["MRPC","QQP","PAWS"]:
        model = PairMLP(dim=dim, p_drop=P_DROP).to(device)
    else:
        model = PairMLP(dim=dim, p_drop=P_DROP, num_classes=5).to(device)
    optim = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=WEIGHT_DECAY)
    crit  = nn.CrossEntropyLoss()

    train_acc_hist, test_acc_hist = [], []
    best = {"acc": -1.0, "f1_one": 0.0, "f1_zero": 0.0, "epoch": -1}

    for epoch in range(1, EPOCHS+1):
        _, tr_acc = train_one(model, train_loader, optim, crit, device)
        te_acc, f1_one, f1_zero = eval_epoch(model, test_loader, device)
        train_acc_hist.append(tr_acc); test_acc_hist.append(te_acc)
        if te_acc > best["acc"]:
            best = {"acc": float(te_acc), "f1_one": float(f1_one), "f1_zero": float(f1_zero), "epoch": epoch}
        print(f"[{dataset} | {model_name}] epoch {epoch}: train_acc={tr_acc:.3f}  test_acc={te_acc:.3f}")

    os.makedirs(os.path.join(OUT_DIR, "images"), exist_ok=True)
    img_path = os.path.join(OUT_DIR, "images", f"{dataset}_{model_name}_acc.png")
    plt.figure()
    plt.plot(range(1, EPOCHS+1), train_acc_hist, label="train acc")
    plt.plot(range(1, EPOCHS+1), test_acc_hist,  label="test acc")
    plt.xlabel("epoch"); plt.ylabel("accuracy"); plt.legend(); plt.tight_layout()
    plt.title(f"{dataset} / {model_name} (best@{best['epoch']}: {best['acc']:.3f})")
    plt.savefig(img_path, dpi=160); plt.close()

    return {
        "dataset": dataset,
        "model": model_name,
        "acc": best["acc"],
        "f1_one": best["f1_one"],
        "f1_zero": best["f1_zero"],
        "plot_path": img_path,
    }

def collect_pairs(folder):
    files = [f for f in os.listdir(folder) if f.endswith(".npz")]
    buckets = {}
    for f in files:
        dset, model, split = parse_name(f)
        key = (dset, model)
        buckets.setdefault(key, {})[split] = os.path.join(folder, f)
    return {k: v for k, v in buckets.items() if "train" in v and "test" in v}



In [None]:
# @title Training and Evaluation

os.makedirs(OUT_DIR, exist_ok=True)
device = get_device()
print("Device:", device)
pairs = collect_pairs(DATA_DIR)
results = []
for (dset, model), splits in sorted(pairs.items()):
    print(f"\n============= Running {dset} | {model} =============")
    row = run_pair(splits["train"], splits["test"], dset, model, device)
    results.append(row)
df = pd.DataFrame(results, columns=["dataset","model","acc","f1_one","f1_zero","plot_path"])
print("\n=== RESULTS ===")
print(df)
csv_path = os.path.join(OUT_DIR, "results.csv")
if os.path.exists(csv_path):
    old_df = pd.read_csv(csv_path)
    df = pd.concat([old_df, df], ignore_index=True)
df.to_csv(csv_path, index=False)
