# Imports

In [1]:
# Package installation for colab
!pip install -U albumentations
!pip install split-folders


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m24.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m24.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [2]:
import numpy as np
import os
import shutil
from tqdm import tqdm
import zipfile
import urllib.request
import cv2
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
from torchvision import datasets, models, transforms
from torch.utils.data import Dataset, DataLoader, SubsetRandomSampler, random_split
from torchvision.datasets import ImageFolder
import torch.nn.functional as F
import torchvision.utils as vutils
from torch.utils.tensorboard import SummaryWriter
from torchvision.models import ResNet152_Weights, ResNet50_Weights
import random
import time
import copy
import matplotlib.pyplot as plt
import splitfolders
import data_augmenter

In [3]:
# For tensorboard plotting
def plot_classes_preds(images, labels, preds, probs):
    # plot the images in the batch, along with predicted and true labels
    fig = plt.figure(figsize=(15, 5))
    for idx in np.arange(4):
        ax = fig.add_subplot(1, 4, idx + 1, xticks=[], yticks=[])
        norm_img = cv2.normalize(images[idx].cpu().numpy(), None, alpha=0, beta=255, norm_type=cv2.NORM_MINMAX)
        rgb_img = np.transpose(norm_img, (1, 2, 0)).astype(np.uint8)
        plt.imshow(rgb_img)
        ax.set_title("{0}, {1:.1f}%\n(label: {2})".format(
            preds[idx],
            probs[idx] * 100.0,
            labels[idx]),
            color=("green" if preds[idx] == labels[idx].item() else "red"))
    return fig

# Preprocess

In [4]:
# For replication
np.random.seed(123)  
torch.manual_seed(123)

## Downloading the dataset

In [5]:
def download_file(url, file_name):
    if not os.path.exists('dataset/' + file_name):
        with urllib.request.urlopen(url) as response, open('dataset/' + file_name, 'wb') as out_file:
            content_length = int(response.headers['Content-Length'])
            with tqdm(total=content_length, unit='B', unit_scale=True, desc=url.split('/')[-1]) as pbar:
                while True:
                    chunk = response.read(1024)
                    if not chunk:
                        break
                    out_file.write(chunk)
                    pbar.update(len(chunk))
    else:
        print(f"{file_name} already exists.")


os.makedirs("dataset/", exist_ok=True)
# Check if the directory exists before trying to delete it
if os.path.exists('dataset/GTSRB/'):
    shutil.rmtree('dataset/GTSRB/')
    print("The folder 'dataset/GTSRB/' has been deleted successfully.")
else:
    print("The folder 'dataset/GTSRB/' does not exist.")
# Training
download_file('https://sid.erda.dk/public/archives/daaeac0d7ce1152aea9b61d9f1e19370/GTSRB_Final_Training_Images.zip',
              'GTSRB_Final_Training_Images.zip')
# Testing
download_file('https://sid.erda.dk/public/archives/daaeac0d7ce1152aea9b61d9f1e19370/GTSRB_Final_Test_Images.zip',
              'GTSRB_Final_Test_Images.zip')
# Ground truth for testing
download_file('https://sid.erda.dk/public/archives/daaeac0d7ce1152aea9b61d9f1e19370/GTSRB_Final_Test_GT.zip',
              'GTSRB_Final_Test_GT.zip')

The folder 'dataset/GTSRB/' has been deleted successfully.
GTSRB_Final_Training_Images.zip already exists.
GTSRB_Final_Test_Images.zip already exists.
GTSRB_Final_Test_GT.zip already exists.


## Extracting zip files

In [6]:
def extract_file(file_name):
    with zipfile.ZipFile(f"dataset/{file_name}", 'r') as zip_ref:
        file_list = zip_ref.namelist()
        with tqdm(total=len(file_list), desc="Extracting") as pbar:
            for file in file_list:
                zip_ref.extract(file, 'dataset/')
                pbar.update(1)


extract_file('GTSRB_Final_Training_Images.zip')
extract_file('GTSRB_Final_Test_Images.zip')
extract_file('GTSRB_Final_Test_GT.zip')

Extracting: 100%|██████████| 39299/39299 [00:05<00:00, 7088.00it/s]
Extracting: 100%|██████████| 12635/12635 [00:01<00:00, 6797.96it/s]
Extracting: 100%|██████████| 1/1 [00:00<00:00, 550.14it/s]


