
# Roof Defect Classifier — DINOv2 ViT‑S/14 + Symmetric Cross‑Entropy (SCE)

This notebook fine‑tunes a **DINOv2 ViT‑S/14** encoder for **single‑head defect classification** on your ~2k in‑domain report images.

**What you get**
- Noise‑robust training via **Symmetric Cross‑Entropy (SCE)**
- **MixUp/CutMix** (timm) + light spatial/photometric augs
- **Class‑balanced sampling** (optional)
- **Grouped split** by `group` (report/building) when available
- Early stop on **macro‑F1**
- Saved artifacts: best checkpoint, per‑epoch metrics, confusion matrix, label mapping

Works in **Colab** and **Vertex AI Workbench**.


In [None]:

# %%capture
# If running in Colab/Vertex AI for the first time, uncomment:
# !pip install -U torch torchvision timm pandas scikit-learn numpy pillow


## 1) Configuration

In [None]:

from pathlib import Path

# --- Required: path to your CSV (columns: path,label[,group]) ---
CSV_PATH = "dataset.csv"  # change to your CSV

# If paths in the CSV are relative, this prefix will be prepended
IMG_ROOT = "."

# Output directory for checkpoints & reports
OUTDIR = "runs/vits14_sce_colab"

# Training hyperparams
IMG_SIZE = 288
EPOCHS = 80
BATCH_SIZE = 32
NUM_WORKERS = 4
VAL_FRAC = 0.2
USE_GROUPS = True         # set False if you don't have a 'group' column
BALANCED_SAMPLER = True   # class-balanced sampling
MIXUP = 0.1               # 0.0 to disable
CUTMIX = 0.1              # 0.0 to disable

# Loss (SCE) params
SCE_ALPHA = 0.1
SCE_BETA  = 1.0
LABEL_SMOOTH = 0.0        # optional smoothing inside SCE

# Optim schedule
LR = 5e-4
WARMUP_EPOCHS = 3.0
WEIGHT_DECAY = 1e-4
DROP_PATH = 0.1
EARLY_STOP_PATIENCE = 10
SEED = 42

Path(OUTDIR).mkdir(parents=True, exist_ok=True)
print("Saving outputs to:", OUTDIR)


## 2) Imports & Utilities

In [None]:

import os, math, json, random
import numpy as np
import pandas as pd
from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler

import timm
from timm.data.mixup import Mixup

from sklearn.model_selection import GroupShuffleSplit, StratifiedShuffleSplit
from sklearn.metrics import f1_score, classification_report, confusion_matrix

def set_seed(seed: int = 42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

def one_hot(labels: torch.Tensor, num_classes: int) -> torch.Tensor:
    return F.one_hot(labels.long(), num_classes=num_classes).float()

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", device)
set_seed(SEED)


## 3) Dataset loader (from CSV)

In [None]:

class CSVImageDataset(Dataset):
    def __init__(self, df: pd.DataFrame, img_root: str, img_size: int, is_train: bool):
        self.df = df.reset_index(drop=True)
        self.img_root = img_root
        self.is_train = is_train
        self.img_size = img_size

        # Basic transforms — keep spatial aug light; MixUp/CutMix handles label-level reg
        import torchvision.transforms as T
        if is_train:
            self.tf = T.Compose([
                T.Resize(int(img_size * 1.15)),
                T.RandomResizedCrop(img_size, scale=(0.7, 1.0)),
                T.RandomHorizontalFlip(p=0.5),
                T.RandomPerspective(distortion_scale=0.05, p=0.2),
                T.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.05, hue=0.02),
                T.ToTensor(),
                T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
            ])
        else:
            self.tf = T.Compose([
                T.Resize(int(img_size * 1.05)),
                T.CenterCrop(img_size),
                T.ToTensor(),
                T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
            ])

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx: int):
        row = self.df.iloc[idx]
        path = row["path"]
        if not os.path.isabs(path):
            path = os.path.join(self.img_root, path)
        img = Image.open(path).convert("RGB")
        img = self.tf(img)
        label = int(row["y"])
        return img, label


## 4) Symmetric Cross‑Entropy (SCE) loss

In [None]:

class SCELoss(nn.Module):
    """Symmetric Cross Entropy:
       SCE = alpha * CE(t,p) + beta * RCE(t,p)
    """
    def __init__(self, num_classes: int, alpha: float = 0.1, beta: float = 1.0, label_smooth: float = 0.0, eps: float = 1e-4):
        super().__init__()
        self.num_classes = num_classes
        self.alpha = alpha
        self.beta = beta
        self.label_smooth = label_smooth
        self.eps = eps

    def forward(self, logits: torch.Tensor, target: torch.Tensor) -> torch.Tensor:
        log_probs = F.log_softmax(logits, dim=1)
        probs = log_probs.exp()

        if target.dim() == 1:
            t = one_hot(target, logits.size(1))
            if self.label_smooth > 0:
                t = t * (1.0 - self.label_smooth) + self.label_smooth / self.num_classes
        else:
            t = target
        t = t.clamp(self.eps, 1.0)

        ce = -(t * log_probs).sum(dim=1).mean()
        rce = -(probs * torch.log(t)).sum(dim=1).mean()
        return self.alpha * ce + self.beta * rce


