In [1]:
import matplotlib
matplotlib.use("Agg")

# import the necessary packages
from keras.optimizers import Adam
from keras.preprocessing.image import img_to_array
from keras.callbacks import LearningRateScheduler, ModelCheckpoint, CSVLogger, ReduceLROnPlateau
from keras.preprocessing.image import ImageDataGenerator
from sklearn.preprocessing import LabelBinarizer
from sklearn.model_selection import train_test_split
from imutils import paths
from skimage import io, color, exposure, transform
from time import time
import matplotlib.pyplot as plt
import numpy as np
import argparse
import random
import pickle
import cv2
import os
import glob
import h5py

  from ._conv import register_converters as _register_converters
Using TensorFlow backend.


In [2]:
# --------------------- Define functions -------------------------------
def preprocess_img(img):
    # Histogram normalization in y
    hsv = color.rgb2hsv(img)
    hsv[:,:,2] = exposure.equalize_hist(hsv[:,:,2])
    img = color.hsv2rgb(hsv)
    
    # Re-scale image
    img = transform.resize(img, (IMAGE_DIMS[1], IMAGE_DIMS[0]))

    return img


def get_class(img_path):
    return int(img_path.split('/')[-2])


## Read Data set

In [3]:
#------- Read training images---------------------------------------------------
NUM_CLASSES = 43 # 43 for GTSRB, 164 for European data set
IMAGE_DIMS = (48,48,3)
file_name = 'X_train_german.h5' # File name for saving your training images

try:
    with  h5py.File(file_name) as hf: 
        X, Y = hf['imgs'][:], hf['labels'][:]
    print("Loaded images from {}".format(file_name))
    
except (IOError,OSError, KeyError):  
    print("Error in reading {}. Processing all images...".format(file_name))
    root_dir = '../../Datasets/Traffic_signs/GTSRB/Final_Training/Images' # changed to your needs
    imgs = []
    labels = []

    # Read all image paths with extension ppm
    all_img_paths = sorted(glob.glob(os.path.join(root_dir, '*/*.ppm')) )
    np.random.seed(42)
    np.random.shuffle(all_img_paths)
    
    # Read images and preprocess them
    print("[INFO] loading images...")
    for img_path in all_img_paths:
        try:
            img = preprocess_img(io.imread(img_path))            
            label = get_class(img_path)            
            
            # Save images and labels in lists
            imgs.append(img)
            labels.append(label)

            if len(imgs)%1000 == 0: print("Processed {}/{}".format(len(imgs), len(all_img_paths)))
        except (IOError, OSError):
            print('missed', img_path)
            pass

    X = np.array(imgs, dtype='float32')
    Y = np.eye(NUM_CLASSES, dtype='uint8')[labels]
    # Save the training dictionary
    with h5py.File(file_name,'w') as hf:
        hf.create_dataset('imgs', data=X)
        hf.create_dataset('labels', data=Y)




Loaded images from X_train_german.h5


In [4]:
#--------------- Read test images-------------------------------
file_name = 'X_test_german.h5' # File name for saving your testing images

try:
    with  h5py.File(file_name) as hf: 
        X_test, y_test = hf['imgs'][:], hf['labels'][:]
    print("Loaded images from {}".format(file_name))
    
except (IOError,OSError, KeyError):  
    print("Error in reading {}. Processing all images...".format(file_name))
    root_dir = '../../Datasets/Traffic_signs/GTSRB/Final_Test/Images2'
    X_test = []
    y_test = []

    # Read all image paths with extension ppm
    all_img_paths = sorted(glob.glob(os.path.join(root_dir, '*/*.ppm')) )
    np.random.seed(42)
    np.random.shuffle(all_img_paths)

    # Read images and preprocess them    
    for img_path in all_img_paths:
        try:
            img = preprocess_img(io.imread(img_path))            
            label = get_class(img_path)
            
            # Save images and labels in lists
            X_test.append(img)
            y_test.append(label)

            if len(X_test)%1000 == 0: print("Processed {}/{}".format(len(X_test), len(all_img_paths)))
        except (IOError, OSError):
            print('missed', img_path)
            pass

    X_test = np.array(X_test, dtype='float32')
    y_test = np.array(y_test, dtype='uint8')
    # Save testing dictionary
    with h5py.File(file_name,'w') as hf:
        hf.create_dataset('imgs', data=X_test)
        hf.create_dataset('labels', data=y_test)


Loaded images from X_test_german.h5


In [5]:
# ---------------------- Split dataset into train and validation -----------------------------
# partition the data into training and validation sets using 90% - 10% ratio
split = train_test_split(X, Y, test_size=0.1, random_state=42)
(X_train, X_val, y_train, y_val) = split

In [6]:
# --------------------- Normalize the data ----------------------------------
normalize = 0
# Subtract the mean image
if normalize:#
    mean_image = np.mean(X_train, axis=0)
    X_train -= mean_image
    X_test -= mean_image
    X_val -= mean_image

print('Train data shape: ', X_train.shape)
print('Train classes labels shape: ', y_train.shape)
print('Validation data shape: ', X_val.shape)
print('Validation classes labels shape: ', y_val.shape)
print('Test data shape: ', X_test.shape)
print('Test classes labels shape: ', y_test.shape)

