# Self-Driving Car Engineer Nanodegree

## Deep Learning Traffic Sign Classifier


In [None]:
import pickle
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import cv2
import random
import seaborn as sns
from sklearn.utils import shuffle

%matplotlib inline

import tensorflow as tf

## Step 0: Load The Data

In [None]:
training_file = 'data/train.p'
validation_file='data/test.p'
testing_file = 'data/valid.p'

with open(training_file, mode='rb') as f:
    train = pickle.load(f)
with open(validation_file, mode='rb') as f:
    valid = pickle.load(f)
with open(testing_file, mode='rb') as f:
    test = pickle.load(f)
    
x_train, y_train = train['features'], train['labels']
x_valid, y_valid = valid['features'], valid['labels']
x_test, y_test = test['features'], test['labels']

---

## Step 1: Dataset Summary & Exploration

The pickled data is a dictionary with 4 key/value pairs:

- `'features'` is a 4D array containing raw pixel data of the traffic sign images, (num examples, width, height, channels).
- `'labels'` is a 1D array containing the label/class id of the traffic sign. The file `signnames.csv` contains id -> name mappings for each id.
- `'sizes'` is a list containing tuples, (width, height) representing the original width and height the image.
- `'coords'` is a list containing tuples, (x1, y1, x2, y2) representing coordinates of a bounding box around the sign in the image. **THESE COORDINATES ASSUME THE ORIGINAL IMAGE. THE PICKLED DATA CONTAINS RESIZED VERSIONS (32 by 32) OF THESE IMAGES**

Complete the basic data summary below. Use python, numpy and/or pandas methods to calculate the data summary rather than hard coding the results. For example, the [pandas shape method](http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.shape.html) might be useful for calculating some of the summary results. 

### Provide a Basic Summary of the Data Set Using Python, Numpy and/or Pandas

In [None]:
n_train = x_train.shape[0]
n_validation = x_valid.shape[0]
n_test = x_test.shape[0]
image_shape = x_train[0].shape
n_classes = len(set(y_train))

print("Number of training examples =", n_train)
print("Number of testing examples =", n_test)
print("Number of validation examples =", n_validation)
print("Image data shape =", image_shape)
print("Number of classes =", n_classes)

There are around 34000 images in our training data that display 43 seperate classes of traffic signs. The input comes as a 32x32 image that contains RGB channels.

We will train the model to learn from the training set, we use the validation set to see the effect on a bigger scale when changing the hyperparameters and will ultimatly measure the models performance against the 4410 examples of the test set.

A minimum of 0.93 validation accuracy will be required for the project.

### Include an exploratory visualization of the dataset

First thing that can be seen in the data is that its ordered. To display a variaty of example pictures, I will slice the training data roughly with the average count each image should be included.  

The road signs have some interesting properties, which might be interesting for processing later:

Shapes:
- mostly round shapes (75%)
- triangular is 2nd highest but features up and down (eg. Traffic Light or Yield)
- rectangle occurs once (Priority Road)
- hexagon occurs once (Stop)

Colors:
- most dominatant colors are clearly red, blue and white
- blue signs always feature one or more white arrows, which determine its class (also red signs dont feature white arrows since their background is white), therefore the blue channel does not hold much information
- red signs come in multiple shapes, but the most similar signs are round with the information of the sign in the middle (eg Speed Limit, driving limitations or caution signals) 

General:
- some images are really bright, some are almost black. The images will have to be normalized to clearly identify the distinct parts of each class
- traffic sign and background are not clearly distinct, contrast needs to be normalized
- RGB channels should be disgarded in favor of R channel, since blue signs show distinct properties anyway and the only yellow sign has a unique shape. Red channel is clearly the best option, since the majority of observations display red color. 

In [None]:
rows = 5
cols = 10
f = int((x_train.shape[0] * 0.9) / len(set(y_train)))

fig, ax = plt.subplots(rows,cols, dpi=160)
ax = ax.ravel()

