<a href="https://colab.research.google.com/github/AvivSham/German-Traffic-Signs-Classification/blob/master/German_TrafficSigns_Classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
#@title Download the German Traffic Signs Dataset
!wget https://d17h27t6h515a5.cloudfront.net/topher/2017/February/5898cd6f_traffic-signs-data/traffic-signs-data.zip
!wget https://github.com/AvivSham/German-Traffic-Signs-Classification/blob/master/signnames.csv

In [0]:
!unzip traffic-signs-data.zip

In [0]:
#@title Import dependencies
%matplotlib inline
import os, pickle, shutil
import numpy as np
from skimage.io import imread
import skimage.morphology as morp
from skimage.filters import rank
from sklearn.utils import shuffle, compute_class_weight
from sklearn.metrics import confusion_matrix
import csv
import cv2
import matplotlib.pyplot as plt
from keras.models import Input, Model
from keras.layers import Conv2D, MaxPooling2D, Dropout, Flatten, Dense
from keras.utils import to_categorical
from keras import optimizers
from keras.initializers import random_normal
from keras.callbacks import Callback, ReduceLROnPlateau, ModelCheckpoint
import seaborn as sn
from sklearn.metrics import confusion_matrix

Using TensorFlow backend.


In [0]:
# constrain seed
np.random.seed(seed=42)

In [0]:
#@title Moving the data files to directory
!mkdir data
for i in os.listdir():
  if '.p' in i:
    shutil.move('./'+i,'./data')

try:
  shutil.move('./signnames.csv','./data')
except:
  print('No CSV file was found')


In [0]:
#@title Loading Train/Test/Validation data
training_file = './data/train.p'
validation_file = './data/valid.p'
testing_file = './data/test.p'

with open (training_file, mode='rb') as f:
  train = pickle.load(f)
with open (validation_file, mode='rb') as f:
  valid = pickle.load(f)
with open (testing_file, mode='rb') as f:
  test = pickle.load(f)

In [0]:
#@title Read Sign names/classes
signs_classes = []
os.chdir('./data')
with open ('signnames.csv', 'r') as file:
  signnames = csv.reader(file, delimiter = ',')
  next(signnames,None)
  for row in signnames:
    signs_classes.append(row[1])
  file.close()

In [0]:
#@title Data info
X_train, Y_train = train['features'], train['labels']
X_valid, Y_valid = valid['features'], valid['labels']
X_test, Y_test = test['features'], test['labels']
n_classes = len(np.unique(Y_train))

print("Number of train samples: ", X_train.shape[0])
print("Number of validation samples: ", X_valid.shape[0])
print("Number of test samples: ", X_test.shape[0])
print("Number of classses: ", n_classes)

In [0]:
def show_images(data,y_data, label="", cmap=None, n_images = 10):
  plt.figure(figsize = (n_images*2,n_images*2))
  for i in range(n_images):
    plt.subplot(1,n_images,i+1)
    ind = np.random.randint(0,len(data))
    if len(data[ind].shape) == 2:
      cmap = 'gray'
    
    plt.imshow(data[ind],cmap = cmap)
    plt.xlabel(signs_classes[y_data[ind]], fontsize = 8)
    plt.ylabel(label, fontsize = 8)
    plt.xticks([])
    plt.yticks([])
  plt.tight_layout()
  plt.show()

In [0]:
#@title Show samples of each group
show_images(X_train,Y_train,'Traning examples')
show_images(X_test,Y_test,'Testing examples')
show_images(X_valid,Y_valid,'Validation examples')

In [0]:
def show_hist(data, label):
  plt.hist(data, bins = n_classes)
  plt.xlabel(label)
  plt.ylabel('class count')
  plt.grid('off')
  plt.show()

In [0]:
#@title Show groups histogram
show_hist(Y_train, "Training examples")
show_hist(Y_test, "Testing examples")
show_hist(Y_valid, "Validation examples")

In [0]:
X_train, Y_train = shuffle(X_train, Y_train)


In [0]:
def convert_to_gray(image):
  
  return cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)

In [0]:
#@title Convert to gray scale images 
gray_images = list(map(convert_to_gray,X_train))
show_images(gray_images,Y_train)
np.shape(gray_images)

In [0]:
def hist_equalization(image):
  kernel = morp.disk(30)
  return rank.equalize(image, selem=kernel)
  

In [0]:
def adapt_hist_equalization(image,clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(2,2))):
  return clahe.apply(image)

In [0]:
#@title Perform histogram equalization
equalizied_gray_images = list(map(hist_equalization,gray_images))
show_images(equalizied_gray_images,Y_train)

In [0]:
def norm_image(data):
  
  normalized_images = np.array(data,dtype = np.float32)/255
  return np.expand_dims(normalized_images, axis=-1)


In [0]:
def preprocess(x_data,y_data, n_classes = 43):
  gray_images = list(map(convert_to_gray,x_data))
  hist_equal_images = list(map(adapt_hist_equalization,gray_images))
  norm_images = norm_image(hist_equal_images)
  y_data = to_categorical(y_data, n_classes)
  return norm_images, y_data 

