In [4]:
# === Stronger Multiclass Trainer (Notebook-safe, fixed) ======================
# JSONL row: {"image_path": "images/xxx_post_disaster.png", "damage": "<no/minor/major/destroyed>"}

import os, json, time, random, math
from pathlib import Path
from typing import List, Dict, Tuple

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from PIL import Image, UnidentifiedImageError
import torchvision
from torchvision import transforms as T
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score

# ---------------- CONFIG (you can tweak) ----------------
DATA_DIR   = Path("disaster-ai/data/xbd/tier1")
TRAIN_JL   = DATA_DIR / "train.jsonl"
VAL_JL     = DATA_DIR / "val.jsonl"
OUT_DIR    = Path("checkpoints_multiclass_strong"); OUT_DIR.mkdir(parents=True, exist_ok=True)

IMG_KEY    = "image_path"
LABEL_KEY  = "damage"

CLASSES    = ['no-damage','minor-damage','major-damage','destroyed']
CLASS_TO_ID = {c:i for i,c in enumerate(CLASSES)}
ID_TO_CLASS = {i:c for c,i in CLASS_TO_ID.items()}
N_CLASSES  = len(CLASSES)

# Model + training
BACKBONE     = "resnet50"         # 'resnet18' | 'resnet50' | 'efficientnet_b0' | 'vit_b_16'
IMG_SIZE     = 320                # 320–384 usually helps
BATCH_SIZE   = 24                 # lower if out of memory
EPOCHS       = 40
BASE_LR      = 5e-4               # will warm up first few epochs
WEIGHT_DECAY = 1e-4
WARMUP_EPOCHS = 3                 # LR warmup
HEAD_WARMUP_EPOCHS = 2            # freeze backbone for first K epochs

# Loss mode: "ce_ls" (CrossEntropy with label smoothing) or "focal"
LOSS_MODE    = "ce_ls"            # "ce_ls" | "focal"
LABEL_SMOOTH = 0.1                # used if LOSS_MODE == "ce_ls"
FOCAL_GAMMA  = 2.0                # used if LOSS_MODE == "focal"

# Imbalance controls
OVERSAMPLE = {                     # multiply examples for a class in training set
    'minor-damage': 2.0,
    # 'major-damage': 1.2,
}
USE_SAMPLER = True                 # keep True to use WeightedRandomSampler also

# Dataloader (Notebook-safe defaults; will be overridden for CUDA)
NUM_WORKERS = 0
PIN_MEMORY  = False

# TTA (test-time augmentation)
TTA_N = 4

SEED = 42
# --------------------------------------------------------

def set_seed(s=42):
    random.seed(s); np.random.seed(s)
    torch.manual_seed(s); torch.cuda.manual_seed_all(s)

def read_jsonl(path: Path) -> List[Dict]:
    assert path.exists(), f"Missing file: {path}"
    rows = []
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if line:
                rows.append(json.loads(line))
    return rows

def resolve_img_path(p: str | Path) -> Path:
    p = Path(p)
    return p if p.is_absolute() else (DATA_DIR / p)

class ImgDS(Dataset):
    def __init__(self, rows: List[Dict], transform=None):
        self.rows = rows
        self.t = transform or (lambda x: x)

    def __len__(self): return len(self.rows)

    def __getitem__(self, i):
        r = self.rows[i]
        img_path = resolve_img_path(r[IMG_KEY])
        try:
            y = CLASS_TO_ID[r[LABEL_KEY]]
        except KeyError:
            raise KeyError(f"Row {i}: bad label '{r.get(LABEL_KEY)}'. Expected {list(CLASSES)}")
        try:
            img = Image.open(img_path).convert("RGB")
        except FileNotFoundError:
            raise FileNotFoundError(f"Row {i}: missing image -> {img_path}")
        except UnidentifiedImageError:
            raise UnidentifiedImageError(f"Row {i}: unreadable image -> {img_path}")
        return self.t(img), torch.tensor(y, dtype=torch.long)

