# Udacity - SDCND Project : Implement Traffic_Sign_Classifier using LeNet of the CNN architecture

# Step 1 - Load The Data

In [None]:
import pickle

training_file = '/home/nsslab/Downloads/traffic-signs-data/train.p'
validation_file='/home/nsslab/Downloads/traffic-signs-data/valid.p'
testing_file = '/home/nsslab/Downloads/traffic-signs-data/test.p'

with open(training_file, mode='rb') as f:
    train = pickle.load(f)
with open(validation_file, mode='rb') as f:
    valid = pickle.load(f)
with open(testing_file, mode='rb') as f:
    test = pickle.load(f)
    
X_train, y_train = train['features'], train['labels']
X_valid, y_valid = valid['features'], valid['labels']
X_test, y_test = test['features'], test['labels']

# Step 2 - Dataset Summary & Exploratory                           Visualization

## Dataset Summary
- 'features' is a 4D array containing raw pixel data of the traffic sign images, (num examples, width, height, channels)
- 'labels' is a 1D array containing the label/class id of the traffic sign. The file signnames.csv contains id -> name mappings for each id

In [None]:
n_train = len(X_train)

n_validation = len(X_valid)

n_test = len(X_test)

image_shape = X_train[0].shape

n_classes = len(np.unique(y_train))

print("Number of training examples =", n_train)
print("Number of validation example ",n_validation)
print("Number of testing examples =", n_test)
print("Image data shape =", image_shape)
print("Number of classes =", n_classes)

## Visualize Data

- Total data

In [None]:
def plot_image(image, nr, nc, i, label=""):
    """
    Plot a single image.
    If 'i' is greater than 0, then plot this image as 
    a subplot of a larger plot.
    """
    
    if i>0: 
        plt.subplot(nr, nc, i)
    else:
        plt.figure(figsize=(nr,nc))
        
    plt.xticks(())
    plt.yticks(())
    plt.xlabel(label)
    plt.tight_layout()
    plt.imshow(image, cmap="gray")

    
import random
def plot_random(dataset1, dataset2=None, instances=1):
    """
    Plot a random image from one or two datasets.
    """
    nc = 1 if dataset2 is None else 2
    nr = instances
    #plt.figure(figsize=(nr,nc))
    for i in range(instances):
        index = random.randint(0, len(dataset1))
        plot_image(dataset1[index].squeeze(), nr, nc, 2*i+1)
        if dataset2 is not None:
            image = dataset2[index].squeeze()
            plot_image(image, nr, nc, 2*i+2)
            print("image mean=", image.mean())


from scipy import misc
def get_image_per_class(X, y):
    """ 
    Plot a representatative of each image class in a 5x10 image grid

    The training dataset is traversed until a sample of each class
    is encountered and cached.

    Another loop then travereses all of the cached images and displays them.
    The two loops are required because we want to display the image samples
    in class order, not in the order they are encountered.
    """
    signs_left = n_classes
    class_images = [None for x in range(signs_left)]

    i = 0
    while signs_left>0:
        if class_images[y[i]] == None:
            image = X[i].squeeze()
            class_images[y[i]] = image
            signs_left -= 1
        i += 1
    return class_images


def summarize_stats(class_images, y_train, y_valid):
    """
    'class_images' is a list of images, one per class.
    This function plots this images list, and print underneath each one its class, 
    the number of training samples, the percent of training samples, 
    and the percent of validation samples
    """
    # Create a histogram of the classes
    y_train_hist = np.bincount(y_train)
    y_valid_hist = np.bincount(y_valid)

    nr = 5; nc = 9
    plt.figure(figsize=(nr,nc))
    for image,i in zip(class_images, range(len(class_images))):
        label = (str(i) + "\n"                                            # class
              + str(y_train_hist[i]) + "\n"                               # no. of training samples
              + "{:.1f}%".format(100 * y_train_hist[i]/sum(y_train_hist))  + "\n"   # representation in training samples
              + "{:.1f}%".format(100 * y_valid_hist[i]/sum(y_valid_hist)))     # representation in validation samples
        plot_image(image, nr, nc, i+1, label)

# Step 3 - Design and Train a Model Architecture
## Preprocess Data
- convert RGB images to GRAY images

 As shown in the figure below, Converting from rgb to gray allows the classifier to better recognize the sign.

