<a href="https://colab.research.google.com/github/ThomasWasTaken/Traffic_sign_challenge/blob/main/Traffic_signs_submission_final_final_final.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Traffic Sign Challenge
Are you up for a competition? We challenge you: Who can write the best ML model for traffic sign recognition?
The model should be trained on the GTSRB training data, which you know from a previous home assignment ( Torch_Traffic_Signs_Basic_Template.ipynb Download Torch_Traffic_Signs_Basic_Template.ipynb).

These are the constraints:
* Your code runs in a free Google Colab environment and we must be able to reproduce the results.
* If you use a deep learning framework, use PyTorch.
* The solution is a singe network, not an ensemble.
* The network is trained from scratch and uses only the GTSRB training data (e.g., no transfer learning).
* You achieve at least 99.47 accuracy on the GTSRB test set (this is not easy).
* Your code is well documented and you can explain all steps by the algorithm.

The best entry (or even best entries) are then evaluated on the novel extended GTSRB data set.
The model(s) performing best, are elegant (the simple the better), and more accurate than the baselines on the extended data will be be considered in our research article describing the new data set, the author(s) of these model(s) will be invited to become co-author(s) of the article.

The deadline is one week after the MLA exam deadline. Send submissions that fullfill the criteria above to me ( igel@diku.dk ). If you have questions, use the discussion forum.

Best wishes,
Christian

Do the installations/imports first:

In [None]:
import os
import time
import math
import random
import copy

import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Subset

from torch.optim import AdamW
from torch.optim.lr_scheduler import (
    LinearLR,
    CosineAnnealingLR,
    SequentialLR
)

import torchvision
from torchvision import datasets, transforms
from torchvision.datasets.utils import download_url, extract_archive



Check if a GPU is available:

In [None]:
gpu = torch.cuda.is_available()
device = torch.device("cuda:0" if gpu else "cpu")
print("device:", device)

device: cuda:0


In [None]:
import torch
print("CUDA available:", torch.cuda.is_available())
if torch.cuda.is_available():
    print("GPU name:", torch.cuda.get_device_name(0))
else:
    print("No GPU detected ⚠️")


CUDA available: True
GPU name: Tesla T4


Import Dataset, In my case via google drive into google collab

In [None]:
from google.colab import drive
drive.mount('/content/drive',force_remount=True)


Mounted at /content/drive


In [None]:
!ls /content/drive/MyDrive | grep GTSRB


GTSRB_models
GTSRBTrafficSigns.py
GTSRB.zip


In [None]:
!unzip -q "/content/drive/MyDrive/GTSRB" -d /content/GTSRB

replace /content/GTSRB/__MACOSX/._GTSRB? [y]es, [n]o, [A]ll, [N]one, [r]ename: A


In [None]:
import os
from torchvision import datasets
from google.colab import drive

drive.mount('/content/drive', force_remount=True)

data_root = "/content/GTSRB/GTSRB"

# Initialize ImageFolder to get all paths and labels
train_full = datasets.ImageFolder(root=os.path.join(data_root, "train"), transform=None)

# Extract only the image paths from train_full.samples
# train_full.samples is a list of (image_path, label) tuples
all_image_paths = [img_path for img_path, label in train_full.samples]

print(all_image_paths)

