# Traffic Sign Recognition with Tensorflow


This notebook is the first part of a tutorial to build a deep learning model for traffic sign recognition. The goal is to build a model that can detect and classify traffic signs in a video stream taken from a moving car. 


## First Objective: Traffic Sign Classification

I'll start with a simple goal: classifiction. Given an image of a traffic sign, our model should be able to tell it's type (e.g. Stop sign, speed limit, yield sign, ...etc.). We'll work with images that are properly cropped such that the traffic sign takes most of the image.


For this project, I'm using Pythong 3.5, Tensorflow 0.11, Numpy, Sci-kit Image, and Matplotlib. All pretty standard tools in machine learning. For convenience, I've created a docker image that contains the most common deep learning tools in one place here: https://hub.docker.com/r/waleedka/modern-deep-learning/ . You can run it with this command:

```
docker run -it -p 8888:8888 -p 6006:6006 -v ~/traffic:/traffic waleedka/modern-deep-learning
```

Note that I have the files of this project in the ~/traffic directory, and I'm it to /traffic directory in the Docker container. Modify this if you're using a different directory.

First step, let's import the needed libraries and get that out of the way.

In [0]:
import os
import random
import skimage.data
import skimage.transform
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf

# Allow image embeding in notebook
%matplotlib inline

## Trainging Dataset

We're using the Belgian Traffic Sign Dataset. Go to http://btsd.ethz.ch/shareddata/ and download the training and test data. There is a lot of datasets on that page, but you only need the two files listed under **BelgiumTS for Classification (cropped images)**":	
* BelgiumTSC_Training (171.3MBytes)
* BelgiumTSC_Testing (76.5MBytes)

After downloading and expanding the files, your directory structure should look something like this:

```
/traffic/datasets/BelgiumTS/Training/
/traffic/datasets/BelgiumTS/Testing/
```

Each of the two directories above has 62 sub-directories named sequentially from 00000 to 00062. The directory name represents the code (or label) and the images inside the directory are examples of that label. 

## Parse and Load the Training Data

The **Training** directory contains sub-directories with sequental numerical names from 00000 to 00061. The name of the directory represents the labels from 0 to 61, and the images in each directory represent the traffic signs that belong to that label. The images are saved in the not-so-common .ppm format, but luckily, this format is supported in the skimage library.

In [0]:
!apt-get install -y -qq software-properties-common python-software-properties module-init-tools
!add-apt-repository -y ppa:alessandro-strada/ppa 2>&1 > /dev/null
!apt-get update -qq 2>&1 > /dev/null
!apt-get -y install -qq google-drive-ocamlfuse fuse

In [0]:
from google.colab import auth
auth.authenticate_user()

In [0]:
from oauth2client.client import GoogleCredentials
creds = GoogleCredentials.get_application_default()
import getpass
!google-drive-ocamlfuse -headless -id={creds.client_id} -secret={creds.client_secret} < /dev/null 2>&1 | grep URL
vcode = getpass.getpass()
!echo {vcode} | google-drive-ocamlfuse -headless -id={creds.client_id} -secret={creds.client_secret}

In [0]:
!mkdir -p drive
!google-drive-ocamlfuse drive

In [0]:
print('Files in Drive:')
!ls drive/

In [0]:
def load_data(data_dir):
    """Loads a data set and returns two lists:
    
    images: a list of Numpy arrays, each representing an image.
    labels: a list of numbers that represent the images labels.
    """
    # Get all subdirectories of data_dir. Each represents a label.
    directories = [d for d in os.listdir(data_dir) 
                   if os.path.isdir(os.path.join(data_dir, d))]
    # Loop through the label directories and collect the data in
    # two lists, labels and images.
    labels = []
    images = []
    for d in directories:
        label_dir = os.path.join(data_dir, d)
        file_names = [os.path.join(label_dir, f) 
                      for f in os.listdir(label_dir) if f.endswith(".ppm")]
        # For each label, load it's images and add them to the images list.
        # And add the label number (i.e. directory name) to the labels list.
        for f in file_names:
            images.append(skimage.data.imread(f))
            labels.append(int(d))
    return images, labels


# Load training and testing datasets.
#ROOT_PATH = "drive"
train_data_dir = os.path.join("drive/VOLVO/BelgiumTSC_Training/Training")
test_data_dir = os.path.join("drive/VOLVO/BelgiumTSC_Testing/Testing")

images, labels = load_data(train_data_dir)

**Produce vendalised images**

