In [4]:
# Minimal dataset summary → Markdown table
import os
from pathlib import Path
from collections import Counter

ROOT = Path(r"G:/procnn/dataset_final_RawV5")  # change if needed
CLASSES = ["curling", "ohne_curling"]
EXTS = {".jpg", ".jpeg", ".png", ".bmp", ".webp", ".tif", ".tiff", ".gif"}

def count_dir(p: Path) -> int:
    n = 0
    for r, _, files in os.walk(p):
        for fn in files:
            if Path(fn).suffix.lower() in EXTS:
                n += 1
    return n

rows = []
for split in ["train", "val", "test"]:
    base = ROOT / split
    counts = {c: (count_dir(base / c) if (base / c).exists() else 0) for c in CLASSES}
    total = sum(counts.values())
    rows.append((split, counts["curling"], counts["ohne_curling"], total))

# Print Markdown
print("| split | curling | ohne_curling | total |")
print("|------:|-------:|-------------:|------:|")
for split, c_pos, c_neg, total in rows:
    print(f"| {split} | {c_pos} | {c_neg} | {total} |")
print(f"\n**ROOT:** `{ROOT}`")


| split | curling | ohne_curling | total |
|------:|-------:|-------------:|------:|
| train | 330 | 43194 | 43524 |
| val | 73 | 31827 | 31900 |
| test | 57 | 715 | 772 |

**ROOT:** `G:\procnn\dataset_final_RawV5`


In [5]:
# ===================== Block B (Hybrid-4): Datasets + Hybrid Train + Balanced Val =====================
import os, numpy as np, torch, random
from pathlib import Path
from collections import Counter
from torch.utils.data import Dataset, Subset
from PIL import Image, ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True

# Config
SEED = 42
random.seed(SEED); np.random.seed(SEED); torch.manual_seed(SEED)
rng = np.random.default_rng(SEED)

ROOT = Path(r"G:/procnn/dataset_final_RawV5")  # fixed dataset root
LABEL_TO_IDX = {"ohne_curling": 0, "curling": 1}
IDX_TO_LABEL = {v: k for k, v in LABEL_TO_IDX.items()}
EXTS = {".jpg",".jpeg",".png",".bmp",".webp",".tif",".tiff",".gif"}

def is_image(p: Path) -> bool:
    return p.is_file() and p.suffix.lower() in EXTS

class ImageFolderFlat(Dataset):
    def __init__(self, root: Path, split: str, transform=None):
        self.transform = transform
        self.samples = []
        base = root / split
        for label, y in LABEL_TO_IDX.items():
            d = base / label
            if not d.exists():
                continue
            for r, _, files in os.walk(d):
                for fn in files:
                    if Path(fn).suffix.lower() in EXTS:
                        self.samples.append((Path(r) / fn, y))
        if not self.samples:
            raise RuntimeError(f"No images under {base} for {list(LABEL_TO_IDX.keys())}")

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, i):
        path, y = self.samples[i]
        im = Image.open(path).convert("RGB")
        if self.transform:
            im = self.transform(im)
        return im, torch.tensor(y, dtype=torch.long)

def orig_counts(ds):
    c = Counter([y for _, y in ds.samples])
    return {"curling": int(c.get(1, 0)), "ohne_curling": int(c.get(0, 0))}

# Raw datasets
ds_train_raw = ImageFolderFlat(ROOT, "train")
ds_val_raw   = ImageFolderFlat(ROOT, "val")
ds_test_raw  = ImageFolderFlat(ROOT, "test")

print("Original counts:")
print("  train:", orig_counts(ds_train_raw))
print("  val  :", orig_counts(ds_val_raw))
print("  test :", orig_counts(ds_test_raw))

# Hybrid indices (oversample minority + undersample majority)
def make_hybrid_indices(samples, minority_label=1, majority_label=0,
                        oversample_factor=6, max_majority_factor=2, rng=None):
    if rng is None: rng = np.random.default_rng()
    labels = np.fromiter((y for _, y in samples), dtype=np.int64)
    idx_pos = np.where(labels == minority_label)[0]
    idx_neg = np.where(labels == majority_label)[0]
    if len(idx_pos) == 0 or len(idx_neg) == 0:
        raise RuntimeError("Both classes must exist in train to build hybrid.")

    n_pos_eff = len(idx_pos) * oversample_factor
    n_neg_eff = min(len(idx_neg), int(n_pos_eff * max_majority_factor))

    sel_pos = rng.choice(idx_pos, size=n_pos_eff, replace=True)
    sel_neg = rng.choice(idx_neg, size=n_neg_eff, replace=False)
    sel_idx = np.concatenate([sel_pos, sel_neg])
    rng.shuffle(sel_idx)

    info = {
        "minority_orig": int(len(idx_pos)), "majority_orig": int(len(idx_neg)),
        "minority_eff": int(n_pos_eff), "majority_eff": int(n_neg_eff),
        "hybrid_total": int(sel_idx.size),
        "ratio_eff(ohne:curling)": round(n_neg_eff / max(n_pos_eff, 1), 3)
    }
    return sel_idx.tolist(), info

