# Traffic-sign classification with Convolutional Neural Network (CNN)

### **Traffic sign classification task**
The task is to categorize $40 \times 40$ RGB pixelspace input images into 43 possible traffic sign categories: $h: \mathbb{R}^{4800} \mapsto \lbrace 0,1,\dots,42 \rbrace$. Due to the three color channels we have a $40 \times 40 \times 3 = 4800$ dimensional input.

The used dataset consists of more than 50000 images total. A detailed description about the dataset can be found via [this link](http://benchmark.ini.rub.de/?section=gtsrb&subsection=dataset). An already pre-processed version of the images resized to 40x40 will be used.

In this task, convolutional neural networks (CNN-s) usually perform at around 90-95% accuracy. The goal is to try training a network that performs better than humans (98.9% on the original GTSRB).

### **import dependencies**

In [None]:
# importing the necessary libraries
import numpy as np # for linear algebra
import tensorflow as tf # for neural models
import time # for measuring time
import zipfile # for handling zip archives
import PIL # for image handling
from keras import backend as K
import os

In [None]:
import matplotlib.pyplot as plt # for plotting data
import plotly
import plotly.graph_objs as go

### get data

In [None]:
# downloading & unzipping "GTSRB 40x40" dataset
!gdown https://drive.google.com/uc?id=1yAe6Qjdpsw2PNcU0fxa2Ak8_JyKKPtyj
local_zip = '/content/GTSRB_40x40.zip'
zip_ref = zipfile.ZipFile(local_zip, 'r')
zip_ref.extractall('/content')
zip_ref.close()

In [None]:
# definitions
IMG_SIZE = (40, 40)
IMG_SHAPE = (40, 40, 3)
NOF_CLASSES = 43

TRAIN_SET_PATH = '/content/GTSRB_40x40/training_set_40x40'
TRAIN_SET_SIZE = 39209
TRAIN_BATCH_SIZE = 256

DEV_RATIO = 0.1  # 3920 dev images are enough
DEV_BATCH_SIZE = 256

TEST_SET_PATH = '/content/GTSRB_40x40/test_set_40x40'
TEST_SET_SIZE = 12630
TEST_BATCH_SIZE = 256

In [None]:
def plot_images(images, true_labels=None, predicted_labels=None):
    assert isinstance(images, (list, tuple, np.ndarray))
    class_names = ['SL 20km/h', 'SL 30km/h', 'SL 50km/h', 'SL 60km/h', 'SL 70km/h', 'SL 80km/h', 'End of SL 80km/h', 'SL 100km/h', 'SL 120km/h', 'No passing',
                   'No passing over 3.5t', 'Right-of-way', 'Priority road', 'Yield', 'Stop', 'No vehicles', 'Prohibited over 3.5t', 'No entry', 'General caution', 'Dangerous curve left',
                   'Dangerous curve right', 'Double curve', 'Bumpy road', 'Slippery road', 'Road narrows (right)', 'Road work', 'Traffic signals', 'Pedestrians', 'Children crossing', 'Bicycles crossing',
                   'Beware of ice/snow', 'Wild animals crossing', 'End of all s&p limits', 'Turn right ahead', 'Turn left ahead', 'Ahead only', 'Go straight or right', 'Go straight or left',
                   'Keep right', 'Keep left', 'Roundabout mandatory', 'End of no passing', 'End no passing (3.5t)']
    if true_labels is not None:
        assert len(images) == len(true_labels)
        true_label_idxs = true_labels.ravel().tolist()
        true_titles = [class_names[i] for i in true_label_idxs]
    if predicted_labels is not None:
        assert len(images) == len(predicted_labels)
        pred_label_idxs = predicted_labels.ravel().tolist()
        pred_titles = [class_names[i] for i in pred_label_idxs]
    
    cols = min(10, len(images))
    rows = 1 + (len(images)-1)//cols
    plt.figure(figsize=(1.3*cols, 1.3*rows))
    for n, image in enumerate(images):
        plt.subplot(rows, cols, n+1)
        plt.xticks([], [])
        plt.yticks([], [])
        if predicted_labels is None:
            if true_labels is not None:  # case 1 - only true labels
                plt.xlabel(f'{true_titles[n]}', size=14, c='black')
        else:
            if true_labels is not None:  # case 2 - both true and predicted labels
                plt.xlabel(f'{pred_titles[n]}', size=14, c='green' if true_label_idxs[n]==pred_label_idxs[n] else 'red')
            else:                        # case 3 - only predicted labels
                plt.xlabel(f'{pred_titles[n]}', size=14, c='blue')
        plt.imshow(image)
    plt.tight_layout()
    plt.show()

In [None]:
# original image
img_arr = np.array(PIL.Image.open('/content/GTSRB_40x40/training_set_40x40/00000/00000_00000.ppm'))
img_arr = np.expand_dims(img_arr, axis=0)
print(img_arr.shape)
plot_images(img_arr)

In [None]:
# flow training images in batches using train_datagen generator
train_datagen = tf.keras.preprocessing.image.ImageDataGenerator(rescale=1./255, validation_split=DEV_RATIO)

train_generator = train_datagen.flow_from_directory(
    TRAIN_SET_PATH,
    target_size=IMG_SIZE,
    batch_size=TRAIN_BATCH_SIZE,
    class_mode='sparse',
    subset='training') # set as training data

dev_generator = train_datagen.flow_from_directory(
    TRAIN_SET_PATH,
    target_size=IMG_SIZE,
    batch_size=DEV_BATCH_SIZE,
    class_mode='sparse',
    shuffle=False,
    subset='validation') # set as dev data

test_generator = tf.keras.preprocessing.image.ImageDataGenerator(rescale=1./255).flow_from_directory(
    TEST_SET_PATH,
    target_size=IMG_SIZE,
    batch_size=TEST_BATCH_SIZE,
    class_mode='sparse',
    shuffle=False)

In [None]:
augmented_train_datagen = tf.keras.preprocessing.image.ImageDataGenerator( rescale=1./255, validation_split=DEV_RATIO,
                                           rotation_range=15,
                                           width_shift_range=0.3,
                                           height_shift_range=0.3,
                                           shear_range=0.2,
                                           zoom_range=[0.8, 1.5],
                                           horizontal_flip=False,
                                           vertical_flip=False,
                                           fill_mode='nearest',
                                           data_format='channels_last',
                                           brightness_range=[0.5, 1.5])


augmented_train_generator = augmented_train_datagen.flow_from_directory(
    TRAIN_SET_PATH,
    target_size=IMG_SIZE,
    batch_size=TRAIN_BATCH_SIZE,
    class_mode='sparse',
    subset='training') # set as training data

augmented_dev_generator = augmented_train_datagen.flow_from_directory(
    TRAIN_SET_PATH,
    target_size=IMG_SIZE,
    batch_size=DEV_BATCH_SIZE,
    class_mode='sparse',
    shuffle=False,
    subset='validation') # set as dev data

augmented_test_generator = tf.keras.preprocessing.image.ImageDataGenerator( rescale=1./255,
                                           rotation_range=15,
                                           width_shift_range=0.3,
                                           height_shift_range=0.3,
                                           shear_range=0.2,
                                           zoom_range=[0.8, 1.5],
                                           horizontal_flip=False,
                                           vertical_flip=False,
                                           fill_mode='nearest',
                                           data_format='channels_last',
                                           brightness_range=[0.5, 1.5]).flow_from_directory(
    TEST_SET_PATH,
    target_size=IMG_SIZE,
    batch_size=TEST_BATCH_SIZE,
    class_mode='sparse',
    shuffle=False)

In [None]:
def augment_plot_pics(datagen, orig_img):
    dir_augmented_data = "preview"
    try:
        # create preview folder if does not exist, 
        os.mkdir(dir_augmented_data)
    except:
        # remove preview folder if exists 
        # the contents (pictures) in the folder
        for item in os.listdir(dir_augmented_data):
            os.remove(dir_augmented_data + "/" + item)
    # convert original image to array
    x = tf.keras.preprocessing.image.img_to_array(orig_img)
    # reshape (sample, nrow, ncol, 3) 3 = R, G or B
    x = x.reshape((1,) + x.shape)

    # randomly generate pictures

    i = 0
    Nplot = 8
    for batch in datagen.flow(x,batch_size=1,
                          save_to_dir=dir_augmented_data,
                          save_prefix="pic",
                          save_format='jpeg'):
        i += 1
        if i > Nplot - 1: # generate 8 pictures 
            break

    # plot the generated data

    fig = plt.figure(figsize=(8, 6))
    fig.subplots_adjust(hspace=0.02,wspace=0.01,
                    left=0,right=1,bottom=0, top=1)
    # original picture
    ax = fig.add_subplot(3, 3, 1,xticks=[],yticks=[])        
    ax.imshow(orig_img)
    ax.set_title("original")
    i = 2
    for imgnm in os.listdir(dir_augmented_data):
        ax = fig.add_subplot(3, 3, i,xticks=[],yticks=[]) 
        img = tf.keras.preprocessing.image.load_img(dir_augmented_data + "/" + imgnm)
        ax.imshow(img)
        i += 1
    plt.show()

In [None]:
orig_img = tf.keras.preprocessing.image.load_img("/content/GTSRB_40x40/training_set_40x40/00000/00000_00000.ppm")
augment_plot_pics(augmented_train_datagen, orig_img)

In [None]:
# plotting histogram of the categories
bins = np.arange(-0.5, 43, 1)
fig, axs = plt.subplots(nrows=1, ncols=3, figsize=(24,3))

# histogram of the data
for ax, setname, setlabels in zip(axs,
                                  ['Training', 'Devval', 'Test'],
                                  [train_generator.labels, dev_generator.labels, test_generator.labels]):
    ax.hist(setlabels, bins=bins, density=True, rwidth=0.9)
    ax.set_xlabel('Traffic sign type')
    ax.set_xticks(np.arange(0, 43, 2))
    ax.set_ylabel('Probability mass')
    ax.set_title(f'Histogram of {setlabels.size} {setname} labels')

# tweak spacing to prevent clipping of y-label
fig.tight_layout()
plt.show()

In [None]:
# plotting histogram of the augmented categories
bins = np.arange(-0.5, 43, 1)
fig, axs = plt.subplots(nrows=1, ncols=3, figsize=(24,3))

# histogram of the data
for ax, setname, setlabels in zip(axs,
                                  ['Training', 'Devval', 'Test'],
                                  [augmented_train_generator.labels, augmented_dev_generator.labels, augmented_test_generator.labels]):
    ax.hist(setlabels, bins=bins, density=True, rwidth=0.9)
    ax.set_xlabel('Traffic sign type')
    ax.set_xticks(np.arange(0, 43, 2))
    ax.set_ylabel('Probability mass')
    ax.set_title(f'Histogram of {setlabels.size} {setname} labels')

# tweak spacing to prevent clipping of y-label
fig.tight_layout()
plt.show()

# Convolutional Neural Network (CNN)

In [None]:
# defining the model
def build_model():

    model = tf.keras.models.Sequential([
        tf.keras.layers.Conv2D(64, (3,3), activation='relu', input_shape=IMG_SHAPE, name='LAYER_01_cnv'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Conv2D(64, (3,3), activation='relu', name='LAYER_02_cnv'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.MaxPooling2D(2,2),
        tf.keras.layers.Conv2D(128, (3,3), activation='relu', name='LAYER_03_cnv'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Conv2D(128, (3,3), activation='relu', name='LAYER_04_cnv'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.MaxPooling2D(2,2),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(384, activation='relu', name='LAYER_05_fc'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Dropout(rate=0.75),
        tf.keras.layers.Dense(NOF_CLASSES, activation='softmax', name='LAYER_06_fc')
    ])

    model.summary()

    model.compile(loss='sparse_categorical_crossentropy',
                  optimizer=tf.keras.optimizers.Adam(lr=0.001, decay=0.001),
                  metrics=['acc'])
    return model

In [None]:
model = build_model()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
LAYER_01_cnv (Conv2D)        (None, 38, 38, 64)        1792      
_________________________________________________________________
batch_normalization (BatchNo (None, 38, 38, 64)        256       
_________________________________________________________________
LAYER_02_cnv (Conv2D)        (None, 36, 36, 64)        36928     
_________________________________________________________________
batch_normalization_1 (Batch (None, 36, 36, 64)        256       
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 18, 18, 64)        0         
_________________________________________________________________
LAYER_03_cnv (Conv2D)        (None, 16, 16, 128)       73856     
_________________________________________________________________
batch_normalization_2 (Batch (None, 16, 16, 128)       5

In [None]:
history = model.fit(
    augmented_train_generator,
    steps_per_epoch = augmented_train_generator.samples // TRAIN_BATCH_SIZE,
    validation_data = augmented_dev_generator, 
    validation_steps = augmented_dev_generator.samples // DEV_BATCH_SIZE,
    epochs=2,
    verbose=1,
    shuffle=True)

In [None]:
def plot_progress(history=None, main_title="", trivial_model_acc=0.1):
    title_text = main_title + "<br>" if main_title else ""
    c = plotly.colors.qualitative.Plotly
    min_acc_text = f"[trivial.acc: {100*trivial_model_acc:.2f}%]"
    fig = go.Figure()
    if history is not None:
        train_loss = history.history['loss']
        valid_loss = history.history['val_loss']
        train_acc = history.history['acc']
        valid_acc = history.history['val_acc']
        if train_acc is not None:
            title_text += f"[train.acc: {100*train_acc[-1]:6.2f}%] "
        if valid_acc is not None:
            title_text += f"[val.acc: <b>{100*valid_acc[-1]:6.2f}%</b>] "
        title_text += min_acc_text
        fig.add_trace(go.Scatter(name="train loss", x=list(range(1, len(train_loss)+1)), y=train_loss, mode="lines", line=dict(color=c[2], dash='dash')))
        fig.add_trace(go.Scatter(name="valid loss", x=list(range(1, len(valid_loss)+1)), y=valid_loss, mode="lines", line=dict(color=c[2])))
        fig.add_trace(go.Scatter(name="train ACC", x=list(range(1, len(train_acc)+1)), y=train_acc, mode="lines", line=dict(color=c[6], dash='dash')))
        fig.add_trace(go.Scatter(name="valid ACC", x=list(range(1, len(valid_acc)+1)), y=valid_acc, mode="lines", line=dict(color=c[6]))) 
    fig.update_layout(title_text=title_text, width=800)
    fig.update_xaxes(title="epoch")
    fig.update_yaxes(range=[0,1])
    fig.show()

In [None]:
plot_progress(history, trivial_model_acc=0.06)

In [None]:
# devval performance
model.evaluate(dev_generator, steps = dev_generator.samples // TEST_BATCH_SIZE)

In [None]:
# freezing layers for transfer learning (transfer from augmented to original distribution)
def freeze_model(model):
    for layer in model.layers:
        layer.trainable = False
    for layer in model.layers[-4:]:
        layer.trainable = True
        
    model.layers[-2].rate = 0.0 # lessened dropout rate

    model.compile(loss='sparse_categorical_crossentropy',
                    optimizer=tf.keras.optimizers.Adam(lr=0.001, decay=0.001),
                    metrics=['acc'])

In [None]:
# freee model
freeze_model(model)

In [None]:
# transfer learn
historyTR = model.fit(
    train_generator,
    steps_per_epoch = train_generator.samples // TRAIN_BATCH_SIZE,
    validation_data = dev_generator, 
    validation_steps = dev_generator.samples // DEV_BATCH_SIZE,
    epochs=10,
    verbose=1,
    shuffle=True)

In [None]:
plot_progress(historyTR, trivial_model_acc=0.06)

### Final test


In [None]:
# test performance
model.evaluate(test_generator, steps = test_generator.samples // TEST_BATCH_SIZE)



[0.07663962990045547, 0.9797512888908386]

### Visualizing the convolutions

In [None]:
def plot_convs(model, images, layernum=0, maxchannels=8):
    nof_images = images.shape[0]
    nof_channels = min(maxchannels, model.layers[layernum].output_shape[-1])
    fig, axarr = plt.subplots(nof_images, 1+nof_channels, figsize=(24, 3*nof_images))
    layers_outputs = [layer.output for layer in model.layers]
    activation_model = tf.keras.models.Model(inputs = model.input, outputs = layers_outputs)
    for i in range(nof_images):
        axarr[i,0].imshow(images[i])
        axarr[i,0].axis('off')
        for c in range(nof_channels):
            activation = activation_model.predict(images[i:i+1])[layernum]
            axarr[i,1+c].imshow(activation[0, : , :, c], cmap='inferno')
            axarr[i,1+c].axis('off')

In [None]:
nof_try_images = 10
try_images = train_generator.__getitem__(0)[0][:nof_try_images]
try_labels = train_generator.__getitem__(0)[1][:nof_try_images]
print(try_labels)
plot_convs(model, try_images, layernum=0)