def get_train_tf():
    return T.Compose([
        T.Resize(int(IMG_SIZE*1.2)),
        T.RandomResizedCrop(IMG_SIZE, scale=(0.5, 1.0), ratio=(0.75, 1.33)),
        T.RandomHorizontalFlip(),
        T.RandomVerticalFlip(p=0.2),
        T.RandomRotation(degrees=10),
        T.RandomApply([T.ColorJitter(0.4,0.4,0.4,0.1)], p=0.7),
        T.RandomGrayscale(p=0.1),
        T.GaussianBlur(kernel_size=3, sigma=(0.1, 2.0)),
        T.ToTensor(),
        T.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225]),
    ])

def get_val_tf():
    return T.Compose([
        T.Resize(int(IMG_SIZE*1.2)),
        T.CenterCrop(IMG_SIZE),
        T.ToTensor(),
        T.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225]),
    ])

def build_model(backbone: str, n_classes: int) -> nn.Module:
    if backbone == "resnet18":
        m = torchvision.models.resnet18(weights=torchvision.models.ResNet18_Weights.IMAGENET1K_V1)
        m.fc = nn.Linear(m.fc.in_features, n_classes)
    elif backbone == "resnet50":
        m = torchvision.models.resnet50(weights=torchvision.models.ResNet50_Weights.IMAGENET1K_V2)
        m.fc = nn.Linear(m.fc.in_features, n_classes)
    elif backbone == "efficientnet_b0":
        m = torchvision.models.efficientnet_b0(weights=torchvision.models.EfficientNet_B0_Weights.IMAGENET1K_V1)
        m.classifier[1] = nn.Linear(m.classifier[1].in_features, n_classes)
    elif backbone == "vit_b_16":
        m = torchvision.models.vit_b_16(weights=torchvision.models.ViT_B_16_Weights.IMAGENET1K_V1)
        m.heads.head = nn.Linear(m.heads.head.in_features, n_classes)
    else:
        raise ValueError(f"Unknown backbone: {backbone}")
    return m

class FocalLoss(nn.Module):
    def __init__(self, weight=None, gamma=2.0, reduction="mean"):
        super().__init__()
        self.weight, self.gamma, self.reduction = weight, gamma, reduction
    def forward(self, logits, target):
        ce = nn.functional.cross_entropy(logits, target, weight=self.weight, reduction="none")
        with torch.no_grad():
            pt = torch.softmax(logits, dim=1).gather(1, target.unsqueeze(1)).squeeze(1).clamp_(1e-6, 1-1e-6)
        loss = ((1-pt)**self.gamma) * ce
        return loss.mean() if self.reduction=="mean" else loss.sum()

def make_oversampled_rows(rows: List[Dict], factors: Dict[str,float]) -> List[Dict]:
    out = []
    for r in rows:
        out.append(r)
        f = float(factors.get(r[LABEL_KEY], 1.0))
        k = max(0, int(round(f - 1.0)))
        for _ in range(k):
            out.append(r.copy())
    random.shuffle(out)
    return out

def class_counts(rows: List[Dict]) -> Dict[str,int]:
    c = {k:0 for k in CLASSES}
    for r in rows: c[r[LABEL_KEY]] += 1
    return c

def pick_device():
    if torch.cuda.is_available():
        return torch.device("cuda"), True, True   # device, use_amp, pin_memory
    if getattr(torch.backends, "mps", None) and torch.backends.mps.is_available():
        return torch.device("mps"), False, False
    return torch.device("cpu"), False, False