# Balanced validation (1:1)
def make_balanced_val_indices(samples, per_class=800, rng=None):
    if rng is None: rng = np.random.default_rng()
    labels = np.fromiter((y for _, y in samples), dtype=np.int64)
    idx_pos = np.where(labels == 1)[0]
    idx_neg = np.where(labels == 0)[0]
    k = int(min(per_class, len(idx_pos), len(idx_neg)))
    if k == 0:
        raise RuntimeError("VAL must contain both classes to build a balanced subset.")
    sel = np.concatenate([
        rng.choice(idx_pos, k, replace=False),
        rng.choice(idx_neg, k, replace=False)
    ])
    rng.shuffle(sel)
    return sel.tolist(), {"per_class": k, "total": int(2 * k)}

# Build subsets
train_idx, train_info = make_hybrid_indices(ds_train_raw.samples, rng=rng)
val_idx,   val_info   = make_balanced_val_indices(ds_val_raw.samples, rng=rng)

ds_train = Subset(ds_train_raw, train_idx)
ds_val   = Subset(ds_val_raw,   val_idx)
ds_test  = ds_test_raw

def subset_counts(subset, which="train"):
    if which == "train":
        labs = [ds_train_raw.samples[i][1] for i in subset.indices]
    elif which == "val":
        labs = [ds_val_raw.samples[i][1] for i in subset.indices]
    else:
        labs = [y for _, y in subset.samples]
    c = Counter(labs)
    return {"curling": int(c.get(1, 0)), "ohne_curling": int(c.get(0, 0))}

print("\nHybrid-4 subsets:")
print("  train(hybrid):", subset_counts(ds_train, "train"), "| info:", train_info)
print("  val(balanced):", subset_counts(ds_val, "val"),   "| info:", val_info)
print("  test(raw)    :", subset_counts(ds_test, "test"))


Original counts:
  train: {'curling': 330, 'ohne_curling': 43194}
  val  : {'curling': 73, 'ohne_curling': 31827}
  test : {'curling': 57, 'ohne_curling': 715}

Hybrid-4 subsets:
  train(hybrid): {'curling': 1980, 'ohne_curling': 3960} | info: {'minority_orig': 330, 'majority_orig': 43194, 'minority_eff': 1980, 'majority_eff': 3960, 'hybrid_total': 5940, 'ratio_eff(ohne:curling)': 2.0}
  val(balanced): {'curling': 73, 'ohne_curling': 73} | info: {'per_class': 73, 'total': 146}
  test(raw)    : {'curling': 57, 'ohne_curling': 715}


In [4]:
!pip install tqdm

import os, shutil, pathlib
from tqdm import tqdm

DEST = pathlib.Path(r"C:/tmp_train_hybrid")
DEST.mkdir(parents=True, exist_ok=True)
(DEST / "curling").mkdir(exist_ok=True)
(DEST / "ohne_curling").mkdir(exist_ok=True)

# Collect source files from ds_train subset
src_paths = []
for i in ds_train.indices:  # ds_train = Subset(ds_train_raw, train_idx)
    p, y = ds_train_raw.samples[i]
    cls = "curling" if y == 1 else "ohne_curling"
    src_paths.append((p, cls))

# Copy with progress
for p, cls in tqdm(src_paths, desc="Copying hybrid train to SSD", unit="file"):
    dst = DEST / cls / p.name
    if not dst.exists():
        try:
            shutil.copy2(p, dst)
        except Exception as e:
            print("Copy error:", p, "->", dst, "|", e)




Copying hybrid train to SSD:  52%|█████▏    | 3091/5940 [01:48<01:18, 36.37file/s]

In [6]:
# ===================== Block C (SSD) : Compute Normalization Stats with progress =====================
import json, time, torch, pathlib
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from PIL import Image, ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True
from tqdm import tqdm

SSD_ROOT = pathlib.Path(r"C:/tmp_train_hybrid")
EXTS = {".jpg",".jpeg",".png",".bmp",".webp",".tif",".tiff",".gif"}

class SimpleFolder(Dataset):
    def __init__(self, root, transform=None):
        self.transform = transform
        self.samples = []
        for cls, y in [("curling",1), ("ohne_curling",0)]:
            d = root/cls
            if not d.exists(): 
                continue
            for r,_,files in os.walk(d):
                for fn in files:
                    if pathlib.Path(fn).suffix.lower() in EXTS:
                        self.samples.append((pathlib.Path(r)/fn, y))
        if not self.samples:
            raise RuntimeError(f"No images under {root}")

    def __len__(self): 
        return len(self.samples)

    def __getitem__(self, i):
        p, y = self.samples[i]
        im = Image.open(p).convert("RGB")
        im = self.transform(im) if self.transform else im
        return im, y

to_tensor = transforms.ToTensor()
train_ds_ssd = SimpleFolder(SSD_ROOT, transform=to_tensor)
loader = DataLoader(train_ds_ssd, batch_size=128, shuffle=False, num_workers=0)

n_pixels = 0
sum_ = torch.zeros(3)
sum_sq = torch.zeros(3)

t0 = time.time()
for imgs, _ in tqdm(loader, desc="Computing stats (SSD)", unit="batch"):
    b, c, h, w = imgs.shape
    n_pixels += b * h * w
    sum_ += imgs.sum(dim=[0,2,3])
    sum_sq += (imgs ** 2).sum(dim=[0,2,3])

mean = (sum_ / n_pixels)
std = ((sum_sq / n_pixels) - mean**2).sqrt()

print("Train mean:", mean.tolist())
print("Train std :", std.tolist())
print(f"Elapsed: {time.time() - t0:.1f}s")

with open("norm_stats.json", "w") as f:
    json.dump({"mean": mean.tolist(), "std": std.tolist()}, f, indent=2)