Mounted at /content/drive
['/content/GTSRB/GTSRB/train/00000/00000_00000.png', '/content/GTSRB/GTSRB/train/00000/00000_00001.png', '/content/GTSRB/GTSRB/train/00000/00000_00002.png', '/content/GTSRB/GTSRB/train/00000/00000_00003.png', '/content/GTSRB/GTSRB/train/00000/00000_00004.png', '/content/GTSRB/GTSRB/train/00000/00000_00005.png', '/content/GTSRB/GTSRB/train/00000/00000_00006.png', '/content/GTSRB/GTSRB/train/00000/00000_00007.png', '/content/GTSRB/GTSRB/train/00000/00000_00008.png', '/content/GTSRB/GTSRB/train/00000/00000_00009.png', '/content/GTSRB/GTSRB/train/00000/00000_00010.png', '/content/GTSRB/GTSRB/train/00000/00000_00011.png', '/content/GTSRB/GTSRB/train/00000/00000_00012.png', '/content/GTSRB/GTSRB/train/00000/00000_00013.png', '/content/GTSRB/GTSRB/train/00000/00000_00014.png', '/content/GTSRB/GTSRB/train/00000/00000_00015.png', '/content/GTSRB/GTSRB/train/00000/00000_00016.png', '/content/GTSRB/GTSRB/train/00000/00000_00017.png', '/content/GTSRB/GTSRB/train/00000/000

# Reproducibility, CUDA Device Check, and Speed Optimizations

In [None]:
SEED = 42
random.seed(SEED); torch.manual_seed(SEED); torch.cuda.manual_seed_all(SEED)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("CUDA:", torch.cuda.is_available(),
      torch.cuda.get_device_name(0) if torch.cuda.is_available() else "")

torch.backends.cudnn.benchmark = True
try:
    torch.set_float32_matmul_precision("high")
except Exception:
    pass


CUDA: True Tesla T4


# Initial Configuration

In [None]:
IMG_SIZE = 64
BATCH_SIZE = 128 if torch.cuda.is_available() else 32
EPOCHS = 60
VAL_SPLIT = 0.19
BASE_LR = 0.0007048443866827096
WEIGHT_DECAY = 0.08800119196442377
LABEL_SMOOTH = 0.07231807995388388
WARMUP_EPOCHS = 3
STOCHASTIC_DEPTH_P = 0.0035185798805673963
EMA_DECAY = 0.999
PATIENCE = 10
NUM_WORKERS = 2
PIN_MEMORY = bool(torch.cuda.is_available())
PERSISTENT = False # Changed to False as persistent workers are not needed with 0 workers

# Where to cache/download GTSRB
CACHE_DIR = "/content/gtsrb_cache"
os.makedirs(CACHE_DIR, exist_ok=True)

# Data Augmentation & Preprocessing

In [None]:
train_tf = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.RandomAffine(degrees=10, translate=(0.05, 0.05), scale=(0.95, 1.05)),
    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.2, hue=0.02),
    transforms.Pad(4, padding_mode="edge"),
    transforms.RandomCrop((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
])
# no aug on val/test
test_tf = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
])

# Dataset Loading, Train/Validation Split, and DataLoader Construction

In [None]:
from PIL import Image # Import Image from PIL at the top of the cell

data_root = "/content/GTSRB/GTSRB"

train_full = datasets.ImageFolder(root=os.path.join(data_root, "train"), transform=None)
num_val = int(len(train_full) * VAL_SPLIT)
num_train = len(train_full) - num_val


g = torch.Generator().manual_seed(SEED)
perm = torch.randperm(len(train_full.samples), generator=g).tolist()
train_idx = perm[:num_train]
val_idx   = perm[num_train:]

class WithTransform(torch.utils.data.Dataset):
    def __init__(self, base_paths_and_labels, indices, transform):
        self.base_paths_and_labels = base_paths_and_labels; self.indices = indices; self.transform = transform
    def __len__(self): return len(self.indices)
    def __getitem__(self, i):
        img_path, y = self.base_paths_and_labels[self.indices[i]]
        img = Image.open(img_path) # Load the image using PIL
        if self.transform: img = self.transform(img)
        return img, y

train_ds = WithTransform(train_full.samples, train_idx, train_tf)
val_ds   = WithTransform(train_full.samples, val_idx,   test_tf)

test_ds  = datasets.ImageFolder(root=os.path.join(data_root, "test"), transform=test_tf)

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True,
                          num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY,
                          persistent_workers=PERSISTENT)
val_loader   = DataLoader(val_ds, batch_size=BATCH_SIZE, shuffle=False,
                          num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY,
                          persistent_workers=PERSISTENT)
test_loader  = DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=False,
                          num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY,
                          persistent_workers=PERSISTENT)

31760 7449


In [None]:
from torchvision import datasets
import os

data_root = "/content/GTSRB/GTSRB"   # your current path
train_full = datasets.ImageFolder(root=os.path.join(data_root, "train"), transform=None)

print("Total images:", len(train_full))


Total images: 39209


In [None]:
print(len(train_ds))
print(len(val_ds))
print(len(test_ds))

31760
7449
12630


# DropPath

In [None]:
class DropPath(nn.Module):
    def __init__(self, drop_prob: float = 0.0):
        super().__init__()
        self.drop_prob = drop_prob
    def forward(self, x):
        if self.drop_prob == 0.0 or not self.training:
            return x
        keep_prob = 1 - self.drop_prob
        shape = (x.shape[0],) + (1,) * (x.ndim - 1)
        mask = torch.empty(shape, dtype=x.dtype, device=x.device).bernoulli_(keep_prob)
        return x / keep_prob * mask

#ResNet model

