In [35]:
import numpy as np
import pandas as pd
import matplotlib.image as mpimg 
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
from skimage.transform import resize
from skimage import exposure
import keras
from keras.utils import to_categorical
from keras.models import Sequential
from keras.layers import Dense, Conv2D, Flatten, MaxPooling2D, BatchNormalization, Dropout
import time as t

"""
Code to hold and test a simple demonstration of KERAS library for traffic sign classification

Authors
-------
Aydamir Mirzayev: https://github.com/AydamirMirzayev || https://www.linkedin.com/in/aydamir-mirzayev-97b297133/

Methods
-------
get_data( global_path)
    extract and perform necessary modifications on the dataset

same_size( data_collection)
    Resize all images to the same size. Use mean size of all images
    
categorical( labels)
    One hot encode the labels

report_accuracy(predicted, true)
    Report accuracy of prediction
""" 

'\nCode to hold and test a simple demonstration of KERAS library for traffic sign classification\n\nAuthors\n-------\nAydamir Mirzayev: https://github.com/AydamirMirzayev || https://www.linkedin.com/in/aydamir-mirzayev-97b297133/\n\nMethods\n-------\nget_data( global_path)\n    extract and perform necessary modifications on the dataset\n\nsame_size( data_collection)\n    Resize all images to the same size. Use mean size of all images\n    \ncategorical( labels)\n    One hot encode the labels\n\nreport_accuracy(predicted, true)\n    Report accuracy of prediction\n'

# Methods

In [5]:
def get_data( global_path = "D:\D_Desktop\Courses\PAST\ML 550\project\data2\gtsrb-german-traffic-sign"):
    """ Extract the data, merge test and training samples, find class distribution, and train/test/e
        
    Parameters:
    global_path (int): Path to the gtsrb-german-traffic-sign folder
    
    Returns:
    
    labels                   (List): labels of the clusters on which clustering has been performed
    data_collection          (List): image collection
    used_classes            (Array): set of images
    general_c_freq      (int array): frequency of each class
    
    """   
    
    # Import both test and train data labels
    test_path = global_path + '/Test.csv'
    test_labels = np.array( pd.read_csv( test_path))
    train_path = global_path + '/Train.csv'
    train_labels = np.array( pd.read_csv( train_path))

    #Extract train class labels from general labels and count unique classes
    train_class_labels = train_labels[:,6]
    train_unique = np.unique( train_class_labels)
    train_c_freq = np.zeros( train_unique.shape[0])
    
    #calculate train class frequency 
    for i,u in enumerate( train_unique):
        train_c_freq[i] = sum( train_class_labels == u)

    #Extract test class labels from general labels and count unique classes
    test_class_labels = test_labels[:,6]
    test_unique = np.unique( test_class_labels)
    test_c_freq = np.zeros( test_unique.shape[0])
    
    #calculate test class frequency 
    for i,u in enumerate( test_unique):
        test_c_freq[i] = sum( test_class_labels == u)
    
    #calculate total class frequency and find used classes
    general_c_freq = train_c_freq + test_c_freq
    used_classes = test_unique[ np.argsort( general_c_freq)][35:]

    # load train images normalize  and flatten
    train_collection = []
    new_train_labels = []
    for c in used_classes:
        names = train_labels[ train_labels[ :, 6] == c][:, 7]
        new_train_labels.append( train_labels [ train_labels[ :, 6] == c])
        for name in names:
            path = global_path + '/' + name 
            train_collection.append( mpimg.imread( path))

    new_flat_train_labels = []
    for i in range( len(new_train_labels)):
        for j in range( len(new_train_labels[i])):
            new_flat_train_labels.append(new_train_labels[i][j])

    # load test images, normalize and flatten
    test_collection = []
    new_test_labels = []
    for c in used_classes:
        names = test_labels [test_labels[ :, 6] == c][:,7] 
        new_test_labels.append(test_labels [test_labels[ :, 6] == c])
        for name in names:
            path = global_path + '/' + name 
            test_collection.append( mpimg.imread( path))

    new_flat_test_labels = []
    for i in range( len(new_test_labels)):
        for j in range( len(new_test_labels[i])):
            new_flat_test_labels.append(new_test_labels[i][j])

    # generate general labels and data
    labels = []
    data_collection = []

    for i in range( len(test_collection)):
        data_collection.append( test_collection[i])
        labels.append( new_flat_test_labels[i])

    for i in range( len( train_collection)):
        data_collection.append( train_collection[i])
        labels.append( new_flat_train_labels[i])

        
    
    
    #shuffle several times
    for i in range(10):
        labels, data_collection = shuffle( labels, data_collection)
        
    return labels, data_collection, used_classes, general_c_freq