## Loading ground truth annotations for the testing set

In [7]:
def csv_loader(csv_path):
    data = np.loadtxt(csv_path,delimiter=";", dtype=str, skiprows=1)
    return data

# Loading the ground truth annotations for testing set
annotations = csv_loader('./dataset/GT-final_test.csv')

# Sort the annotations
annotations = annotations[:, [0, 7]]
num_samples = len(annotations)

#Column 0: image filename - Column 1: classid
annotations = annotations[annotations[:, 1].astype(int).argsort()]

## Making training data structure

In [8]:
def move_directories(source, destination):
    if not os.path.exists(destination):
        os.makedirs(destination)
    # Get a list of all directories in the source directory
    directories = [d for d in os.listdir(source) if os.path.isdir(os.path.join(source, d))]

    # Move each directory to the destination
    for directory in tqdm(directories):
        source_path = os.path.join(source, directory)
        destination_path = os.path.join(destination, directory)
        shutil.move(source_path, destination_path)


# Example: ./dataset/GTSRB/train/class_id/image
move_directories("./dataset/GTSRB/Final_Training/Images", "./dataset/GTSRB/train")
shutil.rmtree("./dataset/GTSRB/Final_Training")

100%|██████████| 43/43 [00:00<00:00, 28224.58it/s]


## Making test data structure from the CSV annotations

In [9]:
for class_id in tqdm(np.unique(annotations[:, 1]), desc='Class_ID'):
    newpath = './dataset/GTSRB/test/' + class_id.zfill(5)
    if not os.path.exists(newpath):
        os.makedirs(newpath)
    for image_filename in annotations[annotations[:, 1] == class_id]:
        shutil.move('./dataset/GTSRB/Final_Test/Images/' + image_filename[0], newpath + '/' + image_filename[0])

shutil.rmtree("./dataset/GTSRB/Final_Test")

Class_ID: 100%|██████████| 43/43 [00:00<00:00, 104.87it/s]


## Merging the dataset and then split into train and test
- creating **plain** dataset

In [10]:
def merge(source_folder, destination_folder):
    if not os.path.exists(destination_folder):
        os.makedirs(destination_folder)

    # Get the total number of files and directories in the source folder
    total_items = sum([len(files) + len(dirs) for root, dirs, files in os.walk(source_folder)])

    progress = tqdm(total=total_items, desc='Moving: ' + source_folder + ' --> ' + destination_folder, position=0, leave=True)

    # Iterate over all files and subdirectories in the source folder
    for root, dirs, files in os.walk(source_folder):
        for item in files + dirs:
            source_item = os.path.join(root, item)
            destination_item = os.path.join(destination_folder, os.path.relpath(source_item, source_folder))

            # If the item is a file, copy it to the destination folder
            if os.path.isfile(source_item):
                shutil.move(source_item, destination_item)
            # If the item is a directory, create it in the destination folder
            elif os.path.isdir(source_item):
                os.makedirs(destination_item, exist_ok=True)

            progress.update(1) 

    progress.close()  

def merge_folders(source_folders, target_folder):
    for sf in source_folders:
        merge(sf, target_folder)
        shutil.rmtree(sf)

# Temporary directory to store the merged dataset
merged_dir = "./dataset/GTSRB/merged"

# Merge train and test set
merge_folders(['./dataset/GTSRB/train','./dataset/GTSRB/test'], merged_dir)

# Training 70%
# Testing 30%
splitfolders.ratio(merged_dir, output="./dataset/GTSRB/plain", seed=123, ratio=(.7,0, 0.3),move=False)

# Clear temporary files
shutil.rmtree('./dataset/GTSRB/plain/val')
os.remove('./dataset/GTSRB/Readme-Images-Final-test.txt')
os.remove('./dataset/GTSRB/Readme-Images.txt')

Moving: ./dataset/GTSRB/train --> ./dataset/GTSRB/merged: 100%|██████████| 39295/39295 [00:01<00:00, 20384.64it/s]
Moving: ./dataset/GTSRB/test --> ./dataset/GTSRB/merged: 100%|██████████| 12673/12673 [00:00<00:00, 22478.17it/s]
Copying files: 51882 files [00:05, 8763.39 files/s]