def train_and_eval():
    set_seed(SEED)
    device, use_amp, pin_memory = pick_device()
    print("Device:", device)

    tr_rows = read_jsonl(TRAIN_JL)
    va_rows = read_jsonl(VAL_JL)

    if OVERSAMPLE:
        tr_rows = make_oversampled_rows(tr_rows, OVERSAMPLE)

    print(f"Train samples: {len(tr_rows)} | Val samples: {len(va_rows)}")
    print("Class counts (train):", class_counts(tr_rows))

    ds_tr = ImgDS(tr_rows, get_train_tf())
    ds_va = ImgDS(va_rows, get_val_tf())

    if USE_SAMPLER:
        counts = np.array([class_counts(tr_rows)[c] for c in CLASSES], dtype=float)
        inv = (counts.sum() / np.maximum(counts, 1.0)).astype(float)
        sample_w = [inv[CLASS_TO_ID[r[LABEL_KEY]]] for r in tr_rows]
        sampler = WeightedRandomSampler(sample_w, num_samples=len(sample_w), replacement=True)
        shuffle_flag = False
    else:
        sampler = None
        shuffle_flag = True

    persistent = (NUM_WORKERS > 0)
    loader_tr = DataLoader(ds_tr, batch_size=BATCH_SIZE, sampler=sampler,
                           shuffle=shuffle_flag, num_workers=NUM_WORKERS,
                           pin_memory=pin_memory, persistent_workers=persistent)
    loader_va = DataLoader(ds_va, batch_size=BATCH_SIZE, shuffle=False,
                           num_workers=NUM_WORKERS, pin_memory=pin_memory,
                           persistent_workers=persistent)

    model = build_model(BACKBONE, N_CLASSES).to(device)

    counts = np.array([class_counts(tr_rows)[c] for c in CLASSES], dtype=float)
    inv = (counts.sum() / np.maximum(counts, 1.0)).astype(float)
    class_w = torch.tensor(inv, dtype=torch.float, device=device)

    if LOSS_MODE == "focal":
        loss_fn = FocalLoss(weight=class_w, gamma=FOCAL_GAMMA)
    else:
        loss_fn = nn.CrossEntropyLoss(weight=class_w if class_w.sum() > 0 else None,
                                      label_smoothing=LABEL_SMOOTH)

    opt = optim.AdamW(model.parameters(), lr=BASE_LR, weight_decay=WEIGHT_DECAY)
    scheduler = optim.lr_scheduler.CosineAnnealingLR(opt, T_max=EPOCHS)
    scaler = torch.cuda.amp.GradScaler(enabled=use_amp)

    # Freeze backbone for head warmup
    for p in model.parameters(): p.requires_grad = True
    def is_head(name): return ("fc" in name) or ("classifier.1" in name) or ("heads.head" in name)
    for name, p in model.named_parameters():
        if not is_head(name): p.requires_grad = False

    best_acc = -1.0

    for epoch in range(1, EPOCHS+1):
        # LR warmup
        if epoch <= WARMUP_EPOCHS:
            warm_lr = 1e-4 + (BASE_LR-1e-4) * (epoch / max(1,WARMUP_EPOCHS))
            for g in opt.param_groups: g["lr"] = warm_lr

        # Unfreeze backbone after head warmup
        if epoch == HEAD_WARMUP_EPOCHS + 1:
            for name, p in model.named_parameters():
                p.requires_grad = True
            for g in opt.param_groups: g["lr"] = min(g["lr"], BASE_LR)

        model.train()
        t0 = time.time()
        running, seen = 0.0, 0

        for xb, yb in loader_tr:
            xb, yb = xb.to(device, non_blocking=pin_memory), yb.to(device, non_blocking=pin_memory)
            opt.zero_grad(set_to_none=True)
            if use_amp:
                with torch.cuda.amp.autocast():
                    logits = model(xb)
                    loss = loss_fn(logits, yb)
                scaler.scale(loss).backward()
                scaler.step(opt); scaler.update()
            else:
                logits = model(xb)
                loss = loss_fn(logits, yb)
                loss.backward(); opt.step()

            running += float(loss.detach().item()) * xb.size(0)
            seen += xb.size(0)

        if epoch > WARMUP_EPOCHS:
            scheduler.step()
        train_loss = running / max(1, seen)

        # ----- Validation -----
        model.eval()
        preds, tgts = [], []
        with torch.no_grad():
            for xb, yb in loader_va:
                xb = xb.to(device, non_blocking=pin_memory)
                logits = model(xb)
                pred = logits.argmax(1).cpu()
                preds.append(pred); tgts.append(yb)
        preds = torch.cat(preds).numpy()
        tgts  = torch.cat(tgts).numpy()
        acc = (preds == tgts).mean()

        # per-class acc (FIXED: use np.diag on numpy cm)
        cm = confusion_matrix(tgts, preds, labels=list(range(N_CLASSES)))
        per_class = (np.diag(cm) / np.maximum(cm.sum(axis=1), 1)).tolist()
        msg = ", ".join(f"{ID_TO_CLASS[i]}={per_class[i]:.3f}" for i in range(N_CLASSES))
        print(f"[{epoch:02d}/{EPOCHS}] train_loss={train_loss:.4f}  val_acc={acc:.4f}  "
              f"per_class: {msg}  (took {time.time()-t0:.1f}s)")

        if acc > best_acc:
            best_acc = acc
            torch.save({
                "model_state": model.state_dict(),
                "backbone": BACKBONE,
                "classes": CLASSES,
                "img_size": IMG_SIZE,
                "acc": float(best_acc),
            }, OUT_DIR / "best.pt")
            print(f"  ✅ Saved best -> {OUT_DIR/'best.pt'} (val_acc={best_acc:.4f})")

    # save final weights only
    torch.save(model.state_dict(), OUT_DIR / "last_weights_only.pt")
    print("Training complete. Best val_acc:", best_acc)
    return model

