# Traffic Sign Recognition Classifier

### If using the Google Colab, mount the drive

In [None]:
from google.colab import drive

drive.mount('/content/drive')

## 1. Load the data

For training and testing the traffic sign classifier, the GTSRB dataset (German Traffic Sign Recognition Benchmark) provided by the Institut für Neuroinformatik group is used. Link to the full dataset is [here](http://benchmark.ini.rub.de/?section=gtsrb&subsection=dataset).

However, here we have only used the pickled dataset with images resized to 32x32. This dataset needs to be downloaded and extracted in the current workspace directory. Download link to the pickled dataset is [here](https://s3-us-west-1.amazonaws.com/udacity-selfdrivingcar/traffic-signs-data.zip).

In [None]:
import os
import pickle

def load_traffic_sign_data(training_file, testing_file):
    with open(training_file, mode='rb') as f:
        train = pickle.load(f)
    with open(testing_file, mode='rb') as f:
        test = pickle.load(f)
    return train, test

# Load pickled data
train, test = load_traffic_sign_data('/content/drive/My Drive/Colab Notebooks/traffic-signs-data/train.p', '/content/drive/My Drive/Colab Notebooks/traffic-signs-data/test.p')
    
X_train, y_train = train['features'], train['labels']
X_test, y_test = test['features'], test['labels']

y_train = y_train.astype('int64')
y_test = y_test.astype('int64')

---

## Step 1: Dataset Summary & Exploration

The pickled data is a dictionary with 4 key/value pairs:

- `'features'` is a 4D array containing raw pixel data of the traffic sign images, (num examples, width, height, channels).
- `'labels'` is a 2D array containing the label/class id of the traffic sign. The file `signnames.csv` contains id -> name mappings for each id.
- `'sizes'` is a list containing tuples, (width, height) representing the original width and height the image.
- `'coords'` is a list containing tuples, (x1, y1, x2, y2) representing coordinates of a bounding box around the sign in the image. **THESE COORDINATES ASSUME THE ORIGINAL IMAGE. THE PICKLED DATA CONTAINS RESIZED VERSIONS (32 by 32) OF THESE IMAGES**

In [None]:
import numpy as np

# Number of examples
n_train, n_test = X_train.shape[0], X_test.shape[0]

# What's the shape of an traffic sign image?
image_shape = X_train[0].shape

# How many classes?
n_classes = np.unique(y_train).shape[0]

print("Number of training examples =", n_train)
print("Number of testing examples  =", n_test)
print("Image data shape  =", image_shape)
print("Number of classes =", n_classes)

Visualize the German Traffic Signs Dataset using the pickled file(s).

- First we can visualize some images sampled from training set:

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

# show a random sample from each class of the traffic sign dataset
rows, cols = 4, 12
fig, ax_array = plt.subplots(rows, cols)
plt.suptitle('RANDOM SAMPLES FROM TRAINING SET (one for each class)')
for class_idx, ax in enumerate(ax_array.ravel()):
    if class_idx < n_classes:
        # show a random image of the current class
        cur_X = X_train[y_train == class_idx]
        cur_img = cur_X[np.random.randint(len(cur_X))]
        ax.imshow(cur_img)
        ax.set_title('{:02d}'.format(class_idx))
    else:
        ax.axis('off')
# hide both x and y ticks
plt.setp([a.get_xticklabels() for a in ax_array.ravel()], visible=False)
plt.setp([a.get_yticklabels() for a in ax_array.ravel()], visible=False)
plt.draw()

- We can also get the idea of how these classes are distributed in both training and testing set

In [None]:
# bar-chart of classes distribution
train_distribution, test_distribution = np.zeros(n_classes), np.zeros(n_classes)
for c in range(n_classes):
    train_distribution[c] = np.sum(y_train == c) / n_train
    test_distribution[c] = np.sum(y_test == c) / n_test
fig, ax = plt.subplots()
col_width = 0.5
bar_train = ax.bar(np.arange(n_classes), train_distribution, width=col_width, color='r')
bar_test = ax.bar(np.arange(n_classes)+col_width, test_distribution, width=col_width, color='b')
ax.set_ylabel('PERCENTAGE OF PRESENCE')
ax.set_xlabel('CLASS LABEL')
ax.set_title('Classes distribution in traffic-sign dataset')
ax.set_xticks(np.arange(0, n_classes, 5)+col_width)
ax.set_xticklabels(['{:02d}'.format(c) for c in range(0, n_classes, 5)])
ax.legend((bar_train[0], bar_test[0]), ('train set', 'test set'))
plt.show()

----

## Design a Model Architecture

Design and implement a deep learning model that learns to recognize traffic signs.

There are various aspects to consider when thinking about this problem:

- Neural network architecture
- Play around preprocessing techniques (normalization, rgb to grayscale, etc)
- Number of examples per label (some have more than others).
- Generate fake data.


**Feature preprocessing**

Following this paper [[Sermanet, LeCun]](http://yann.lecun.com/exdb/publis/pdf/sermanet-ijcnn-11.pdf) I employed three main steps of feature preprocessing:

1) *each image is converted from RGB to YUV color space, then only the Y channel is used.* This choice can sound at first suprising, but the cited paper shows how this choice leads to the best performing model. This is slightly counter-intuitive, but if we think about it arguably we are able to distinguish all the traffic signs just by looking to the grayscale image.

2) *contrast of each image is adjusted by means of histogram equalization*. This is to mitigate the numerous situation in which the image contrast is really poor.
a
3) *each image is centered on zero mean and divided for its standard deviation*. This feature scaling is known to have beneficial effects on the gradient descent performed by the optimizer.