## Adding weather conditions to the merged dataset and then split into train and test
- creating **50% Weather** or **100% Weather** dataset depending on fixed probability set below

In [11]:
# For weather augmentation
da = data_augmenter.DataAugmenter(dataset_path='./dataset/GTSRB/')
da.load_images(folder_to_load='merged')
da.add_weather_effects(prob_per_class=1)

# Training 70 %
# Testing 30 %
splitfolders.ratio(merged_dir, output="./dataset/GTSRB/weather", seed=123, ratio=(.7, 0, 0.3), move=True)

# Clear temporary files
shutil.rmtree('./dataset/GTSRB/weather/val')
shutil.rmtree(merged_dir)

Classes found: , 00016, 00033, 00022, 00011, 00003, 00039, 00034, 00035, 00028, 00036, 00037, 00015, 00027, 00038, 00014, 00026, 00019, 00007, 00029, 00005, 00008, 00021, 00025, 00032, 00002, 00000, 00024, 00009, 00012, 00023, 00013, 00041, 00030, 00017, 00006, 00042, 00018, 00040, 00020, 00010, 00031, 00004, 00001


Loading classes: 100%|██████████| 43/43 [00:02<00:00, 20.81it/s]
Add weather effects with probability 1: 100%|██████████| 43/43 [01:01<00:00,  1.43s/it]
Copying files: 51882 files [00:02, 22812.54 files/s]


## Set dataset paths

In [12]:
plain_train_dir = './dataset/GTSRB/plain/train'
plain_test_dir = './dataset/GTSRB/plain/test'

weather_train_dir = './dataset/GTSRB/weather/train'
weather_test_dir = './dataset/GTSRB/weather/test'

# Parameters setup

In [13]:
# Setting device for the computation
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Change this to select between plain or weather dataset
# train_dir = plain_train_dir/weather_train_dir
# test_dir = plain_test_dir/weather_test_dir
train_dir = plain_train_dir
test_dir = plain_test_dir

adam_hyperparams = {
    "num_epochs": 15,
    "batch_size": 64,
    #optimizer
    "opt": "adam",
    "learning_rate": 1e-4,
    "beta_1": 0.9,
    "beta_2": 0.999,
    "eps": 1e-8,
    "weight_decay": 0,
    "momentum": 0,
    #scheduler
    "decay_rate": 0.5,
}

runs_arguments = [
    {'type': 'P', 'cnn': 'CNN'},
    {'type': 'P', 'cnn': 'CNN_ST'},
    {'type': 'W-FULL', 'cnn': 'CNN'},
    {'type': 'W-FULL', 'cnn': 'CNN_ST'},
]

# Method for loading the training/testing set

In [14]:
def load_train_test_dir(train_dir, test_dir):
    
    # Define the transformations to make the image fit into the CNN
    custom_cnn_transform1 = transforms.Compose([
        transforms.Resize((48, 48)),
        transforms.ToTensor(),
    ])
    
    # Load the dataset
    train_dataset = datasets.ImageFolder(train_dir, transform=custom_cnn_transform1)
    
    # Concatenate all images into a single tensor
    images = torch.stack([img for img, _ in train_dataset], dim=0)
    
    # Calculate mean and std across all images and channels
    mean = torch.mean(images, dim=(0, 2, 3))
    std = torch.std(images, dim=(0, 2, 3))
    
    # Redefine the transformation adding the normalization
    custom_cnn_transform2 = transforms.Compose([
        transforms.Resize((48, 48)),
        transforms.ToTensor(),
        transforms.Normalize(mean=mean, std=std)
    ])
    
    # Load dataset
    train_dataset = datasets.ImageFolder(train_dir, transform=custom_cnn_transform2)
    test_dataset = datasets.ImageFolder(test_dir, transform=custom_cnn_transform1)
    
    return train_dataset, test_dataset

# Defining the CNNs from the paper

## Basic Convolutional Neural Network

