# Proyecto final (CNN)

Omar David Hernández Aguirre  | A01383543  
Bernardo García Zermeño       | A00570682  
06 de junio de 2023  

In [16]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision
from torchvision.transforms import ToTensor
import os
import pandas as pd
import numpy as np
import random
import matplotlib.pyplot as plt
from matplotlib.image import imread
import cv2
from PIL import Image
import torchvision.transforms as T
from  torch.utils import data
from multiprocessing.dummy import Pool as ThreadPool
from torchvision.transforms.functional import resize
from torch.utils.data import Dataset, DataLoader
from fastprogress.fastprogress import master_bar, progress_bar


# Get cpu or gpu device for training
device = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"
print(f"Using {device} device")

# For training purposes took first 5 classes. Taking all 43 takes a lot of time. 
NUM_CLASSES = 43
BATCH_SIZE = 64

Using cpu device


### The Dataset
Get a list of all the paths of the training images, classified by different number of classes

In [17]:
train_ids = {}
for i in range(NUM_CLASSES):
    path = './archive/Train/' + str(i)
    images = os.listdir(path)
    train_ids[i] = []
    # print(images)
    # print(train_ids)

    for img in images:
            img_path = path + '/' + img
            train_ids[i].append(img_path)

To use DataLoader, we need to get info from a torch Dataset. Pytorch allows us to create our own customize dataset.
A customized Dataset is taken for practical purposes from TUSSALO: https://www.kaggle.com/code/tussalo/gtsrb-99-05-test-accuracy-with-efficient-cnns

TUSSALO includes other methods for extra functionalities that we're not going to use, but this customized dataset allows us to include our imgs in a Pytorch Dataset and use it further

In [18]:
class GTSRB_Dataset(Dataset):
    
    def load_image(self,key_path):
        """
        key_path --> Tuple conisting of a key, which is the label of the traffic-sign and the path to the image.
        Loads the image and converts it into a usable torch.tensor. 
        Returns label and tensor.
        """
        image = torch.tensor(cv2.imread(key_path[1]),dtype = torch.float)/255
        image = image.transpose(0,2)
        image = image.transpose(1,2)
        # By default RGB channels are in BGR order, so needs to be flipped
        image = image.flip(0)
        return key_path[0],image
    
    def augment_image(self, key_image):
        """
        key_image --> Tuple of Label and the image-tensor.
        
        Only if a transformation is defined during initialization of the GTSRB_Dataset, an augmentation will be performed.
        Only if a dict with symmetric classes is given, horizontal flipping will be performed.
        Only if a dict with mirrors is given, horizontal flip and label reassignment will be performed.
        
        The augmentation is allways based on the raw image tensor, which is not yet resized! Resizing will be performed on 
        the augmented data in this function. 
        
        Returns a Label (which might have changed due to mirrors) and the augmented image of uniform size, defined during
        initialization of the GTSRB_Dataset (default is 32x32).
        
        Augmentation is not determistic so a different result will be achieved each time. (probably)
        Creating a deterministic augmentation would be great for reproducibility, but it needs to be different in each iteration
        of the augmentation.
        
        """
        label,image = key_image
        if self.transformation:
            if self.symmetric_classes[str(label).zfill(2)]:
                # self.hflipper is defined during init with probability 0.5
                image = self.hflipper(image)
            elif self.mirrored_classes[str(label).zfill(2)]:
                # If a sign of class A can be flipped to represent a sign of class B, the probability of this A-Sign being flipped is 
                # n(B)/(n(A) + n(B)) --> with n(X) meaning the absolute number of occurences of the class X in the Dataset
                # Thereby the number of unflipped signs in class A + flipped signs from class B and relabeld as A is roughly n(A).
                if np.random.rand() > (self.priors[str(label).zfill(2)]/(self.priors[str(label).zfill(2)] + self.priors[self.mirrored_classes[str(label).zfill(2)]])):
                    image = T.functional.hflip(image)
                    label = int(self.mirrored_classes[str(label).zfill(2)])
            width = image.shape[1]
            height = image.shape[2]
            # transformation is defined outside of the dataset to allow simple adjustments, see below
            image = self.transformation(image)
            # RandomCrop to 95% to induce translational movement and compensated the zoom-out induced by the random perspective
            image = T.RandomCrop((int(width*.95), int(height*.95)))(image)
        return(label, resize(image, self.im_size))

    
    def __init__(self, data_ids,  mirrored_classes, symmetric_classes, augmentation_size, transformation, im_size = [32, 32], device = "cpu"):
        """
        data_ids --> dict of labels and corresponding image paths used for the dataset.
        mirrored_classes --> dict of labels and one corresponding label or None, determines for each class which other one it represents after flipping horizontally
        symmetric_classes --> dict of labels and boolean value if they can be flipped horizontally
        augmentation_size --> int that determines number of augmented versions of each image to be added to the raw data
        transformation --> torchvision.transformation which contains a list of imagetransformations to be used during augmentation
        im_size --> Tuple with number of Pixels in x and y direction each image will be resized to, default : (32,32)
        device --> torch.device, cuda will be used by default 
        
        First the number of occurences of eacht class (priors) is counted. --> will be used with mirrors to calculate probability
        A pool of 8 threads is used to speed up the image loading process.
        A list of all tuples (label, image_tensor) is created: raw_data
        Afterwards there will be (augmentation_size)-iterations over the raw_data, augmenting eacht image individually (see function augment image)
        raw_data and augmented_data are stacked together to data tensor with all the images and label tensor containing the label for each index.
        
        These two tensors are used as the mapstyle-dataset.        
        The whole dataset is also moved to the specified device, e.g. gpu, in order to avoid memory bottlenecks during training.
    
        
        """
        
        self.priors = {str(key).zfill(2) : len(paths) for key,paths in data_ids.items()}
        self.image_paths = [(key,path) for key,paths in data_ids.items() for path in paths]
        self.mirrored_classes = mirrored_classes
        self.symmetric_classes = symmetric_classes
        self.im_size = im_size
        self.transformation = transformation
        pool = ThreadPool(8)
        self.raw_data = pool.map(self.load_image, self.image_paths)

        self.augmented_data = []

        if augmentation_size > 0:
            self.hflipper =  T.RandomHorizontalFlip(p=0.5)
            for i in range(augmentation_size):
                # I could not find a way to create a deterministic augmentation while using the pool of threads. It worked without the multithreading but just took to long.
                self.augmented_data += pool.map(self.augment_image, self.raw_data)

        
        self.data = torch.stack([resize(image, im_size) for _,image in self.raw_data] + [image for _,image in self.augmented_data]).to(torch.device(device))
        self.labels = torch.tensor([int(label) for label,_ in self.raw_data] + [int(label) for label,_ in self.augmented_data]).to(torch.uint8).to(torch.device(device))
        del self.raw_data, self.augmented_data

    def __len__(self):
        return len(self.data)
    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]
    