In [None]:
def get_num_instances(img_class, y):
    y_hist = np.bincount(y)
    n_instances = y_hist[img_class]
    return n_instances

def get_class_images(img_class, X, y):
    n_instances = get_num_instances(img_class, y)
    class_images = []
    i = 0
    while n_instances>0:
        if y[i] == img_class:
            image = X[i].squeeze()
            class_images.append(image)
            n_instances -= 1
        i += 1
    return class_images

import math
def plot_class_images(img_class, class_images, ncol, desc):
    nimages = len(class_images)
    nrow = math.ceil(nimages/ncol)
    #plt.figure(figsize=(nrow,ncol))
    print("class {} has {} images in the {} dataset".format(img_class,nimages, desc))
    for image,i in zip(class_images, range(nimages)):
        plot_image(image, nrow, ncol, i)
    
def plot_class(img_class, X, y, ncol, desc):
    class_images = get_class_images(img_class, X, y)
    plot_class_images(img_class, class_images, ncol,  desc)

class_images = get_class_images(20, X_train, y_train)
for i in range(4):
    plot_image(class_images[i], 1, 4, i+1)

In [None]:
def gray(x):
    x = np.sum(x/3, axis=3, keepdims=True)
    x = np.sum(x/3, axis=3, keepdims=True)
    return x
X_train = gray(X_train)
X_valid = gray(X_valid)
X_test  = gray(X_test)

- Normalization the data

In [None]:
def Norm(x):
    x = x / 255
    x -= np.mean(x)
    x /= np.std(x)
    return x

X_train = Norm(X_train)
X_valid = Norm(X_valid)
X_test  = Norm(X_test)
    

- Shuffle the training data

 Randomly mix sequences of the data so that the sequence of the data does not affect learning

In [None]:
from sklearn.utils import shuffle
X_train, y_train = shuffle(X_train, y_train)

## Model Architecture

In [None]:
import tensorflow as tf
epoch = 50
batch_size = 155
from tensorflow.contrib.layers import flatten

def LeNet(x):
    #Set hyperparameter
    mn = 0
    sigma = 0.1
    
    """
    #Convolutional Layer1 input : 32*32*1 output = 28*28*10
    """
    conv1_w = tf.Variable(tf.truncated_normal(shape=(5 ,5 ,1 ,10), mean = mn , stddev = sigma))
    conv1_b = tf.Variable(tf.zeros(10))
    conv1 = tf.nn.conv2d(x, conv1_w, strides=[1, 1, 1, 1], padding = 'VALID') + conv1_b
    
    #Activation
    conv1 = tf.nn.relu(conv1)
    
    """
    #Max_pool Layer1 input : 28*28*10 output = 14*14*10
    """
    
    conv1 = tf.nn.max_pool(conv1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding = 'VALID')
    
    """
    #Convolutional Layer2 input : 14*14*10 output = 10*10*25
    """
    
    conv2_w = tf.Variable(tf.truncated_normal(shape=(5 ,5 ,10 ,25), mean = mn , stddev = sigma))
    conv2_b = tf.Variable(tf.zeros(25))
    conv2 = tf.nn.conv2d(conv1, conv2_w, strides=[1, 1, 1, 1], padding = 'VALID') + conv2_b
    
    #Activation
    conv2 = tf.nn.relu(conv2)
    """
    #Max_pool Layer2 input : 10*10*25 output = 5*5*25
    """
    
    conv2 = tf.nn.max_pool(conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding = 'VALID')
    
    """
    3 #Fully connected Layer3 input = 5*5*25 = 625 output = 250
    """
    
    #Flatten input = 5*5*25 output = 625
    f_conv0 = flatten(conv2)
       
    f_w1 = tf.Variable(tf.truncated_normal(shape=(625,250), mean = mn , stddev = sigma))
    f_b1 = tf.Variable(tf.zeros(250))
    f_conv1 = tf.matmul(f_conv0, f_w1) + f_b1
    
    #Activation
    f_conv1 = tf.nn.relu(f_conv1)
    """
     Apply dropout to prevent overfitting  
     The probability that a node will be transported to the next layer is 75%
    """
    
    f_conv1 = tf.nn.dropout(f_conv1, 0.75)
    
    """
    4 #Fully connected Layer4 input = 250 output = 86
    """
    
    f_w2 = tf.Variable(tf.truncated_normal(shape=(250,86), mean = mn , stddev = sigma))
    f_b2 = tf.Variable(tf.zeros(86))
    f_conv2 = tf.matmul(f_conv1, f_w2) + f_b2
    
    #Activation
    f_conv2 = tf.nn.relu(f_conv2)
    
    """
     Apply dropout to prevent overfitting  
    """
    f_conv2 = tf.nn.dropout(f_conv2, 0.75)
    
    """
    5 #Fully connected Layer5 input = 84 output = 43
    """
    
    f_w3 = tf.Variable(tf.truncated_normal(shape=(86,43), mean = mn , stddev = sigma))
    f_b3 = tf.Variable(tf.zeros(43))
    
    """
    Finally, proccessed values mean logits
    """
    
    #Predicted output
    logits = tf.matmul(f_conv2, f_w3) + f_b3
    
    return logits

