## torch based DataLoader 가져오기

In [None]:
import torch
from transformers import AutoModelForImageClassification, TrainingArguments, Trainer
import evaluate
import numpy as np

from utils.dataset import (
    PCOSDataset,
    create_label_mapping,
    stratified_split_by_pid,
    stratified_pid_kfold,
)
from utils.dataset import HFVisionDataset
from utils.dataset import create_weighted_sampler

from utils.transform import get_transform, SpeckleNoise, AddGaussianNoise
from torchvision import transforms
import pandas as pd
import os 

os.environ["CUDA_VISIBLE_DEVICES"] = "1"

device = "cuda" if torch.cuda.is_available() else "cpu"
device

In [None]:
data_root_dir = "/workspace/pcos_dataset/Dataset"
label_path = "/workspace/pcos_dataset/labels/기존_Dataset_info.csv"
result_root_dir = "/workspace/pcos_dataset/results"

label_df = pd.read_csv(label_path)

# 1) transform 생성
train_tf, val_tf = get_transform(
    train_transform=[
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.RandomVerticalFlip(p=0.5),
        transforms.RandomRotation(10),
        transforms.RandomAffine(degrees=0, translate=(0.05,0.05), scale=(0.95,1.05)),
        transforms.RandomResizedCrop(224, scale=(0.9, 1.0)),
        # AddGaussianNoise(std=0.02),
        # SpeckleNoise(noise_factor=0.1),
    ]
)

# 2) label mapping
label_mapping = create_label_mapping(label_df, "label")

# 3) Train / Val / Test split (PID 단위 7:1:2)
train_df, val_df, test_df = stratified_split_by_pid(label_df)

# 4) Tune = Train + Val (5-Fold용)
tune_df = pd.concat([train_df, val_df]).reset_index(drop=True)

# 5) PID 기반 5-Fold
folds = stratified_pid_kfold(tune_df, n_splits=5)

## Huggingface based Image Classification

In [None]:
from transformers import (
    AutoModelForImageClassification,
)
model_name = "google/vit-base-patch16-224"

num_labels = len(label_mapping)

accuracy = evaluate.load("accuracy")
f1 = evaluate.load("f1")

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=1)

    return {
        "accuracy": accuracy.compute(references=labels, predictions=preds)["accuracy"],
        "f1": f1.compute(references=labels, predictions=preds, average="macro")["f1"],
    }


In [None]:
fold_results = []

for fold_idx, (fold_train_df, fold_val_df) in enumerate(folds):
    print(f"\n======== Fold {fold_idx} Training Start ========")

    train_base = PCOSDataset(
        fold_train_df, data_root_dir,
        filename_col="filename",
        label_col="label",
        label_mapping=label_mapping,
        transform=train_tf,
    )

    val_base = PCOSDataset(
        fold_val_df, data_root_dir,
        filename_col="filename",
        label_col="label",
        label_mapping=label_mapping,
        transform=val_tf,
    )

    train_dataset = HFVisionDataset(train_base)
    val_dataset   = HFVisionDataset(val_base)

    # fold마다 새 TrainingArguments / 새 output_dir
    training_args = TrainingArguments(
        output_dir=f"{result_root_dir}/pcos_fold_{fold_idx}",
        learning_rate=5e-5,
        per_device_train_batch_size=16,
        per_device_eval_batch_size=16,
        num_train_epochs=5,
        eval_strategy="epoch",
        save_strategy="epoch",
        load_best_model_at_end=True,
        metric_for_best_model="f1",
        save_total_limit=2,
        logging_dir=f"{result_root_dir}/logs_fold_{fold_idx}",
    )

    # fold마다 모델을 새로 초기화하는 게 더 정석
    model = AutoModelForImageClassification.from_pretrained(
        model_name,
        num_labels=num_labels,
        ignore_mismatched_sizes=True,
        cache_dir="/workspace/pcos_dataset/models"
    ).to(device)

    model.config.id2label = {int(v): str(k) for k, v in label_mapping.items()}
    model.config.label2id = {str(k): int(v) for k, v in label_mapping.items()}

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=val_dataset,
        compute_metrics=compute_metrics,
    )

    trainer.train()
    metrics = trainer.evaluate()
    print(f"[Fold {fold_idx}] metrics:", metrics)
    fold_results.append(metrics)