for i in range(rows*cols):
    ax[i].axis('off')
    ax[i].imshow(x_train[i * 700], cmap='gray')    
    
plt.show()


Since we have 43 different classes, one thing that we have to consider when implementing a neural network is distribution bias. With the given distribution of pictures our model will favor higher occuring road signs when unsure what to pick. There is no ground truth here, but an equal occurance will to prevent our model from overfitting in a particular direction, wheras keeping the given distribution will favor more frequently occuring signs in real world. When checking the distributions of the images, we can clearly see that some images occur around 2000 times, whereas some class occurances are only around 200 images. 

In [None]:
fig, ax = plt.subplots(figsize=(10,10), dpi=100)
sns.countplot(ax=ax, data=pd.DataFrame(y_train),y=0, order=pd.DataFrame(y_train)[0].value_counts().index)
ax.set(xlabel='Occurance', ylabel='Sign')
plt.show()

----

## Step 2: Design and Test a Model Architecture


### Pre-process the Data Set

Since we learned earlier that the train data has a distribution bias, I tried two approaches to achieve a better validation accuracy. Regardless of the bias, we need to create more training data by augmenting the given data set. When thinking of how these images will be taken from a vehicle, the augmenting operations will be:
- blurring (to simulate a driving vehicle)
- perspective transform (either left or right to simulate the perspective of taking the picture)
- rotation (to improve stability of observations) 

The augmentations have some sort of randomizer in them, that determines the blur, the degree of rotation, and the warp to left or right. 

In [None]:
def blur_img(img):
    kernel = np.random.choice(np.array([3,5]))
    return cv2.medianBlur(img, kernel)

def rotate(img):
    rot_degree = np.random.choice(np.array([10,15,20,25,30]))
    rot_size = np.random.choice(np.array([0.8,0.9,1,1.1,1.2]))
    rot_dir = np.random.choice(np.array([1, -1]))
    rot = cv2.getRotationMatrix2D((16,16),rot_degree * rot_dir, rot_size)
    return cv2.warpAffine(img, rot, (32,32))

def warp(img):
    
    src = np.float32(
    [[32,0], # bottom left
     [32, 32], # bottom right
     [0,0], # top left
     [0, 32]]) # top right
    
    dst_l = np.float32(
    [[32,5],
     [32, 27],
     [0,0],
     [0, 32]])
    dst_r = np.float32(
    [[32,0],
     [32, 32],
     [0,5],
     [0, 27]])    
    
    dst = dst_r if np.random.choice([0,1]) == 1 else dst_l
    
    M = cv2.getPerspectiveTransform(src, dst)
    warped_image = cv2.warpPerspective(img, M, (img.shape[1], img.shape[0]), flags=cv2.INTER_LINEAR)
    return warped_image

def change_img(img):
    opt = np.random.choice([0,1,2])
    if opt == 0:
        img = blur_img(img)
    elif opt == 1:
        img = rotate(img)
    elif opt == 2:
        img = warp(img)
    return img

cols = 10

fig, ax = plt.subplots(1,cols, dpi=160, figsize=(5,5))
ax = ax.ravel()

for i in range(cols):
    ax[i].axis('off')
    ax[i].imshow(change_img(x_train[5000]), cmap='gray')    
plt.show()

After a short demonstration of the data augmentations, I state how many pictures of each class should be used for training. 

The two approaches feature creating a dictionary for each label and storing the indexes of the corresponding image within the key. Then I can either create a target sample amount (option 1) or to keep the distribution state how many times each image should be randomly augmented (option 2). This should in general improve the models performance on accuracy by more generalized approach to classification. 

After evaluation I achieved better results with Option 1.

In [None]:
data_dict = {key:None for key in set(y_train)}

for i in range(0,len(y_train)):
    if data_dict[y_train[i]] == None:
        data_dict[y_train[i]] = [i]
    else:
        data_dict[y_train[i]].append(i)

In [None]:
# Option 1 - Remove distribution bias

target_samples = 3000

new_samples = []