Computing stats (SSD): 100%|██████████| 34/34 [00:06<00:00,  4.97batch/s]

Train mean: [0.4680688679218292, 0.20129556953907013, 0.3361990749835968]
Train std : [0.394273579120636, 0.29281333088874817, 0.21700330078601837]
Elapsed: 6.9s





OSError: [Errno 22] Invalid argument: 'norm_stats.json'

In [7]:
# ===================== Block D: Transforms + DataLoaders (CPU-friendly, anti-leakage) =====================
import json, pathlib
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
import torch

# 1) load normalization stats (from Block C)
stats_path = pathlib.Path("norm_stats.json")
if stats_path.exists():
    stats = json.loads(stats_path.read_text())
    MEAN, STD = stats["mean"], stats["std"]
else:
    MEAN = [0.4680688679, 0.2012955695, 0.33619907498]
    STD  = [0.3942735791, 0.2928133309, 0.2170033008]

# 2) transforms (train: light hybrid; val/test: deterministic)
train_tf = transforms.Compose([
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=5),
    transforms.RandomResizedCrop(size=224, scale=(0.9, 1.0)),
    transforms.TrivialAugmentWide(num_magnitude_bins=31),
    transforms.ToTensor(),
    transforms.Normalize(MEAN, STD),
    transforms.RandomErasing(p=0.05, scale=(0.02, 0.08), ratio=(0.3, 3.3), inplace=False),
])

valtest_tf = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(MEAN, STD),
])

# 3) apply transforms to Block B datasets via a wrapper
class TransformedSubset(Dataset):
    def __init__(self, base_subset, base_raw, transform=None):
        self.base_subset = base_subset
        self.base_raw = base_raw
        self.transform = transform
    def __len__(self):
        return len(self.base_subset.indices) if hasattr(self.base_subset, "indices") else len(self.base_subset)
    def __getitem__(self, i):
        idx = self.base_subset.indices[i]
        path, y = self.base_raw.samples[idx]
        from PIL import Image
        im = Image.open(path).convert("RGB")
        if self.transform: im = self.transform(im)
        return im, torch.tensor(y, dtype=torch.long)

class TransformedDataset(Dataset):
    def __init__(self, base_raw, transform=None):
        self.base_raw = base_raw
        self.transform = transform
        self.samples = base_raw.samples
    def __len__(self):
        return len(self.samples)
    def __getitem__(self, i):
        path, y = self.samples[i]
        from PIL import Image
        im = Image.open(path).convert("RGB")
        if self.transform: im = self.transform(im)
        return im, torch.tensor(y, dtype=torch.long)

# expects ds_train_raw, ds_val_raw, ds_test_raw, ds_train (Subset), ds_val (Subset) from Block B
train_ds = TransformedSubset(ds_train, ds_train_raw, transform=train_tf)
val_ds   = TransformedSubset(ds_val,   ds_val_raw,   transform=valtest_tf)
test_ds  = TransformedDataset(ds_test_raw,           transform=valtest_tf)

# 4) DataLoaders (Windows/USB-friendly defaults)
BATCH_SIZE = 64
NUM_WORKERS = 0
PREFETCH = 2

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True,
                          num_workers=0, pin_memory=False)
val_loader   = DataLoader(val_ds,   batch_size=BATCH_SIZE, shuffle=False,
                          num_workers=0, pin_memory=False)
test_loader  = DataLoader(test_ds,  batch_size=BATCH_SIZE, shuffle=False,
                          num_workers=0, pin_memory=False)


print("DataLoaders ready:",
      f"train={len(train_ds)} | val={len(val_ds)} | test={len(test_ds)}",
      f"| mean={MEAN} | std={STD}", sep="\n")

# 5) quick sanity pass with tqdm (one epoch-like pass without training)
from tqdm import tqdm
cnt = 0
for x, y in tqdm(train_loader, desc="Warmup pass (train)", unit="batch"):
    cnt += 1
    if cnt >= 3: break  # short warmup
for x, y in tqdm(val_loader, desc="Warmup pass (val)", unit="batch"):
    break
print("Warmup OK.")


DataLoaders ready:
train=5940 | val=146 | test=772
| mean=[0.4680688679218292, 0.20129556953907013, 0.3361990749835968] | std=[0.394273579120636, 0.29281333088874817, 0.21700330078601837]


Warmup pass (train):   0%|          | 0/93 [00:00<?, ?batch/s]

Warmup pass (train):   2%|▏         | 2/93 [00:00<00:38,  2.38batch/s]
Warmup pass (val):   0%|          | 0/3 [00:05<?, ?batch/s]

Warmup OK.





In [None]:
# ===================== Block E: MobileNetV3-Small (pretrained) + Focal Loss + Warmup + Resume-Safe =====================
import os, math, time, csv, json
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
from torchvision.models import mobilenet_v3_small, MobileNet_V3_Small_Weights
from sklearn.metrics import (
    accuracy_score, precision_recall_fscore_support, confusion_matrix,
    roc_auc_score, average_precision_score, precision_recall_curve
)

# -------- Config --------
DEVICE = torch.device("cpu")  # CPU-friendly
NUM_CLASSES = 2
EPOCHS = 20
WARMUP_EPOCHS = 3
PATIENCE = 6
LR = 3e-4
WEIGHT_DECAY = 1e-4
FOCAL_GAMMA = 2.0
ALPHA = torch.tensor([0.25, 0.75], dtype=torch.float32)  # [neg, pos]