Now we can divide out training and test data, and create the DataLoaders for each to work with our CNN

In [19]:
training_dataset = GTSRB_Dataset(train_ids,None,None,0,None)
train_data_size = 0.8 * len(training_dataset)
test_data_size =  len(training_dataset) - int(train_data_size)
# print(test_data_size)
train_data, test_data = data.random_split(training_dataset, [int(train_data_size), int(test_data_size)])
trainloader = DataLoader(train_data, batch_size=64)
testloader = DataLoader(test_data, batch_size=64)



7842


### Modeling
2 feature learning sections
1 classification section

In [20]:
class CNN(nn.Module):
    """
    Description: 
    """
    def __init__(self, n_classes):

        super(CNN, self).__init__()
        self.model = nn.Sequential(
          nn.Conv2d(in_channels= 3, out_channels=16, kernel_size=3, stride=1,bias = True),
          nn.ReLU(),
          nn.Conv2d(in_channels= 16, out_channels=32, kernel_size=3, stride=1,bias = True),
          nn.ReLU(),
          nn.MaxPool2d(kernel_size = 2),
          nn.BatchNorm2d(32),

          nn.Conv2d(in_channels= 32, out_channels=64, kernel_size=3, stride=1,bias = True),
          nn.ReLU(),
          nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1,bias = True),
          nn.ReLU(),
          nn.MaxPool2d(kernel_size = 2),
          nn.BatchNorm2d(128),

          nn.Flatten(),
          nn.Linear(in_features=128*25, out_features=512, bias = True),
          nn.ReLU(),
          nn.BatchNorm1d(512),
          nn.Dropout(0.5),
          nn.Linear(in_features = 512, out_features = n_classes, bias = True)        
        )
    
    def forward(self, x):
        return self.model(x)

Función de training 

In [21]:
def train(model, trainloader, lossfn, optimizer, print_outputs = True, epochs = 5):
    """
    Description:   
    
    """

    train_losses = []
    train_accuracy = []    
        
    # Counters to keep accuracy tracking
    correct_train = 0
    loss_train = 0
    model.train()

    for batch, (X, y) in enumerate(trainloader):
        X = X.to(device)
        y = y.to(device)
        # print(X.size())

        # Compute prediction error
        pred=model(X)
        loss = lossfn(pred, y)
        loss_train += loss.item()

        #Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Keeping track of the num of correct predictions
        _,pred_idxs = torch.topk(pred, 1)
        correct_train += torch.eq(y, pred_idxs.squeeze()).sum().item()

    acc_train = correct_train/len(trainloader)
    loss_train = loss_train/len(trainloader)
    
    train_losses.append(loss_train)
    train_accuracy.append(acc_train)


    return acc_train, loss_train