# generate new samples
for key in data_dict.keys():
    samples_to_create = target_samples - len(data_dict[key])
    for i in range(0, samples_to_create):
        img = x_train[np.random.choice(data_dict[key])]
        changed_img = change_img(img)
        new_samples.append([
            changed_img, key
        ])  

# merge with existing 
for i in range(0,len(x_train)):
    new_samples.append([
        x_train[i], y_train[i]
    ])
    
random.shuffle(new_samples)
len(new_samples)



In [None]:
# Option 2 - augment image n times (not used)

#target_factor = 5

#new_samples = []

# generate new samples
#for key in data_dict.keys():
#    for sample in data_dict[key]:
#        for i in range(target_factor):
#            img = change_img(x_train[sample])
#            new_samples.append([
#                img, key
#            ])
    
# merge with existing 
#for i in range(0,len(x_train)):
#    new_samples.append([
#        x_train[i], y_train[i]
#    ])
    
#random.shuffle(new_samples)
#len(new_samples)

After creating augmented images of the originals, the input data is send through the preparation pipeline, consisting of:
- contrast equalization to improve performance on bright or shady pictures
- select only red channel
- scale the values from 0,255 between 0,1

The pipeline is then applied to train, validation and test data. 

In [None]:
def scale(img, out_range=(0, 1), axis=None):
    domain = np.min(img, axis), np.max(img, axis)
    y = (img - (domain[1] + domain[0]) / 2) / (domain[1] - domain[0])
    return y * (out_range[1] - out_range[0]) + (out_range[1] + out_range[0]) / 2

def contrast_equalization(img):
    img[:,:,0] = cv2.equalizeHist(img[:,:,0])
    img[:,:,1] = cv2.equalizeHist(img[:,:,1])
    img[:,:,2] = cv2.equalizeHist(img[:,:,2])
    return img


for i in range(0,len(new_samples)):
    new_samples[i][0] = contrast_equalization(new_samples[i][0])
    new_samples[i][0] = scale(new_samples[i][0][:,:,0])
    
X_train = [x[0].reshape(32,32,1) for x in new_samples] 
y_train = [y[1] for y in new_samples]

In [None]:
# same has to be done with the validation / test data
X_valid = [scale(contrast_equalization(i)[:,:,0]).reshape(32,32,1) for i in x_valid]
X_test = [scale(contrast_equalization(i)[:,:,0]).reshape(32,32,1) for i in x_test]

### Model Architecture

The designed model is really similar to LeNet. It consists out of 3 Convolutional Layers and 4 flat Dense Layers. 

| Layer                                        | Shape    |
|----------------------------------------------|----------|
| Input Image                                  | 32x32x1  |
| Conv2d / ReLu / Dropout (0.9)              | 28x28x8  |
| Conv2d / ReLu / MaxPooling / Dropout (0.9) | 24x24x16 |
| Conv2d / ReLu / MaxPooling / Dropout (0.9) | 20x20x32 |
| Flatten / Dense / ReLu / Dropout (0.6)     | 512      |
| Dense / ReLu / Dropout (0.6)               | 256      |
| Dense / ReLu / Dropout (0.6)               | 128      |
| Dense                                      | 43       |

As for the hyperparameters, I choose 40 Epochs and a Batch Size of 256. 

With the later implemented decay (1e-5) of the learning rate (1e-3) I try to smooth the learning gradually over time to prevent the model from adapting to quickly deep in training process. I noticed that starting with a higher learning rate speeds up the earlier training quite a bit.

In [None]:
EPOCHS = 40
BATCH_SIZE = 128