In [None]:
class BasicBlock(nn.Module):
    def __init__(self, in_ch, out_ch, stride=1, drop_path=0.0):
        super().__init__()
        self.conv1 = nn.Conv2d(in_ch, out_ch, 3, stride=stride, padding=1, bias=False)
        self.bn1   = nn.BatchNorm2d(out_ch)
        self.conv2 = nn.Conv2d(out_ch, out_ch, 3, stride=1, padding=1, bias=False)
        self.bn2   = nn.BatchNorm2d(out_ch)
        self.act   = nn.ReLU(inplace=True)
        self.drop  = DropPath(drop_path)
        self.down  = None
        if stride != 1 or in_ch != out_ch:
            self.down = nn.Sequential(
                nn.Conv2d(in_ch, out_ch, 1, stride=stride, bias=False),
                nn.BatchNorm2d(out_ch),
            )
    def forward(self, x):
        identity = x
        out = self.act(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        if self.down is not None:
            identity = self.down(identity)
        out = self.drop(out) + identity
        return self.act(out)

In [None]:
class MiniResNet(nn.Module):
    def __init__(self, num_classes=43, drop_path_rate=0.05, dropout_head=0.42220762746310037):
        super().__init__()
        self.stem = nn.Sequential(
            nn.Conv2d(3, 64, 3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
        )
        depths   = [2, 2, 2]
        channels = [64, 128, 256]
        blocks = []
        dp_rates = torch.linspace(0, drop_path_rate, sum(depths)).tolist()
        idx = 0
        in_ch = 64
        for stage, (d, ch) in enumerate(zip(depths, channels)):
            for i in range(d):
                stride = 2 if (stage > 0 and i == 0) else 1
                blocks.append(BasicBlock(in_ch, ch, stride=stride, drop_path=dp_rates[idx]))
                in_ch = ch; idx += 1
        self.backbone = nn.Sequential(*blocks)
        self.head = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten(),
            nn.Dropout(dropout_head),
            nn.Linear(channels[-1], num_classes)
        )
    def forward(self, x):
        x = self.stem(x)
        x = self.backbone(x)
        x = self.head(x)
        return x

# EMA

In [None]:
class EMA:
    def __init__(self, model, decay=0.9972933928671345):
        super().__init__()
        self.decay = decay
        self.ema = copy.deepcopy(model).to(device)
        for p in self.ema.parameters():
            p.requires_grad_(False)
    @torch.no_grad()
    def update(self, model):
        d = self.decay
        for p_ema, p in zip(self.ema.parameters(), model.parameters()):
            p_ema.data.mul_(d).add_(p.data, alpha=1.0 - d)
        for b_ema, b in zip(self.ema.buffers(), model.buffers()):
            b_ema.copy_(b)

# Model, optimizer, schedulers, loss, AMP

In [None]:
model = MiniResNet(num_classes=43, drop_path_rate=STOCHASTIC_DEPTH_P).to(device)

model.to(memory_format=torch.channels_last)

ema = EMA(model, decay=EMA_DECAY)
optimizer = AdamW(model.parameters(), lr=BASE_LR, weight_decay=WEIGHT_DECAY)

warmup = LinearLR(optimizer, start_factor=1e-6/BASE_LR, end_factor=1.0, total_iters=WARMUP_EPOCHS)
cosine = CosineAnnealingLR(optimizer, T_max=EPOCHS - WARMUP_EPOCHS, eta_min=BASE_LR * 0.05)
scheduler = SequentialLR(optimizer, schedulers=[warmup, cosine], milestones=[WARMUP_EPOCHS])

criterion = nn.CrossEntropyLoss(label_smoothing=LABEL_SMOOTH)

from torch.amp import GradScaler, autocast
scaler = GradScaler(enabled=torch.cuda.is_available())

#Eval

In [None]:
@torch.no_grad()
def evaluate(model_or_ema, loader):
    model_or_ema.eval()
    total, correct, loss_sum = 0, 0, 0.0
    for x, y in loader:
        x = x.to(device, non_blocking=True).to(memory_format=torch.channels_last)
        y = y.to(device, non_blocking=True)
        with autocast(enabled=torch.cuda.is_available(), device_type='cuda'):
            logits = model_or_ema(x)
            loss = criterion(logits, y)
        loss_sum += loss.item() * x.size(0)
        pred = logits.argmax(1)
        correct += (pred == y).sum().item()
        total += x.size(0)
    return loss_sum / total, correct / total

# Train

In [None]:
best_val_acc = 0.0
best_state = None
epochs_no_improve = 0
start = time.time()

print("Sanity:", "Model on", next(model.parameters()).device)
xb, yb = next(iter(train_loader))
print("Batch before:", xb.device)
xb = xb.to(device); yb = yb.to(device)
print("Batch after:", xb.device)


for epoch in range(1, EPOCHS + 1):
    model.train()
    running = 0.0
    for x, y in train_loader:
        x = x.to(device, non_blocking=True).to(memory_format=torch.channels_last)
        y = y.to(device, non_blocking=True)

        optimizer.zero_grad(set_to_none=True)
        with autocast(enabled=torch.cuda.is_available(), device_type='cuda'):
            logits = model(x)
            loss = criterion(logits, y)
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        ema.update(model)
        running += loss.item() * x.size(0)

    scheduler.step()

    val_loss, val_acc = evaluate(ema.ema, val_loader)
    train_loss = running / len(train_loader.dataset)
    print(f"Epoch {epoch:02d}/{EPOCHS} | "
          f"train_loss {train_loss:.4f} | val_loss {val_loss:.4f} | val_acc {val_acc*100:.2f}% | "
          f"lr {optimizer.param_groups[0]['lr']:.6f}")

    if val_acc > best_val_acc + 1e-4:
        best_val_acc = val_acc
        best_state = copy.deepcopy(ema.ema.state_dict())
        epochs_no_improve = 0
    else:
        epochs_no_improve += 1
        if epochs_no_improve >= PATIENCE:
            print(f"Early stopping at epoch {epoch}. Best val acc = {best_val_acc*100:.2f}%")
            break

print(f"Training time: {time.time() - start:.1f}s")

Sanity: Model on cuda:0
Batch before: cpu
Batch after: cuda:0
Epoch 01/60 | train_loss 3.9545 | val_loss 3.9807 | val_acc 1.61% | lr 0.000236
Epoch 02/60 | train_loss 2.8335 | val_loss 3.7089 | val_acc 4.62% | lr 0.000470




Epoch 03/60 | train_loss 1.4680 | val_loss 4.0556 | val_acc 5.01% | lr 0.000705
Epoch 04/60 | train_loss 0.8068 | val_loss 4.8283 | val_acc 2.81% | lr 0.000704
Epoch 05/60 | train_loss 0.6771 | val_loss 4.9201 | val_acc 2.78% | lr 0.000703
Epoch 06/60 | train_loss 0.6380 | val_loss 4.1705 | val_acc 5.61% | lr 0.000700
Epoch 07/60 | train_loss 0.6146 | val_loss 3.3846 | val_acc 12.50% | lr 0.000697
Epoch 08/60 | train_loss 0.6136 | val_loss 2.5737 | val_acc 38.38% | lr 0.000692
Epoch 09/60 | train_loss 0.6011 | val_loss 2.0362 | val_acc 53.13% | lr 0.000687
Epoch 10/60 | train_loss 0.5933 | val_loss 1.0142 | val_acc 88.56% | lr 0.000680
Epoch 11/60 | train_loss 0.5872 | val_loss 0.8043 | val_acc 93.99% | lr 0.000673
Epoch 12/60 | train_loss 0.5835 | val_loss 0.6711 | val_acc 97.87% | lr 0.000664
Epoch 13/60 | train_loss 0.5805 | val_loss 0.7038 | val_acc 97.25% | lr 0.000655
Epoch 14/60 | train_loss 0.5792 | val_loss 1.0963 | val_acc 82.60% | lr 0.000645
Epoch 15/60 | train_loss 0.5803 

#Test Accuracy

In [None]:
if best_state is not None:
    ema.ema.load_state_dict(best_state)
test_loss, test_acc = evaluate(ema.ema, test_loader)
print(f"TEST — loss: {test_loss:.4f}, acc: {test_acc*100:.2f}%")

TEST — loss: 0.5689, acc: 99.63%


# Save model

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

save_dir = "/content/drive/MyDrive/GTSRB_models"
os.makedirs(save_dir, exist_ok=True)

# Save both versions
torch.save(model.state_dict(), os.path.join(save_dir, "mini_resnet_final_99.63.pth"))
torch.save(ema.ema.state_dict(), os.path.join(save_dir, "mini_resnet_ema_99.63.pth"))

# Optionally full checkpoint for resuming
torch.save({
    "epoch": epoch,
    "model_state": model.state_dict(),
    "ema_state": ema.ema.state_dict(),
    "optimizer_state": optimizer.state_dict(),
    "scheduler_state": scheduler.state_dict(),
    "best_val_acc": best_val_acc,
}, os.path.join(save_dir, "mini_resnet_checkpoint.pth"))

print("✅ Models saved to:", save_dir)


Mounted at /content/drive
✅ Models saved to: /content/drive/MyDrive/GTSRB_models


In [None]:
import json

# Save split for perfect reproducibility
split_dict = {
    "train_idx": train_idx,
    "val_idx": val_idx
}

with open("gtsrb_split.json", "w") as f:
    json.dump(split_dict, f)

print("Saved split to gtsrb_split.json")


Saved split to gtsrb_split.json