CSV_LOG = "train_log.csv"
BEST_PATH = "mobilenetv3_focal_pretrained_best.pth"
CKPT_PATH = "train_ckpt.pth"
TEST_REPORT = "test_report.json"

# -------- Focal Loss --------
class FocalLoss(nn.Module):
    def __init__(self, alpha=None, gamma=2.0, reduction="mean"):
        super().__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction
    def forward(self, logits, targets):
        ce = nn.functional.cross_entropy(logits, targets, reduction="none", weight=self.alpha)
        pt = torch.exp(-ce)
        loss = ((1 - pt) ** self.gamma) * ce
        if self.reduction == "mean":
            return loss.mean()
        elif self.reduction == "sum":
            return loss.sum()
        return loss

# -------- Model (pretrained) --------
def build_model(num_classes=2):
    weights = MobileNet_V3_Small_Weights.IMAGENET1K_V1
    m = mobilenet_v3_small(weights=weights)
    in_feats = m.classifier[3].in_features
    m.classifier[3] = nn.Linear(in_feats, num_classes)
    return m

model = build_model(NUM_CLASSES).to(DEVICE)

alpha_device = ALPHA.to(DEVICE)
criterion = FocalLoss(alpha=alpha_device, gamma=FOCAL_GAMMA)

# freeze backbone for warmup
for p in model.features.parameters():
    p.requires_grad = False

optimizer = optim.AdamW(filter(lambda t: t.requires_grad, model.parameters()), lr=LR, weight_decay=WEIGHT_DECAY)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min", factor=0.5, patience=2)

# -------- Helpers --------
def run_epoch(loader, train_mode=True):
    model.train(train_mode)
    total_loss, total_correct, total_samp = 0.0, 0, 0
    for x, y in tqdm(loader, desc="train" if train_mode else "val", unit="batch", leave=False):
        x, y = x.to(DEVICE), y.to(DEVICE)
        with torch.set_grad_enabled(train_mode):
            logits = model(x)
            loss = criterion(logits, y)
        if train_mode:
            optimizer.zero_grad(set_to_none=True)
            loss.backward()
            optimizer.step()
        total_loss += loss.item() * y.size(0)
        total_correct += (logits.argmax(1) == y).sum().item()
        total_samp += y.size(0)
    return total_loss / max(1, total_samp), total_correct / max(1, total_samp)

@torch.no_grad()
def collect_preds(loader):
    model.eval()
    y_true, y_pred, y_prob = [], [], []
    for x, y in tqdm(loader, desc="metrics", unit="batch", leave=False):
        x = x.to(DEVICE)
        logits = model(x)
        probs = torch.softmax(logits, dim=1).cpu().numpy()
        y_true.extend(y.numpy().tolist())
        y_pred.extend(probs.argmax(1).tolist())
        y_prob.extend(probs[:, 1].tolist())  # prob of class-1
    return torch.tensor(y_true).numpy(), torch.tensor(y_pred).numpy(), torch.tensor(y_prob).numpy()

def eval_split(loader, name="val"):
    y_true, y_pred, y_prob = collect_preds(loader)
    acc = accuracy_score(y_true, y_pred)
    p_cls, r_cls, f1_cls, supp = precision_recall_fscore_support(y_true, y_pred, labels=[0,1], average=None, zero_division=0)
    p_macro, r_macro, f1_macro, _ = precision_recall_fscore_support(y_true, y_pred, average="macro", zero_division=0)
    cm = confusion_matrix(y_true, y_pred, labels=[0,1])
    print(f"\n=== {name.upper()} === acc={acc:.4f} macroF1={f1_macro:.4f} | F1(curling)={f1_cls[1]:.4f}")
    print("CM [rows=true, cols=pred]:", cm.tolist())
    return {"accuracy": float(acc), "macro_f1": float(f1_macro), "f1_curling": float(f1_cls[1]),
            "cm": cm.tolist(), "y_true": y_true.tolist(), "y_prob": y_prob.tolist()}

# -------- Checkpoint helpers --------
def save_ckpt(epoch, model, optimizer, scheduler, best_state, best_val, no_improve):
    torch.save({
        "epoch": epoch,
        "model": model.state_dict(),
        "optimizer": optimizer.state_dict(),
        "scheduler": scheduler.state_dict() if scheduler else None,
        "best_state": best_state,
        "best_val": best_val,
        "no_improve": no_improve
    }, CKPT_PATH)

def load_ckpt(model, optimizer=None, scheduler=None):
    if not os.path.exists(CKPT_PATH): return None
    ckpt = torch.load(CKPT_PATH, map_location="cpu")
    model.load_state_dict(ckpt["model"])
    if optimizer and ckpt.get("optimizer"): optimizer.load_state_dict(ckpt["optimizer"])
    if scheduler and ckpt.get("scheduler"): scheduler.load_state_dict(ckpt["scheduler"])
    return ckpt

# -------- Training loop --------
start_epoch = 1
loaded = load_ckpt(model, optimizer, scheduler)
if loaded:
    start_epoch = loaded["epoch"] + 1
    best_state, best_val, no_improve = loaded["best_state"], loaded["best_val"], loaded["no_improve"]
    print(f"Resumed from epoch {loaded['epoch']} | best_val={best_val:.6f}")