In [0]:
for i in range(len(images)):
  images[i] = np.fliplr(images[i])
  

Here we're loading two lists:
* **images** a list of images, each image is represted by a numpy array.
* **labels** a list of labels. Integers with values between 0 and 61.


It's not usually a good idea to load the whole dataset into memory, but this dataset is small and we're trying to keep the code simple, so it's okay for now. We'll improve it in the next part. For larger datasets, we'd want to have a separate thread loading chunks of data in the background and feeding them to the training thread. 

## Explore the Dataset

How many images and labels do we have?

In [0]:
print("Unique Labels: {0}\nTotal Images: {1}".format(len(set(labels)), len(images)))

Display the first image of each label.

In [0]:
def display_images_and_labels(images, labels):
    """Display the first image of each label."""
    unique_labels = set(labels)
    plt.figure(figsize=(15, 15))
    i = 1
    for label in unique_labels:
        # Pick the first image for each label.
        image = images[labels.index(label)]
        plt.subplot(8, 8, i)  # A grid of 8 rows x 8 columns
        plt.axis('off')
        plt.title("Label {0} ({1})".format(label, labels.count(label)))
        i += 1
        _ = plt.imshow(image)
    plt.show()

display_images_and_labels(images, labels)

That looks great! The traffic signs occupy most of the area of each image, which is going to make our job easier: we don't have to look for the sign in the image. And we have a variety of angles and lighting conditions, which will help our model generalize. 

However, although the images are square-ish, they're not all the same size. They have different aspect ratios. Our simple neural network takes a fixed-size input, so we have a bit of pre-processing to do. We'll get to that soon, but first let's pick a label and see more of it's images. Let's pick label 32:

In [0]:
def display_label_images(images, label):
    """Display images of a specific label."""
    limit = 24  # show a max of 24 images
    plt.figure(figsize=(15, 5))
    i = 1

    start = labels.index(label)
    end = start + labels.count(label)
    for image in images[start:end][:limit]:
        plt.subplot(3, 8, i)  # 3 rows, 8 per row
        plt.axis('off')
        i += 1
        plt.imshow(image)
    plt.show()

display_label_images(images, 21)

Interesting! It looks like our dataset considers all speeding limit signs to be of the same class regardless of the numbers on them. That's fine, as long as we know about it beforehand and don't let it confuse us later when the output doesn't match our expectation. 

I'll leave exploring other labels as an exercise for you, edit the code above and check other labels. Make sure to check Labels 26 and 27. They also have numbers in a red circle, so our model will have to get really good to differentiate between these 3 classes.

## Handling images of different sizes?

Most neural networks expect a fixed-size input, and our network is no exception. But as we've seen above, our images are not all the same size. A common approach is to crop and pad the images to a selected apect ratio, but then we have to make sure that we don't cut-off parts of the traffic signs in the process. That seems like it might require manual work! Let's do a simpler solution instead (a hack really): We'll just resize the images to a fixed size and ignore the distortions caused by the different aspect ratios. A person can easily recognize a traffic sign even if it's compressed or stretched a bit, so we hope that our model can as well. 

And while we're at it, let's make the images smaller. The larger the input data, the larger the model, and the slower it is to train. In the early stages of development we want fast training to avoid long waits between iterations while we change the code rapidly. 

What are the sizes of our image anyway?

In [0]:
for image in images[:5]:
    print("shape: {0}, min: {1}, max: {2}".format(image.shape, image.min(), image.max()))

The sizes seem to hover around 128x128. If we resize them to, say, 32x32, we'll have reduced the data and the model size by a factor of 16. And 32x32 is probably still big enough to recognize the signs, so let's go with that. 

I'm also in the habit of frequently printing the min() and max() values. It's a simple way to verify the range of your data and catch bugs early.

In [0]:
# Resize images
images32 = [skimage.transform.resize(image, (32, 32), mode='constant')
                for image in images]
display_images_and_labels(images32, labels)

The 32x32 images are not as sharp but still recognizable. Note that the display above shows the images larger than their real size because the matplotlib library tries to fit them to the grid size. Let's print the sizes of a few images to verify that we got it right.

In [0]:
for image in images32[:5]:
    print("shape: {0}, min: {1}, max: {2}".format(image.shape, image.min(), image.max()))

The sizes are correct. But check the min and max values! They now range from 0 to 1.0, which is different from the 0-255 range we saw above. The resizing function did that transformation for us. Normalizing values to the range 0.0-1.0 is very common so we'll keep it. But remember to multiply by 255 if you later want to convert the images back to the normal 0-255 range.

