# GTSRB Dataset

**Problem:** Implement a neural network model capable of identifying a german road sign as accuratelly as possible.

In this notebook we will start with a simple model and train it without any data augmentation. This allows us to get a base value for our accuracy, so that we can better grasp the improvements or regressions obtained from our methods.

In [None]:
%pip install tensorflow
%pip install opencv-python
%pip install tensorflow-addons
%pip install tensorflow-datasets

In [None]:
# Imports
import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras.layers import (
    Conv2D, 
    Conv2DTranspose, 
    BatchNormalization, 
    LeakyReLU, 
    Flatten, 
    Dense, 
    Reshape, 
    Input,
    Activation
)
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping,TensorBoard, ReduceLROnPlateau
from tensorflow.keras import Sequential
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.optimizers import Adam
from tensorflow.data import AUTOTUNE
import tensorflow_addons as tfa
import tensorflow_datasets as tfds

import pandas as pd

import matplotlib.pyplot as plt
import cv2


from sklearn.model_selection import train_test_split
import numpy as np
import seaborn as sn
from PIL import Image
import glob

import os

## Constants

In [None]:
HEIGHT = 39
WIDTH = 39
NUM_CHANNELS = 3 # Pictures are in RGB
BATCH_SIZE = 32
DATASET_PATH = '/kaggle/input/sinais/dataset'
NUM_CLASSES = 43
CLASS_NAMES = os.listdir(f'{DATASET_PATH}/test_images')
EPOCHS = 128

TRAIN_ONLINE = True
TRAIN_ONLINE_AUG = True
CONVERT_DATASET = False

## Auxiliary functions
These help with visualizing data and predictions

In [None]:
def plot_image(i, predictions_array, true_label, img):
  predictions_array, true_label, img = predictions_array, true_label[i], img[i]
  plt.grid(False)
  plt.xticks([])
  plt.yticks([])

  plt.imshow(img, cmap=plt.cm.binary)

  predicted_label = np.argmax(predictions_array)
  if predicted_label == true_label:
    color = 'blue'
  else:
    color = 'red'

  plt.xlabel("{} {:2.0f}% ({})".format(CLASS_NAMES[predicted_label],
                                100*np.max(predictions_array),
                                CLASS_NAMES[true_label]),
                                color=color)

def show_batch(image_batch, label_batch):
  columns = 8
  rows = BATCH_SIZE / columns + 1  
  plt.figure(figsize=(10, 2 * rows))
  for n in range(BATCH_SIZE):
      ax = plt.subplot(int(rows), columns, n+1)
      plt.imshow((image_batch[n]))
      plt.title(classNames[label_batch[n]==1][0])
      plt.axis('off')

def plot_value_array(i, predictions_array, true_label):
  predictions_array, true_label = predictions_array, true_label[i]
  plt.grid(False)
  plt.xticks(range(10))
  plt.yticks([])
  thisplot = plt.bar(range(10), predictions_array, color="#777777")
  plt.ylim([0, 1])
  predicted_label = np.argmax(predictions_array)

  thisplot[predicted_label].set_color('red')
  thisplot[true_label].set_color('blue')

# Plot the first X test images, their predicted labels, and the true labels.
# Color correct predictions in blue and incorrect predictions in red.
def plot_predictions(predictions, ground_truth, num_rows= 5, num_cols=3 ):

    num_images = num_rows*num_cols
    plt.figure(figsize=(2*2*num_cols, 2*num_rows))
    for i in range(num_images):
        plt.subplot(num_rows, 2*num_cols, 2*i+1)
        plot_image(i, predictions[i], ground_truth, x_test)
        plt.subplot(num_rows, 2*num_cols, 2*i+2)
        plot_value_array(i, predictions[i], ground_truth)
    plt.tight_layout()
    plt.show()

def show_history(history):
    print(history.history.keys())

    # summarize history for accuracy
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title('model accuracy')
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    plt.legend(['train', 'val'], loc='lower right')
    plt.show()
    # summarize history for loss
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('model loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train', 'val'], loc='upper right')
    plt.show()    

    
def show_confusion_matrix(model, dataset):

    all_labels = []
    all_preds = []

    for images , labels in dataset.take(-1):
        numpy_labels = labels.numpy()
        numpy_images = images.numpy()
        preds = model.predict(numpy_images, verbose=0)

        all_labels += [np.argmax(x) for x in numpy_labels]
        all_preds += [np.argmax(x) for x in preds]

    conf_mat = tf.math.confusion_matrix(all_labels, all_preds)

    df_cm = pd.DataFrame(conf_mat.numpy(), range(NUM_CLASSES), range(NUM_CLASSES))
    plt.figure(figsize=(15,10))
    sn.set(font_scale=1.4) # for label size
    sn.heatmap(df_cm, annot=True, annot_kws={"size": 16}, fmt='d') # font size

    plt.show()

    res_correct = {0:0, 1:0, 2:0, 3:0,4:0,5:0,6:0,7:0}
    res_incorrect = {0:0, 1:0, 2:0, 3:0,4:0,5:0,6:0,7:0}
    for i in range(len(all_preds)):
        if all_preds[i] == all_labels[i]:
            res_correct[all_labels[i]] += 1
        else:
            res_incorrect[all_labels[i]] += 1

    for i in range(len(res_correct)):

        print('class: ', i, ' total images: ', res_correct[i] + res_incorrect[i],' % correct: ', res_correct[i])