In [None]:
import cv2 

def preprocess_features(X, equalize_hist=True):

    # convert from RGB to YUV
    X = np.array([np.expand_dims(cv2.cvtColor(rgb_img, cv2.COLOR_RGB2YUV)[:, :, 0], 2) for rgb_img in X])

    # adjust image contrast
    if equalize_hist:
        X = np.array([np.expand_dims(cv2.equalizeHist(np.uint8(img)), 2) for img in X])

    X = np.float32(X)

    # standardize features
    X -= np.mean(X, axis=0)
    X /= (np.std(X, axis=0) + np.finfo('float32').eps)

    return X

X_train_norm = preprocess_features(X_train)
X_test_norm = preprocess_features(X_test)

### Dataset Augmentation

Use pytorch "transforms" to augment the dataset by randomly doing affine transformations on the images. Also including the ToTensor() to convert the images to pytorch tensors

In [None]:
from sklearn.model_selection import train_test_split
import torch
from torchvision import transforms

# split into train and validation
VAL_RATIO = 0.2
X_train_norm, X_val_norm, y_train, y_val = train_test_split(X_train_norm, y_train, test_size=VAL_RATIO, random_state=0)

image_transform = transforms.Compose([transforms.ToPILImage(),                                   
                                     transforms.RandomAffine(degrees=15.,
                                                            translate=(0.1,0.1),
                                                            scale=(0.8,1.2),
                                                            ),
                                     transforms.ToTensor(),
                                     ])


In [None]:
standard_transform = transforms.Compose([transforms.ToPILImage(),                                   
                                       transforms.ToTensor(),
                                      ])

### Visualize random tranformations on an image

In [None]:
# take a random image from the training set
img_rgb = X_train[100]

# plot the original image
plt.figure(figsize=(1,1))
plt.imshow(img_rgb)
plt.title('Example of RGB image (class = {})'.format(y_train[0]))
plt.show()

# plot some randomly augmented images
rows, cols = 4, 10
fig, ax_array = plt.subplots(rows, cols)
for ax in ax_array.ravel():
    augmented_img = image_transform(img_rgb)
    ax.imshow(augmented_img.numpy().transpose((1,2,0)))
plt.setp([a.get_xticklabels() for a in ax_array.ravel()], visible=False)
plt.setp([a.get_yticklabels() for a in ax_array.ravel()], visible=False)
plt.suptitle('Random examples of data augmentation (starting from the previous image)')
plt.show()

### Create a pytorch dataset using torch.utils.data.TensorDataset

In [None]:
def create_tensor_dataset(X_array, y_array, transform):
    X_torch = torch.stack([transform(img) for img in X_array])
    y_torch = torch.from_numpy(y_array)
    dataset = torch.utils.data.TensorDataset(X_torch, y_torch)
    return(dataset)