## Optimization to minimize the difference between the actual value and the predicted value 

In [None]:
#Because the final size of the batch is not constant, the first element is None
x = tf.placeholder(tf.float32, (None, 32, 32, 1))

y = tf.placeholder(tf.int32, (None))

one_hot_y = tf.one_hot(y,43)    
#learning rate
rate = 0.001
logits = LeNet(x)
cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits, one_hot_y)
loss_operation = tf.reduce_mean(cross_entropy)
optimizer = tf.train.AdamOptimizer(learning_rate = rate)
training_operation = optimizer.minimize(loss_operation)

## Model Evaluation

In [None]:
correct_prediction = tf.equal(tf.argmax(logits, 1), tf.argmax(one_hot_y, 1))
accuracy_operation = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

def evaluation(X_data, y_data):
    num_examples = len(X_data)
    total_accuracy = 0
    sess = tf.get_default_session()
    for offset in range(0, num_examples, batch_size):
        batch_x, batch_y = X_data[offset:offset+batch_size], y_data[offset:offset+batch_size]
        accuracy = sess.run(accuracy_operation, feed_dict = {x:batch_x, y:batch_y})
        total_accuracy += (accuracy*len(batch_x))
    return total_accuracy / num_examples

## Train the model and than validate&test it

In [None]:
with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    num_examples = len(X_train)
    
    print("Training...")
    print()
    save_file = './save_train.ckpt'
    
    validation_accuracy_graph = []
    test_accuracy_graph = []
    training_accuracy_graph = []
    
    for i in range(epoch):
        X_train, y_train = shuffle(X_train, y_train)
        
        for offset in range(0, num_examples, batch_size):
            end = offset + batch_size
            batch_x, batch_y = X_train[offset:end], y_train[offset:end]
            sess.run(training_operation, feed_dict={x: batch_x, y: batch_y})
            
        train_accuracy = evaluation(X_train, y_train)
        validation_accuracy = evaluation(X_valid, y_valid)
        test_accuracy = evaluation(X_test, y_test)       
        
        validation_accuracy_graph.append(validation_accuracy)
        test_accuracy_graph.append(test_accuracy)
        train_accuracy_graph.append(train_accuracy)
        
        print("epoch {}...".format(i+1))
        print("Train Accuracy = {:.3f}".format(train_accuracy))
        print("Validation Accuracy = {:.3f}".format(validation_accuracy))
        print("Test Accuracy = {:.3f}".format(test_accuracy))
        print()
    try:
        saver
    except NameError:
        saver = tf.train.Saver()
    saver.save(sess,'lenet')
    print("Model saved")
        

In [None]:
plt.plot(validation_accuracy_figure)
plt.title("Test Accuracy")
plt.show()

plt.plot(validation_accuracy_figure)
plt.title("Validation Accuracy")
plt.show()

# Step4 - Test a Model on New Images

- New images for germany traffic signs

In [None]:
import glob
import matplotlib.image as mpimg

fig, axs = plt.subplots(2,3, figsize=(3, 2))
fig.subplots_adjust(hspace = .2, wspace=.001)
axs = axs.ravel()

my_images = []

for i, img in enumerate(glob.glob('/home/nsslab/SDCND/Project3/New_images/*.png')):
    image = cv2.imread(img)
    axs[i].axis('off')
    my_images.append(image)
    axs[i].imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))