**Random crop of images**
*Not sure how it works*

In [0]:
#@title Default title text
# TensorFlow. 'x' = A placeholder for an image.
#original_size = [height, width, channels]
#x = tf.placeholder(dtype = tf.float32, shape = original_size)
# Use the following commands to perform random crops
#crop_size = [new_height, new_width, channels]
#seed = np.random.randint(1234)
#x = tf.random_crop(x, size = crop_size, seed = seed)
#output = tf.images.resize_images(x, size = original_size)

# Minimum Viable Model

In [0]:
labels_a = np.array(labels)
images_a = np.array(images32)
print("labels: ", labels_a.shape, "\nimages: ", images_a.shape)

In [0]:
# Function to create the model.
tf.reset_default_graph()
def create_model():
    with tf.device('/device:GPU:0'):
        # Placeholders for inputs and labels.
        images_ph = tf.placeholder(tf.float32, [None, 32, 32, 3])
        labels_ph = tf.placeholder(tf.int32, [None])
        dropout_ph = tf.placeholder(tf.float32, ())
        
        # convolutional layer 1
        conv1 = tf.layers.conv2d(
            inputs=images_ph,
            filters=32,
            kernel_size=[5, 5],
            padding="same",
            activation=tf.nn.relu,
            kernel_initializer=tf.initializers.random_uniform(-0.1, 0.1))
        
        # max pooling layer 1
        pool1 = tf.layers.max_pooling2d(inputs=conv1, pool_size=[2, 2], strides=2)
        
        # convolutional layer 2
        conv2 = tf.layers.conv2d(
            inputs=pool1,
            filters=64,
            kernel_size=[3, 3],
            padding="same",
            activation=tf.nn.relu)
        
        # max pooling layer 2
        pool2 = tf.layers.max_pooling2d(inputs=conv2, pool_size=[2, 2], strides=2)
        
        # Flatten features
        images_flat = tf.layers.flatten(pool2)

        # Fully connected layer. 
        logits = tf.contrib.layers.fully_connected(images_flat, 62, activation_fn=None,
                                                   weights_initializer=tf.initializers.random_uniform(-0.1, 0.1))
        
        # dropout layer
        logits = tf.nn.dropout(logits, 1 - dropout_ph)

        # Convert logits to label indexes (int).
        # Shape [None], which is a 1D vector of length == batch_size.
        predicted_labels = tf.argmax(logits, 1)

        # Define the loss and accuracy function. 
        # Cross-entropy is a good choice for classification.
        loss = tf.reduce_mean(tf.losses.sparse_softmax_cross_entropy(logits=logits, labels=labels_ph))
        accuracy = tf.metrics.accuracy(labels_ph, predicted_labels)[1]
        
        # Create training op.
        train = tf.train.AdamOptimizer(learning_rate=0.001).minimize(loss)

    return images_ph, labels_ph, dropout_ph, predicted_labels, loss, accuracy, train

images_ph, labels_ph, dropout_ph, predicted_labels, loss, accuracy, train = create_model()
print("loss: ", loss)
print("predicted_labels: ", predicted_labels)

## Training

In [0]:
# load test data for evaluation while training
test_images, test_labels = load_data(test_data_dir)

# Transform the images, just like we did with the training set.
test_images32 = [skimage.transform.resize(image, (32, 32), mode='constant')
                 for image in test_images]

# Put them into arrays to feed them to tensorflow
test_images32, test_labels = np.array(test_images32), np.array(test_labels)

In [0]:
# Create a session to run the graph we created.
try:
    session = tf.Session()
except ValueError:
    session.close()
    session = tf.Session()

# First step is always to initialize all variables. 
# We don't care about the return value, though. It's None.
_ = session.run(tf.global_variables_initializer())

# weird case with tensorflow metrics (code from stack overflow to initialize it)
running_vars = tf.get_collection(tf.GraphKeys.LOCAL_VARIABLES)
session.run(tf.variables_initializer(var_list=running_vars))

step=0
loss
train_loss_list = []
train_acc = []
test_loss_list = []
test_acc = []
steps_list = []

In [0]:
batch_size = 32  # I added batch because gradient descent if faster and it avoids local minimums 
# (it is a classic machine learning move)
n_steps = 20000