## 5) Model: DINOv2 ViT‑S/14

In [None]:

def create_model(num_classes: int, drop_path: float = 0.1):
    model = timm.create_model(
        "vit_small_patch14_dinov2.lvd142m",
        pretrained=True,
        num_classes=num_classes,
        drop_path_rate=drop_path,
    )
    return model


## 6) Class‑balanced sampler (optional)

In [None]:

def make_weighted_sampler(labels: np.ndarray) -> WeightedRandomSampler:
    unique, counts = np.unique(labels, return_counts=True)
    freq = {int(k): int(v) for k, v in zip(unique, counts)}
    weights = np.array([1.0 / freq[int(y)] for y in labels], dtype=np.float32)
    sampler = WeightedRandomSampler(weights.tolist(), num_samples=len(labels), replacement=True)
    return sampler


## 7) Train/Eval helpers

In [None]:

@torch.no_grad()
def evaluate(model, loader, device, num_classes: int):
    model.eval()
    all_logits, all_targets = [], []
    for imgs, labels in loader:
        imgs = imgs.to(device, non_blocking=True)
        labels = labels.to(device, non_blocking=True)
        logits = model(imgs)
        all_logits.append(logits.cpu())
        all_targets.append(labels.cpu())
    logits = torch.cat(all_logits, dim=0)
    targets = torch.cat(all_targets, dim=0).numpy()
    probs = torch.softmax(logits, dim=1).numpy()
    preds = probs.argmax(axis=1)
    top1 = (preds == targets).mean().item()
    top3 = np.mean([t in np.argpartition(p, -3)[-3:] for p, t in zip(probs, targets)]).item()
    macro_f1 = f1_score(targets, preds, average="macro")
    return top1, top3, macro_f1, preds, targets

def cosine_warmup_scheduler(optimizer, total_steps, warmup_steps):
    def lr_lambda(step):
        if step < warmup_steps:
            return float(step + 1) / float(max(1, warmup_steps))
        progress = (step - warmup_steps) / max(1, total_steps - warmup_steps)
        return 0.5 * (1.0 + math.cos(math.pi * progress))
    return torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda=lr_lambda)


## 8) Load CSV, map labels, and split

In [None]:

df = pd.read_csv(CSV_PATH)
assert "path" in df.columns and "label" in df.columns, "CSV must include columns: path,label[,group]"

# Drop unlabeled rows (blank/NaN)
df = df[df["label"].astype(str).str.len() > 0].copy()

# Label mapping
classes = sorted(df["label"].dropna().unique().tolist())
class_to_idx = {c: i for i, c in enumerate(classes)}
df["y"] = df["label"].map(class_to_idx).astype(int)

with open(os.path.join(OUTDIR, "label_to_index.json"), "w", encoding="utf-8") as f:
    json.dump(class_to_idx, f, ensure_ascii=False, indent=2)

print("Classes:", len(classes))
print("Example classes:", classes[:10])

# Grouped split if requested & available
if USE_GROUPS and "group" in df.columns:
    gss = GroupShuffleSplit(n_splits=1, test_size=VAL_FRAC, random_state=SEED)
    train_idx, val_idx = next(gss.split(df, groups=df["group"]))
else:
    sss = StratifiedShuffleSplit(n_splits=1, test_size=VAL_FRAC, random_state=SEED)
    train_idx, val_idx = next(sss.split(df, df["y"]))

df_train = df.iloc[train_idx].reset_index(drop=True)
df_val   = df.iloc[val_idx].reset_index(drop=True)

print(f"Train: {len(df_train)} | Val: {len(df_val)}")


## 9) Build datasets and dataloaders

In [None]:

ds_train = CSVImageDataset(df_train, IMG_ROOT, IMG_SIZE, is_train=True)
ds_val   = CSVImageDataset(df_val,   IMG_ROOT, IMG_SIZE, is_train=False)

if BALANCED_SAMPLER:
    sampler = make_weighted_sampler(df_train["y"].values)
    train_loader = DataLoader(ds_train, batch_size=BATCH_SIZE, sampler=sampler,
                              num_workers=NUM_WORKERS, pin_memory=True, drop_last=True)
else:
    train_loader = DataLoader(ds_train, batch_size=BATCH_SIZE, shuffle=True,
                              num_workers=NUM_WORKERS, pin_memory=True, drop_last=True)

val_loader = DataLoader(ds_val, batch_size=BATCH_SIZE, shuffle=False,
                        num_workers=NUM_WORKERS, pin_memory=True)

len(train_loader), len(val_loader)


## 10) Initialize model, optimizer, scheduler, loss, mixup

In [None]:

num_classes = len(classes)
model = create_model(num_classes=num_classes, drop_path=DROP_PATH).to(device)

optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=WEIGHT_DECAY)