## Pytorch based Multiple Instance Learning

In [None]:
from utils.dataset import PCOSMILDataset, create_label_mapping, create_weighted_sampler, stratified_pid_kfold, stratified_split_by_pid
from utils.transform import get_transform, SpeckleNoise, AddGaussianNoise

from torch.utils.data import DataLoader
from torchvision import transforms
import pandas as pd 
data_root_dir = "/workspace/pcos_dataset/Dataset"
label_path = "/workspace/pcos_dataset/labels/기존_Dataset_info.csv"
label_df = pd.read_csv(label_path)

train_tf, val_tf = get_transform(
    train_transform=[
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.RandomVerticalFlip(p=0.5),
        transforms.RandomRotation(10),
        # transforms.RandomAffine(degrees=0, translate=(0.05,0.05), scale=(0.95,1.05)),
        # transforms.RandomResizedCrop(224, scale=(0.9, 1.0)),

        # AddGaussianNoise(std=0.02),
        # SpeckleNoise(noise_factor=0.1),
    ]
)

label_mapping = create_label_mapping(label_df, "label")

# 2) Train / Val / Test split (PID 단위 stratified 7:1:2)
train_df, val_df, test_df = stratified_split_by_pid(label_df)

# 3) Tune = Train + Val (80%)
tune_df = pd.concat([train_df, val_df]).reset_index(drop=True)

# 4) K-Fold는 tune_df 에 대해서만 적용
folds = stratified_pid_kfold(tune_df, n_splits=5) # [ADD] K-Fold Cross Validation 추가

In [None]:
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score

def compute_metrics(y_true, y_pred, y_logit):
    """Fold나 Test에서 사용할 metric 계산"""
    metrics = {
        "accuracy": accuracy_score(y_true, y_pred),
        "f1_macro": f1_score(y_true, y_pred, average="macro"),
    }
    try:
        if len(np.unique(y_true)) == 2:
            metrics["roc_auc"] = roc_auc_score(y_true, y_logit[:, 1])
        else:
            metrics["roc_auc"] = roc_auc_score(y_true, y_logit, multi_class="ovr")
    except:
        metrics["roc_auc"] = np.nan

    return metrics


In [None]:
def train_one_epoch(model, loader, optimizer, criterion):
    model.train()
    running_loss = 0

    for batch in loader:
        imgs = batch["images"].to(device)   # (N_inst, C, H, W)
        label = batch["label"].to(device)   # (1,)

        optimizer.zero_grad()
        logits = model(imgs)                # (1, num_classes)
        loss = criterion(logits, label)

        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    return running_loss / len(loader)

def validate(model, loader, criterion):
    model.eval()
    losses = []
    preds = []
    trues = []
    logits_all = []

    with torch.no_grad():
        for batch in loader:
            imgs = batch["images"].to(device)
            label = batch["label"].to(device)

            logits = model(imgs)
            loss = criterion(logits, label)

            losses.append(loss.item())
            preds.append(torch.argmax(logits, dim=1).cpu().item())
            trues.append(label.cpu().item())
            logits_all.append(logits.cpu().numpy())

    logits_all = np.vstack(logits_all)
    metrics = compute_metrics(trues, preds, logits_all)

    return np.mean(losses), metrics

def train_fold(fold_idx, train_df, val_df, num_epochs=5):
    print(f"\n========== Fold {fold_idx} ==========")

    # Dataset
    train_dataset = PCOSMILDataset(train_df, data_root_dir, transform=train_tf)
    val_dataset = PCOSMILDataset(val_df, data_root_dir, transform=val_tf)

    # Loader
    train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True, num_workers=8)
    val_loader = DataLoader(val_dataset, batch_size=1, shuffle=False, num_workers=8)

    # Model 선택
    # model = AttentionMIL(num_classes=num_classes, embed_dim=256).to(device)
    model = TransformerMIL(num_classes=num_classes, embed_dim=256, depth=4).to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=1e-4, weight_decay=0.01)

    best_val_f1 = -1
    best_state = None

    for epoch in range(num_epochs):
        print(f"\nEpoch {epoch+1}/{num_epochs}")

        train_loss = train_one_epoch(model, train_loader, optimizer, criterion)
        val_loss, val_metrics = validate(model, val_loader, criterion)

        print(f"[Train] Loss: {train_loss:.4f}")
        print(f"[Val] Loss: {val_loss:.4f}, Metrics: {val_metrics}")

        if val_metrics["f1_macro"] > best_val_f1:
            best_val_f1 = val_metrics["f1_macro"]
            best_state = model.state_dict()

    # 최종 best model 반환
    return best_state
