# üõ£Ô∏è RoadScan AI ‚Äî Colab Training
Run each cell top to bottom. Only edit the CONFIG cell.

## Cell 1 ¬∑ Install

In [1]:
!pip install -q torch torchvision pyyaml scikit-learn

## Cell 2 ¬∑ Imports

In [2]:
import os, time, json
from pathlib import Path
import yaml
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import CosineAnnealingLR
from torch.utils.data import DataLoader, Dataset, WeightedRandomSampler
from torchvision import transforms, models
from torchvision.models import EfficientNet_V2_S_Weights
from PIL import Image

print("PyTorch:", torch.__version__)
print("CUDA:", torch.cuda.is_available())
if torch.cuda.is_available():
    print("GPU:", torch.cuda.get_device_name(0))

PyTorch: 2.10.0+cpu
CUDA: False


## Cell 3 ¬∑ Mount Google Drive

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## ‚öôÔ∏è Cell 4 ¬∑ CONFIG ‚Äî Edit this cell

In [8]:
DATA_DIR   = "/content/drive/MyDrive/maddata-hackathon-2026/datasets/dataset"
OUTPUT_DIR = "/content/drive/MyDrive/maddata-hackathon-2026/checkpoints"

EPOCHS     = 20
BATCH_SIZE = 32   # lower to 16 if out-of-memory
LR         = 1e-3
IMAGE_SIZE = 224

os.makedirs(OUTPUT_DIR, exist_ok=True)
print("Data:", DATA_DIR)
print("Output:", OUTPUT_DIR)

Data: /content/drive/MyDrive/maddata-hackathon-2026/datasets/dataset
Output: /content/drive/MyDrive/maddata-hackathon-2026/checkpoints


## Cell 5 ¬∑ Load dataset.yaml

In [9]:
with open(f"{DATA_DIR}/dataset.yaml") as f:
    cfg = yaml.safe_load(f)

CLASS_NAMES  = cfg["names"]
NUM_CLASSES  = len(CLASS_NAMES)
class_to_idx = {name: i for i, name in enumerate(CLASS_NAMES)}

# Dataset uses {split}/{class_name}/ folder structure (no /images sub-folder)
TRAIN_DIR = Path(DATA_DIR) / "train"
VAL_DIR   = Path(DATA_DIR) / "val"
TEST_DIR  = Path(DATA_DIR) / "test"

print("Classes:", CLASS_NAMES)
print("Train dir:", TRAIN_DIR, "| exists:", TRAIN_DIR.exists())
print("Val dir:  ", VAL_DIR,   "| exists:", VAL_DIR.exists())
print("Test dir: ", TEST_DIR,  "| exists:", TEST_DIR.exists())

Classes: ['potholes', 'cracked_pavement', 'road_debris_obstruction', 'broken_road_signs', 'faded_lane_markings', 'normal_road']
Train dir: /content/drive/MyDrive/maddata-hackathon-2026/datasets/dataset/train | exists: True
Val dir:   /content/drive/MyDrive/maddata-hackathon-2026/datasets/dataset/val | exists: True
Test dir:  /content/drive/MyDrive/maddata-hackathon-2026/datasets/dataset/test | exists: True


## Cell 6 ¬∑ Device

In [10]:
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("GPU:", torch.cuda.get_device_name(0))
else:
    device = torch.device("cpu")
    print("CPU only ‚Äî training will be slow")

CPU only ‚Äî training will be slow


## Cell 7 ¬∑ Dataset

Class is matched from the filename prefix ‚Äî longest match wins, so `broken_streetlight_abc.jpg` ‚Üí `broken_streetlight`, not `broken_sidewalk`.

In [11]:
VALID_EXTS = {".jpg", ".jpeg", ".png", ".bmp", ".webp"}