In [15]:
class CNN(nn.Module):
    def __init__(self):
        super().__init__()
        # Expected input as 48x48
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=200, kernel_size=7, stride=1, padding=2)
        self.pool1 = nn.MaxPool2d(2, 2)

        self.local_norm = nn.LocalResponseNorm(size=5)

        self.conv2 = nn.Conv2d(in_channels=200, out_channels=250, kernel_size=4, stride=1, padding=2)

        self.conv3 = nn.Conv2d(in_channels=250, out_channels=350, kernel_size=4, stride=1, padding=2)

        self.fc1 = nn.Linear(in_features=350 * 6 * 6, out_features=400)

        self.fc2 = nn.Linear(in_features=400, out_features=43)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.pool1(x)

        x = self.local_norm(x)

        x = self.conv2(x)
        x = F.relu(x)
        x = self.pool1(x)

        x = self.local_norm(x)

        x = self.conv3(x)
        x = F.relu(x)
        x = self.pool1(x)

        x = self.local_norm(x)

        # Flatten the output from conv1
        x = x.view(-1, 350 * 6 * 6)

        x = self.fc1(x)
        x = F.relu(x)

        x = self.fc2(x)
        return x

## CNN with 3 spatial transformer units

In [16]:
class CNN_ST(nn.Module):
    def __init__(self):
        super().__init__()
        # Expected input as 48x48
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=200, kernel_size=7, stride=1, padding=2)
        self.conv2 = nn.Conv2d(in_channels=200, out_channels=250, kernel_size=4, stride=1, padding=2)
        self.conv3 = nn.Conv2d(in_channels=250, out_channels=350, kernel_size=4, stride=1, padding=2)

        self.local_norm = nn.LocalResponseNorm(size=5)

        self.pool1 = nn.MaxPool2d(2, 2)

        self.fc1 = nn.Linear(in_features=350 * 6 * 6, out_features=400)
        self.fc2 = nn.Linear(in_features=400, out_features=43)

        # Spatial transformer block 1
        self.loc1 = nn.Sequential(
            nn.MaxPool2d(2, 2),
            nn.Conv2d(in_channels=3, out_channels=250, kernel_size=5, stride=1, padding=2),
            nn.ReLU(True),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(in_channels=250, out_channels=250, kernel_size=5, stride=1, padding=2),
            nn.ReLU(True),
            nn.MaxPool2d(2, 2)
        )
        self.fc_loc1 = nn.Sequential(
            nn.Linear(250 * 6 * 6, 250),
            nn.ReLU(True),
            nn.Linear(250, 6)
        )
        # Initialize the weights/bias with identity transformation
        self.fc_loc1[2].weight.data.zero_()
        self.fc_loc1[2].bias.data.copy_(torch.tensor([1, 0, 0, 0, 1, 0], dtype=torch.float))

        # Spatial transformer block 2
        self.loc2 = nn.Sequential(
            nn.MaxPool2d(2, 2),
            nn.Conv2d(in_channels=200, out_channels=150, kernel_size=5, stride=1, padding=2),
            nn.ReLU(True),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(in_channels=150, out_channels=200, kernel_size=5, stride=1, padding=2),
            nn.ReLU(True),
            nn.MaxPool2d(2, 2)
        )
        self.fc_loc2 = nn.Sequential(
            nn.Linear(200 * 2 * 2, 300),
            nn.ReLU(True),
            nn.Linear(300, 6)
        )
        # Initialize the weights/bias with identity transformation
        self.fc_loc2[2].weight.data.zero_()
        self.fc_loc2[2].bias.data.copy_(torch.tensor([1, 0, 0, 0, 1, 0], dtype=torch.float))

        # Spatial transformer block 3
        self.loc3 = nn.Sequential(
            nn.MaxPool2d(2, 2),
            nn.Conv2d(in_channels=250, out_channels=150, kernel_size=5, stride=1, padding=2),
            nn.ReLU(True),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(in_channels=150, out_channels=200, kernel_size=5, stride=1, padding=2),
            nn.ReLU(True),
            nn.MaxPool2d(2, 2)
        )
        self.fc_loc3 = nn.Sequential(
            nn.Linear(200 * 1 * 1, 300),
            nn.ReLU(True),
            nn.Linear(300, 6)
        )
        # Initialize the weights/bias with identity transformation
        self.fc_loc3[2].weight.data.zero_()
        self.fc_loc3[2].bias.data.copy_(torch.tensor([1, 0, 0, 0, 1, 0], dtype=torch.float))

    def stn1(self, x):
        xs = self.loc1(x)
        xs = xs.view(-1, 250 * 6 * 6)
        theta = self.fc_loc1(xs)
        theta = theta.view(-1, 2, 3)

        grid = F.affine_grid(theta, x.size(), align_corners=False)
        x = F.grid_sample(x, grid, align_corners=False)

        return x

    def stn2(self, x):
        xs = self.loc2(x)
        xs = xs.view(-1, 200 * 2 * 2)
        theta = self.fc_loc2(xs)
        theta = theta.view(-1, 2, 3)

        grid = F.affine_grid(theta, x.size(), align_corners=False)
        x = F.grid_sample(x, grid, align_corners=False)

        return x

    def stn3(self, x):
        xs = self.loc3(x)
        xs = xs.view(-1, 200 * 1 * 1)
        theta = self.fc_loc3(xs)
        theta = theta.view(-1, 2, 3)

        grid = F.affine_grid(theta, x.size(), align_corners=False)
        x = F.grid_sample(x, grid, align_corners=False)

        return x

    def forward(self, x):
        # Spatial transformer 1
        x = self.stn1(x)

        # CNN block 1
        x = self.conv1(x)
        x = F.relu(x)
        x = self.pool1(x)
        x = self.local_norm(x)

        # Spatial transformer 2
        x = self.stn2(x)

        # CNN block 2
        x = self.conv2(x)
        x = F.relu(x)
        x = self.pool1(x)
        x = self.local_norm(x)

        # Spatial transformer 3
        x = self.stn3(x)

        # CNN block 3
        x = self.conv3(x)
        x = F.relu(x)
        x = self.pool1(x)
        x = self.local_norm(x)

        # Flatten the output for dense layers
        x = x.view(-1, 350 * 6 * 6)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.fc2(x)

        return x