else:
    best_state, best_val, no_improve = None, math.inf, 0

if not os.path.exists(CSV_LOG):
    with open(CSV_LOG, "w", newline="") as f:
        csv.writer(f).writerow(["epoch","phase","loss","acc","val_macro_f1","val_f1_curling"])

try:
    for epoch in range(start_epoch, EPOCHS + 1):
        if epoch == WARMUP_EPOCHS + 1:
            for p in model.features.parameters(): p.requires_grad = True
            optimizer = optim.AdamW(model.parameters(), lr=LR, weight_decay=WEIGHT_DECAY)

        print(f"\nEpoch {epoch}/{EPOCHS}" + (" [linear-probe]" if epoch <= WARMUP_EPOCHS else ""))
        t0 = time.time()
        tr_loss, tr_acc = run_epoch(train_loader, True)
        val_loss, val_acc = run_epoch(val_loader, False)
        val_metrics = eval_split(val_loader, "val")
        scheduler.step(val_loss)

        with open(CSV_LOG, "a", newline="") as f:
            w = csv.writer(f)
            w.writerow([epoch,"train",f"{tr_loss:.6f}",f"{tr_acc:.6f}","",""])
            w.writerow([epoch,"val",f"{val_loss:.6f}",f"{val_acc:.6f}",
                        f"{val_metrics['macro_f1']:.6f}",f"{val_metrics['f1_curling']:.6f}"])

        print(f"train: loss={tr_loss:.4f} acc={tr_acc:.4f} | "
              f"val: loss={val_loss:.4f} acc={val_acc:.4f} | "
              f"val F1(curling)={val_metrics['f1_curling']:.3f} macroF1={val_metrics['macro_f1']:.3f} | "
              f"time={time.time()-t0:.1f}s")

        if val_loss < best_val - 1e-4:
            best_val = val_loss
            best_state = {k: v.cpu().clone() for k,v in model.state_dict().items()}
            torch.save(model.state_dict(), BEST_PATH)
            no_improve = 0
        else:
            no_improve += 1
            if no_improve >= PATIENCE:
                print("Early stopping."); break

        save_ckpt(epoch, model, optimizer, scheduler, best_state, best_val, no_improve)

except KeyboardInterrupt:
    print("\nKeyboardInterrupt — saving checkpoint.")
    save_ckpt(epoch, model, optimizer, scheduler, best_state, best_val, no_improve)

if best_state is not None:
    model.load_state_dict(best_state)
print(f"Best weights at: {BEST_PATH}")

# -------- Threshold tuning on VAL --------
@torch.no_grad()
def best_threshold_on_val(val_metrics):
    y_true = torch.tensor(val_metrics["y_true"]).numpy()
    y_prob = torch.tensor(val_metrics["y_prob"]).numpy()
    ps, rs, ths = precision_recall_curve(y_true, y_prob)
    f1s = 2 * (ps*rs) / (ps+rs+1e-12)
    idx = f1s.argmax()
    best_th = float(ths[max(0, idx-1)]) if idx < len(ths) else 0.5
    return float(best_th), float(f1s[idx]), float(ps[idx]), float(rs[idx])

val_metrics_final = eval_split(val_loader, "val_best")
best_th, best_f1, best_p, best_r = best_threshold_on_val(val_metrics_final)
print(f"\nBest threshold on VAL for curling=1: th={best_th:.3f} | F1={best_f1:.3f} (P={best_p:.3f}, R={best_r:.3f})")

# -------- Final TEST evaluation --------
@torch.no_grad()
def eval_test_with_threshold(th):
    y_true, y_pred_argmax, y_prob = collect_preds(test_loader)
    y_pred_th = (y_prob >= th).astype(int)

    def metrics(y_true, y_pred, name):
        acc = accuracy_score(y_true, y_pred)
        p, r, f1, _ = precision_recall_fscore_support(y_true, y_pred, labels=[0,1], average=None, zero_division=0)
        cm = confusion_matrix(y_true, y_pred, labels=[0,1])
        return {"acc": float(acc),
                "precision": {"ohne": float(p[0]), "curling": float(p[1])},
                "recall":    {"ohne": float(r[0]), "curling": float(r[1])},
                "f1":        {"ohne": float(f1[0]), "curling": float(f1[1])},
                "cm": cm.tolist()}

    return {"argmax": metrics(y_true, y_pred_argmax, "argmax"),
            "thresholded": {"threshold": float(th), **metrics(y_true, y_pred_th, "th")} }

test_report = eval_test_with_threshold(best_th)
print("\n=== TEST (argmax) ===", json.dumps(test_report["argmax"], indent=2))
print("\n=== TEST (thresholded) ===", json.dumps(test_report["thresholded"], indent=2))

with open(TEST_REPORT, "w") as f:
    json.dump(test_report, f, indent=2)
print(f"Saved: {TEST_REPORT}")



Epoch 1/20 [linear-probe]


                                                         


=== VAL === acc=0.9589 macroF1=0.9588 | F1(curling)=0.9605
CM [rows=true, cols=pred]: [[67, 6], [0, 73]]
train: loss=0.0036 acc=0.8983 | val: loss=0.0053 acc=0.9589 | val F1(curling)=0.961 macroF1=0.959 | time=125.2s

Epoch 2/20 [linear-probe]


                                                         