# ---------- TTA predict + Evaluation helpers ----------

def build_infer_pipeline(ckpt_dir: Path = OUT_DIR):
    ckpt_path = ckpt_dir / "best.pt"
    if not ckpt_path.exists():
        alt = ckpt_dir / "last_weights_only.pt"
        assert alt.exists(), f"No checkpoints found in {ckpt_dir}"
        raise RuntimeError("best.pt not found; re-run training to create it.")
    ckpt = torch.load(ckpt_path, map_location="cpu")
    classes = ckpt.get("classes", CLASSES)
    img_size = ckpt.get("img_size", IMG_SIZE)
    backbone = ckpt.get("backbone", BACKBONE)

    model = build_model(backbone, len(classes))
    model.load_state_dict(ckpt["model_state"])
    model.eval()

    base_tf = T.Compose([
        T.Resize(int(img_size*1.2)),
        T.CenterCrop(img_size),
        T.ToTensor(),
        T.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225]),
    ])
    return model, base_tf, classes

def predict_one_tta(model, base_tf, classes, img_path: Path, tta_n: int = 4):
    img = Image.open(img_path).convert("RGB")
    tfs = [base_tf]
    if tta_n >= 2: tfs.append(T.Compose([T.RandomHorizontalFlip(p=1.0), *base_tf.transforms]))
    if tta_n >= 3: tfs.append(T.Compose([T.RandomVerticalFlip(p=1.0), *base_tf.transforms]))
    if tta_n >= 4: tfs.append(T.Compose([T.RandomRotation(10), *base_tf.transforms]))

    probs_sum = torch.zeros(len(classes))
    with torch.no_grad():
        for tf in tfs[:tta_n]:
            x = tf(img).unsqueeze(0)
            logits = model(x)
            probs_sum += torch.softmax(logits, dim=1).squeeze(0)
    probs = probs_sum / len(tfs[:tta_n])
    top = int(probs.argmax().item())
    return classes[top], float(probs[top])

def eval_on_val_with_tta(tta_n: int = TTA_N):
    model, base_tf, classes = build_infer_pipeline(OUT_DIR)
    with open(VAL_JL, "r") as f:
        rows = [json.loads(line) for line in f]

    y_true, y_pred = [], []
    for r in rows:
        p = resolve_img_path(r[IMG_KEY])
        pred, _ = predict_one_tta(model, base_tf, classes, p, tta_n=tta_n)
        y_pred.append(pred); y_true.append(r[LABEL_KEY])

    acc = accuracy_score(y_true, y_pred)
    print(f"\nOverall Validation Accuracy (TTA={tta_n}): {acc*100:.2f}%\n")
    print("Classification Report:\n")
    print(classification_report(y_true, y_pred, labels=CLASSES, digits=3))
    cm = confusion_matrix(y_true, y_pred, labels=CLASSES)
    print("\nConfusion Matrix (rows=True, cols=Pred):\n", cm)