In [6]:
def same_size( data_collection):
    """ Resize all images to the same size. Use mean size of all images
        
    Parameters:
    data_collection (List): list of all images
    
    Returns:
    new_data_collection   (List): normalized list of images
    """   
    
    #calculate the image size mean
    h_mean = 0
    w_mean = 0
    
    for a in data_collection:
        h_mean += a.shape[0]
        w_mean += a.shape[1]
    
    #round up the mean
    w_mean = round( w_mean/len( data_collection))
    h_mean = round( h_mean/len( data_collection))

    #resize all the images
    new_data_collection = []
    for i, a in enumerate( data_collection):
        resized = resize( a, (h_mean, w_mean), anti_aliasing=True)
        normalized = exposure.rescale_intensity( resized)
        new_data_collection.append( normalized)
    
    return new_data_collection

In [15]:
def categorical( labels):
    """ One hot encode the labels
        
    Parameters:
    labels (List): list of all image labels
    
    Returns:
    new_classes   (List): one hot encoded labels
    """   
    
    classes = np.array( labels)[:,6] #extract class info

    new_classes = np.zeros( np.array( labels).shape[0])
    
    for i, a in enumerate( np.unique( classes)): #categorize
        new_classes[ classes == a ] = i
    
    new_classes = to_categorical( new_classes)
    return new_classes

In [18]:
def report_accuracy(predicted, true):
    """ Report accuracy of prediction
        
    Parameters:
    predicted  (int array): array of predicted labels
    true       (int array): array of true labels
    
    Returns:
    new_classes   (List): one hot encoded labels
    """   
    
    #calculate accuracy
    acc = sum( np.argmax( predicted, 1) == np.argmax( true, 1) ) / predicted.shape[0]
    return acc

# Prepare data

In [17]:
# extract the data
labels, data_collection, used_classes, general_c_freq = get_data()
# resize image to the mean size of the collection
data_collection = same_size(data_collection)
# one hot encode the labels
class_labels = categorical( labels)

# Train, validation, and test split the dataset
X_train, X_test, y_train, y_test  = train_test_split( np.array(data_collection), class_labels, test_size=0.2, random_state=1)
X_test, X_val, y_test, y_val = train_test_split( X_test, y_test, test_size=0.5, random_state=1)

# Build Network

In [25]:
model = Sequential() #define a sequential model

# define convolutional layers
model.add( Conv2D( 32, kernel_size=3, activation= 'relu', input_shape=( 49, 49, 3))) 
model.add( MaxPooling2D(pool_size=2, strides = 2))
model.add( Conv2D( 16, kernel_size=5, activation= 'relu'))
model.add( MaxPooling2D(pool_size=2, strides = 2))
model.add( Conv2D( 8, kernel_size=5, activation= 'relu'))
model.add( MaxPooling2D(pool_size=2, strides = 2))

# define fully connected (dense) layer
model.add( Flatten())
model.add( Dense(8, activation= 'softmax' ))

# define optimization function
sgd = keras.optimizers.SGD(learning_rate=0.02, momentum=0.2, nesterov= True)

#compile the model
model.compile(optimizer= sgd, loss='categorical_crossentropy', metrics=['accuracy'])

# Test the network measure running time

In [29]:
start = t.time()
model.fit( X_train , y_train, validation_data=(X_val, y_val), epochs= 2)
end = t.time()
print('Training Time: ', end - start)

Train on 17736 samples, validate on 2217 samples
Epoch 1/2
Epoch 2/2
Training Time:  123.50570034980774


In [34]:
start = t.time()
predicted = model.predict(X_test)
end = t.time()
print('Prediction Time: ', end - start)
print('Accuracy: ',  round(report_accuracy(predicted,y_test),2)*100,'%')

Prediction Time:  2.5534069538116455
Accuracy:  97.0 %