=== VAL === acc=0.9589 macroF1=0.9588 | F1(curling)=0.9605
CM [rows=true, cols=pred]: [[67, 6], [0, 73]]
train: loss=0.0013 acc=0.9646 | val: loss=0.0024 acc=0.9589 | val F1(curling)=0.961 macroF1=0.959 | time=145.9s

Epoch 3/20 [linear-probe]


                                                         


=== VAL === acc=0.9521 macroF1=0.9519 | F1(curling)=0.9542
CM [rows=true, cols=pred]: [[66, 7], [0, 73]]
train: loss=0.0011 acc=0.9737 | val: loss=0.0033 acc=0.9521 | val F1(curling)=0.954 macroF1=0.952 | time=279.2s

Epoch 4/20


                                                         


=== VAL === acc=0.9932 macroF1=0.9932 | F1(curling)=0.9932
CM [rows=true, cols=pred]: [[72, 1], [0, 73]]
train: loss=0.0015 acc=0.9739 | val: loss=0.0002 acc=0.9932 | val F1(curling)=0.993 macroF1=0.993 | time=344.2s

Epoch 5/20


                                                         


=== VAL === acc=0.9795 macroF1=0.9794 | F1(curling)=0.9799
CM [rows=true, cols=pred]: [[70, 3], [0, 73]]
train: loss=0.0002 acc=0.9919 | val: loss=0.0034 acc=0.9795 | val F1(curling)=0.980 macroF1=0.979 | time=224.1s

Epoch 6/20


                                                         


=== VAL === acc=0.8836 macroF1=0.8820 | F1(curling)=0.8682
CM [rows=true, cols=pred]: [[73, 0], [17, 56]]
train: loss=0.0003 acc=0.9936 | val: loss=0.0316 acc=0.8836 | val F1(curling)=0.868 macroF1=0.882 | time=329.3s





Epoch 7/20


                                                         


=== VAL === acc=0.9658 macroF1=0.9657 | F1(curling)=0.9645
CM [rows=true, cols=pred]: [[73, 0], [5, 68]]
train: loss=0.0002 acc=0.9931 | val: loss=0.0083 acc=0.9658 | val F1(curling)=0.965 macroF1=0.966 | time=316.6s

Epoch 8/20


                                                         


KeyboardInterrupt — saving checkpoint.
Best weights at: mobilenetv3_focal_pretrained_best.pth


                                                         


=== VAL_BEST === acc=0.9932 macroF1=0.9932 | F1(curling)=0.9932
CM [rows=true, cols=pred]: [[72, 1], [0, 73]]

Best threshold on VAL for curling=1: th=0.666 | F1=1.000 (P=1.000, R=1.000)


metrics:   8%|▊         | 1/13 [00:03<00:42,  3.54s/batch]

In [8]:
# ===================== Unified Block: Prep → Threshold on VAL → Final TEST =====================
import os, glob, json
from pathlib import Path
from collections import Counter

import torch
import torch.nn as nn
import numpy as np

from PIL import Image, ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True

from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torchvision.models import mobilenet_v3_small, MobileNet_V3_Small_Weights

from sklearn.metrics import (
    precision_recall_curve, precision_recall_fscore_support,
    accuracy_score, confusion_matrix
)

# ---------------- Config ----------------
PROJ_ROOT = Path(r"G:\procnn")                       # project root
DATA_ROOT = PROJ_ROOT / "dataset_final_RawV5"        # must contain train/val/test
BEST_NAME = "mobilenetv3_focal_pretrained_best.pth"  # best weights saved from training
NORM_PATH = PROJ_ROOT / "norm_stats.json"            # mean/std computed on train(hybrid)
VAL_THRESHOLD_JSON = PROJ_ROOT / "val_threshold.json"
TEST_REPORT_JSON = PROJ_ROOT / "test_report.json"
DEVICE = torch.device("cpu")
BATCH_SIZE = 64
NUM_WORKERS = 0

LABEL_TO_IDX = {"ohne_curling": 0, "curling": 1}
IDX_TO_LABEL = {v: k for k, v in LABEL_TO_IDX.items()}

IMG_EXTS = {".jpg",".jpeg",".png",".bmp",".webp",".tif",".tiff",".gif",
            ".JPG",".JPEG",".PNG",".BMP",".WEBP",".TIF",".TIFF",".GIF"}

# ---------------- Utils ----------------
def is_image(p: Path) -> bool:
    return p.is_file() and p.suffix in IMG_EXTS

class ImageFolderFlat(Dataset):
    def __init__(self, root: Path, split: str, transform=None):
        self.transform = transform
        self.samples = []
        base = root / split
        if not base.exists():
            raise FileNotFoundError(f"Split folder not found: {base}")
        for label, y in LABEL_TO_IDX.items():
            d = base / label
            if not d.exists(): 
                continue
            for q in d.rglob("*"):
                if is_image(q):
                    self.samples.append((q, y))
        if not self.samples:
            raise RuntimeError(f"No images under {base}")
    def __len__(self): 
        return len(self.samples)
    def __getitem__(self, i):
        path, y = self.samples[i]
        im = Image.open(path).convert("RGB")
        if self.transform:
            im = self.transform(im)
        return im, torch.tensor(y, dtype=torch.long)