# ======================= RUN TRAINING =======================
model = train_and_eval()
# After training, evaluate with TTA (increase tta_n to 4 or 8 for a small boost)
eval_on_val_with_tta(tta_n=TTA_N)
# =================================================================

Device: mps
Train samples: 1958 | Val samples: 448
Class counts (train): {'no-damage': 1042, 'minor-damage': 332, 'major-damage': 247, 'destroyed': 337}


  scaler = torch.cuda.amp.GradScaler(enabled=use_amp)


[01/40] train_loss=1.2556  val_acc=0.2411  per_class: no-damage=0.000, minor-damage=0.310, major-damage=0.869, destroyed=0.500  (took 130.6s)
  ✅ Saved best -> checkpoints_multiclass_strong/best.pt (val_acc=0.2411)
[02/40] train_loss=1.1491  val_acc=0.2366  per_class: no-damage=0.000, minor-damage=0.167, major-damage=0.869, destroyed=0.548  (took 129.3s)
[03/40] train_loss=1.0735  val_acc=0.3772  per_class: no-damage=0.188, minor-damage=0.429, major-damage=0.803, destroyed=0.631  (took 207.8s)
  ✅ Saved best -> checkpoints_multiclass_strong/best.pt (val_acc=0.3772)
[04/40] train_loss=0.9980  val_acc=0.3482  per_class: no-damage=0.146, minor-damage=0.571, major-damage=0.721, destroyed=0.595  (took 223.4s)
[05/40] train_loss=0.9701  val_acc=0.3326  per_class: no-damage=0.115, minor-damage=0.548, major-damage=0.557, destroyed=0.738  (took 214.5s)
[06/40] train_loss=0.9351  val_acc=0.3862  per_class: no-damage=0.195, minor-damage=0.381, major-damage=0.787, destroyed=0.690  (took 223.4s)
  

In [6]:
# Inspect where it's going wrong
import json, pandas as pd
from pathlib import Path

VAL_JL = Path("disaster-ai/data/xbd/tier1/val.jsonl")
ckpt_dir = Path("checkpoints_multiclass_strong")

from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from torchvision import transforms as T
from PIL import Image
import torch, numpy as np

# Reuse the helpers from the training cell if they still exist:
# resolve_img_path, build_infer_pipeline, predict_one_tta, CLASSES

model, base_tf, classes = build_infer_pipeline(ckpt_dir)
rows = [json.loads(l) for l in open(VAL_JL, "r")]
records = []
for r in rows:
    p = resolve_img_path(r["image_path"])
    pred, prob = predict_one_tta(model, base_tf, classes, p, tta_n=4)
    records.append({
        "image_path": str(p),
        "true": r["damage"],
        "pred": pred,
        "conf": round(prob, 4),
        "is_correct": pred == r["damage"]
    })

df = pd.DataFrame(records)
mistakes = df[~df["is_correct"]].sort_values("conf", ascending=False)
display(mistakes.head(25))
mistakes.to_csv("val_mistakes_top.csv", index=False)
print("Saved:", "val_mistakes_top.csv")

Unnamed: 0,image_path,true,pred,conf,is_correct
395,disaster-ai/data/xbd/tier1/images/socal-fire_0...,no-damage,destroyed,0.9703,False
165,disaster-ai/data/xbd/tier1/images/socal-fire_0...,no-damage,destroyed,0.9684,False
385,disaster-ai/data/xbd/tier1/images/hurricane-ma...,major-damage,minor-damage,0.9633,False
265,disaster-ai/data/xbd/tier1/images/santa-rosa-w...,minor-damage,destroyed,0.9526,False
252,disaster-ai/data/xbd/tier1/images/hurricane-ha...,destroyed,major-damage,0.9525,False
415,disaster-ai/data/xbd/tier1/images/hurricane-ha...,minor-damage,major-damage,0.9459,False
375,disaster-ai/data/xbd/tier1/images/hurricane-fl...,minor-damage,major-damage,0.9433,False
350,disaster-ai/data/xbd/tier1/images/hurricane-ha...,destroyed,major-damage,0.9399,False
281,disaster-ai/data/xbd/tier1/images/hurricane-fl...,destroyed,major-damage,0.9387,False
377,disaster-ai/data/xbd/tier1/images/hurricane-fl...,minor-damage,major-damage,0.9355,False