In [None]:
def prepare_callbacks(file_path):

    checkpointer = ModelCheckpoint(filepath= file_path, 
                               monitor = 'val_accuracy',
                               verbose=1, 
                               save_weights_only=True,
                               save_best_only=True)


    earlyStopper = EarlyStopping(monitor='val_loss', min_delta = 0.001, patience = 25, verbose = 1)

    reduceLR = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=25, min_lr=0.000000001, verbose = 1)

    return [checkpointer, earlyStopper, reduceLR]
    



## Convert dataset to PNG

For some reason tensorflow does not support PPM

In [None]:
if CONVERT_DATASET:
    for path in glob.iglob(f'{DATASET_PATH}/**/*.ppm', recursive=True):
        img = Image.open(path, mode='r')
        new_path = path.replace('.ppm', '.png')
        img.save(new_path)

## Load and prepare datasets

In [None]:
classNames = np.array(os.listdir(f'{DATASET_PATH}/train_images/GTSRB/Final_Training/Images',))

AUTOTUNE = tf.data.experimental.AUTOTUNE

dataset = tf.keras.utils.image_dataset_from_directory(
    directory=f'{DATASET_PATH}/train_images/GTSRB/Final_Training/Images',
    image_size=(WIDTH,HEIGHT),
    batch_size=BATCH_SIZE,
    shuffle=True,
    label_mode='categorical'
)

testset = tf.keras.utils.image_dataset_from_directory(
    directory=f'{DATASET_PATH}/test_images',
    image_size=(WIDTH,HEIGHT),
    batch_size=BATCH_SIZE,
    label_mode='categorical'
)

dataset_length = dataset.cardinality().numpy()

for image, label in dataset.take(1):
    print("Image shape : ",image.numpy().shape)
    print("Label : ", label.numpy())

print(dataset_length)

# Normalize image pixels
normalize = tf.keras.layers.Rescaling(1.0/255.0)

dataset = dataset.map(lambda x,y: (normalize(x),y))
test_ds = testset.map(lambda x,y: (normalize(x),y))

dataset = dataset.cache()
dataset = dataset.shuffle(buffer_size = dataset_length)
#dataset = dataset.batch(batch_size = BATCH_SIZE)
dataset = dataset.prefetch(buffer_size = 10000)

test_ds = test_ds.cache()
#test_ds = test_ds.shuffle(buffer_size = dataset_length)
#test_ds = test_ds.batch(batch_size = BATCH_SIZE)
test_ds = test_ds.prefetch(buffer_size = 10000)


# Validation Dataset

In [None]:
def split_val(dataset, dataset_length):
    train_size = int(0.8 * dataset_length / BATCH_SIZE)
    val_size = int(0.2 * dataset_length / BATCH_SIZE)

    train_ds = dataset.take(train_size)
    val_ds = dataset.skip(train_size)
    return train_ds, val_ds

train_ds, val_ds = split_val(dataset, dataset_length)

# Show info

In [None]:
image_batch, label_batch = next(iter(train_ds))        
show_batch(image_batch, label_batch.numpy())

## Get dataset info

In [None]:
image_batch, label_batch = next(iter(train_ds))
num_batches = train_ds.cardinality().numpy()

print(f'Image shape: {image_batch[0].numpy().shape}')
print(f'Label: {label_batch[0].numpy()}')
print(f'Number of batches: {num_batches}')

## Model creation

In [None]:
def model_BP(width, height, num_channels):
    model = Sequential()

    model.add(Input(shape=(width,height,num_channels)))

    model.add(Conv2D(64, (3,3), padding='same'))
    model.add(LeakyReLU(alpha=0.02))
    model.add(BatchNormalization())

    model.add(Conv2D(64, (3,3), padding='same'))
    model.add(LeakyReLU(alpha=0.02))
    model.add(BatchNormalization())

    model.add(Flatten())
    model.add(Dense(64))
    model.add(Dense(NUM_CLASSES, activation='softmax'))

    opt = Adam(learning_rate=1e-3)
    model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])

    print(model.summary())
    tf.keras.utils.plot_model(model, show_shapes=True)

    return model

# Train Model

In [None]:
if TRAIN_ONLINE:
    log_path = f"logs/"
    model = model_BP(WIDTH, HEIGHT, NUM_CHANNELS)
    callback = prepare_callbacks(log_path)
    hist = model.fit(train_ds, verbose=2, 
                     batch_size=BATCH_SIZE, epochs=EPOCHS, 
                     callbacks=callback, 
                     validation_data = val_ds)

# Results

In [None]:
results = model.evaluate(test_ds, verbose=2)

print(results)

show_history(hist)

In [None]:
def denoise_image(image, label):
    if len(image.shape) == 3:
        image = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
        
    denoised_image = cv2.fastNlMeansDenoising(image, None, h=10, templateWindowSize=7, searchWindowSize=21)
    
    if len(denoised_image.shape) == 2:
        denoised_image = np.expand_dims(denoised_image, axis=-1)
        
    return denoised_image, label

def augment_data(images, labels):
    augments_images = []
    augments_labels = []
    
    for image, label in zip(images, labels):
        if len(image.shape) == 3: 
            denoised_image = denoise_image(image)
            augments_images.append(denoised_image)
            augments_labels.append(label)
        
        augments_images.append(image)
        augments_labels.append(label)
                
    return np.array(augments_images), np.array(augments_labels)

In [None]:
print(tf.__version__)
s_ds = dataset
s_ds = s_ds.cache()
s_ds = s_ds.map(denoise_image)
s_ds = s_ds.prefetch(buffer_size = 10000)