In [None]:
# Not using image_transform for training because PyTorch tranform is creating black backgrounds for rotation and shift transforms whereas TF's tranforms seem to adjust the images without black backgrounds
# @ToDo Solve the black background issue of pytorch tranforms and test again.

train_set = create_tensor_dataset(X_train_norm, y_train, standard_transform)
val_set = create_tensor_dataset(X_val_norm, y_val, standard_transform)
test_set = create_tensor_dataset(X_test_norm, y_test, standard_transform)

### Define a dataloader to create train, validation and test pytorch dataloaders

In [None]:
def get_data_loader(dataset, batch_size):
    loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, num_workers=2)
    return(loader)

### Define the convolutional network 
As mentioned in the paper, the network will have two conv layers with relu activations each separated by maxpooling layers. Conv layers are padded to maintain the input size. Dropout layers need to be added.
Finally two fully-connected layers are added. The outputs from both convolutional layers are flattened and concatenated before feeding to the FC layers (as explained in the paper).

In [None]:
import torch.nn.functional as F
class my_net(torch.nn.Module):
    def __init__(self, n_classes, dropout_prob=0.2):
        super(my_net,self).__init__()
        
        self.conv1 = torch.nn.Conv2d(1,64,kernel_size=3,stride=1,padding=1)
        self.conv2 = torch.nn.Conv2d(64,128,kernel_size=3,stride=1,padding=1)
        
        self.pool = torch.nn.MaxPool2d(kernel_size=2,stride=2,padding=0)
        
        self.drop = torch.nn.Dropout(p=dropout_prob)
        
        self.fc1 = torch.nn.Linear(16*16*64 + 8*8*128,64)
        self.fc2 = torch.nn.Linear(64,n_classes)
    
    def forward(self, x):
        
        # computes the activation of the first convolution
        # input images are (1,32,32). height and width are preserved. Output size = (64,32,32)
        x = F.relu(self.conv1(x))

        # size changes from (64,32,32) to (64,16,16)
        x = self.pool(x)
        x = self.drop(x)
                
        # flatten and save output to feed the low level features of this 1st conv layer to the dense FC layers
        # size of x_conv1 = (1,16*16*64)
        x_conv1 = x.view(-1,16*16*64)
        
        # computes the activation of the second convolution
        # size changes from (64,16,16) to (128,16,16) 
        x = F.relu(self.conv2(x))
        
        # size changes from (128,16,16) to (128,8,8)
        x = self.pool(x)
        x = self.drop(x)
        
        # Flatten the input to feed the FC layers
        # Size changes from (128,8,8,) to (1,8*8*128)
        x = x.view(-1,8*8*128)
                
        # Concatenate with the flattened conv1 layer output
        # size changes from (1,8*8*128) to (1, 16*16*64 + 8*8*128)
        x = torch.cat((x_conv1, x), dim=1)
        
        # FC1. Size changes from (1,16*16*64 + 8*8*128) to (1,64)
        x = F.relu(self.fc1(x))
        
        x = self.drop(x)
        
        # FC2. Size changes from (1,64) to (1,n_classes])
        x = self.fc2(x)
        
        return(x)

###  Define the loss and optimizer

In [None]:
import torch.optim as optim

def createLossAndOptimizer(net, learning_rate=0.001):
    loss = torch.nn.CrossEntropyLoss()
    
    optimizer = optim.Adam(net.parameters(), lr=learning_rate)
    
    return (loss,optimizer)

If GPU is available or using Google Colab's GPU, make this parameter True to use GPU

In [None]:
use_cuda = False

### Define the accuracy function
use net.eval() to turn off dropout and put the model in evaluation mode.