class RoadDataset(Dataset):
    """
    Supports two dataset structures:

    Standard (val / test):
        {split_dir}/{class_name}/{image}.jpg

    With synthetic sub-folder (train only):
        {split_dir}/{class_name}/{image}.jpg          <- real images
        {split_dir}/{class_name}/synthetic/{image}.jpg <- synthetic images

    Both real and synthetic images are loaded and treated identically
    during training. The label comes from the class_name folder, not
    the file name or sub-folder name.
    """
    def __init__(self, split_dir, transform):
        self.transform = transform
        self.samples   = []
        split_path = Path(split_dir)

        for class_dir in sorted(split_path.iterdir()):
            if not class_dir.is_dir():
                continue
            cls = class_dir.name
            if cls not in class_to_idx:
                print(f"  WARNING: folder '{cls}' not in class list ‚Äî skipping")
                continue
            label = class_to_idx[cls]
            real, synth = 0, 0

            for item in sorted(class_dir.iterdir()):
                if item.is_file() and item.suffix.lower() in VALID_EXTS:
                    # Direct image inside class folder (real data)
                    self.samples.append((item, label))
                    real += 1
                elif item.is_dir() and item.name == "synthetic":
                    # Recurse one level into synthetic/ sub-folder
                    for img_path in sorted(item.iterdir()):
                        if img_path.suffix.lower() in VALID_EXTS:
                            self.samples.append((img_path, label))
                            synth += 1

            print(f"  [{cls}]  real={real}  synthetic={synth}  total={real+synth}")

        print(f"  ‚Üí {len(self.samples)} total images from {split_path.name}/")

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        path, label = self.samples[idx]
        return self.transform(Image.open(path).convert("RGB")), label

    @property
    def targets(self):
        return [lbl for _, lbl in self.samples]

## Cell 8 ¬∑ Transforms & DataLoaders

In [12]:
mean, std = [0.485, 0.456, 0.406], [0.229, 0.224, 0.225]

train_tf = transforms.Compose([
    transforms.Resize((IMAGE_SIZE + 24, IMAGE_SIZE + 24)),
    transforms.RandomCrop(IMAGE_SIZE),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize(mean, std),
])
val_tf = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean, std),
])

print("Loading train dataset...")
train_ds = RoadDataset(TRAIN_DIR, train_tf)
print("Loading val dataset...")
val_ds   = RoadDataset(VAL_DIR, val_tf)

# Weighted sampler to handle any remaining class imbalance
targets     = torch.tensor(train_ds.targets)
class_count = torch.bincount(targets, minlength=NUM_CLASSES).float()
class_count[class_count == 0] = 1
weights     = 1.0 / class_count[targets]
sampler     = WeightedRandomSampler(weights, num_samples=len(weights), replacement=True)

NUM_WORKERS  = 2 if device.type == "cuda" else 0
train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, sampler=sampler,  num_workers=NUM_WORKERS, pin_memory=True)
val_loader   = DataLoader(val_ds,   batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS, pin_memory=True)

print("\nClass distribution (train):")
for i, cls in enumerate(CLASS_NAMES):
    print(f"  [{i}] {cls:<25s}: {int(class_count[i])} images")

Loading train dataset...
  [broken_road_signs]  real=960  synthetic=29  total=989
  [cracked_pavement]  real=960  synthetic=63  total=1023
  [faded_lane_markings]  real=960  synthetic=28  total=988
  [normal_road]  real=960  synthetic=51  total=1011
  [potholes]  real=960  synthetic=61  total=1021
  [road_debris_obstruction]  real=960  synthetic=68  total=1028
  ‚Üí 6060 total images from train/
Loading val dataset...
  [broken_road_signs]  real=120  synthetic=0  total=120
  [cracked_pavement]  real=120  synthetic=0  total=120
  [faded_lane_markings]  real=120  synthetic=0  total=120
  [normal_road]  real=120  synthetic=0  total=120
  [potholes]  real=120  synthetic=0  total=120
  [road_debris_obstruction]  real=120  synthetic=0  total=120
  ‚Üí 720 total images from val/

