In [None]:
import os
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from sklearn.model_selection import train_test_split
from torchvision import transforms, models
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import StratifiedKFold
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt
import random
from sklearn.metrics import f1_score

from torch.utils.data import Subset, DataLoader

from pytorch_grad_cam import GradCAM
from pytorch_grad_cam.utils.image import show_cam_on_image
from sklearn.metrics import classification_report

In [None]:
# change the file path to your own path of the project
file_path = r"C:\Users\xiluo\Desktop\UoM 2025 S1\ML\COMP30027 asmt2"

In [None]:
# Check availability of GPU
print(torch.__version__)
print(torch.version.cuda)
print(torch.cuda.is_available())

In [None]:
# Set all random seeds for reproducibility
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed(42)

In [None]:
def train_model(model, criterion, optimizer, train_loader, val_loader, device, num_epochs=20):
    """
     Train the model and record training/validation metrics for each epoch
    """
    best_acc = 0
    best_f1 = 0
    best_model_state = None
    train_losses = []
    train_accs = []
    val_accs = []
    train_f1s = []
    val_f1s = []
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        correct = 0
        all_train_labels = []
        all_train_preds = []

        for imgs, labels in train_loader:
            imgs, labels = imgs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(imgs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            preds = outputs.argmax(1)
            correct += (preds == labels).sum().item()
            all_train_labels.extend(labels.cpu().numpy())
            all_train_preds.extend(preds.cpu().numpy())

        train_acc = correct / len(train_loader.dataset)
        train_losses.append(total_loss)
        train_accs.append(train_acc)
        train_f1 = f1_score(all_train_labels, all_train_preds, average='macro')
        train_f1s.append(train_f1)

        # evaluate
        model.eval()
        correct = 0
        all_val_labels = []
        all_val_preds = []
        with torch.no_grad():
            for imgs, labels in val_loader:
                imgs, labels = imgs.to(device), labels.to(device)
                outputs = model(imgs)
                preds = outputs.argmax(1)
                correct += (preds == labels).sum().item()
                all_val_labels.extend(labels.cpu().numpy())
                all_val_preds.extend(preds.cpu().numpy())

        val_acc = correct / len(val_loader.dataset)
        val_accs.append(val_acc)
        val_f1 = f1_score(all_val_labels, all_val_preds, average='macro')
        val_f1s.append(val_f1)

        print(f"Epoch {epoch+1}: Loss {total_loss:.5f}, Train Acc {train_acc:.5f}, Val Acc {val_acc:.5f}, Train F1 {train_f1:.5f}, Val F1 {val_f1:.5f}")

        if val_acc > best_acc:
            best_acc = val_acc
            best_model_state = model.state_dict()  # save current best model's parameters
            best_val_preds = all_val_preds.copy()
            best_val_labels = all_val_labels.copy()
        if val_f1 > best_f1:
            best_f1 = val_f1

    # return all data during training 
    return best_acc, best_f1, best_model_state, train_losses, train_accs, val_accs, train_f1s, val_f1s,  best_val_preds, best_val_labels

In [None]:
# inport self-defined CNN modules
import sys
sys.path.append('../utils')
from GTRSB_CNN import SimpleCNN
from TrafficDataset import TrafficSignDataset
from transform import transform

# read train data
metadata = pd.read_csv(os.path.join(file_path, "data", "train", "train_metadata.csv"))

# First split into train set and holdout set
train_data, holdout_set = train_test_split(
    metadata,
    test_size=0.2,
    random_state=42,
    stratify=metadata['ClassId'],
    shuffle=True
)
print(len(holdout_set))
# transform train set into compatible dataset format
train_dataset = TrafficSignDataset(train_data, os.path.join(file_path, "data", "train"), transform=transform)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# 5 fold cross validation
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
X = train_data['image_path'].values
y = train_data['ClassId'].values

num_epochs = 20

# initialise to record training data in epochs
all_train_losses = []
all_train_accs = []
all_val_accs = []
all_train_f1s = []
all_val_f1s = []
all_best_val_accs = []
all_best_val_f1s = []

# initial to save OOF validation prediction
cnn_val_preds = np.zeros(len(train_data), dtype=int)

for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
    print(f"=== Fold {fold+1} ===")
    train_subset = Subset(train_dataset, train_idx)
    val_subset = Subset(train_dataset, val_idx)
    train_loader = DataLoader(train_subset, batch_size=64, shuffle=True, num_workers=2)
    val_loader = DataLoader(val_subset, batch_size=64, shuffle=False, num_workers=2)
    model = SimpleCNN(num_classes=43).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    best_val_acc, best_val_f1, best_model_state, train_losses, train_accs, val_accs, train_f1s, val_f1s, val_preds, val_labels = train_model(
        model, criterion, optimizer, train_loader, val_loader, device, num_epochs=num_epochs
    )
    # save the best model in each folds
    torch.save(best_model_state, os.path.join(file_path, "models", "cnn_models", f"cnn_fold{fold+1}_best.pth"))
    all_train_losses.append(train_losses)
    all_train_accs.append(train_accs)
    all_val_accs.append(val_accs)
    all_train_f1s.append(train_f1s)
    all_val_f1s.append(val_f1s)
    all_best_val_accs.append(best_val_acc)
    all_best_val_f1s.append(best_val_f1)

    cnn_val_preds[val_idx] = val_preds
    
    print(f"Fold {fold+1} best val acc: {best_val_acc:.5f}, best val F1: {best_val_f1:.5f}")

# save the cnn validation prediction for visualisation
save_dir = os.path.join(file_path, "results", "sankey_data")
np.save(os.path.join(save_dir, "cnn_val_pred_labels.npy"), cnn_val_preds)

print("Mean Val Acc: {:.5f}".format(np.mean(all_best_val_accs)))
print("Mean Val F1: {:.5f}".format(np.mean(all_best_val_f1s)))

In [None]:
# visualise training curves
epochs = range(1, len(all_train_losses[0]) + 1)
n_folds = 5

fig, axes = plt.subplots(n_folds, 3, figsize=(18, 4 * n_folds))

for fold in range(n_folds):
    # Loss
    ax = axes[fold, 0]
    ax.plot(epochs, all_train_losses[fold], label='Train Loss')
    ax.set_xlabel('Epoch')
    ax.set_ylabel('Loss')
    ax.set_title(f'Fold {fold+1} Train Loss')
    ax.legend()

    # Accuracy
    ax = axes[fold, 1]
    ax.plot(epochs, all_train_accs[fold], label='Train Acc')
    ax.plot(epochs, all_val_accs[fold], label='Val Acc')
    ax.set_xlabel('Epoch')
    ax.set_ylabel('Accuracy')
    ax.set_title(f'Fold {fold+1} Accuracy')
    ax.legend()

    # F1 Score
    ax = axes[fold, 2]
    ax.plot(epochs, all_train_f1s[fold], label='Train F1')
    ax.plot(epochs, all_val_f1s[fold], label='Val F1')
    ax.set_xlabel('Epoch')
    ax.set_ylabel('F1 Score')
    ax.set_title(f'Fold {fold+1} F1 Score')
    ax.legend()

plt.suptitle('All Folds Training Curves', y=1.02, fontsize=18)
plt.tight_layout(rect=[0, 0, 1, 0.98])
plt.show()

## Evaluate CNN model

In [None]:
report_dict = classification_report(y, cnn_val_preds, output_dict=True)
report_df = pd.DataFrame(report_dict).transpose()
print(report_df)


# only keep class rows (remove accuracy, macro avg, weighted avg)
class_rows = report_df.iloc[:-3, :]

x = list(range(43))
plt.figure(figsize=(14, 6))
plt.plot(x, class_rows['precision'], marker='o', label='Precision', color='#1f77b4')
plt.plot(x, class_rows['recall'], marker='o', label='Recall', color='#2ca02c')
plt.plot(x, class_rows['f1-score'], marker='o', label='F1-score', color='#ff7f0e')

plt.axhline(report_df.loc['macro avg', 'f1-score'], color='gray', linestyle='--', label='Macro F1')
plt.axhline(report_df.loc['weighted avg', 'f1-score'], color='orange', linestyle='--', label='Weighted F1')

plt.xlabel('Class', fontsize=13)
plt.ylabel('Score', fontsize=13)
plt.title('CNN Per-Class Precision, Recall, F1-score (with Macro/Weighted F1)', fontsize=15, pad=12)
plt.ylim(0, 1.05)
plt.xticks(x, x, fontsize=11, rotation=0)
plt.yticks(fontsize=11)
plt.grid(axis='y', linestyle='--', alpha=0.5)
plt.legend(loc='lower left', fontsize=11, framealpha=0.85)
plt.tight_layout()
plt.show()

In [None]:
# visualise the output of the intermediate layer
def get_feature_map(model, x):
    with torch.no_grad():
        x = model.conv1(x)
        x = model.bn1(x)
        x = model.relu(x)
        x = model.pool(x)
        x = model.res_block1(x)
        x = model.res_block2(x)
        x = model.conv4(x)
        x = model.bn4(x)
        return x.cpu()

import matplotlib.pyplot as plt
from PIL import Image
import torch

# find sample images to visualise the feature maps
img_paths = [
    r'C:\Users\xiluo\Desktop\UoM 2025 S1\ML\COMP30027 asmt2\data\train\img_000062.jpg',
    r'C:\Users\xiluo\Desktop\UoM 2025 S1\ML\COMP30027 asmt2\data\train\img_000065.jpg',
    r'C:\Users\xiluo\Desktop\UoM 2025 S1\ML\COMP30027 asmt2\data\train\img_000067.jpg'
]

n_imgs = len(img_paths)
n_channels = 8  # first 8 channels

fig, axes = plt.subplots(n_imgs, n_channels, figsize=(2.2*n_channels, 2.2*n_imgs))

for row, img_path in enumerate(img_paths):
    img = Image.open(img_path).convert('RGB')
    input_tensor = transform(img).unsqueeze(0).to(device)
    feature_map = get_feature_map(model, input_tensor).squeeze(0)  # [C, H, W]
    for col in range(n_channels):
        ax = axes[row, col] if n_imgs > 1 else axes[col]
        ax.imshow(feature_map[col].detach().cpu().numpy(), cmap='viridis')
        ax.axis('off')
        if row == 0:
            ax.set_title(f'Channel {col}')
    if n_channels == 1:
        axes[row].set_ylabel(f'Image {row+1}')
    else:
        axes[row, 0].set_ylabel(f'Image {row+1}', rotation=90, size='large')

plt.suptitle('Feature Maps (First 8 Channels) for Multiple Images', y=1.02)
plt.tight_layout()
plt.show()

## Error Analysis and visualisation by Grad-CAM

In [None]:
# The use of Grad-CAM is based on the following link:
# https://github.com/jacobgil/pytorch-grad-cam/blob/master/cam.py

In [None]:
# find images to visualise the Grad-CAM
img_paths = [
    r'C:\Users\xiluo\Desktop\UoM 2025 S1\ML\COMP30027 asmt2\data\train\img_000062.jpg',
    r'C:\Users\xiluo\Desktop\UoM 2025 S1\ML\COMP30027 asmt2\data\train\img_000065.jpg',
    r'C:\Users\xiluo\Desktop\UoM 2025 S1\ML\COMP30027 asmt2\data\train\img_000067.jpg',
    r'C:\Users\xiluo\Desktop\UoM 2025 S1\ML\COMP30027 asmt2\data\train\img_005456.jpg',
    r'C:\Users\xiluo\Desktop\UoM 2025 S1\ML\COMP30027 asmt2\data\train\img_005459.jpg',
    r'C:\Users\xiluo\Desktop\UoM 2025 S1\ML\COMP30027 asmt2\data\train\img_005460.jpg',
]

target_layers = [model.conv1, model.conv4]  # first and last conv layer

n_imgs = len(img_paths)
fig, axes = plt.subplots(3, 6, figsize=(18, 9))

for i, img_path in enumerate(img_paths):
    group = i // 3
    row = i % 3
    col_base = group * 3 

    img_name = os.path.basename(img_path)
    meta_row = metadata[metadata['image_path'] == img_name]
    if not meta_row.empty:
        true_label = meta_row['ClassId'].values[0]
    else:
        true_label = 'N/A'
    img = Image.open(img_path).convert('RGB')
    img_resized = img.resize((64, 64))
    input_tensor = transform(img_resized).unsqueeze(0).to(device)
    with torch.no_grad():
        output = model(input_tensor)
        pred_label = output.argmax(1).item()

    # original image
    axes[row, col_base].imshow(img_resized)
    axes[row, col_base].axis('off')
    axes[row, col_base].set_title(f'True: {true_label}\nPred: {pred_label}')
    axes[row, col_base].set_ylabel(f'Image {i+1}', rotation=90, size='large')

    # Grad-CAMs
    for j, target_layer in enumerate(target_layers):
        cam = GradCAM(model=model, target_layers=[target_layer])
        grayscale_cam = cam(input_tensor=input_tensor)[0, :]
        img_np = np.array(img_resized).astype(np.float32) / 255.0
        visualization = show_cam_on_image(img_np, grayscale_cam, use_rgb=True)
        axes[row, col_base + j + 1].imshow(visualization)
        axes[row, col_base + j + 1].axis('off')
        axes[row, col_base + j + 1].set_title(f'Grad-CAM conv{1 if j==0 else 4}')

plt.suptitle('Grad-CAM conv1, Grad-CAM conv4 (Left: round speed limit, Right: triangular warning)', y=1.02)
plt.tight_layout()
# plt.savefig('grad_cam_comparison.png')
plt.show()

In [None]:
# Find wrong predictions and visualize by Grad-CAM
wrong_images = []
wrong_true_labels = []
wrong_pred_labels = []

model.eval()
with torch.no_grad():
    for imgs, labels in val_loader:
        imgs, labels = imgs.to(device), labels.to(device)
        outputs = model(imgs)
        preds = outputs.argmax(1)
        wrong_mask = preds != labels
        for i in range(imgs.size(0)):
            if wrong_mask[i]:
                wrong_images.append(imgs[i].cpu())
                wrong_true_labels.append(labels[i].cpu().item())
                wrong_pred_labels.append(preds[i].cpu().item())

print(f"Total wrong predictions: {len(wrong_images)}")

# Create a single figure for all images
N = 5  # Number of wrong predictions to show
target_layer = model.conv4

fig, axes = plt.subplots(N, 3, figsize=(15, 4*N))
plt.suptitle('Wrong Predictions Analysis', fontsize=16, y=1.02)

for idx in range(min(N, len(wrong_images))):
    # Process image and generate Grad-CAM
    img_tensor = wrong_images[idx].unsqueeze(0).to(device)
    true_label = wrong_true_labels[idx]
    pred_label = wrong_pred_labels[idx]
    
    # Prepare original image
    img_np = img_tensor.squeeze().cpu().numpy().transpose(1,2,0)
    img_np = (img_np * 0.5) + 0.5  # reverse normalization
    img_np = np.clip(img_np, 0, 1)
    
    # Generate Grad-CAM
    cam = GradCAM(model=model, target_layers=[target_layer])
    grayscale_cam = cam(input_tensor=img_tensor)[0, :]
    visualization = show_cam_on_image(img_np, grayscale_cam, use_rgb=True)
    
    # Get sample image of predicted class
    sample_row = metadata[metadata['ClassId'] == pred_label].sample(1)
    pred_img_path = sample_row['image_path'].values[0]
    pred_img = Image.open(os.path.join(file_path, "data", "train", pred_img_path)).convert('RGB')
    pred_img = pred_img.resize((img_np.shape[1], img_np.shape[0]))
    pred_img_np = np.array(pred_img).astype(np.float32) / 255.0
    
    # Plot in the grid
    axes[idx, 0].imshow(img_np)
    axes[idx, 0].set_title(f"True: {true_label}, Pred: {pred_label}")
    axes[idx, 0].axis('off')
    
    axes[idx, 1].imshow(visualization)
    axes[idx, 1].set_title("Grad-CAM")
    axes[idx, 1].axis('off')
    
    axes[idx, 2].imshow(pred_img_np)
    axes[idx, 2].set_title(f"Sample of Class {pred_label}")
    axes[idx, 2].axis('off')

plt.tight_layout()
plt.show()

## Inference on test set and holdout set (5-fold average)

In [None]:
NUM_CLASSES = 43
MODEL_DIR = os.path.join(file_path, "models", "cnn_models")

# setup holdout and test set
test_metadata = pd.read_csv(os.path.join(file_path, "data", "test", "test_metadata.csv"))
test_metadata["ClassId"] = 0
test_dataset = TrafficSignDataset(test_metadata, os.path.join(file_path, "data", "test"), transform=transform)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
n_test = len(test_metadata)
holdout_dataset = TrafficSignDataset(holdout_set, os.path.join(file_path, "data", "train"), transform=transform)
holdout_loader = DataLoader(holdout_dataset, batch_size=64, shuffle=False)
n_holdout = len(holdout_set)

# initialise to store the predictions on test and holdout
test_probs_folds = np.zeros((n_test, NUM_CLASSES, 5))
holdout_probs_folds = np.zeros((n_holdout, NUM_CLASSES, 5))

for fold in range(5):
    model = SimpleCNN(num_classes=NUM_CLASSES)
    model_path = os.path.join(MODEL_DIR, f'cnn_fold{fold+1}_best.pth')
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()
    model.to(device)

    # inference on Test set
    fold_test_probs = []
    with torch.no_grad():
        for imgs, _ in test_loader:
            imgs = imgs.to(device)
            outputs = model(imgs)
            probs = torch.softmax(outputs, dim=1).cpu().numpy()
            fold_test_probs.append(probs)
    fold_test_probs = np.concatenate(fold_test_probs, axis=0)
    test_probs_folds[:, :, fold] = fold_test_probs

    # inference on Holdout set
    fold_holdout_probs = []
    with torch.no_grad():
        for imgs, _ in holdout_loader:
            imgs = imgs.to(device)
            outputs = model(imgs)
            probs = torch.softmax(outputs, dim=1).cpu().numpy()
            fold_holdout_probs.append(probs)
    fold_holdout_probs = np.concatenate(fold_holdout_probs, axis=0)
    holdout_probs_folds[:, :, fold] = fold_holdout_probs

# average for models in each fold
test_probs_mean = np.mean(test_probs_folds, axis=2)
test_preds = np.argmax(test_probs_mean, axis=1)

holdout_probs_mean = np.mean(holdout_probs_folds, axis=2)
holdout_preds = np.argmax(holdout_probs_mean, axis=1)
np.save(os.path.join(save_dir, "cnn_holdout_pred_labels.npy"), holdout_preds)

# store test prediction
test_metadata['ClassId'] = test_preds
test_metadata[['id', 'ClassId']].to_csv(os.path.join(file_path, "results", "submission_cnn.csv"), index=False)
print("✅ Test set predictions saved to submission_cnn.csv")

In [None]:
# evaluate holdout performance
y = holdout_set['ClassId']
report_dict = classification_report(y, holdout_preds, output_dict=True)
report_df = pd.DataFrame(report_dict).transpose()
print(report_df)

# only keep class rows (remove accuracy, macro avg, weighted avg)
class_rows = report_df.iloc[:-3, :]

x = list(range(43))
plt.figure(figsize=(14, 6))
plt.plot(x, class_rows['precision'], marker='o', label='Precision', color='#1f77b4')
plt.plot(x, class_rows['recall'], marker='o', label='Recall', color='#2ca02c')
plt.plot(x, class_rows['f1-score'], marker='o', label='F1-score', color='#ff7f0e')

plt.axhline(report_df.loc['macro avg', 'f1-score'], color='gray', linestyle='--', label='Macro F1')
plt.axhline(report_df.loc['weighted avg', 'f1-score'], color='orange', linestyle='--', label='Weighted F1')

plt.xlabel('Class', fontsize=13)
plt.ylabel('Score', fontsize=13)
plt.title('CNN Per-Class Precision, Recall, F1-score (with Macro/Weighted F1)', fontsize=15, pad=12)
plt.ylim(0, 1.05)
plt.xticks(x, x, fontsize=11, rotation=0)
plt.yticks(fontsize=11)
plt.grid(axis='y', linestyle='--', alpha=0.5)
plt.legend(loc='lower left', fontsize=11, framealpha=0.85)
plt.tight_layout()
plt.show()

In [None]:
# use the ensemble prediction as the final submission and copy to root directory
!copy results\submission_cnn.csv submission.csv