total_steps = max(1, len(train_loader) * EPOCHS)
warmup_steps = int(WARMUP_EPOCHS * len(train_loader))
scheduler = cosine_warmup_scheduler(optimizer, total_steps, warmup_steps)

criterion = SCELoss(num_classes=num_classes, alpha=SCE_ALPHA, beta=SCE_BETA, label_smooth=LABEL_SMOOTH)

mixup_fn = None
if MIXUP > 0.0 or CUTMIX > 0.0:
    mixup_fn = Mixup(
        mixup_alpha=MIXUP,
        cutmix_alpha=CUTMIX,
        label_smoothing=0.0,
        num_classes=num_classes
    )

print("Model ready.")


## 11) Train

In [None]:

history = []
best_f1 = -1.0
best_ckpt = os.path.join(OUTDIR, "best.pth")
epochs_no_improve = 0

global_step = 0
for epoch in range(1, EPOCHS + 1):
    model.train()
    running_loss = 0.0
    n_seen = 0

    for imgs, labels in train_loader:
        imgs = imgs.to(device, non_blocking=True)
        labels = labels.to(device, non_blocking=True)

        if mixup_fn is not None:
            imgs, targets = mixup_fn(imgs, labels)  # soft labels
        else:
            targets = labels                        # hard labels

        logits = model(imgs)
        loss = criterion(logits, targets)

        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        scheduler.step()

        bs = imgs.size(0)
        running_loss += loss.item() * bs
        n_seen += bs
        global_step += 1

    train_loss = running_loss / max(1, n_seen)

    # Eval
    top1, top3, macro_f1, preds, targets_np = evaluate(model, val_loader, device, num_classes)
    print(f"Epoch {epoch:03d} | train_loss {train_loss:.4f} | val_top1 {top1*100:.2f}% | val_top3 {top3*100:.2f}% | macroF1 {macro_f1*100:.2f}%")

    history.append({
        "epoch": epoch,
        "train_loss": float(train_loss),
        "val_top1": float(top1),
        "val_top3": float(top3),
        "val_macro_f1": float(macro_f1),
        "lr": float(optimizer.param_groups[0]["lr"]),
    })
    pd.DataFrame(history).to_csv(os.path.join(OUTDIR, "training_log.csv"), index=False)

    if macro_f1 > best_f1 + 1e-6:
        best_f1 = macro_f1
        epochs_no_improve = 0
        torch.save({"model": model.state_dict(),
                    "classes": classes,
                    "config": {
                        "CSV_PATH": CSV_PATH, "IMG_ROOT": IMG_ROOT, "IMG_SIZE": IMG_SIZE
                    }}, best_ckpt)
    else:
        epochs_no_improve += 1
        if epochs_no_improve >= EARLY_STOP_PATIENCE:
            print(f"Early stopping at epoch {epoch}. Best macro-F1: {best_f1*100:.2f}%")
            break

print("Best macro-F1:", round(best_f1*100, 2), "%")
print("Checkpoint:", best_ckpt)


## 12) Final evaluation and reports

In [None]:

# Load best
ckpt = torch.load(best_ckpt, map_location="cpu")
model.load_state_dict(ckpt["model"])

top1, top3, macro_f1, preds, targets_np = evaluate(model, val_loader, device, num_classes)
print(f"VAL  Top-1: {top1*100:.2f}%  | Top-3: {top3*100:.2f}%  | Macro-F1: {macro_f1*100:.2f}%")

# Classification report
rep = classification_report(targets_np, preds, target_names=classes, digits=4, zero_division=0)
print(rep)
with open(os.path.join(OUTDIR, "val_classification_report.txt"), "w", encoding="utf-8") as f:
    f.write(rep)

# Confusion matrix
cm = confusion_matrix(targets_np, preds, labels=list(range(num_classes)))
cm_df = pd.DataFrame(cm, index=classes, columns=classes)
cm_csv = os.path.join(OUTDIR, "val_confusion_matrix.csv")
cm_df.to_csv(cm_csv)

print("Saved:")
print(" - training log:", os.path.join(OUTDIR, "training_log.csv"))
print(" - classification report:", os.path.join(OUTDIR, "val_classification_report.txt"))
print(" - confusion matrix:", cm_csv)
print(" - label mapping:", os.path.join(OUTDIR, "label_to_index.json"))


## 13) (Optional) Export artifacts to Google Cloud Storage

In [None]:

# If you're on Colab/Vertex and want to push results to GCS:
# 1) Set your bucket
# GCS_BUCKET = "gs://your-bucket-name/roof-defect-runs"
#
# 2) If on Colab:
# from google.colab import auth
# auth.authenticate_user()
#
# 3) Copy
# !gsutil -m cp -r $OUTDIR $GCS_BUCKET
#
# Or use the Python client:
# from google.cloud import storage
# client = storage.Client()
# bucket = client.bucket(GCS_BUCKET.replace("gs://",""))
# for path in Path(OUTDIR).glob("**/*"):
#     if path.is_file():
#         blob = bucket.blob(str(Path(OUTDIR).name / path.relative_to(OUTDIR)))
#         blob.upload_from_filename(str(path))