Class distribution (train):
  [0] potholes                 : 1021 images
  [1] cracked_pavement         : 1023 images
  [2] road_debris_obstruction  : 1028 images
  [3] broken_road_signs        : 989 images
  [4] fad

## Cell 9 ¬∑ Build Model

In [13]:
model = models.efficientnet_v2_s(weights=EfficientNet_V2_S_Weights.IMAGENET1K_V1)

# Freeze backbone ‚Äî train head only during warm-up
for param in model.features.parameters():
    param.requires_grad = False

# Replace classifier for our classes
in_features = model.classifier[1].in_features
model.classifier = nn.Sequential(
    nn.Dropout(p=0.2, inplace=True),
    nn.Linear(in_features, 256),
    nn.ReLU(),
    nn.Dropout(p=0.1),
    nn.Linear(256, NUM_CLASSES),
)
model = model.to(device)
print("Model ready ‚Äî trainable params:", sum(p.numel() for p in model.parameters() if p.requires_grad))

Downloading: "https://download.pytorch.org/models/efficientnet_v2_s-dd5fe13b.pth" to /root/.cache/torch/hub/checkpoints/efficientnet_v2_s-dd5fe13b.pth


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 82.7M/82.7M [00:00<00:00, 117MB/s]


Model ready ‚Äî trainable params: 329478


## Cell 10 ¬∑ Train

- **Epochs 1‚Äì3 (warm-up):** only the classifier head trains, backbone frozen.
- **Epoch 4+:** last backbone blocks unfreeze for full fine-tuning.
- Stops early if val accuracy doesn't improve for 5 epochs.

In [None]:
from tqdm import tqdm

criterion = nn.CrossEntropyLoss()
scaler    = torch.amp.GradScaler("cuda") if device.type == "cuda" else None
optimizer = optim.AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=LR, weight_decay=1e-4)
scheduler = CosineAnnealingLR(optimizer, T_max=EPOCHS)

WARMUP   = 3
PATIENCE = 5
best_acc = 0.0
no_imp   = 0
history  = {"train_loss": [], "train_acc": [], "val_loss": [], "val_acc": []}
ckpt_path = Path(OUTPUT_DIR) / "best_roadscan(latest).pt"