In [None]:
from torch.autograd import Variable
def calculate_accuracy(net, data_loader):
    correct = 0
    total = 0
    
    # Put the model in eval mode so that dropout layers are ineffective
    net.eval()
    
    with torch.no_grad():
        for data in data_loader:
            inputs, labels = data
            inputs, labels = Variable(inputs), Variable(labels)
            # If GPU available
            if(use_cuda & torch.cuda.is_available()):
              inputs = inputs.to('cuda')
              labels = labels.to('cuda')
            outputs = net(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    return(100 * correct/total)

###  To Store the accuracies and losses during training

In [None]:
train_accuracies, val_accuracies = [], []
train_losses, val_losses = [], []

### Define the training function

In [None]:
import time

def trainNet(net, batch_size, n_epochs, learning_rate):
    print("=========== HYPERPARAMETERS ===========")
    print("batch size = ", batch_size)
    print("epochs = ",n_epochs)
    print("learning rate = ",learning_rate)
    print("="*30)
    
    # Get training and validation data
    train_loader = get_data_loader(train_set, batch_size)
    val_loader = get_data_loader(val_set, batch_size)
    n_batches = len(train_loader)
    
    # Create loss and optimizer functions
    loss, optimizer = createLossAndOptimizer(net, learning_rate)
    
    # Time for printing
    training_start_time = time.time()
    
    # Loop for n_epochs
    for epoch in range(n_epochs):
        
        # Put the model in train mode so that dropout layers are effective
        net.train()
        
        running_loss = 0.0
        running_losses = []
        print_every = n_batches//10
        start_time = time.time()
        total_train_loss = 0
        
        for i, data in enumerate(train_loader, 0):
            
            # Get inputs
            inputs, labels = data
            
            # Wrap them in a variable object
            inputs, labels = Variable(inputs), Variable(labels)
            
            # If GPU available
            if(use_cuda & torch.cuda.is_available()):
              inputs = inputs.to('cuda')
              labels = labels.to('cuda')
            
            # Set the parameters gradients to zero
            optimizer.zero_grad()
            
            # Forward Pass and loss
            outputs = net(inputs)
            loss_size = loss(outputs, labels)
            
            # Backprop
            loss_size.backward()
            
            # Optimize
            optimizer.step()
            
            # Get running loss
            running_loss += loss_size.item()
            total_train_loss += loss_size.item()
            
            if (i+1)%(print_every+1) == 0:
                print("Epoch: {}, {:d}% \t train_loss: {:.2f} took: {:.2f}s".format(
                    epoch+1, int(100*(i+1)/n_batches), running_loss/print_every, time.time() - start_time))
                running_losses.append(running_loss/print_every)
                running_loss = 0.0
                start_time = time.time()
        
        # Append running training losses to list
        train_losses.append(running_losses)
        
        # After each epoch, calculate loss on the val set
        total_val_loss = 0
        
        # Put the model in eval mode so that dropout layers are ineffective
        net.eval()
        
        for inputs, labels in val_loader:
            
            # Wrap variables in Variable objects
            inputs, labels = Variable(inputs), Variable(labels)
            
            # If GPU available
            if(use_cuda & torch.cuda.is_available()):
              inputs = inputs.to('cuda')
              labels = labels.to('cuda')
            
            # Forward Pass
            val_outputs = net(inputs)
            val_loss_size = loss(val_outputs, labels)
            total_val_loss += val_loss_size.item()
            
        print("Validation loss = {:.2f}".format(total_val_loss/len(val_loader)))
        
        # Append the val loss to list
        val_losses.append(total_val_loss/(len(val_loader)))
        
        # Calculate accuracy after each epoch
        train_accuracy = calculate_accuracy(net, train_loader)
        val_accuracy = calculate_accuracy(net, val_loader)
        print('Train Accuracy = {:.3f} - Validation Accuracy: {:.3f}'.format(train_accuracy, val_accuracy))
        
        # Append the accuracies to list
        train_accuracies.append(train_accuracy)
        val_accuracies.append(val_accuracy)
        
        # Save the epoch
        #torch.save(net.state_dict(), os.path.join(os.getcwd()+'/models/', 'epoch-{}.pth'.format(epoch)))
    
    print("Training finished. Took {:.2f}s".format(time.time() - training_start_time))

### Create the model object

In [None]:
net = my_net(n_classes, dropout_prob=0.2)
if(use_cuda & torch.cuda.is_available()):
  print("GPU Available")
  net.to('cuda')

### Start the training

In [None]:
trainNet(net, batch_size=32, n_epochs=30, learning_rate=0.001)

## Test the model

In [None]:
test_loader = get_data_loader(test_set, batch_size=128)
test_accuracy = calculate_accuracy(net, test_loader)
print("Accuracy on the test dataset = {:.3f}".format(test_accuracy))