Saved: val_mistakes_top.csv


In [8]:
# Low-LR fine-tuning for 8 more epochs from best.pt
import torch, json, time
from pathlib import Path
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms as T

# Reuse helper funcs from earlier cell: build_model, get_train_tf, get_val_tf,
# read_jsonl, resolve_img_path, ImgDS, CLASSES, N_CLASSES, DATA_DIR, TRAIN_JL, VAL_JL

ckpt_dir = Path("checkpoints_multiclass_strong")
ckpt = torch.load(ckpt_dir/"best.pt", map_location="cpu")

model = build_model(ckpt["backbone"], N_CLASSES)
model.load_state_dict(ckpt["model_state"])
model.train()

device = torch.device("cuda" if torch.cuda.is_available() else ("mps" if getattr(torch.backends, "mps", None) and torch.backends.mps.is_available() else "cpu"))
model.to(device)

# Datasets/loaders
tr_rows = read_jsonl(TRAIN_JL)
va_rows = read_jsonl(VAL_JL)
ds_tr = ImgDS(tr_rows, get_train_tf())
ds_va = ImgDS(va_rows, get_val_tf())

loader_tr = DataLoader(ds_tr, batch_size=24, shuffle=True, num_workers=0)
loader_va = DataLoader(ds_va, batch_size=24, shuffle=False, num_workers=0)

# Very small LR; unfreeze everything
for p in model.parameters(): p.requires_grad = True
opt = optim.AdamW(model.parameters(), lr=1e-4, weight_decay=5e-5)
loss_fn = nn.CrossEntropyLoss()

best_acc = ckpt.get("acc", 0.0)
EPOCHS = 8

for epoch in range(1, EPOCHS+1):
    t0 = time.time(); run, seen = 0.0, 0
    for xb, yb in loader_tr:
        xb, yb = xb.to(device), yb.to(device)
        opt.zero_grad(set_to_none=True)
        logits = model(xb)
        loss = loss_fn(logits, yb)
        loss.backward(); opt.step()
        run += loss.item() * xb.size(0); seen += xb.size(0)
    tr_loss = run/max(1,seen)

    # val
    model.eval()
    preds, tgts = [], []
    with torch.no_grad():
        for xb, yb in loader_va:
            xb = xb.to(device)
            logits = model(xb)
            preds.append(logits.argmax(1).cpu()); tgts.append(yb)
    import torch as _t
    acc = (_t.cat(preds) == _t.cat(tgts)).float().mean().item()
    model.train()
    print(f"[FT {epoch:02d}/{EPOCHS}] train_loss={tr_loss:.4f}  val_acc={acc:.4f}  (took {time.time()-t0:.1f}s)")
    if acc > best_acc:
        best_acc = acc
        torch.save({"model_state": model.state_dict(),
                    "backbone": ckpt["backbone"],
                    "classes": ckpt["classes"],
                    "img_size": ckpt["img_size"],
                    "acc": best_acc}, ckpt_dir/"best.pt")
        print("  ✅ Saved new best.pt", best_acc)

[FT 01/8] train_loss=0.1565  val_acc=0.6942  (took 172.3s)
  ✅ Saved new best.pt 0.6941964030265808
[FT 02/8] train_loss=0.0996  val_acc=0.6875  (took 172.5s)
[FT 03/8] train_loss=0.1097  val_acc=0.6696  (took 172.0s)
[FT 04/8] train_loss=0.1178  val_acc=0.6830  (took 179.0s)
[FT 05/8] train_loss=0.0831  val_acc=0.7165  (took 183.5s)
  ✅ Saved new best.pt 0.7165178656578064