In [None]:
def CNN_architecture(x, mu=0, sigma=0.1):

    # Layer 1
    CONVOL1_W = tf.Variable(tf.truncated_normal(shape=(3, 3, 1, 8), mean = mu, stddev = sigma))
    CONVOL1_b = tf.Variable(tf.zeros(8))
    CONVOL1   = tf.nn.conv2d(x, CONVOL1_W, strides=[1, 1, 1, 1], padding='SAME') + CONVOL1_b
    CONVOL1 = tf.nn.relu(CONVOL1)
    CONVOL1 = tf.nn.dropout(CONVOL1, 0.95)
    
    # Layer 2
    CONVOL2_W = tf.Variable(tf.truncated_normal(shape=(3, 3, 8, 16), mean = mu, stddev = sigma))
    CONVOL2_b = tf.Variable(tf.zeros(16))
    CONVOL2   = tf.nn.conv2d(CONVOL1, CONVOL2_W, strides=[1, 1, 1, 1], padding='VALID') + CONVOL2_b
    CONVOL2 = tf.nn.relu(CONVOL2)
    CONVOL2 = tf.nn.max_pool(CONVOL2, ksize=[1, 3, 3, 1], strides=[1, 2, 2, 1], padding='VALID')
    CONVOL2 = tf.nn.dropout(CONVOL2, 0.9)
                           
    # Layer 3
    CONVOL3_W = tf.Variable(tf.truncated_normal(shape=(3, 3, 16, 32), mean = mu, stddev = sigma))
    CONVOL3_b = tf.Variable(tf.zeros(32))
    CONVOL3   = tf.nn.conv2d(CONVOL2, CONVOL3_W, strides=[1, 1, 1, 1], padding='VALID') + CONVOL3_b
    CONVOL3 = tf.nn.relu(CONVOL3)
    CONVOL3 = tf.nn.max_pool(CONVOL3, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID')
    CONVOL3 = tf.nn.dropout(CONVOL3, 0.9)
    
    # Layer 4       
    DENSE0  = tf.contrib.layers.flatten(CONVOL3)
    DENSE1_W  = tf.Variable(tf.truncated_normal(shape=(1152, 512), mean = mu, stddev = sigma))
    DENSE1_b  = tf.Variable(tf.zeros(512))
    DENSE1    = tf.matmul(DENSE0, DENSE1_W) + DENSE1_b
    DENSE1    = tf.nn.relu(DENSE1)
    DENSE1    = tf.nn.dropout(DENSE1, 0.6)
    
    # Later 5
    DENSE2_W  = tf.Variable(tf.truncated_normal(shape=(512, 256), mean = mu, stddev = sigma))
    DENSE2_b  = tf.Variable(tf.zeros(256))
    DENSE2    = tf.matmul(DENSE1, DENSE2_W) + DENSE2_b
    DENSE2    = tf.nn.relu(DENSE2)
    DENSE2    = tf.nn.dropout(DENSE2, 0.6)
    
    # Layer 6
    DENSE3_W  = tf.Variable(tf.truncated_normal(shape=(256, 128), mean = mu, stddev = sigma))
    DENSE3_b  = tf.Variable(tf.zeros(128))
    DENSE3 = tf.matmul(DENSE2, DENSE3_W) + DENSE3_b
    DENSE3 = tf.nn.relu(DENSE3)
    DENSE3 = tf.nn.dropout(DENSE3, 0.6)
    
    # Layer 7
    DENSE4_W  = tf.Variable(tf.truncated_normal(shape=(128, 43), mean = mu, stddev = sigma))
    DENSE4_b  = tf.Variable(tf.zeros(43))
    logits = tf.matmul(DENSE3, DENSE4_W) + DENSE4_b
    
    return logits

In [None]:
rate = 1e-3
decay = 1e-5*2

x = tf.placeholder(tf.float32, (None, 32,32,1))
y = tf.placeholder(tf.int32, (None))
one_hot_y = tf.one_hot(y,43)

logits = CNN_architecture(x)
cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits=logits, labels=one_hot_y)
loss_operation = tf.reduce_mean(cross_entropy)
optimizer = tf.train.AdamOptimizer(learning_rate = rate)
training_operation = optimizer.minimize(loss_operation)

In [None]:
correct_prediction = tf.equal(tf.argmax(logits, 1), tf.argmax(one_hot_y, 1))
accuracy_operation = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
saver = tf.train.Saver()

