In [1]:
!pip install openpyxl



In [1]:
# import modules
import torch    

In [9]:
import numpy as np
import json
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, Subset
from torch.nn.utils.rnn import pad_sequence
import os
import csv
from modules_modified import ISAB, SAB, PMA
import pandas as pd
import openpyxl

from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight

In [10]:
# import models
from model import (
    SetTransformerClassifierXY,
    SetTransformerClassifierXYAdditive,
    SetTransformerClassifier,
    DeepSetClassifierXYAdditive,
    DeepSetClassifierXY,
    DeepSetClassifier,
    SetTransformerOrdinalXY,
    SetTransformerOrdinalXYAdditive,
    SetTransformerOrdinal,
    DeepSetOrdinalXYAdditive,
    DeepSetOrdinalXY,
    DeepSetOrdinal,
)
from utils_ordinal import ordinal_logistic_loss, cumulative_to_labels, threshold_accuracy


In [11]:
# Mappings --------------------------------------------------------
# Map each hold like "A1"…"K18" to an integer 0…(11*18−1)=197
cols = [chr(c) for c in range(ord('A'), ord('K')+1)]
rows = list(range(1, 19))
hold_to_idx = {f"{c}{r}": i for i, (c, r) in enumerate((c, r) for r in rows for c in cols)}


# Map grades "V4"…"V11" 
grade_to_label = {f"V{i}": i - 4 for i in range(4, 12)}  
label_to_grade = {v: k for k, v in grade_to_label.items()}
print(hold_to_idx)