[FT 06/8] train_loss=0.0885  val_acc=0.6920  (took 194.9s)
[FT 07/8] train_loss=0.0772  val_acc=0.6987  (took 192.7s)
[FT 08/8] train_loss=0.0925  val_acc=0.6808  (took 203.0s)


In [10]:
LOSS_MODE = "focal"         # change in your strong trainer cell
FOCAL_GAMMA = 2.0           # keep
OVERSAMPLE = {
    "minor-damage": 3.0,    # try 3x or even 4x
    "major-damage": 1.3
}
EPOCHS = 20                  # you can do a shorter focused run

In [12]:
eval_on_val_with_tta(tta_n=8)


Overall Validation Accuracy (TTA=8): 72.54%

Classification Report:

              precision    recall  f1-score   support

   no-damage      0.785     0.897     0.837       261
minor-damage      0.409     0.214     0.281        42
major-damage      0.636     0.574     0.603        61
   destroyed      0.644     0.560     0.599        84

    accuracy                          0.725       448
   macro avg      0.619     0.561     0.580       448
weighted avg      0.703     0.725     0.709       448


Confusion Matrix (rows=True, cols=Pred):
 [[234   7   2  18]
 [ 23   9   8   2]
 [ 18   2  35   6]
 [ 23   4  10  47]]


In [14]:
# Bias tweak on the classification layer to help minor-damage recall
import torch

ckpt_dir = Path("checkpoints_multiclass_strong")
ckpt = torch.load(ckpt_dir/"best.pt", map_location="cpu")
model = build_model(ckpt["backbone"], len(ckpt["classes"]))
model.load_state_dict(ckpt["model_state"])

# Identify the class index for 'minor-damage'
minor_idx = CLASSES.index("minor-damage")

# Small bias lift (adjust 0.05–0.20)
with torch.no_grad():
    if hasattr(model, "fc"):            # resnets
        model.fc.bias[minor_idx] += 0.10
    elif hasattr(model, "classifier"):   # efficientnet
        model.classifier[1].bias[minor_idx] += 0.10
    elif hasattr(model, "heads") and hasattr(model.heads, "head"):  # ViT
        model.heads.head.bias[minor_idx] += 0.10

torch.save({"model_state": model.state_dict(),
            "backbone": ckpt["backbone"],
            "classes": ckpt["classes"],
            "img_size": ckpt["img_size"],
            "acc": ckpt.get("acc", 0.0)}, ckpt_dir/"best.pt")

print("Tweaked bias for 'minor-damage' and saved to best.pt. Re-run eval:")
eval_on_val_with_tta(tta_n=4)

Tweaked bias for 'minor-damage' and saved to best.pt. Re-run eval:

Overall Validation Accuracy (TTA=4): 72.77%

Classification Report:

              precision    recall  f1-score   support

   no-damage      0.783     0.900     0.838       261
minor-damage      0.450     0.214     0.290        42
major-damage      0.625     0.574     0.598        61
   destroyed      0.653     0.560     0.603        84

    accuracy                          0.728       448
   macro avg      0.628     0.562     0.582       448
weighted avg      0.706     0.728     0.710       448


Confusion Matrix (rows=True, cols=Pred):
 [[235   6   2  18]
 [ 23   9   8   2]
 [ 19   2  35   5]
 [ 23   3  11  47]]


In [16]:
# Dump validation predictions to CSV (submission-style artifact)
import json, pandas as pd
from pathlib import Path

VAL_JL = Path("disaster-ai/data/xbd/tier1/val.jsonl")
ckpt_dir = Path("checkpoints_multiclass_strong")

model, base_tf, classes = build_infer_pipeline(ckpt_dir)
rows = [json.loads(l) for l in open(VAL_JL, "r")]

recs = []
for r in rows:
    p = resolve_img_path(r["image_path"])
    pred, prob = predict_one_tta(model, base_tf, classes, p, tta_n=4)
    recs.append({"image_path": str(p), "true": r["damage"], "pred": pred, "conf": round(prob, 4)})

pd.DataFrame(recs).to_csv("val_predictions.csv", index=False)
print("Saved val_predictions.csv")

Saved val_predictions.csv