Train data shape:  (35288, 48, 48, 3)
Train classes labels shape:  (35288, 43)
Validation data shape:  (3921, 48, 48, 3)
Validation classes labels shape:  (3921, 43)
Test data shape:  (12630, 48, 48, 3)
Test classes labels shape:  (12630,)


## CNN 8-layers model

In [7]:
# ----------------- Initialization of weights -------------------------------
from keras import backend as K
import numpy as np

def my_init(shape, name=None):
    value = np.random.random(shape)
    return K.variable(value, name=name)

In [8]:
# --------------- Define Keras model ---------------------------------- 
from model.TS_Net_proposal import TrafficSignsNet

model = TrafficSignsNet.build(48, 48,
    numClasses=43,
    finalAct="softmax")

model.summary() # Print model information

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         (None, 48, 48, 3)         0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 48, 48, 32)        896       
_________________________________________________________________
batch_normalization_1 (Batch (None, 48, 48, 32)        128       
_________________________________________________________________
activation_1 (Activation)    (None, 48, 48, 32)        0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 48, 48, 32)        9248      
_________________________________________________________________
batch_normalization_2 (Batch (None, 48, 48, 32)        128       
_________________________________________________________________
activation_2 (Activation)    (None, 48, 48, 32)        0         
__________

In [9]:
#--------------------- Initialize model ------------------------
EPOCHS = 40
INIT_LR = 1e-3
BS = 128

opt = Adam(lr=INIT_LR, decay=INIT_LR / EPOCHS)

model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])

def lr_schedule(epoch):
    return lr * (0.1 ** init(epoch/10))

In [10]:
#---------------------- Train model -----------------------
t1=time()
history = model.fit(X_train, y_train, batch_size=BS, 
                            nb_epoch=EPOCHS,
                            validation_data=(X_val, y_val),
                            shuffle = True
         )
t2 = time()

print ('Time taken to train the model', t2-t1)

  


Train on 35288 samples, validate on 3921 samples
Epoch 1/40
Epoch 2/40
Epoch 3/40
Epoch 4/40
Epoch 5/40
Epoch 6/40
Epoch 7/40
Epoch 8/40
Epoch 9/40
Epoch 10/40
Epoch 11/40
Epoch 12/40
Epoch 13/40
Epoch 14/40
Epoch 15/40
Epoch 16/40
Epoch 17/40
Epoch 18/40
Epoch 19/40
Epoch 20/40
Epoch 21/40
Epoch 22/40
Epoch 23/40
Epoch 24/40
Epoch 25/40
Epoch 26/40
Epoch 27/40
Epoch 28/40
Epoch 29/40
Epoch 30/40
Epoch 31/40
Epoch 32/40
Epoch 33/40
Epoch 34/40
Epoch 35/40
Epoch 36/40
Epoch 37/40
Epoch 38/40
Epoch 39/40
Epoch 40/40
Time taken to train the model 941.1104226112366


## Test on the Test Dataset

In [8]:
t1=time()
y_proba = model.predict(X_test)
t2=time()
print  ('Took {} sec to predict {} images'.format(t2-t1, len(X_test)) )

pred_labels = []
pred_probas = []
for i in range(0,len(y_proba)):
    classId = y_proba[i].argmax()
    pred_labels.append(classId)
    proba = max(y_proba[i])
    pred_probas.append(proba)

pred_labels = np.array(pred_labels, dtype='uint8')
pred_probas = np.array(pred_probas)

acc = np.mean(pred_labels==y_test)
print("Test accuracy = {}".format(acc*100))

Took 2.7618215084075928 sec to predict 12630 images
Test accuracy = 98.5193982581156


In [60]:
model.save('GTSRB_weights/german_model_98.52.h5')

## Train with data augmentation

In [13]:
datagen = ImageDataGenerator(featurewise_center=False, 
                            featurewise_std_normalization=False, 
                            width_shift_range=0.1,
                            height_shift_range=0.1,
                            zoom_range=0.2,
                            shear_range=0.1,
                            rotation_range=10.,)
datagen.fit(X_train)


In [14]:
nb_epoch = 50
history2=model.fit_generator(datagen.flow(X_train, y_train, batch_size=BS),
                            samples_per_epoch=X_train.shape[0],
                            nb_epoch=nb_epoch,
                            validation_data=(X_val, y_val),
                            callbacks=[ReduceLROnPlateau('val_loss', factor=0.2, patience=20, verbose=1, mode='auto'), 
                                       ModelCheckpoint('GTSRB_weights/german_aug_test.h5',save_best_only=True)]
                           )


Epoch 1/50
  2/275 [..............................] - ETA: 26s - loss: 0.5981 - acc: 0.8789

  import sys
  import sys


Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


In [20]:
# Evaluate the model
t1=time()
y_proba = model.predict(X_test)
t2=time()
print  ('Took {} sec to predict {} images'.format(t2-t1, len(X_test)) )

pred_labels = []
pred_probas = []
for i in range(0,len(y_proba)):
    classId = y_proba[i].argmax()
    pred_labels.append(classId)
    proba = max(y_proba[i])
    pred_probas.append(proba)

pred_labels = np.array(pred_labels, dtype='uint8')
pred_probas = np.array(pred_probas)

acc = np.mean(pred_labels==y_test)
print("Test accuracy = {}".format(acc*100))

Took 2.2971127033233643 sec to predict 12630 images
Test accuracy = 99.36658749010293


In [69]:
model.save('GTSRB/german_aug-99.37.h5')