## Training function

In [17]:
def test_model(trained_model, test_loader):
    correct = 0
    total = 0
    with torch.no_grad():
        trained_model.eval()
        for data in test_loader:
            images, labels = data
            images, labels = images.to(device), labels.to(device)
            outputs = trained_model(images)
            softmax_outputs = F.softmax(outputs, dim=1)
            _, predicted = torch.max(softmax_outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    test_accuracy = float(correct / total)
    return test_accuracy

## Model setting

In [18]:
# All the stats will be stored in ./runs and they will be readed by tensorboard
# runs_arguments = [
#     {'type': 'P', 'cnn': 'CNN'},
#     {'type': 'P', 'cnn': 'CNN_ST'},
#     {'type': 'W-FULL', 'cnn': 'CNN'},
#     {'type': 'W-FULL', 'cnn': 'CNN_ST'},
# ]
for r_args in tqdm(runs_arguments):
    dataset_type = r_args['type']
    model_architecture = r_args['cnn']

    # If type is P I want to test into W and vice versa
    train_dir = weather_train_dir if dataset_type == 'P' else plain_train_dir
    test_dir =  weather_test_dir if dataset_type == 'P' else plain_test_dir
    train_dataset, test_dataset = load_train_test_dir(train_dir, test_dir)

    # Compute class weights
    train_size = len(train_dataset)
    number_of_classes = len(train_dataset.classes)
    train_samples_per_class = np.bincount(train_dataset.targets)
    class_weights = (train_size / (number_of_classes * train_samples_per_class))
    class_weights = torch.tensor(class_weights, dtype=torch.float)

    hyperparams = adam_hyperparams
    model_name = f'{dataset_type}_{model_architecture}_{hyperparams["opt"]}'
    writer = SummaryWriter(f'runs/{model_name}')

    # Create DataLoader instances for training and validation
    test_loader = DataLoader(
        test_dataset,
        batch_size=hyperparams["batch_size"],
        shuffle=True,
        num_workers=0)

    # Model initialization
    # model = CNN() if model_architecture == 'CNN' else CNN_ST()
    model = torch.load(f'./models/trained_model_{model_name}_final.pth')
    model.to(device)

    ta = test_model(trained_model=model, test_loader=test_loader)
    
    plot_name = 'Training/Weather-full Accuracy' if dataset_type == 'P' else 'Training/Plain Accuracy'
    writer.add_scalar(plot_name, ta)

  return F.conv2d(input, weight, bias, self.stride,
100%|██████████| 4/4 [01:46<00:00, 26.64s/it]