for i in range(n_steps):
    if i==0 or data_index + batch_size > len(labels_a) :
        data_index = 0
        indices = np.linspace(0, len(labels_a)-1, len(labels_a), dtype=np.int32)
        np.random.shuffle(indices)
        
    batch_indices = indices[data_index:data_index + batch_size]
    session.run(train,
                feed_dict={images_ph: images_a[batch_indices],
                           labels_ph: labels_a[batch_indices],
                           dropout_ph: 0.5})
    if i % 1000 == 0:
        # Compute train loss and accuracy without dropout on the batch to save some time
        train_loss_value, train_accuracy = session.run([loss, accuracy], 
                            feed_dict={images_ph: images_a[batch_indices],
                                       labels_ph: labels_a[batch_indices],
                                       dropout_ph: 0.0})
        
        # Evaluation on the whole test dataset, without dropout
        test_loss_value, test_accuracy = session.run([loss, accuracy],
                                                     feed_dict={images_ph: test_images32,
                                                                labels_ph: test_labels,
                                                                dropout_ph: 0.0})
        steps_list.append(step)
        train_loss_list.append(train_loss_value)
        train_acc.append(train_accuracy)
        test_loss_list.append(test_loss_value)
        test_acc.append(test_accuracy)
        
        print("********** step {} **********".format(step))
        print("Train loss: ", train_loss_value)
        print("Train accuracy: ", train_accuracy)
        print("Test loss: ", test_loss_value)
        print("Test accuracy: ", test_accuracy)
        
    step += 1
    data_index += batch_size
    

In [0]:
# plot loss
plt.figure()
plt.plot(steps_list, train_loss_list, label="train loss")
plt.plot(steps_list, test_loss_list, label="test loss")
plt.title("loss evolution in training")
plt.legend()

# plot accuracy
plt.figure()
plt.plot(steps_list, train_acc, label="train accuracy")
plt.plot(steps_list, test_acc, label="test accuracy")
plt.title("accuracy evolution in training")
plt.legend()

## Using the Model

The session object contains the values of all the variables in our model (i.e. the weights). 

In [0]:
# Pick 10 random images
sample_indexes = random.sample(range(len(images32)), 10)
sample_images = [images32[i] for i in sample_indexes]
sample_labels = [labels[i] for i in sample_indexes]

# Run the "predicted_labels" op.
predicted = session.run([predicted_labels], 
                        feed_dict={images_ph: sample_images,
                                   dropout_ph: 0.0})[0]
print(sample_labels)
print(predicted)

In [0]:
# Display the predictions and the ground truth visually.
fig = plt.figure(figsize=(10, 10))
for i in range(len(sample_images)):
    truth = sample_labels[i]
    prediction = predicted[i]
    plt.subplot(5, 2,1+i)
    plt.axis('off')
    color='green' if truth == prediction else 'red'
    plt.text(40, 10, "Truth:        {0}\nPrediction: {1}".format(truth, prediction), 
             fontsize=12, color=color)
    plt.imshow(sample_images[i])


## Evaluation

It's fun to visualize the results, but we need a more precise way to measure the accuracy of our model. Also, it's important to test it on images that it hasn't seen. And that's where the validation data set comes into play.

In [0]:
# Load the test dataset.
test_images, test_labels = load_data(test_data_dir)

In [0]:
# Transform the images, just like we did with the training set.
test_images32 = [skimage.transform.resize(image, (32, 32), mode='constant')
                 for image in test_images]
display_images_and_labels(test_images32, test_labels)

In [0]:
# Run predictions against the full test set.
predicted = session.run([predicted_labels], 
                        feed_dict={images_ph: test_images32,
                                   dropout_ph: 0.0})[0]

# Calculate how many matches we got.
match_count = sum([int(y == y_) for y, y_ in zip(test_labels, predicted)])
accuracy = match_count / len(test_labels)
print("Accuracy: {:.3f}".format(accuracy))

In [0]:
for i in range(len(test_images32)):
    truth = test_labels[i]
    prediction = predicted[i]
    plt.plot()
    plt.axis('off')
    if truth != prediction:
      fig = plt.figure(figsize=(2, 2))
      plt.text(40, 10, "Truth:        {0}\nPrediction: {1}".format(truth, prediction), 
             fontsize=16, color=color)
      plt.imshow(test_images32[i])

In [0]:
# Calculate how many matches we got.
match_count = sum([int(y == y_) for y, y_ in zip(test_labels, predicted)])
accuracy = match_count / len(test_labels)
print("Accuracy: {:.3f}".format(accuracy))

In [0]:
# Close the session. This will destroy the trained model.
session.close()