def find_best_pth() -> Path:
    p = PROJ_ROOT / BEST_NAME
    if p.is_file():
        return p
    hits = [Path(f) for f in glob.glob(str(PROJ_ROOT / "**" / BEST_NAME), recursive=True)]
    if hits:
        return hits[0]
    hits = [Path(f) for f in glob.glob(str(PROJ_ROOT / "**" / "*.pth"), recursive=True)]
    hits = [f for f in hits if "best" in f.name.lower()]
    if hits:
        return hits[0]
    raise FileNotFoundError(f"Best weights not found under {PROJ_ROOT}")

def pack_metrics(y_true, y_pred):
    acc = accuracy_score(y_true, y_pred)
    p, r, f1, _ = precision_recall_fscore_support(y_true, y_pred, labels=[0,1], average=None, zero_division=0)
    cm = confusion_matrix(y_true, y_pred, labels=[0,1]).tolist()
    return {
        "acc": float(acc),
        "precision": {"ohne_curling": float(p[0]), "curling": float(p[1])},
        "recall":    {"ohne_curling": float(r[0]), "curling": float(r[1])},
        "f1":        {"ohne_curling": float(f1[0]), "curling": float(f1[1])},
        "cm": cm
    }

@torch.no_grad()
def collect_probs(model, loader, device=DEVICE):
    model.eval()
    y_true, y_prob, y_pred_arg = [], [], []
    for x, y in loader:
        x = x.to(device)
        logits = model(x)
        probs = torch.softmax(logits, dim=1).cpu().numpy()
        y_prob.extend(probs[:, 1])               # prob for class=1
        y_pred_arg.extend(np.argmax(probs, 1))   # argmax
        y_true.extend(y.numpy())
    return np.array(y_true), np.array(y_prob), np.array(y_pred_arg)

# ---------------- Prep DataLoaders ----------------
if not DATA_ROOT.exists():
    raise FileNotFoundError(f"DATA_ROOT not found: {DATA_ROOT}")

if not NORM_PATH.exists():
    raise FileNotFoundError(f"Normalization stats not found: {NORM_PATH} (expected from training)")

stats = json.load(open(NORM_PATH, "r"))
MEAN, STD = stats["mean"], stats["std"]

eval_tf = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=MEAN, std=STD),
])

ds_val  = ImageFolderFlat(DATA_ROOT, "val",  transform=eval_tf)
ds_test = ImageFolderFlat(DATA_ROOT, "test", transform=eval_tf)

val_loader  = DataLoader(ds_val,  batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS, pin_memory=False)
test_loader = DataLoader(ds_test, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS, pin_memory=False)

def counts(ds: ImageFolderFlat):
    c = Counter([y for _, y in ds.samples])
    return {"curling": int(c.get(1,0)), "ohne_curling": int(c.get(0,0))}
print("VAL counts:", counts(ds_val), "| TEST counts:", counts(ds_test))

# ---------------- Build & Load Model ----------------
def build_model(num_classes=2):
    w = MobileNet_V3_Small_Weights.IMAGENET1K_V1
    m = mobilenet_v3_small(weights=w)
    in_feats = m.classifier[3].in_features
    m.classifier[3] = nn.Linear(in_feats, num_classes)
    return m

model = build_model(2).to(DEVICE)
best_path = find_best_pth()
state = torch.load(str(best_path), map_location=DEVICE)
model.load_state_dict(state)
model.eval()
print(f"Loaded best weights: {best_path.name} | size={best_path.stat().st_size/1024:.1f} KB")

# ---------------- 1) Threshold tuning on VAL ----------------
yv_true, yv_prob, yv_pred_arg = collect_probs(model, val_loader, DEVICE)
prec, rec, ths = precision_recall_curve(yv_true, yv_prob)  # positive class = 1
f1s = 2 * (prec * rec) / (prec + rec + 1e-12)
if len(ths) == 0:
    best_th = 0.5
else:
    best_idx = int(np.argmax(f1s))
    best_th = float(ths[max(0, min(best_idx, len(ths)-1))])

yv_pred_th = (yv_prob >= best_th).astype(int)

val_summary = {
    "best_threshold": best_th,
    "val_argmax": pack_metrics(yv_true, yv_pred_arg),
    "val_thresholded": {"threshold": best_th, **pack_metrics(yv_true, yv_pred_th)}
}
with open(VAL_THRESHOLD_JSON, "w") as f:
    json.dump(val_summary, f, indent=2)
print(f"[VAL] best_threshold = {best_th:.3f} → saved to {VAL_THRESHOLD_JSON}")

# ---------------- 2) Final TEST evaluation ----------------
yt_true, yt_prob, yt_pred_arg = collect_probs(model, test_loader, DEVICE)
yt_pred_th = (yt_prob >= best_th).astype(int)

test_report = {
    "argmax": pack_metrics(yt_true, yt_pred_arg),
    "thresholded": {"threshold": best_th, **pack_metrics(yt_true, yt_pred_th)}
}
with open(TEST_REPORT_JSON, "w") as f:
    json.dump(test_report, f, indent=2)
print(f"[TEST] saved to {TEST_REPORT_JSON}")


VAL counts: {'curling': 73, 'ohne_curling': 31827} | TEST counts: {'curling': 57, 'ohne_curling': 715}


  state = torch.load(str(best_path), map_location=DEVICE)


Loaded best weights: mobilenetv3_focal_pretrained_best.pth | size=6070.5 KB
[VAL] best_threshold = 0.513 → saved to G:\procnn\val_threshold.json
[TEST] saved to G:\procnn\test_report.json


In [None]:
import json, numpy as np, os
from pathlib import Path

