# Traffic Sign Classification

In [None]:
%matplotlib inline
from IPython.display import display, Markdown

# Disable GPU
import os
os.environ["CUDA_VISIBLE_DEVICES"]="-1"

import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import sklearn.metrics as sk_metrics

# Traffic sign specific code
import tsdata
import mlutil
import mlvis

print(f"TensorFlow version: {tf.__version__}")
print("GPUs availability: ", tf.config.experimental.list_physical_devices('GPU'))

## Configuration

In [None]:
task = "part_1a"

num_classes = 21

if task == "part_1a":
    # High performance model

    dpconfig = tsdata.DataPipelineConfig(
        target_width  = 32,
        target_height = 32,
        is_color_mode = True,
        augmentation  = "fliplr"
    )

    class_weight = None
    
    model = tf.keras.Sequential([
      tf.keras.layers.Flatten(input_shape = dpconfig.get_keras_input_shape()),
      tf.keras.layers.Dense(128, activation = 'relu'),
      tf.keras.layers.Dense(64,  activation = 'relu'),
      tf.keras.layers.Dense(num_classes)
    ])

    # Larger image classification models - overkill for this problem!

    # base_model = tf.keras.applications.ResNet50(weights = 'imagenet', include_top = False, input_shape = dpconfig.get_keras_input_shape()) 
    # x=base_model.output
    # x=tf.keras.layers.GlobalAveragePooling2D()(x)
    # x=tf.keras.layers.Dense(num_classes, activation = 'softmax')(x)
    # model=tf.keras.Model(base_model.input,x)

    # base_model = tf.keras.applications.NASNetMobile(weights = None, include_top = False, input_shape = dpconfig.get_keras_input_shape()) 
    # x=base_model.output
    # x=tf.keras.layers.GlobalAveragePooling2D()(x)
    # x=tf.keras.layers.Dense(num_classes, activation = 'softmax')(x)
    # model=tf.keras.Model(base_model.input,x)

    # model = tf.keras.applications.NASNetMobile(weights = None, include_top = True, input_shape = dpconfig.get_keras_input_shape(), classes = num_classes) 

elif task == "part_1b":
    # Smaller model

    dpconfig = tsdata.DataPipelineConfig(
        target_width  = 16,
        target_height = 16,
        is_color_mode = False,
        augmentation  = "fliplr"
    ) 
    
    class_weight = None

    model = tf.keras.Sequential([
      tf.keras.layers.Flatten(input_shape = dpconfig.get_keras_input_shape()),
      tf.keras.layers.Dense(64, activation = 'relu'),
      tf.keras.layers.Dense(num_classes)
    ])

elif task == "part_2":
    # Extra emphasis on left and right turn signs

    # Using two attempts to improve classification of these two signs:
    #   a) Getting more data, by making use of the fact that these two classes are horizontally mirrored.
    #      The actual augmentation happens in function "load_data_fresh" in "load.py".
    #   b) Increase the class weights for these two classes
        
    dpconfig = tsdata.DataPipelineConfig(
        target_width  = 32,
        target_height = 32,
        is_color_mode = True,
        augmentation  = "turnimprove"  # Make use of symmetries of the 2 classes we want to improve
    )    
        
    # Put stronger emphasis on the classes of interest
    class_weight = dict()
    for class_idx in range(num_classes):
      class_weight[class_idx] = 1.0
    class_weight[12] = 2.0
    class_weight[13] = 2.0
        
    model = tf.keras.Sequential([
      tf.keras.layers.Flatten(input_shape = dpconfig.get_keras_input_shape()),
      tf.keras.layers.Dense(128, activation = 'relu'),
      tf.keras.layers.Dense(64,  activation = 'relu'),
      tf.keras.layers.Dense(num_classes)
    ])

else:
    raise RuntimeError("Invalid task!")

## Load Data

In [None]:
train_images, train_labels = tsdata.load_data(dpconfig, "train")
test_images,  test_labels  = tsdata.load_data(dpconfig, "test")

class_names = tsdata.get_class_names()

num_classes_actual = len(class_names)

if num_classes_actual != num_classes:
    raise RuntimeError("Unexpected number of classes!")

## Explore the data