for epoch in range(1, EPOCHS + 1):
    t0 = time.time()

    # Unfreeze backbone after warm-up
    if epoch == WARMUP + 1:
        print("\n[Phase 2] Unfreezing backbone blocks 6+")
        for i, layer in enumerate(model.features):
            if i >= 6:
                for p in layer.parameters():
                    p.requires_grad = True
        optimizer = optim.AdamW([
            {"params": model.features.parameters(),   "lr": LR * 0.1},
            {"params": model.classifier.parameters(), "lr": LR},
        ], weight_decay=1e-4)
        scheduler = CosineAnnealingLR(optimizer, T_max=EPOCHS - epoch)

    # ‚îÄ‚îÄ Train ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
    model.train()
    tr_loss, tr_correct, tr_total = 0.0, 0, 0

    train_bar = tqdm(train_loader, desc=f"Epoch {epoch:02d}/{EPOCHS} [Train]", leave=False)
    for imgs, labels in train_bar:
        imgs, labels = imgs.to(device), labels.to(device)
        optimizer.zero_grad(set_to_none=True)

        if device.type == "cuda":
            with torch.amp.autocast("cuda"):
                out  = model(imgs)
                loss = criterion(out, labels)
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
        else:
            out  = model(imgs)
            loss = criterion(out, labels)
            loss.backward()
            optimizer.step()

        tr_loss    += loss.item() * imgs.size(0)
        tr_correct += (out.argmax(1) == labels).sum().item()
        tr_total   += imgs.size(0)
        train_bar.set_postfix(loss=f"{tr_loss/tr_total:.4f}", acc=f"{tr_correct/tr_total:.3f}")

    # ‚îÄ‚îÄ Validate ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
    model.eval()
    vl_loss, vl_correct, vl_total = 0.0, 0, 0
    all_preds, all_labels = [], []

    val_bar = tqdm(val_loader, desc=f"Epoch {epoch:02d}/{EPOCHS} [Val]  ", leave=False)
    with torch.no_grad():
        for imgs, labels in val_bar:
            imgs, labels = imgs.to(device), labels.to(device)
            out   = model(imgs)
            loss  = criterion(out, labels)
            preds = out.argmax(1)

            vl_loss    += loss.item() * imgs.size(0)
            vl_correct += (preds == labels).sum().item()
            vl_total   += imgs.size(0)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            val_bar.set_postfix(loss=f"{vl_loss/vl_total:.4f}", acc=f"{vl_correct/vl_total:.3f}")

    scheduler.step()

    tr_acc = tr_correct / tr_total
    vl_acc = vl_correct / vl_total
    history["train_loss"].append(tr_loss / tr_total)
    history["train_acc"].append(tr_acc)
    history["val_loss"].append(vl_loss / vl_total)
    history["val_acc"].append(vl_acc)

    print(f"Epoch {epoch:02d}/{EPOCHS} | Train loss {tr_loss/tr_total:.4f} acc {tr_acc:.3f} | Val loss {vl_loss/vl_total:.4f} acc {vl_acc:.3f} | {time.time()-t0:.1f}s")

    # ‚îÄ‚îÄ Save progress after every epoch (safe against Colab crashes) ‚îÄ
    with open(Path(OUTPUT_DIR) / "history.json", "w") as f:
        json.dump({
            "epoch":      epoch,
            "best_acc":   best_acc,
            "history":    history,
            "epochs_log": [
                {
                    "epoch":      e + 1,
                    "train_loss": round(history["train_loss"][e], 4),
                    "train_acc":  round(history["train_acc"][e],  4),
                    "val_loss":   round(history["val_loss"][e],   4),
                    "val_acc":    round(history["val_acc"][e],    4),
                }
                for e in range(len(history["train_acc"]))
            ],
        }, f, indent=2)

    if vl_acc > best_acc:
        best_acc = vl_acc
        no_imp   = 0
        torch.save({"model_state": model.state_dict(), "classes": CLASS_NAMES,
                    "class_to_idx": class_to_idx, "image_size": IMAGE_SIZE}, ckpt_path)
        print(f"  üíæ Saved best model (val_acc={vl_acc:.4f})")
    else:
        no_imp += 1
        if no_imp >= PATIENCE:
            print(f"\n‚èπÔ∏è  Early stopping ‚Äî no improvement for {PATIENCE} epochs")
            break

print(f"\n‚úÖ Done. Best val acc: {best_acc:.4f}")
print(f"   Checkpoint: {ckpt_path}")



Epoch 01/20 | Train loss 0.6038 acc 0.801 | Val loss 0.3732 acc 0.886 | 2768.4s
  üíæ Saved best model (val_acc=0.8861)




Epoch 02/20 | Train loss 0.4105 acc 0.860 | Val loss 0.3257 acc 0.887 | 1976.4s
  üíæ Saved best model (val_acc=0.8875)




Epoch 03/20 | Train loss 0.3637 acc 0.877 | Val loss 0.3019 acc 0.886 | 1772.9s

[Phase 2] Unfreezing backbone blocks 6+


Epoch 04/20 [Train]:  16%|‚ñà‚ñå        | 30/190 [05:05<28:09, 10.56s/it, acc=0.866, loss=0.3784]

In [None]:
# Print confidence distribution across val set
all_confs = []
with torch.no_grad():
    for imgs, _ in val_loader:
        probs = torch.softmax(model(imgs.to(device)), dim=-1)
        all_confs.extend(probs.max(dim=-1).values.cpu().tolist())