def evaluate_test(best_state):
    print("\n======= Test Evaluation =======")

    test_dataset = PCOSMILDataset(test_df, data_root_dir, transform=val_tf)
    test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False, num_workers=8)

    model = TransformerMIL(num_classes=num_classes).to(device)
    model.load_state_dict(best_state)
    model.eval()

    preds = []
    trues = []
    logits_all = []

    with torch.no_grad():
        for batch in test_loader:
            imgs = batch["images"].to(device)
            label = batch["label"].to(device)

            logits = model(imgs)

            preds.append(torch.argmax(logits, dim=1).cpu().item())
            trues.append(label.cpu().item())
            logits_all.append(logits.cpu().numpy())

    logits_all = np.vstack(logits_all)
    metrics = compute_metrics(trues, preds, logits_all)
    print("Test Metrics:", metrics)

    return metrics


## Vision Language

In [None]:
import os
from transformers import pipeline
import torch
import pandas as pd

def zero_shot_classification(
    img_root_dir,
    data_path,
    save_root_dir,
    model_ckpt="google/siglip2-so400m-patch14-384",
    batch_size=16,
    labels=None
):
    """
    Perform zero-shot image classification and save the top-1 results as a CSV.
    """
    if labels is None:
        labels = ["Benign", "Borderline", "Malignant"]
    label_to_num = {label: i for i, label in enumerate(labels)}

    os.makedirs(save_root_dir, exist_ok=True)

    # Set up the Huggingface pipeline
    device = 0 if torch.cuda.is_available() else -1
    classifier = pipeline(
        model=model_ckpt,
        task="zero-shot-image-classification",
        device=device
    )

    # Load data
    data_df = pd.read_csv(data_path)
    img_files = data_df['filename'].astype(str).tolist()
    gt_labels = data_df['USG_Ontology'].tolist()

    top1_results = {
        "filename": [],
        "label": [],
        "preds": [],
        "probs": [],
    }

    # Inference in batches
    for start_idx in range(0, len(img_files), batch_size):
        batch_imgs = img_files[start_idx : start_idx + batch_size]
        batch_img_paths = [os.path.join(img_root_dir, f"{img_file}.png") for img_file in batch_imgs]

        outputs = classifier(batch_img_paths, candidate_labels=labels)
        for i, out in enumerate(outputs):
            # Get the highest scoring label
            top_result = max(out, key=lambda x: x["score"])
            top1_results["filename"].append(batch_imgs[i])
            top1_results["label"].append(gt_labels[start_idx + i])
            top1_results["preds"].append(label_to_num[top_result["label"]])
            top1_results["probs"].append(top_result["score"])

    # Ensure lengths are consistent before saving
    min_len = min(
        len(top1_results["filename"]),
        len(top1_results["label"]),
        len(top1_results["preds"]),
        len(top1_results["probs"])
    )
    df = pd.DataFrame({k: v[:min_len] for k, v in top1_results.items()})
    csv_path = os.path.join(save_root_dir, "top1_results.csv")
    df.to_csv(csv_path, index=False)
    print(f"[INFO] Results saved to {csv_path}")

# 사용 예시
IMG_ROOT_DIR = "/workspace/pcos_dataset/Dataset"
DATA_PATH = "/workspace/pcos_dataset/labels/통합_Dataset_info.csv"
CKPT = "google/siglip2-so400m-patch14-384"
SAVE_ROOT_DIR = f"/workspace/pcos_dataset/results/zero_shot/siglip2/{CKPT}"

zero_shot_classification(
    img_root_dir=IMG_ROOT_DIR,
    data_path=DATA_PATH,
    save_root_dir=SAVE_ROOT_DIR,
    model_ckpt=CKPT,
    batch_size=16,
    labels=["Benign", "Borderline", "Malignant"]
)