In [None]:
print(f"train_images.shape = {train_images.shape}")
print(f"len(train_labels)  = {len(train_labels)}")
print(f"test_images.shape  = {test_images.shape}")
print(f"len(test_labels)   = {len(test_labels)}")

### Visually inspect the data

In [None]:
plt.figure(figsize=(10,10))
for i in range(25):
    plt.subplot(5,5,i+1)
    plt.xticks([])
    plt.yticks([])
    plt.grid(False)
    plt.imshow(train_images[i], cmap = None if dpconfig.is_color_mode else 'gray')
    plt.xlabel(class_names[train_labels[i]])
plt.show()

## Train the model

In [None]:
model_family_name = mlutil.generate_model_family_name(dpconfig, friendly_name = task)

model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

callback_history     = tf.keras.callbacks.History()
callback_earlystop   = tf.keras.callbacks.EarlyStopping(monitor = "val_loss", patience = 20, mode = "min")
callback_checkpoint  = mlutil.get_model_checkpointer(model_family_name)

model.fit(train_images, train_labels, validation_split = 0.2, epochs = 200, class_weight = class_weight,
  callbacks = [callback_history, callback_earlystop, callback_checkpoint])

### Model selection: Choose optimum model based on validation loss

In [None]:
val_losses = np.asarray(callback_history.history["val_loss"])
optimum_epoch_num = np.argmin(val_losses) + 1

print(f"Optimum model: epoch = {optimum_epoch_num}, val_loss = {val_losses[optimum_epoch_num - 1]:.3f}")

print(f"Loading optimum model weights (epoch {optimum_epoch_num})...")
mlutil.load_model_epoch(model, model_family_name, optimum_epoch_num)

## Evaluate

### Show model summary / computational effort
For these simple models, the number of parameters correlates with the computational effort

In [None]:
model.summary()

### Compute model accuracy on test set

In [None]:
test_loss, test_acc = model.evaluate(test_images,  test_labels, verbose=2)

print(f"\nAccuracy on test set: {test_acc}")

### Make predictions

In [None]:
probability_model = tf.keras.Sequential([model, tf.keras.layers.Softmax()])
predictions = probability_model.predict(test_images)
labels_predicted = np.argmax(predictions, axis = 1)

### Generate class-wise report

In [None]:
print(sk_metrics.classification_report(test_labels, labels_predicted, target_names=class_names))

### Compute confusion matrix

In [None]:
confmat = tf.math.confusion_matrix(test_labels, labels_predicted, num_classes)
mlvis.plot_traffic_sign_confmat(confmat, normalize_by = "cols")

### Verify predictions

Manual inspection of some predictions:

In [None]:
i = 0
plt.figure(figsize=(15,3))
plt.subplot(1,2,1)
mlvis.plot_image(class_names, predictions[i], test_labels[i], test_images[i])
plt.subplot(1,2,2)
mlvis.plot_value_array(class_names, predictions[i],  test_labels[i])
plt.show()

In [None]:
i = 12
plt.figure(figsize=(15,3))
plt.subplot(1,2,1)
mlvis.plot_image(class_names, predictions[i], test_labels[i], test_images[i])
plt.subplot(1,2,2)
mlvis.plot_value_array(class_names, predictions[i],  test_labels[i])
plt.show()

Overview of more predictions:

In [None]:
# Plot the first X test images, their predicted labels, and the true labels.
# Color correct predictions in blue and incorrect predictions in red.
num_rows = 5
num_cols = 3
num_images = num_rows*num_cols
plt.figure(figsize=(2*2*num_cols, 2*num_rows))
for i in range(num_images):
  plt.subplot(num_rows, 2*num_cols, 2*i+1)
  mlvis.plot_image(class_names, predictions[i], test_labels[i], test_images[i])
  plt.subplot(num_rows, 2*num_cols, 2*i+2)
  mlvis.plot_value_array(class_names, predictions[i], test_labels[i])
plt.tight_layout()
plt.show()

## Further Improvement Ideas

* Use a **larger dataset**, e.g., the full dataset available at: http://benchmark.ini.rub.de/?section=gtsdb&subsection=dataset
* Apply more advanced **image enhancement**: gamma correction, histogram stretching, ...
* Do more **augmentation**: perspective transformation, rotation, ...
* Explore more **model architectures**, i.e., try out more advanced models that perform well in image classification tasks
  