PROJ_ROOT = Path(r"G:\procnn")
val_json = PROJ_ROOT / "val_threshold.json"
test_json = PROJ_ROOT / "test_report.json"

def pretty_print_report(report, title="REPORT"):
    def calc_macro_avg(d):
  
        vals = list(d.values())
        return float(np.mean(vals)) if vals else float("nan")

    def metrics_from_pack(pack):
        acc = pack["acc"]
        P = pack["precision"]
        R = pack["recall"]
        F = pack["f1"]
        cm = np.array(pack["cm"])
        
        TN, FP, FN, TP = cm[0,0], cm[0,1], cm[1,0], cm[1,1]
        spec_neg = TN / (TN + FP + 1e-12)  
        spec_pos = TP / (TP + FN + 1e-12)  
        sup0 = TN + FP  
        sup1 = FN + TP  
        macroP = calc_macro_avg(P); macroR = calc_macro_avg(R); macroF = calc_macro_avg(F)
        return {
            "accuracy": acc,
            "precision": P, "recall": R, "f1": F,
            "macro_avg": {"precision": macroP, "recall": macroR, "f1": macroF},
            "specificity": {"ohne_curling": float(spec_neg), "curling": None}, 
            "support": {"ohne_curling": int(sup0), "curling": int(sup1)},
            "cm": cm.astype(int).tolist()
        }

    print(f"\n=== {title} ===")
    if "threshold" in report:
        print(f"Threshold used: {report['threshold']:.4f}")

    
    for key in ["argmax", "thresholded"]:
        if key in report:
            block = report[key]
            th = block.get("threshold", None)
            m = metrics_from_pack(block)
            tag = f"{key.upper()} (thr={th:.4f})" if th is not None else key.upper()
            print(f"\n-- {tag} --")
            print(f"Accuracy: {m['accuracy']:.4f}")
            print("Precision:", m["precision"])
            print("Recall   :", m["recall"])
            print("F1       :", m["f1"])
            print("MacroAvg :", m["macro_avg"])
            print("Support  :", m["support"])
            print("Specificity(ohne):", m["specificity"]["ohne_curling"])
            print("Confusion Matrix [[TN,FP],[FN,TP]]:", m["cm"])

# VAL
with open(val_json, "r") as f:
    val = json.load(f)
print(f"[VAL] Best Threshold = {val['best_threshold']:.4f}")
pretty_print_report(val["val_argmax"], title="VAL—ARGMAX")
pretty_print_report(val["val_thresholded"], title="VAL—THRESHOLDED")

# TEST
with open(test_json, "r") as f:
    test = json.load(f)
pretty_print_report(test, title="TEST")


[VAL] Best Threshold = 0.5134

=== VAL—ARGMAX ===

=== VAL—THRESHOLDED ===
Threshold used: 0.5134

=== TEST ===

-- ARGMAX --
Accuracy: 1.0000
Precision: {'ohne_curling': 1.0, 'curling': 1.0}
Recall   : {'ohne_curling': 1.0, 'curling': 1.0}
F1       : {'ohne_curling': 1.0, 'curling': 1.0}
MacroAvg : {'precision': 1.0, 'recall': 1.0, 'f1': 1.0}
Support  : {'ohne_curling': 715, 'curling': 57}
Specificity(ohne): 0.9999999999999986
Confusion Matrix [[TN,FP],[FN,TP]]: [[715, 0], [0, 57]]

-- THRESHOLDED (thr=0.5134) --
Accuracy: 1.0000
Precision: {'ohne_curling': 1.0, 'curling': 1.0}
Recall   : {'ohne_curling': 1.0, 'curling': 1.0}
F1       : {'ohne_curling': 1.0, 'curling': 1.0}
MacroAvg : {'precision': 1.0, 'recall': 1.0, 'f1': 1.0}
Support  : {'ohne_curling': 715, 'curling': 57}
Specificity(ohne): 0.9999999999999986
Confusion Matrix [[TN,FP],[FN,TP]]: [[715, 0], [0, 57]]


In [10]:
import csv

def write_csv_from_pack(pack, csv_path):
    cm = pack["cm"]
    with open(csv_path, "w", newline="", encoding="utf-8") as f:
        w = csv.writer(f)
        w.writerow(["metric","ohne_curling","curling","macro"])
        w.writerow(["precision", pack["precision"]["ohne_curling"], pack["precision"]["curling"],
                    (pack["precision"]["ohne_curling"]+pack["precision"]["curling"])/2])
        w.writerow(["recall",    pack["recall"]["ohne_curling"],    pack["recall"]["curling"],
                    (pack["recall"]["ohne_curling"]+pack["recall"]["curling"])/2])
        w.writerow(["f1",        pack["f1"]["ohne_curling"],        pack["f1"]["curling"],
                    (pack["f1"]["ohne_curling"]+pack["f1"]["curling"])/2])
        w.writerow([])
        w.writerow(["accuracy", pack["acc"], "", ""])
        w.writerow([])
        w.writerow(["confusion_matrix_format","[[TN,FP],[FN,TP]]","",""])
        w.writerow(["row1", cm[0][0], cm[0][1], ""])
        w.writerow(["row2", cm[1][0], cm[1][1], ""])


with open(r"G:\procnn\test_report.json","r") as f:
    test = json.load(f)
pack = test["thresholded"]
write_csv_from_pack(pack, r"G:\procnn\test_report_thresholded.csv")
print("CSV saved.")


CSV saved.