In [0]:
def ConvBlock(model, pool, n_filters, mu, sigma):
  model = Conv2D(n_filters, kernel_size = 2, padding = 'same',
                 activation = 'relu', 
                 kernel_initializer = random_normal(mean = mu,stddev = sigma))(model)
  
  model = Conv2D(n_filters, kernel_size = 2, padding = 'same',
                 activation = 'relu', 
                 kernel_initializer = random_normal(mean = mu,stddev = sigma))(model)
  model = MaxPooling2D(pool_size = 2, strides = 2, padding = 'valid')(model)
  model = Dropout(0.5)(model)
  return model
  

In [0]:
def VGG_variation(input_shape,nf=32):
  
  inputs = x = Input(input_shape)
  for i in range(3):
    x = ConvBlock(x,pool = True, n_filters = nf * (i+2), mu = 0, sigma = 0.1)  
  
  x = Flatten()(x)
  for _ in range(2):
    x = Dense(units = 128, activation = 'relu')(x)
  output = Dense(units = 43, activation = 'softmax')(x)
  VGG_var_model = Model(inputs = inputs, outputs = output)
  opti = optimizers.Adam(0.0001)
  VGG_var_model.compile(optimizer = opti, loss = 'categorical_crossentropy',
                       metrics = ['accuracy','categorical_crossentropy'])
  VGG_var_model.summary()
  return VGG_var_model
  
  

In [0]:
class My_Callback(Callback):
    def on_train_begin(self, logs={}):
        print("train begins!")
        return

    def on_train_end(self, logs={}):
        return

    def on_epoch_begin(self, epoch, logs={}):
        return

    def on_epoch_end(self, epoch, logs={}):
        print("-", end='')
        flag_val=True
        if epoch%5==0:
            train_acc = logs.get("acc")
            train_loss = logs.get("loss")
            try:
                val_acc = logs.get("val_acc")
                val_loss = logs.get("val_loss")
            except:
                flag_val=False
            if flag_val:
                print("\n%d"%epoch, "\ttrain_loss: ", train_loss,
                      "\tval_loss: ", val_loss, "\ttrain_acc:", train_acc, 
                      "\tval_acc:", val_acc)
            else:
                print("\n%d"%epoch, "\ttrain_loss: ",
                      train_loss, "\ttrain_acc:", train_acc)    
        return

    def on_batch_begin(self, batch, logs={}):
        return

    def on_batch_end(self, batch, logs={}):
        return

reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.3, patience=10,
                              verbose=1, mode='auto', min_lr=1e-12)
my_callback = My_Callback()

checkpoint = ModelCheckpoint("VGG_GermanSigns_classification.h5", monitor='loss', 
                             verbose=0, save_best_only=True, save_weights_only=True)

In [0]:
#@title Preprocess the data
X_train_processed, Y_train_cat = preprocess(X_train,Y_train)
X_valid_processed, Y_valid_cat = preprocess(X_valid, Y_valid)
class_weights = create_class_weights(Y_train)

In [0]:
#@title Create a variation of VGG model
VGG_model = VGG_variation(X_train_processed.shape[1:])
batch_size = 32
epochs = 100
weights = compute_class_weight('balanced',classes = np.unique(Y_train),y = Y_train)





In [0]:
#@title Training
model_history = VGG_model.fit(X_train_processed, Y_train_cat, batch_size=batch_size, epochs=epochs, 
                    validation_data=(X_valid_processed,Y_valid_cat),shuffle=True,
                    callbacks = [my_callback, reduce_lr, checkpoint],verbose=0,
                    class_weight = weights)

In [0]:
#@title Plotting ACC and LOSS of training
# Accuracy
plt.figure(figsize = (8,6))
plt.plot(model_history.history['acc'],'r')
plt.grid('off')
plt.plot(model_history.history['val_acc'],'g')
plt.xticks()
plt.xlabel("Num of Epochs")
plt.ylabel("Accuracy")
plt.title("Training Accuracy vs Validation Accuracy")
plt.legend(['train','validation'])
plt.show()
# Loss 
plt.figure(figsize = (8,6))
plt.grid('off')
plt.plot(model_history.history['loss'],'r')
plt.plot(model_history.history['val_loss'],'g')
plt.xticks()
plt.xlabel("Num of Epochs")
plt.ylabel("Loss")
plt.title("Training Loss vs Validation Loss")
plt.legend(['train','validation'])
plt.show()

In [0]:
#@title Loading best result weights
VGG_model.load_weights("VGG_GermanSigns_classification.h5")

In [0]:
#@title Preprocessing and predicting the test group
X_test_processed, Y_test_cat = preprocess(X_test, Y_test)
predicted = VGG_model.predict(X_test_processed)
Y_pred = np.argmax(predicted, axis = 1)


In [0]:
#@title Calculating and presenting the confusion matrix
cm = confusion_matrix(np.argmax(Y_test_cat,axis=1),Y_pred)
plt.figure(figsize = (25,25))
sn.set(font_scale=1)
sn.heatmap(cm, cmap = 'viridis', annot = True, annot_kws = {'size': 8})
plt.show()

In [0]:
#@title Printing the results
total_accurate = 0
for i in range (cm.shape[0]):
  print('The accuracy of class No.{} is: {:.2f}%' .format(i+1,100*cm[i,i]/cm[i].sum()))
  total_accurate += cm[i,i]
  
print('The total accuracy is: {:.2f}%' .format(100*total_accurate/cm.sum()))