Función de testing

In [22]:
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    # print(size)
    num_batches = len(dataloader)   # Obtenemos el num batches para el analisis porcentual del error
    model.eval()
    test_loss, correct = 0, 0       # Variables para el analisis del error
    with torch.no_grad():           # A pesar de poner el model.eval(), este es un step mas para que el modelo no aprenda de lo que va a hacer
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            
            # Keeping track of the num of correct predictions
            _,pred_idxs = torch.topk(pred, 1)
            correct += torch.eq(y, pred_idxs.squeeze()).sum().item()
            # print(correct)

    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

Aplicamos el modelo 

In [25]:
model = CNN(NUM_CLASSES)
model.to(device)

# # Create a loss function
loss_fn = nn.CrossEntropyLoss()
# # Create the optimizer
optimizer = torch.optim.SGD(params=model.parameters(),  # parameters of target model to optimize
                            lr=0.001)
epochs = 15
for t in range(epochs):
    acc_train, loss_train = train(model, trainloader, loss_fn, optimizer)
    print(f"Training - Epoch: {t}  Accuracy: {acc_train} Avg Loss: {loss_train} ")


test(testloader, model, loss_fn)



Training - Epoch: 0  Accuracy: 28.036659877800407 Avg Loss: 2.320025452528369 
Training - Epoch: 1  Accuracy: 48.08757637474542 Avg Loss: 1.1166371111957216 
Training - Epoch: 2  Accuracy: 55.30142566191446 Avg Loss: 0.6930267330099754 
Training - Epoch: 3  Accuracy: 58.70264765784114 Avg Loss: 0.47820855109609073 
Training - Epoch: 4  Accuracy: 60.27902240325866 Avg Loss: 0.36155581313456636 
Training - Epoch: 5  Accuracy: 61.329938900203665 Avg Loss: 0.2833914452985445 
Training - Epoch: 6  Accuracy: 61.93482688391039 Avg Loss: 0.2302160084763515 
Training - Epoch: 7  Accuracy: 62.372708757637476 Avg Loss: 0.19296907251909406 
Training - Epoch: 8  Accuracy: 62.70672097759674 Avg Loss: 0.16525382219288354 
Training - Epoch: 9  Accuracy: 62.91649694501018 Avg Loss: 0.14263191367755595 
Training - Epoch: 10  Accuracy: 63.057026476578415 Avg Loss: 0.12556423173265155 
Training - Epoch: 11  Accuracy: 63.193482688391036 Avg Loss: 0.11166406928916327 
Training - Epoch: 12  Accuracy: 63.2301

Main steps:
1. X Data preparation: make images into tensors with DataLoader 
2. X Make train set and test set
3. X Make the model
4. X Train the model
5. X Test the model
6. Evaluate


In [24]:
classes = { 1:'Speed limit (20km/h)',
            2:'Speed limit (30km/h)',
            3:'Speed limit (50km/h)',
            4:'Speed limit (60km/h)',
            5:'Speed limit (70km/h)',
            6:'Speed limit (80km/h)',
            7:'End of speed limit (80km/h)',
            8:'Speed limit (100km/h)',
            9:'Speed limit (120km/h)',
            10:'No passing',
            11:'No passing veh over 3.5 tons',
            12:'Right-of-way at intersection',
            13:'Priority road',
            14:'Yield',
            15:'Stop',
            16:'No vehicles',
            17:'Veh > 3.5 tons prohibited',
            18:'No entry',
            19:'General caution',
            20:'Dangerous curve left',
            21:'Dangerous curve right',
            22:'Double curve',
            23:'Bumpy road',
            24:'Slippery road',
            25:'Road narrows on the right',
            26:'Road work',
            27:'Traffic signals',
            28:'Pedestrians',
            29:'Children crossing',
            30:'Bicycles crossing',
            31:'Beware of ice/snow',
            32:'Wild animals crossing',
            33:'End speed + passing limits',
            34:'Turn right ahead',
            35:'Turn left ahead',
            36:'Ahead only',
            37:'Go straight or right',
            38:'Go straight or left',
            39:'Keep right',
            40:'Keep left',
            41:'Roundabout mandatory',
            42:'End of no passing',
            43:'End no passing veh > 3.5 tons' }