def evaluate(X_data, y_data):
    num_examples = len(X_data)
    total_accuracy = 0
    sess = tf.get_default_session()
    for offset in range(0, num_examples, BATCH_SIZE):
        batch_x, batch_y = X_data[offset:offset+BATCH_SIZE], y_data[offset:offset+BATCH_SIZE]
        accuracy = sess.run(accuracy_operation, feed_dict={x: batch_x, y: batch_y})
        total_accuracy += (accuracy * len(batch_x))
    return total_accuracy / num_examples

For monitoring the training training and validation accuracy will be stored and printed during the process.

In [None]:
t_acc = []
v_acc = []

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    num_examples = len(X_train)
    
    print("Training...")
    print()
    for i in range(EPOCHS):
        # shuffle
        X_train, y_train = shuffle(X_train, y_train)
        for offset in range(0, num_examples, BATCH_SIZE):
            end = offset + BATCH_SIZE
            batch_x, batch_y = X_train[offset:end], y_train[offset:end]
            sess.run(training_operation, feed_dict={x: batch_x, y: batch_y})
        
        
        train_accuracy = evaluate(X_train, y_train)
        t_acc.append(round(train_accuracy,3))
        validation_accuracy = evaluate(X_valid, y_valid)
        v_acc.append(round(validation_accuracy,3))
        print("EPOCH {} ...".format(i+1))
        print("Training Accuracy   = {:.3f}".format(train_accuracy))
        print("Validation Accuracy = {:.3f}".format(validation_accuracy))
        print()
        
        #reduce rate
        rate -= decay
        optimizer = tf.train.AdamOptimizer(learning_rate = rate)

        
    saver.save(sess, './cnn_model')
    print("Model saved")

With training done, I can now plot the training and validation accuary over Epochs.

In [None]:
plt.plot(t_acc, label='Training Accuracy')
plt.plot(v_acc, label='Validation Accuracy')
plt.legend()
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.show()

### Train, Validate and Test the Model

A final rundown on the model performance:

In [None]:
with tf.Session() as sess:
    saver.restore(sess, tf.train.latest_checkpoint('.'))

    train_accuracy = evaluate(X_train, y_train)
    print("Train Accuracy = {:.3f}".format(train_accuracy))
    
    valid_accuracy = evaluate(X_valid, y_valid)
    print("Valid Accuracy = {:.3f}".format(valid_accuracy))    
    
    test_accuracy = evaluate(X_test, y_test)
    print("Test Accuracy = {:.3f}".format(test_accuracy))

---

## Step 3: Test a Model on New Images

To highlight the performance of the model I searched for some traffic sign images and formated them according to the model input (32x32x3). The target class was then taken from the appending signnames list. 

In [None]:
import pandas as pd
signnames = pd.read_csv('signnames.csv')
signnames.head()

In [None]:
custom_imgs = [mpimg.imread(f'custom_data/{i}.jpg') for i in range(1,6)]
custom_imgs_copy = [img.copy() for img in custom_imgs]
    
labels = [
    '28',
    '40',
    '14',
    '13',
    '1'
]

print('Custom sample images with Label')

fig, ax = plt.subplots(1,5, dpi=160, figsize=(5,5))
for i in range(5):
    ax[i].axis('off')
    cv2.putText(custom_imgs[i], labels[i],(15, 10),cv2.FONT_HERSHEY_SIMPLEX,0.4,(0,255,0),1)
    ax[i].imshow(custom_imgs[i])    
plt.show()



In [None]:
custom_imgs_edit = [scale(contrast_equalization(i)[:,:,0]) for i in custom_imgs_copy]

print('Final input for CNN')

fig, ax = plt.subplots(1,5, dpi=160, figsize=(5,5))
for i in range(5):
    ax[i].axis('off')
    ax[i].imshow(custom_imgs_edit[i], cmap='gray')    
plt.show()

In [None]:
imgs_top_5 = []