{'A1': 0, 'B1': 1, 'C1': 2, 'D1': 3, 'E1': 4, 'F1': 5, 'G1': 6, 'H1': 7, 'I1': 8, 'J1': 9, 'K1': 10, 'A2': 11, 'B2': 12, 'C2': 13, 'D2': 14, 'E2': 15, 'F2': 16, 'G2': 17, 'H2': 18, 'I2': 19, 'J2': 20, 'K2': 21, 'A3': 22, 'B3': 23, 'C3': 24, 'D3': 25, 'E3': 26, 'F3': 27, 'G3': 28, 'H3': 29, 'I3': 30, 'J3': 31, 'K3': 32, 'A4': 33, 'B4': 34, 'C4': 35, 'D4': 36, 'E4': 37, 'F4': 38, 'G4': 39, 'H4': 40, 'I4': 41, 'J4': 42, 'K4': 43, 'A5': 44, 'B5': 45, 'C5': 46, 'D5': 47, 'E5': 48, 'F5': 49, 'G5': 50, 'H5': 51, 'I5': 52, 'J5': 53, 'K5': 54, 'A6': 55, 'B6': 56, 'C6': 57, 'D6': 58, 'E6': 59, 'F6': 60, 'G6': 61, 'H6': 62, 'I6': 63, 'J6': 64, 'K6': 65, 'A7': 66, 'B7': 67, 'C7': 68, 'D7': 69, 'E7': 70, 'F7': 71, 'G7': 72, 'H7': 73, 'I7': 74, 'J7': 75, 'K7': 76, 'A8': 77, 'B8': 78, 'C8': 79, 'D8': 80, 'E8': 81, 'F8': 82, 'G8': 83, 'H8': 84, 'I8': 85, 'J8': 86, 'K8': 87, 'A9': 88, 'B9': 89, 'C9': 90, 'D9': 91, 'E9': 92, 'F9': 93, 'G9': 94, 'H9': 95, 'I9': 96, 'J9': 97, 'K9': 98, 'A10': 99, 'B10': 1

In [12]:
# Holds difficulty data --------------------------------------------------------
hold_difficulty = {}
with open("data/hold_difficulty.txt", "r") as f:
    for line in f:
        if ":" not in line:
            continue  # skip malformed line
        hold, rest = line.strip().split(":", 1)
        parts = rest.strip().split(",")
        difficulty = int(parts[0].strip())
        types = [t.strip() for t in parts[1:]]
        hold_difficulty[hold.strip()] = (difficulty, types)
    print("successfully parsed hold difficulty file")

# prepare type vocabulary
unique_types = set()
for _, (_, types) in hold_difficulty.items():
    unique_types.update(types)

type_to_idx = {t: i for i, t in enumerate(sorted(unique_types))}
print(f"successfully prepare type vocabulary")

successfully parsed hold difficulty file
successfully prepare type vocabulary


In [13]:
# assign x,y position to each holds -------------------------------
import string

# Board columns A–K → indices 0–10
cols = list(string.ascii_uppercase[:11])  # A–K
# Rows 1–18 → indices 0–17
rows = list(range(1, 19))  # 1–18

# Generate hold_to_coord dictionary
hold_to_coord = {}

for x, col in enumerate(cols):
    for y, row in enumerate(rows):
        hold_name = f"{col}{row}"
        hold_to_coord[hold_name] = (x, y)

print("successfully created (x,y) position to each hold:")
print(hold_to_coord)

successfully created (x,y) position to each hold:
{'A1': (0, 0), 'A2': (0, 1), 'A3': (0, 2), 'A4': (0, 3), 'A5': (0, 4), 'A6': (0, 5), 'A7': (0, 6), 'A8': (0, 7), 'A9': (0, 8), 'A10': (0, 9), 'A11': (0, 10), 'A12': (0, 11), 'A13': (0, 12), 'A14': (0, 13), 'A15': (0, 14), 'A16': (0, 15), 'A17': (0, 16), 'A18': (0, 17), 'B1': (1, 0), 'B2': (1, 1), 'B3': (1, 2), 'B4': (1, 3), 'B5': (1, 4), 'B6': (1, 5), 'B7': (1, 6), 'B8': (1, 7), 'B9': (1, 8), 'B10': (1, 9), 'B11': (1, 10), 'B12': (1, 11), 'B13': (1, 12), 'B14': (1, 13), 'B15': (1, 14), 'B16': (1, 15), 'B17': (1, 16), 'B18': (1, 17), 'C1': (2, 0), 'C2': (2, 1), 'C3': (2, 2), 'C4': (2, 3), 'C5': (2, 4), 'C6': (2, 5), 'C7': (2, 6), 'C8': (2, 7), 'C9': (2, 8), 'C10': (2, 9), 'C11': (2, 10), 'C12': (2, 11), 'C13': (2, 12), 'C14': (2, 13), 'C15': (2, 14), 'C16': (2, 15), 'C17': (2, 16), 'C18': (2, 17), 'D1': (3, 0), 'D2': (3, 1), 'D3': (3, 2), 'D4': (3, 3), 'D5': (3, 4), 'D6': (3, 5), 'D7': (3, 6), 'D8': (3, 7), 'D9': (3, 8), 'D10': (3, 9), '

In [14]:
class MoonBoardDataset(Dataset):
    def __init__(self, json_path, hold_to_idx, grade_to_label, hold_difficulty, type_to_idx, hold_to_coord, max_difficulty=10):
        self.hold_to_idx = hold_to_idx
        self.grade_to_label = grade_to_label
        self.hold_difficulty = hold_difficulty
        self.type_to_idx = type_to_idx
        self.hold_to_coord = hold_to_coord
        self.max_difficulty = max_difficulty

        with open(json_path, 'r') as f:
            self.raw = [json.loads(line) for line in f]

    def __len__(self):
        return len(self.raw)

    def __getitem__(self, idx):
        item = self.raw[idx]
        holds = item['holds']

        hold_idxs = []
        diff_values = []
        type_vecs = []
        xy_coords = []

        for h in holds:
            hold_idxs.append(self.hold_to_idx[h])

            difficulty, types = self.hold_difficulty[h]
            diff_values.append(difficulty / self.max_difficulty)

            # multi-hot vector
            type_vec = torch.zeros(len(self.type_to_idx), dtype=torch.float)
            for t in types:
                if t in self.type_to_idx:
                    type_vec[self.type_to_idx[t]] = 1.0
            type_vecs.append(type_vec)

            # normalized (x, y)
            x, y = self.hold_to_coord[h]
            xy_coords.append(torch.tensor([x / 10.0, y / 17.0], dtype=torch.float))

        return {
            "indices": torch.tensor(hold_idxs, dtype=torch.long),
            "difficulty": torch.tensor(diff_values, dtype=torch.float),
            "type": torch.stack(type_vecs),       # (N, T)
            "xy": torch.stack(xy_coords)          # (N, 2)
        }, torch.tensor(self.grade_to_label[item['grade']], dtype=torch.long)


In [15]:
# Training loop ------------------------------------------------

# --- Set Hyperparameters ---
json_path = './data/cleaned_moonboard2024_grouped.json'
embed_dim = 64
batch_size = 16
lr = 1e-4
epochs = 20
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

XY_MODELS = {
    'set_transformer_xy',
    'set_transformer_additive',
    'deepset_xy',
    'deepset_xy_additive',
    'set_transformer_ordinal_xy',
    'set_transformer_ordinal_xy_additive',
    'deepset_ordinal_xy',
    'deepset_ordinal_xy_additive',
}

ORDINAL_MODELS = {
    'set_transformer_ordinal',
    'set_transformer_ordinal_xy',
    'set_transformer_ordinal_xy_additive',
    'deepset_ordinal',
    'deepset_ordinal_xy',
    'deepset_ordinal_xy_additive',
}

# --- Collate Function Factory ---
def make_collate_fn(model_type):
    def collate_fn(batch):
        X_indices = [x['indices'] for x, _ in batch]
        X_difficulty = [x['difficulty'] for x, _ in batch]
        X_type = [x['type'] for x, _ in batch]
        y_batch = [y for _, y in batch]

        X_indices = pad_sequence(X_indices, batch_first=True)
        X_difficulty = pad_sequence(X_difficulty, batch_first=True)
        X_type = pad_sequence(X_type, batch_first=True)
        y_tensor = torch.stack(y_batch)

        if model_type in XY_MODELS:
            X_xy = [x['xy'] for x, _ in batch]
            X_xy = pad_sequence(X_xy, batch_first=True)
            return (X_indices, X_difficulty, X_type, X_xy), y_tensor
        else:
            return (X_indices,), y_tensor
    return collate_fn

# --- Dataset Loader ---
def load_dataset(json_path, hold_to_idx, grade_to_label, hold_difficulty, type_to_idx, hold_to_coord):
    return MoonBoardDataset(json_path, hold_to_idx, grade_to_label, hold_difficulty, type_to_idx, hold_to_coord)

# --- DataLoader Preparation ---
def prepare_dataloaders(dataset, grade_to_label, batch_size, collate_fn):
    targets = [grade_to_label[item['grade']] for item in dataset.raw]
    class_weights = compute_class_weight(class_weight='balanced', classes=np.unique(targets), y=targets)
    class_weights = torch.tensor(class_weights, dtype=torch.float).to(device)

    train_idx, val_idx = train_test_split(
        list(range(len(dataset))), test_size=0.2, stratify=targets, random_state=42
    )

    train_data = Subset(dataset, train_idx)
    val_data = Subset(dataset, val_idx)

    train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
    val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
    return train_loader, val_loader, class_weights, train_idx, val_idx

# --- Training ---
def train_model(model, train_loader, val_loader, criterion, optimizer, epochs, is_ordinal=False):
    for epoch in range(1, epochs + 1):
        model.train()
        total_loss = 0
        for X, y in train_loader:
            inputs = tuple(x.to(device) for x in X)
            y = y.to(device)
            payload = inputs[0] if len(inputs) == 1 else inputs
            outputs = model(payload)
            if is_ordinal:
                probs, logits = outputs
                loss = criterion(logits, y)
            else:
                logits = outputs
                loss = criterion(logits, y)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Epoch {epoch:02d} — loss: {total_loss / len(train_loader):.4f}")
    return model

# --- Main Per Model ---
def main(model_type):
    dataset = load_dataset(json_path, hold_to_idx, grade_to_label, hold_difficulty, type_to_idx, hold_to_coord)
    targets = [grade_to_label[item['grade']] for item in dataset.raw]
    num_classes = len(np.unique(targets))
    vocab_size = len(hold_to_idx)
    type_vec_dim = len(type_to_idx)
    is_ordinal = model_type in ORDINAL_MODELS

    if model_type == 'set_transformer':
        ModelClass = SetTransformerClassifier
        kwargs = dict(vocab_size=vocab_size, dim_in=embed_dim, num_classes=num_classes)
    elif model_type == 'set_transformer_xy':
        ModelClass = SetTransformerClassifierXY
        kwargs = dict(vocab_size=vocab_size, dim_in=embed_dim, num_classes=num_classes, type_vec_dim=type_vec_dim)
    elif model_type == 'set_transformer_additive':
        ModelClass = SetTransformerClassifierXYAdditive
        kwargs = dict(vocab_size=vocab_size, feat_dim=embed_dim, num_classes=num_classes, type_vec_dim=type_vec_dim)
    elif model_type == 'deepset':
        ModelClass = DeepSetClassifier
        kwargs = dict(vocab_size=vocab_size, dim_in=embed_dim, num_classes=num_classes)
    elif model_type == 'deepset_xy':
        ModelClass = DeepSetClassifierXY
        kwargs = dict(vocab_size=vocab_size, dim_in=embed_dim, num_classes=num_classes, type_vec_dim=type_vec_dim)
    elif model_type == 'deepset_xy_additive':
        ModelClass = DeepSetClassifierXYAdditive
        kwargs = dict(vocab_size=vocab_size, feat_dim=embed_dim, num_classes=num_classes, type_vec_dim=type_vec_dim)
    elif model_type == 'set_transformer_ordinal':
        ModelClass = SetTransformerOrdinal
        kwargs = dict(vocab_size=vocab_size, dim_in=embed_dim, num_classes=num_classes)
    elif model_type == 'set_transformer_ordinal_xy':
        ModelClass = SetTransformerOrdinalXY
        kwargs = dict(vocab_size=vocab_size, dim_in=embed_dim, num_classes=num_classes, type_vec_dim=type_vec_dim)
    elif model_type == 'set_transformer_ordinal_xy_additive':
        ModelClass = SetTransformerOrdinalXYAdditive
        kwargs = dict(vocab_size=vocab_size, feat_dim=embed_dim, num_classes=num_classes, type_vec_dim=type_vec_dim)
    elif model_type == 'deepset_ordinal':
        ModelClass = DeepSetOrdinal
        kwargs = dict(vocab_size=vocab_size, dim_in=embed_dim, num_classes=num_classes)
    elif model_type == 'deepset_ordinal_xy':
        ModelClass = DeepSetOrdinalXY
        kwargs = dict(vocab_size=vocab_size, dim_in=embed_dim, num_classes=num_classes, type_vec_dim=type_vec_dim)
    elif model_type == 'deepset_ordinal_xy_additive':
        ModelClass = DeepSetOrdinalXYAdditive
        kwargs = dict(vocab_size=vocab_size, feat_dim=embed_dim, num_classes=num_classes, type_vec_dim=type_vec_dim)
    else:
        raise ValueError(f"Unknown model_type: {model_type}")

    collate_fn = make_collate_fn(model_type)
    train_loader, val_loader, class_weights, train_idx, val_idx = prepare_dataloaders(dataset, grade_to_label, batch_size, collate_fn)

    model = ModelClass(**kwargs).to(device)
    model.is_ordinal = is_ordinal
    model.num_classes = num_classes

    if is_ordinal:
        def criterion_fn(logits, targets):
            return ordinal_logistic_loss(logits, targets)
    else:
        criterion_fn = torch.nn.CrossEntropyLoss(weight=class_weights)

    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    model = train_model(model, train_loader, val_loader, criterion_fn, optimizer, epochs, is_ordinal=is_ordinal)
    return train_loader, val_loader, model, dataset, train_idx, val_idx



In [16]:
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix
from openpyxl import load_workbook
from openpyxl.drawing.image import Image as XLImage
import csv

# --- plot confusion matrix and save to excel---
def save_confusion_matrix_to_excel(y_true, y_pred, class_labels, model_name, excel_path):
    # Plot confusion matrix and save as image
    cm = confusion_matrix(y_true, y_pred, labels=range(len(class_labels)), normalize='true')
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt=".2f", cmap="Blues", xticklabels=class_labels, yticklabels=class_labels)
    plt.title(f"Confusion Matrix: {model_name}")
    plt.xlabel("Predicted Grade")
    plt.ylabel("Actual Grade")
    plt.tight_layout()
    img_path = f"result/confusion_{model_name}.png"
    plt.savefig(img_path)
    plt.close()

    # Insert image into Excel (new sheet per model)
    wb = load_workbook(excel_path)
    if model_name in wb.sheetnames:
        ws = wb[model_name]
    else:
        ws = wb.create_sheet(title=model_name)
    img = XLImage(img_path)
    ws.add_image(img, "A1")
    wb.save(excel_path)
    print(f"Confusion matrix for {model_name} saved and inserted into {excel_path} (sheet: {model_name})")

# --- export the predictions to excel ---
def _update_outlier_excel(df_all_preds, outlier_filename="result/outlier.xlsx", sheet_name="outliers", threshold=3):
    """
    From a DataFrame with columns [problem_name, y_true, y_pred, diff],
    keep rows where abs(diff) > threshold and aggregate per problem_name:
        - count = number of times flagged
        - y_true = mode (most frequent true label)
        - y_pred_avg = average predicted label across occurrences
    Save to outlier.xlsx.
    """
    # Filter outliers
    outliers = df_all_preds.loc[df_all_preds["diff"].abs() > threshold,
                                ["problem_name", "y_true", "y_pred"]]
    if outliers.empty:
        print(f"No outliers (abs(diff) > {threshold}). Skipped creating outlier.xlsx.")
        return

    # Group & aggregate
    grouped = (outliers
               .groupby("problem_name")
               .agg(
                   count=("problem_name", "size"),
                   y_true=("y_true", lambda x: x.mode().iat[0] if not x.mode().empty else x.iloc[0]),
                   y_pred_avg=("y_pred", lambda x: round(pd.to_numeric(x, errors="coerce").mean(), 2))
               )
               .reset_index())

    # If a previous file exists, merge and accumulate counts
    if os.path.exists(outlier_filename):
        try:
            existing = pd.read_excel(outlier_filename, sheet_name=sheet_name)
            if set(existing.columns) >= {"problem_name", "count", "y_true", "y_pred_avg"}:
                merged = pd.concat([existing, grouped], ignore_index=True)
                # Re-aggregate: sum counts, keep most common y_true, recompute y_pred_avg
                grouped = (merged
                           .groupby("problem_name")
                           .agg(
                               count=("count", "sum"),
                               y_true=("y_true", lambda x: x.mode().iat[0] if not x.mode().empty else x.iloc[0]),
                               y_pred_avg=("y_pred_avg", "mean")
                           )
                           .reset_index())
            # else keep grouped as new
        except Exception:
            pass

    # Save
    with pd.ExcelWriter(outlier_filename, engine="openpyxl", mode="w") as writer:
        grouped.to_excel(writer, sheet_name=sheet_name, index=False)

    print(f"Outliers saved to: {os.path.abspath(outlier_filename)}")


def export_predictions_to_excel(model, dataloader, device, grade_to_label, excel_path, sheet_name):
    results = []
    raw_dataset = dataloader.dataset.dataset  # MoonBoardDataset
    indices = dataloader.dataset.indices      # Subset indices
    label_to_grade = {v: k for k, v in grade_to_label.items()}
    current_index = 0

    model.eval()
    with torch.no_grad():
        for X, y in dataloader:
            if isinstance(X, tuple):
                inputs = tuple(x.to(device) for x in X)
                payload = inputs[0] if len(inputs) == 1 else inputs
            else:
                payload = X.to(device)
            outputs = model(payload)
            if isinstance(outputs, tuple):
                probs, logits = outputs
                preds_tensor = cumulative_to_labels(probs)
            else:
                preds_tensor = outputs.argmax(dim=1)
            y = y.to(device)
            preds_cpu = preds_tensor.detach().cpu()
            y_cpu = y.detach().cpu()
            for i in range(y_cpu.size(0)):
                real_label = int(y_cpu[i].item())
                pred_label = int(preds_cpu[i].item())
                dataset_index = indices[current_index]
                current_index += 1
                raw_item = raw_dataset.raw[dataset_index]
                problem_name = raw_item.get('problem_name', f"problem_{dataset_index}")
                results.append({
                    "problem_name": problem_name,
                    "y_true": real_label,  # keep numeric for averaging/aggregation
                    "y_pred": pred_label,
                    "diff": real_label - pred_label
                })

    df = pd.DataFrame(results)

    # 1) Save all predictions into your main Excel file
    with pd.ExcelWriter(excel_path, engine="openpyxl", mode="a", if_sheet_exists="replace") as writer:
        # Convert numeric labels back to grade strings for readability
        df_out = df.copy()
        df_out["y_true"] = df_out["y_true"].map(lambda x: label_to_grade.get(x, f"Unknown({x})"))
        df_out["y_pred"] = df_out["y_pred"].map(lambda x: label_to_grade.get(x, f"Unknown({x})"))
        df_out.to_excel(writer, sheet_name=sheet_name, index=False)
    print(f"Predictions for {sheet_name} exported to: {excel_path}")

    # 2) Create/update outlier.xlsx (problem_name, count, y_true, y_pred_avg)
    _update_outlier_excel(df, outlier_filename="result/outlier.xlsx", sheet_name="outliers", threshold=3)


# --- compute training and validation accuracy ---
def compute_accuracy(model, dataloader, device):
    strict_correct, loose_correct, total = 0, 0, 0
    y_true, y_pred = [], []
    model.eval()
    with torch.no_grad():
        for X, y in dataloader:
            X = tuple(x.to(device) for x in X)
            y = y.to(device)
            if len(X) == 1:
                preds = model(X[0]).argmax(dim=1)
            else:
                preds = model(X).argmax(dim=1)
            total += y.size(0)
            strict_correct += (preds == y).sum().item()
            loose_correct += ((preds - y).abs() <= 1).sum().item()
            y_true.extend(y.cpu().numpy())
            y_pred.extend(preds.cpu().numpy())
    strict_acc = 100.0 * strict_correct / total
    loose_acc = 100.0 * loose_correct / total
    return strict_acc, loose_acc, y_true, y_pred

def log_accuracy_to_csv(model_type, train_strict_acc, train_loose_acc, val_strict_acc, val_loose_acc, csv_path="result/accuracy.csv"):
    file_exists = os.path.isfile(csv_path)
    with open(csv_path, mode='a', newline='') as csvfile:
        writer = csv.writer(csvfile)
        if not file_exists:
            writer.writerow([
                "Model Type",
                "Train Strict Accuracy (%)",
                "Train ±1 Grade Accuracy (%)",
                "Val Strict Accuracy (%)",
                "Val ±1 Grade Accuracy (%)"
            ])
        writer.writerow([
            model_type,
            round(train_strict_acc, 2),
            round(train_loose_acc, 2),
            round(val_strict_acc, 2),
            round(val_loose_acc, 2)
        ])


def compare_models():
    model_types = [
        "set_transformer",
        "deepset",
        "set_transformer_xy",
        "deepset_xy",
        "set_transformer_additive",
        "deepset_xy_additive"
    ]
    results = []
    excel_path = "result/model_comparison_results.xlsx"
    class_labels = [f"V{i}" for i in range(4, 12)]

    for idx, mtype in enumerate(model_types):
        print(f"\n===== Training {mtype} =====")
        train_loader, val_loader, model, dataset, train_idx, val_idx = main(mtype)

        # Compute training accuracy
        train_strict_acc, train_loose_acc, _, _ = compute_accuracy(model, train_loader, device)
        # Compute validation accuracy
        val_strict_acc, val_loose_acc, y_true, y_pred = compute_accuracy(model, val_loader, device)

        # log to CSV
        log_accuracy_to_csv(mtype, train_strict_acc, train_loose_acc, val_strict_acc, val_loose_acc)
        
        results.append({
            "Model Type": mtype,
            "Train Strict Accuracy (%)": round(train_strict_acc, 2),
            "Train ±1 Grade Accuracy (%)": round(train_loose_acc, 2),
            "Val Strict Accuracy (%)": round(val_strict_acc, 2),
            "Val ±1 Grade Accuracy (%)": round(val_loose_acc, 2)
        })

        # Save summary table on first iteration (so Excel file exists)
        if idx == 0:
            df_results = pd.DataFrame(results)
            df_results.to_excel(excel_path, index=False)

        # Save confusion matrix to Excel
        save_confusion_matrix_to_excel(y_true, y_pred, class_labels, mtype, excel_path)

        # Export predictions to Excel (new sheet per model)
        export_predictions_to_excel(model, val_loader, device, grade_to_label, excel_path, sheet_name=f"{mtype}_preds")

    # Save summary table again at the end (with all models)
    df_results = pd.DataFrame(results)
    with pd.ExcelWriter(excel_path, engine="openpyxl", mode="a", if_sheet_exists="replace") as writer:
        df_results.to_excel(writer, sheet_name="Summary", index=False)
    print("\n=== Model Comparison Summary ===")
    print(df_results)

# usage
for i in range(1):
    compare_models()





===== Training set_transformer =====
Epoch 01 — loss: 1.7587
Epoch 02 — loss: 1.6138
Epoch 03 — loss: 1.5777
Epoch 04 — loss: 1.5432
Epoch 05 — loss: 1.5168
Epoch 06 — loss: 1.4786
Epoch 07 — loss: 1.4544
Epoch 08 — loss: 1.4270
Epoch 09 — loss: 1.3925
Epoch 10 — loss: 1.3617
Epoch 11 — loss: 1.3321
Epoch 12 — loss: 1.3078
Epoch 13 — loss: 1.2692
Epoch 14 — loss: 1.2354
Epoch 15 — loss: 1.2097
Epoch 16 — loss: 1.1642
Epoch 17 — loss: 1.1484
Epoch 18 — loss: 1.1060
Epoch 19 — loss: 1.0737
Epoch 20 — loss: 1.0235
Confusion matrix for set_transformer saved and inserted into result/model_comparison_results.xlsx (sheet: set_transformer)
Predictions for set_transformer_preds exported to: result/model_comparison_results.xlsx
Outliers saved to: /Users/patrickdharma/Desktop/university/卒業課題/my_models/grade_predictor/result/outlier.xlsx

===== Training deepset =====
Epoch 01 — loss: 1.8994
Epoch 02 — loss: 1.6313
Epoch 03 — loss: 1.5484
Epoch 04 — loss: 1.5198
Epoch 05 — loss: 1.5111
Epoch 06 — 

In [21]:
# Ordinal evaluation helpers
def evaluate_ordinal_thresholds(model, loader, grade_to_label, device, decision_threshold=0.5, model_name=None, output_dir='./result'):
    model.eval()
    probs_list = []
    targets_list = []
    with torch.no_grad():
        for X, y in loader:
            inputs = tuple(x.to(device) for x in X)
            y = y.to(device)
            payload = inputs[0] if len(inputs) == 1 else inputs
            outputs = model(payload)
            if not isinstance(outputs, tuple):
                raise ValueError('Model is not configured for ordinal outputs.')
            probs, logits = outputs
            probs_list.append(probs.cpu())
            targets_list.append(y.cpu())
    if not probs_list:
        raise ValueError('No samples available for ordinal evaluation.')
    probs = torch.cat(probs_list, dim=0)
    targets = torch.cat(targets_list, dim=0)
    acc_per_threshold = threshold_accuracy(probs, targets, threshold=decision_threshold).cpu()
    grade_by_label = {v: k for k, v in grade_to_label.items()}
    threshold_labels = []
    for idx in range(acc_per_threshold.size(0)):
        grade = grade_by_label.get(idx, f'label_{idx}')
        threshold_labels.append(f"P(>{grade})")
    df = pd.DataFrame({
        'threshold': threshold_labels,
        'accuracy': (acc_per_threshold.numpy() * 100).round(2)
    })
    overall_pred = cumulative_to_labels(probs, threshold=decision_threshold)
    overall_acc = (overall_pred == targets).float().mean().item() * 100
    if model_name:
        os.makedirs(output_dir, exist_ok=True)
        output_path = os.path.join(output_dir, f'ordinal_metrics_{model_name}.csv')
        df.to_csv(output_path, index=False)
        print(f'Saved threshold table to {output_path}')
    print(df)
    print(f'Overall accuracy: {overall_acc:.2f}%')
    return df, overall_acc



In [22]:
# --- Ordinal variants sweep ---
ordinal_model_types = [
    'set_transformer_ordinal',
    'set_transformer_ordinal_xy',
    'set_transformer_ordinal_xy_additive',
    'deepset_ordinal',
    'deepset_ordinal_xy',
    'deepset_ordinal_xy_additive',
]

ordinal_tables = []
ordinal_summary = []

for model_key in ordinal_model_types:
    print(f"=== Training ordinal model: {model_key} ===")
    train_loader, val_loader, model, dataset, train_idx, val_idx = main(model_key)
    table, overall_acc = evaluate_ordinal_thresholds(
        model,
        val_loader,
        grade_to_label=grade_to_label,
        device=device,
        decision_threshold=0.5,
        model_name=model_key
    )
    table = table.copy()
    table['model'] = model_key
    table['overall_accuracy'] = overall_acc
    ordinal_tables.append(table)
    ordinal_summary.append({'model': model_key, 'overall_accuracy': overall_acc})

if ordinal_tables:
    combined = pd.concat(ordinal_tables, ignore_index=True)
    summary_df = pd.DataFrame(ordinal_summary)

    threshold_order = [f"P(>{grade})" for grade in sorted(grade_to_label.keys(), key=lambda g: grade_to_label[g])]
    pivot_df = (combined
                .pivot_table(index='model', columns='threshold', values='accuracy')
                .reindex(columns=[c for c in threshold_order if c in combined['threshold'].unique()]))
    pivot_df = pivot_df.sort_index()
    pivot_df['Overall Accuracy'] = summary_df.set_index('model')['overall_accuracy']

    combined = combined.sort_values(['model', 'threshold']).reset_index(drop=True)
    summary_df = summary_df.sort_values('model').reset_index(drop=True)

    output_path = './result/ordinal_result.xlsx'
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    with pd.ExcelWriter(output_path) as writer:
        pivot_df.to_excel(writer, sheet_name='threshold_matrix')
        combined.to_excel(writer, sheet_name='threshold_long', index=False)
        summary_df.to_excel(writer, sheet_name='overall', index=False)
    print(f"Saved combined ordinal results to {output_path}")
else:
    print('No ordinal results generated.')



=== Training ordinal model: set_transformer_ordinal ===
Epoch 01 — loss: 0.3742
Epoch 02 — loss: 0.3150
Epoch 03 — loss: 0.3059
Epoch 04 — loss: 0.2946
Epoch 05 — loss: 0.2848
Epoch 06 — loss: 0.2764
Epoch 07 — loss: 0.2659
Epoch 08 — loss: 0.2589
Epoch 09 — loss: 0.2522
Epoch 10 — loss: 0.2448
Epoch 11 — loss: 0.2381
Epoch 12 — loss: 0.2345
Epoch 13 — loss: 0.2240
Epoch 14 — loss: 0.2191
Epoch 15 — loss: 0.2134
Epoch 16 — loss: 0.2070
Epoch 17 — loss: 0.2008
Epoch 18 — loss: 0.1966
Epoch 19 — loss: 0.1921
Epoch 20 — loss: 0.1849
Saved threshold table to ./result/ordinal_metrics_set_transformer_ordinal.csv
  threshold   accuracy
0    P(>V4)  83.059998
1    P(>V5)  81.949997
2    P(>V6)  85.029999
3    P(>V7)  88.550003
4    P(>V8)  92.589996
5    P(>V9)  96.540001
Overall accuracy: 49.18%
=== Training ordinal model: set_transformer_ordinal_xy ===
Epoch 01 — loss: 0.3709
Epoch 02 — loss: 0.3108
Epoch 03 — loss: 0.3011
Epoch 04 — loss: 0.2937
Epoch 05 — loss: 0.2841
Epoch 06 — loss: 0.27

In [None]:
# evaluate problems
def evaluate_problems(
    model, problem_dict, hold_to_idx, hold_difficulty, type_to_idx, device,
    grade_to_label, hold_to_coord, dataset, train_idx, val_idx, model_type
):
    label_to_grade = {v: k for k, v in grade_to_label.items()}
    print("=== MoonBoard Problem Evaluation ===")

    for fallback_name, holds in problem_dict.items():
        try:
            hold_idxs = []
            diff_values = []
            type_vecs = []
            xy_coords = []

            for h in holds:
                if h not in hold_difficulty or h not in hold_to_idx or h not in hold_to_coord:
                    raise ValueError(f"[ERROR] Hold '{h}' missing from required dictionaries.")

                hold_idxs.append(hold_to_idx[h])
                difficulty, types = hold_difficulty[h]
                diff_values.append(difficulty / 10.0)

                # Multi-hot vector
                type_vec = torch.zeros(len(type_to_idx), dtype=torch.float)
                for t in types:
                    if t in type_to_idx:
                        type_vec[type_to_idx[t]] = 1.0
                type_vecs.append(type_vec)

                xy_coords.append(torch.tensor([hold_to_coord[h][0] / 10.0, hold_to_coord[h][1] / 17.0], dtype=torch.float))

            # Convert to tensors
            hold_tensor = torch.tensor(hold_idxs, dtype=torch.long).unsqueeze(0).to(device)
            difficulty_tensor = torch.tensor(diff_values, dtype=torch.float).unsqueeze(0).to(device)
            type_tensor = torch.stack(type_vecs).unsqueeze(0).to(device)
            xy_tensor = torch.stack(xy_coords).unsqueeze(0).to(device)

            model.eval()
            with torch.no_grad():
                # --- Select input format based on model_type ---
                if model_type in XY_MODELS:
                    input_data = (hold_tensor, difficulty_tensor, type_tensor, xy_tensor)
                elif model_type in {'set_transformer', 'deepset', 'set_transformer_ordinal', 'deepset_ordinal'}:
                    input_data = (hold_tensor,)
                else:
                    raise ValueError(f"Unknown model type: {model_type}")

                payload = input_data[0] if isinstance(input_data, tuple) and len(input_data) == 1 else input_data
                outputs = model(payload)
                if isinstance(outputs, tuple):
                    probs, logits = outputs
                    pred_label = (probs > 0.5).sum(dim=1).item()
                else:
                    logits = outputs
                    pred_label = logits.argmax(dim=1).item()
                pred_grade = label_to_grade.get(pred_label, f"Unknown({pred_label})")

        except Exception as e:
            print(f"[{fallback_name}] Skipping due to error: {e}")
            continue

        # Search in dataset for match
        found_idx = None
        split = "Not Found"
        setter_grade = "Unknown"
        problem_name = fallback_name

        for idx_item, item in enumerate(dataset.raw):
            if set(item['holds']) == set(holds):
                found_idx = idx_item
                setter_grade = item.get('grade', 'Unknown')
                problem_name = item.get('problem_name', fallback_name)
                if found_idx in train_idx:
                    split = "Train"
                elif found_idx in val_idx:
                    split = "Validation"
                else:
                    split = "Found (Unknown Split)"
                break

        holds_with_difficulty = {h: hold_difficulty[h][0] if h in hold_difficulty else "N/A" for h in holds}
        print(f"🔹 Problem Name   : {problem_name}")
        print(f"   Holds Used     : {holds_with_difficulty}")
        print(f"   Setter Grade   : {setter_grade}")
        print(f"   Predicted Grade: {pred_grade}")
        print(f"   Dataset Split  : {split}")

named_problems = {
    "Physical V9 Benchmark": ["I18", "J12", "F13", "D10", "E6", "J2"],
    "Triangulation V7": ["A18", "J13", "D16", "E9", "E9", "I4"],
    "warmup crimps": ["I18", "I7", "I9", "I15", "G11", "J14", "J12", "I15", "J14", "H4", "K6"],
    "Ronani V5": ["F18", "I15", "I10", "K9", "K6", "G14", "D16", "E9", "K6", "I15", "E4", "H5"],
    "Don't Fart Alan": ["K18", "J15", "F14", "F13", "D10", "E6", "I7", "I5", "F1"],
    "FINALE MAXI 2025 POCKET 2 V9": ["G3", "F3", "F4", "A6", "A11", "B17", "C9", "D17", "H18"],
    "Khai's V7": ["D18", "A15", "A12", "C9", "E7", "H8", "I6", "E1"],
    "Yums In My Tums V5": ["F18", "G12", "E1", "D13", "I9", "F8", "I2", "F16", "E4", "E6"],
}

team_problems = {
    "simma mot strommen": ["A18", "C12", "A9", "B14", "B16", "D1", "F5", "F5"],
    "MAXIMUS!": ["K18", "E3", "K14", "I13", "K7", "I2", "H16", "K11", "G7", "H4"],
    "interstate": ["K18", "H17", "J11", "I9", "G13", "H15", "I5", "I6"],
    "krakatoa pusher": ["H18", "H11", "J8", "F7", "K15", "F4", "J3"],
    "doublement": ["A18", "E16", "F8", "B14", "G8", "E12", "F4", "F3", "F3"],
    "animal instinct": ["F18", "J11", "F9", "H15", "E13", "J11", "I6", "F4"],
    "blue bin day": ["B18", "C18", "A8", "C12", "B15", "A5", "C3"]
}

# --- Classification baseline run ---
clf_train_loader, clf_val_loader, clf_model, clf_dataset, clf_train_idx, clf_val_idx = main('set_transformer_additive')

evaluate_problems(
    model=clf_model,
    problem_dict=named_problems,
    hold_to_idx=hold_to_idx,
    hold_difficulty=hold_difficulty,
    type_to_idx=type_to_idx,
    device=device,
    grade_to_label=grade_to_label,
    hold_to_coord=hold_to_coord,
    dataset=clf_dataset,
    train_idx=clf_train_idx,
    val_idx=clf_val_idx,
    model_type='set_transformer_additive'
)

Epoch 01 — loss: 1.7030
Epoch 02 — loss: 1.6260
Epoch 02 — loss: 1.6260
Epoch 03 — loss: 1.6025
Epoch 03 — loss: 1.6025
Epoch 04 — loss: 1.5573
Epoch 04 — loss: 1.5573
Epoch 05 — loss: 1.5306
Epoch 05 — loss: 1.5306
Epoch 06 — loss: 1.5167
Epoch 06 — loss: 1.5167
Epoch 07 — loss: 1.4908
Epoch 07 — loss: 1.4908
Epoch 08 — loss: 1.4637
Epoch 08 — loss: 1.4637
Epoch 09 — loss: 1.4356
Epoch 09 — loss: 1.4356
Epoch 10 — loss: 1.4186
Epoch 10 — loss: 1.4186
Epoch 11 — loss: 1.4024
Epoch 11 — loss: 1.4024
Epoch 12 — loss: 1.3722
Epoch 12 — loss: 1.3722
Epoch 13 — loss: 1.3418
Epoch 13 — loss: 1.3418
Epoch 14 — loss: 1.3292
Epoch 14 — loss: 1.3292
Epoch 15 — loss: 1.3051
Epoch 15 — loss: 1.3051
Epoch 16 — loss: 1.2856
Epoch 16 — loss: 1.2856
Epoch 17 — loss: 1.2500
Epoch 17 — loss: 1.2500


KeyboardInterrupt: 

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import torch

def visualize_attention_for_problem(model, holds, hold_to_idx, hold_difficulty, type_to_idx, hold_to_coord, device):
    model.eval()

    hold_idxs = []
    diff_values = []
    type_vecs = []
    xy_coords = []

    for h in holds:
        hold_idxs.append(hold_to_idx[h])
        difficulty, types = hold_difficulty[h]
        diff_values.append(difficulty / 10.0)

        type_vec = torch.zeros(len(type_to_idx), dtype=torch.float)
        for t in types:
            if t in type_to_idx:
                type_vec[type_to_idx[t]] = 1.0
        type_vecs.append(type_vec)

        x, y = hold_to_coord[h]
        xy_coords.append([x / 10.0, y / 17.0])

    hold_tensor = torch.tensor(hold_idxs, dtype=torch.long).unsqueeze(0).to(device)
    diff_tensor = torch.tensor(diff_values, dtype=torch.float).unsqueeze(0).to(device)
    type_tensor = torch.stack(type_vecs).unsqueeze(0).to(device)
    xy_tensor = torch.tensor(xy_coords, dtype=torch.float).unsqueeze(0).to(device)

    with torch.no_grad():
        _ = model((hold_tensor, diff_tensor, type_tensor, xy_tensor))

    attn_isab1 = model.encoder[0].mab0.attn_weights.cpu().numpy()
    attn_isab2 = model.encoder[1].mab0.attn_weights.cpu().numpy()

    num_heads = attn_isab1.shape[0]
    fig, axes = plt.subplots(2, num_heads, figsize=(4 * num_heads, 8))
    if num_heads == 1:
        axes = axes.reshape(2, 1)

    for h in range(num_heads):
        sns.heatmap(attn_isab1[h], ax=axes[0, h], cmap="viridis", xticklabels=holds)
        axes[0, h].set_title(f"ISAB1 – Head {h}")
        axes[0, h].set_xlabel("Key (Hold)")
        axes[0, h].set_ylabel("Seed")

        sns.heatmap(attn_isab2[h], ax=axes[1, h], cmap="viridis", xticklabels=holds)
        axes[1, h].set_title(f"ISAB2 – Head {h}")
        axes[1, h].set_xlabel("Key (Hold)")
        axes[1, h].set_ylabel("Seed")

    plt.tight_layout()
    plt.show()


In [None]:
def get_avg_attention_per_hold(model, holds, hold_to_idx, hold_difficulty, type_to_idx, hold_to_coord, device):
    model.eval()

    hold_idxs = []
    diff_values = []
    type_vecs = []
    xy_coords = []

    for h in holds:
        hold_idxs.append(hold_to_idx[h])
        difficulty, types = hold_difficulty[h]
        diff_values.append(difficulty / 10.0)

        # Multi-hot type vector
        type_vec = torch.zeros(len(type_to_idx), dtype=torch.float)
        for t in types:
            if t in type_to_idx:
                type_vec[type_to_idx[t]] = 1.0
        type_vecs.append(type_vec)

        # XY coordinate
        if h not in hold_to_coord:
            raise ValueError(f"[ERROR] Hold '{h}' has no coordinate in hold_to_coord.")
        x, y = hold_to_coord[h]
        xy_coords.append([x / 10.0, y / 17.0])

    # Build model input tensors
    hold_tensor = torch.tensor(hold_idxs, dtype=torch.long).unsqueeze(0).to(device)       # (1, N)
    diff_tensor = torch.tensor(diff_values, dtype=torch.float).unsqueeze(0).to(device)    # (1, N)
    type_tensor = torch.stack(type_vecs).unsqueeze(0).to(device)                          # (1, N, T)
    xy_tensor = torch.tensor(xy_coords, dtype=torch.float).unsqueeze(0).to(device)        # (1, N, 2)

    with torch.no_grad():
        _ = model((hold_tensor, diff_tensor, type_tensor, xy_tensor))

    attn_weights = model.encoder[0].mab0.attn_weights  # shape: (heads, seeds, holds)
    avg_attn = attn_weights.mean(dim=(0, 1)).cpu().numpy()  # average across heads & seeds → (num_holds,)

    return list(zip(holds, avg_attn))


In [None]:
# Visualize attention and scores (with XY support)

holds = named_problems["warmup crimps"]

if not hasattr(model.encoder[0], 'mab0') or not hasattr(model.encoder[0].mab0, 'attn_weights'):
    raise ValueError("The provided model does not support attention visualization.")

visualize_attention_for_problem(model, holds, hold_to_idx, hold_difficulty, type_to_idx, hold_to_coord, device)
attention_scores = get_avg_attention_per_hold(model, holds, hold_to_idx, hold_difficulty, type_to_idx, hold_to_coord, device)

# Print sorted scores
attention_scores_sorted = sorted(attention_scores, key=lambda x: x[1], reverse=True)
print("Average Attention Per Hold (sorted):")
for h, score in attention_scores_sorted:
    difficulty = hold_difficulty[h][0] if h in hold_difficulty else "N/A"
    print(f"{h}: {score:.4f} (difficulty: {difficulty})")


In [None]:
# read accuracy.csv file
import pandas as pd

df = pd.read_csv('./result/accuracy.csv')
# print(df)

filtered_df = df[df['model'] == 'set_transformer_xy']
print(filtered_df)


                 model  strict_train  ±1_train  strict_test  ±1_test
2   set_transformer_xy         61.56     87.46        43.98    78.97
7   set_transformer_xy         65.11     91.18        45.43    81.86
12  set_transformer_xy         59.61     86.25        44.37    79.50
17  set_transformer_xy         64.78     91.20        44.37    81.23
22  set_transformer_xy         60.65     87.16        42.93    78.87
27  set_transformer_xy         64.07     90.74        45.81    81.28
32  set_transformer_xy         58.98     87.99        42.35    80.17
37  set_transformer_xy         61.40     88.75        44.32    80.51
42  set_transformer_xy         63.49     90.59        46.01    81.09
47  set_transformer_xy         61.77     88.91        43.31    80.65