all_confs = sorted(all_confs)
print(f"Val confidence ‚Äî p10={all_confs[len(all_confs)//10]:.3f}  "
      f"p50={all_confs[len(all_confs)//2]:.3f}  "
      f"p90={all_confs[int(len(all_confs)*0.9)]:.3f}")

## Cell 11 ¬∑ Per-Class Report

In [None]:
## Cell 11 ¬∑ Per-Class Report

from sklearn.metrics import classification_report
print(classification_report(all_labels, all_preds, target_names=CLASS_NAMES, digits=3))

# ‚îÄ‚îÄ Confidence-gated prediction ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
NO_ISSUE_THRESHOLD = 0.60   # below this ‚Üí "No Issue Detected"
REJECT_THRESHOLD   = 0.85   # below this ‚Üí unknown/OOD

def predict_with_rejection(model, input_tensor):
    model.eval()
    with torch.no_grad():
        logits     = model(input_tensor)
        probs      = torch.softmax(logits, dim=-1)
        confidence, pred = probs.max(dim=-1)

    conf = confidence.item()

    if conf < NO_ISSUE_THRESHOLD:
        return {"label": "No Issue Detected", "confidence": conf}
    elif conf < REJECT_THRESHOLD:
        return {"label": "UNKNOWN / Out-of-Distribution", "confidence": conf}
    else:
        return {"label": CLASS_NAMES[pred.item()], "confidence": conf}

# Quick sanity-check on the val set ‚Äî prints a few sample predictions
print("\nSample predictions with confidence gating:")
print(f"  < {NO_ISSUE_THRESHOLD:.0%}  ‚Üí No Issue Detected")
print(f"  < {REJECT_THRESHOLD:.0%}  ‚Üí UNKNOWN / OOD")
print(f"  ‚â• {REJECT_THRESHOLD:.0%}  ‚Üí Predicted class")
print()

model.eval()
val_tf_single = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
])

shown = 0
for class_dir in sorted((Path(DATA_DIR) / "val").iterdir()):
    if not class_dir.is_dir() or shown >= 6:
        break
    for img_path in sorted(class_dir.iterdir()):
        if img_path.suffix.lower() not in {".jpg", ".jpeg", ".png"}:
            continue
        tensor = val_tf_single(Image.open(img_path).convert("RGB")).unsqueeze(0).to(device)
        result = predict_with_rejection(model, tensor)
        true_label = class_dir.name
        match = "‚úì" if result["label"] == true_label else "‚úó"
        print(f"  {match} true={true_label:<25s}  pred={result['label']:<30s}  conf={result['confidence']:.3f}")
        shown += 1
        break
"""
The two thresholds explained:

conf < 0.60  ‚Üí  "No Issue Detected"
               Model isn't sure enough to flag anything ‚Äî treat the road as fine.
               This is your safe-pass gate.

0.60 ‚â§ conf < 0.85  ‚Üí  "UNKNOWN / OOD"
               Model sees *something* but can't commit ‚Äî likely a weird angle,
               lighting condition, or something genuinely outside training data.

conf ‚â• 0.85  ‚Üí  Predicted class name
               High-confidence ‚Äî use the result.
"""

## Cell 12 ¬∑ Plot Training Curves

In [None]:
import matplotlib.pyplot as plt

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
ax1.plot(history["train_loss"], label="Train"); ax1.plot(history["val_loss"], label="Val")
ax1.set_title("Loss"); ax1.set_xlabel("Epoch"); ax1.legend(); ax1.grid(True)
ax2.plot(history["train_acc"], label="Train"); ax2.plot(history["val_acc"], label="Val")
ax2.set_title("Accuracy"); ax2.set_xlabel("Epoch"); ax2.legend(); ax2.grid(True)
plt.tight_layout()
plt.savefig(Path(OUTPUT_DIR) / "training_curves.png", dpi=120)
plt.show()