for i in range(5):
    with tf.Session() as sess:
        sess.run(tf.global_variables_initializer())
        saver.restore(sess, "./cnn_model")
        top_5 = sess.run(tf.nn.top_k(tf.nn.softmax(logits), k=5), feed_dict={x: [custom_imgs_edit[i].reshape(32, 32, 1)]})
        imgs_top_5.append([
            custom_imgs_edit[i],
            top_5
        ])

In [None]:
fig, ax = plt.subplots(1,5, dpi=160, figsize=(7,7))
for i in range(5):
    ax[i].axis('off')
    ax[i].imshow(custom_imgs[i])    

fig, ax = plt.subplots(1,5, dpi=160, figsize=(7,7))
for i in range(5):
    ax[i].barh(np.array(imgs_top_5[i][1].indices[0], str), imgs_top_5[i][1].values[0])
    asp = np.diff(ax[i].get_xlim())[0] / np.diff(ax[i].get_ylim())[0]
    ax[i].set_aspect(asp)
    ax[i].set_xlabel('Prediction', fontsize = 5.0)
    for tick in ax[i].xaxis.get_major_ticks():
                tick.label.set_fontsize(5) 
    for tick in ax[i].yaxis.get_major_ticks():
                tick.label.set_fontsize(5)
plt.show()

As can be seen, the model identified 80% of the custom images correctly with a high confidence window on all but the wrongly identified one. I was actually surprised that it was mistaken for a 60 km/h sign, because obviously its not round. In the original data this sign was not one of the ones that occured that often, so maybe its features were not extracted well in the data augmentation process. However the roundabout sign was also pretty underrepresented in the training data and was no challenge at all for the model. Maybe its triangular shape was not detected due to the perspective of the image. 

I believe some improvements could still be made in data processing, as well as data augmentation. Those were pretty weak considering real life aquisition of the data. 

Also I believe the CNN programming done is not really state of the art. Initially I used the Keras environment within a newer implementation of Tensorflow on my local machine and got better results faster with a smaller neural net with much less code. The accuracy of Keras implementation was around 97% and featured much less code, due to all of the custom function and pipeline are implemented from scratch. I was not able to reproduce the same because I am not that familiar with this older Tensorflow version, but gladly I barely passed the required 93% accuracy :).

In [None]:
## Keras Classifier

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Activation, Flatten
from tensorflow.keras.layers import CONVOL3D, MaxPooling2D

class CNN_model:

	def __init__(x_shape, y_shape):

		self.model = Sequential()

		# Layer 1
		self.model.add(CONVOL3D(256,(3,3), activation='relu', padding='same', input_shape=x_shape))
		self.model.add(MaxPooling2D(pool_size(2,2)))
		self.model.add(Dropout(0.2))

		# Layer 2
		self.model.add(CONVOL3D(256,(3,3), activation='relu', padding='same'))
		self.model.add(MaxPooling2D(pool_size(2,2)))
		self.model.add(Dropout(0.2))

		# Layer 3
		self.model.add(Flatten())
		self.model.add(Dense(256, activation='relu'))

		# Layer 4
		self.model.add(Dense(y_shape, activation='softmax'))


		# Model compile
		self.opt = tf.keras.optimizers.Adam(lr=0.001, decay=1e-6)

		self.model.compile(loss='sparse_categorical_crossentropy',
		              optimizer=self.opt,
		              metrics=['accuracy'])


	def fit_model(self, x, y, x_validation, y_validation, BATCH_SIZE = 256, EPOCHS=20):

		self.model.fit(x,y, 
			batch_size=BATCH_SIZE, 
			epochs=EPOCHS,
			validation_data=(x_validation, y_validation))

	def evaluate_model(self):

		plt.plot(history.history['accuracy'], label='accuracy')
		plt.plot(history.history['val_accuracy'], label = 'val_accuracy')
		plt.xlabel('Epoch')
		plt.ylabel('Accuracy')
		plt.ylim([0.5, 1])
		plt.legend(loc='lower right')

		test_loss, test_acc = model.evaluate(test_images,  test_labels, verbose=2)

