In [None]:
# Load pickled data
import pickle
def load_data_X():
    training_file = 'data/train.p'
    validation_file= 'data/valid.p'
    testing_file = 'data/test.p'

    with open(training_file, mode='rb') as f:
        train = pickle.load(f)
    with open(validation_file, mode='rb') as f:
        valid = pickle.load(f)
    with open(testing_file, mode='rb') as f:
        test = pickle.load(f)

    X_train = train['features']
    X_valid = valid['features']
    X_test = test['features']
    print("Training data: ", X_train.shape)
    print('Validation data: ',X_valid.shape)
    print("Test data: ", X_test.shape)
    return X_train, X_valid, X_test

def load_data_y():
    training_file = 'data/train.p'
    validation_file= 'data/valid.p'
    testing_file = 'data/test.p'
    with open(training_file, mode='rb') as f:
        train = pickle.load(f)
    with open(validation_file, mode='rb') as f:
        valid = pickle.load(f)
    with open(testing_file, mode='rb') as f:
        test = pickle.load(f)
    y_train = train['labels']
    y_valid = valid['labels']
    y_test = test['labels']
    print("Training labels: ",y_train.shape)
    print('Validation labels: ',y_valid.shape)
    print("Test labels: ", y_test.shape)
    return y_train, y_valid, y_test

X_train, X_valid, X_test = load_data_X()
y_train, y_valid, y_test = load_data_y()

In [None]:
n_train = len(y_train)
n_validation = len(y_valid)
n_test = len(y_test)
image_shape = X_train[0].shape
n_classes = max(y_train) + 1

print("Number of training examples =", n_train)
print("Number of testing examples =", n_test)
print("Image data shape =", image_shape)
print("Number of classes =", n_classes)

In [None]:
from utils import *
import matplotlib.pyplot as plt
%matplotlib inline
import random

def random_show(X, y, n):
    idxs = []
    pics = []
    titles = []
    for i in range(n):
        idx = random.randint(0, len(X))
        idxs.append(idx)
        pics.append(X[idx])
        titles.append('idx:'+str(idx)+',label:'+str(y[idx]))
    draw_pics(n,pics,titles=titles)

random_show(X_train, y_train, 6)
random_show(X_train, y_train, 6)
random_show(X_train, y_train, 6)

In [None]:
import numpy as np
def explore_data(y_train, n_class):
    counts = []
    for i in range(n_class):
        counts.append(np.sum(y_train == i))
    plt.bar(range(43),counts)
    
explore_data(y_train, 43)

In [None]:
# preprocessing
import numpy as np
import cv2
import pdb
from utils import *

def blur_img(img, kernel=(5,5), sigmax = 1):
    imcopy = cv2.GaussianBlur(img, kernel, sigmax)
    return imcopy

def get_gray(img):  
    imcopy = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
    return imcopy

def get_hls(img):
    imcopy = cv2.cvtColor(img, cv2.COLOR_RGB2HLS)
    return imcopy

def get_rgb(img):
    imcopy = cv2.cvtColor(img, cv2.COLOR_HLS2RGB)
    return imcopy

def change_contrast(img, rate):  # [0 , 255]
    imcopy = np.copy(img)
    imcopy = (imcopy.astype(np.float32)-128) * rate + 128
    imcopy = np.maximum(imcopy, 0)
    imcopy = np.minimum(imcopy, 255)
    return imcopy.astype(np.uint8)

def strech_img(img):  # img must be gray 
    imcopy = cv2.equalizeHist(img)
    return imcopy

def crop_img(img, crop=2):
    imcopy = img[crop:img.shape[0]-crop, crop:img.shape[1]-crop,:]
    return imcopy

# test preprocessing
idx=random.randint(0, len(X_train))
print('idx: ', idx)
raw = X_train[idx]
raw = crop_img(raw)
print('shape: ', raw.shape)
# blurred = blur_img(raw, kernel=(3,3), sigmax=0)
# gray = get_gray(raw)
hls = get_hls(raw)
streched = strech_img(hls[:,:,1])  # only cha
hls[:, :, 1] = streched
rgb = get_rgb(hls)

pics = [raw, streched, rgb]
draw_pics(3, pics, cmap=[None, 'gray', None], titles=['raw', 'gray(streched)', 'color(streched)'])

In [None]:
# preprocessing img

def preprocess_img(img):
    hls = get_hls(img)
    hls[:,:,1] = strech_img(hls[:,:,1])
    rgb = get_rgb(hls)
    return rgb
#     imcopy = get_gray(img)
#     return imcopy.reshape((img.shape[0], img.shape[1],1))

def norm_img(img):
    imcopy = (img.astype(np.float32) - 128.0)/ 128.0
    return imcopy

def preprocess_img_all(imgs, norm=True):
    outimgs=[]
    for img in imgs:
        imcopy = crop_img(img)  # crop img
        imcopy = preprocess_img(imcopy)  # preprocess
        if norm:
            imcopy = norm_img(imcopy)  # normalize
        outimgs.append(imcopy)
    outimgs = np.array(outimgs)
    print('preprocessed, shape: ', outimgs.shape)
    return outimgs

In [None]:
# data augmentation

def add_light(img, light):  # [0, 255]
    imcopy=np.copy(img)
    imcopy = imcopy.astype(np.float32) + light
    imcopy = np.maximum(imcopy, 0)
    imcopy = np.minimum(imcopy, 255)
    return imcopy.astype(np.uint8)

def change_rot(img, ang=5):
    rot = imutils.rotate(img, 2*ang*np.random.uniform()-ang)
    return rot

def change_light(img, light=30, dir=0 ):
    if dir == 0:  # bi direction
        light = light*2*np.random.uniform() - light
    elif dir == 1:  # only +
        light = light*np.random.uniform()
    elif dir == -1:  # only -
        light = -light*np.random.uniform()
    imcopy = add_light(img, light)
    return imcopy

def change_shear(img, shear=5):
    rows,cols,ch = img.shape
    pts1 = np.float32([[5,5],[20,5],[5,20]])
    pt1 = 5+shear*2*np.random.uniform()-shear
    pt2 = 20+shear*2*np.random.uniform()-shear
    pts2 = np.float32([[pt1,5],[pt2,pt1],[5,pt2]])
    shear_M = cv2.getAffineTransform(pts1,pts2)
    imcopy = cv2.warpAffine(img,shear_M,(cols,rows))
    return imcopy

def change_img(img, n=30, ang=10, light=30, shear=5):
    imgs=[np.copy(img)]
    for i in range(n-1):
        imcopy = change_rot(img, ang=ang)
        l_img = np.mean(img)
        if l_img <= 40.0:
            imcopy = change_light(imcopy, light=light, dir=1)
        if l_img >=215:
            imcopy = change_light(imcopy, light=light, dir=-1)
        else:
            imcopy = change_light(imcopy, light=light, dir=0)
        imcopy = change_shear(imcopy, shear=shear)
        imgs.append(imcopy)
    return imgs

import imutils 
idx=random.randint(0, len(X_train))
# dark: 6641:10
# idx = 6641
print('idx: ',idx)
raw = X_train[idx]
imgs = change_img(raw, ang=10, light=40, shear=2)
draw_pics(10, imgs[0:10],titles=['raw']*10)
draw_pics(10, imgs[10:20],titles=['raw']*10)
draw_pics(10, imgs[20:30],titles=['raw']*10)

imgs = preprocess_img_all(imgs, norm=False)
draw_pics(10, imgs[0:10], titles=['processed']*10)
draw_pics(10, imgs[10:20],titles=['processed']*10)
draw_pics(10, imgs[20:30],titles=['processed']*10)

In [None]:
# data augmentation 
def aug_data(X_train, y_train, n=10, uplim=3000):
    X_aug = []
    y_aug = []
    counts = [0] * 43
    for x ,y  in zip(X_train, y_train):
        diff = -(np.sum(y_train == y) + counts[y]) + uplim
        if diff > 0:
            if diff > uplim//3:
                nn = n
            else:
                nn = n//2
            xs = change_img(x, n=nn, ang=10, light=40, shear=2)
            X_aug.append(xs)
            ys = np.repeat(y , nn)
            y_aug.append(ys)
            counts[y] += nn - 1
        else:
            X_aug.append(np.array([x]))
            y_aug.append(np.array([y]))
            counts[y] += 1
    X_aug = np.concatenate(X_aug)
    y_aug = np.concatenate(y_aug)
    print('X_aug: ', len(X_aug))
    print('y_aug: ', len(y_aug))
    return X_aug, y_aug

def aug_shuffle_data(X_train, y_train, n=5, uplim=3000, n_loop=10):
    for i in range(n_loop):
        X_train, y_train = shuffle(X_train,y_train)
        X_train, y_train = aug_data(X_train, y_train, n=n, uplim=uplim)
        print(i, 'n_loop')
    X_train, y_train = shuffle(X_train,y_train)
    return X_train, y_train

In [None]:
X_train, X_valid, X_test = load_data_X()
y_train, y_valid, y_test = load_data_y()

from sklearn.utils import shuffle
X_train, y_train = shuffle(X_train,y_train)

X_train, y_train = aug_shuffle_data(X_train, y_train, n=4, uplim=2000, n_loop=3)

X_train, y_train = shuffle(X_train,y_train)

random_show(X_train, y_train, 6)
random_show(X_train, y_train, 6)
random_show(X_train, y_train, 6)
random_show(X_train, y_train, 6)
random_show(X_train, y_train, 6)

In [None]:
X_train = preprocess_img_all(X_train)
X_valid = preprocess_img_all(X_valid)
X_test = preprocess_img_all(X_test)
explore_data(y_train, 43)

In [None]:
print(np.max(X_train), np.min(X_train), len(X_train))
print(np.max(X_valid), np.min(X_valid), len(X_valid))
print(np.max(X_test), np.min(X_test), len(X_test))
print(np.max(y_train), np.min(y_train), len(y_train))
print(np.max(y_valid), np.min(y_valid), len(y_valid))
print(np.max(y_test), np.min(y_test), len(y_test))

In [None]:
import tensorflow as tf
def add_layer_conv(x_in, n_f, depth_in, size_f=5, mu=0, sigma=0.1, activation='relu', stride=1, padding='VALID', 
                   val_names=['conv_W', 'conv_b']):
    # weights tensor
    conv_W = tf.Variable(tf.truncated_normal(shape=(size_f, size_f, depth_in, n_f), mean = mu, stddev = sigma), 
                         name=val_names[0])
    # bias vector
    conv_b = tf.Variable(tf.zeros(n_f), name=val_names[1])
    
    print(conv_W)
    print(conv_b)
    
    # conv net
    conv = tf.nn.conv2d(x_in, conv_W, strides=[1, stride, stride, 1], padding=padding) + conv_b
    # activation 
    if activation == 'relu':
        conv = tf.nn.relu(conv)
    else:
        print('No RELU function.')
    print('One conv layer generated.')
    return conv

def add_layer_pool(x_in, stride=2, size_k=2, padding='VALID'):
    pool = tf.nn.max_pool(x_in, ksize=[1, size_k, size_k, 1], strides=[1, stride, stride, 1], padding=padding)
    print('One max pooling layer generated.')
    return pool

def add_layer_fc(x_in, shape, mu=0, sigma=0.1, activation='relu', val_names=['fc_W', 'fc_b'], 
                 out_name='fc_out'):   # shape=(n_in, n_out) tuple
    # weights matrix
    fc_W = tf.Variable(tf.truncated_normal(shape=shape, mean = mu, stddev = sigma), name=val_names[0])
    # bias vector
    fc_b = tf.Variable(tf.zeros(shape[1]), name= val_names[1])
    
    print(fc_W)
    print(fc_b)
    
    # fully connected net
    if activation is None:
        fc = tf.add(tf.matmul(x_in, fc_W), fc_b, name=out_name)
        print(fc)
    else:
        fc = tf.add(tf.matmul(x_in, fc_W), fc_b)
    # activation
    if activation == 'relu':
        fc = tf.nn.relu(fc)
    else:
        print('No RELU function.')
    print('One fully connected layer generated.')
    return fc

def add_dropout(x_in, keep_prob):
    out = tf.nn.dropout(x_in, keep_prob)
    return out

In [None]:
from tensorflow.contrib.layers import flatten


def gen_neural_net(x_in, keep_prob):  # input (28, 28, 3)
    conv1 = add_layer_conv(x_in, n_f=16, depth_in=3, val_names=['cW1', 'cb1'])  # out(24,24,16)
    pool1 = add_layer_pool(conv1)  # out (12,12,16)
    
    conv2 = add_layer_conv(pool1, n_f=32, depth_in=16, val_names=['cW2', 'cb2'])  # out(8,8,32)
    pool2 = add_layer_pool(conv2)  # out (4,4,32)
    
    conv3 = add_layer_conv(pool2, n_f=64, depth_in=32, padding='SAME', val_names=['cW3', 'cb3'])  # (4, 4, 64)
    pool3 = add_layer_pool(conv3)  # (2, 2, 64)
    
    fc0 = flatten(pool3)  # 256
    fc0 = add_dropout(fc0, keep_prob)
    fc1 = add_layer_fc(fc0, shape=(256, 120), val_names=['fW1', 'fb1'])
    fc1 = add_dropout(fc1, keep_prob)
    fc2 = add_layer_fc(fc1, shape=(120, 84), val_names=['fW2', 'fb2'])
    fc2 = add_dropout(fc2, keep_prob)
    fc3 = add_layer_fc(fc2, shape=(84, 43), activation=None, val_names=['fW3', 'fb3'], out_name='logits')
    return fc3

In [None]:
# create the network
tf.reset_default_graph()
rate = 0.001

X = tf.placeholder(tf.float32, (None, 28, 28, 3), name='X')
y = tf.placeholder(tf.int32, (None), name='y')
one_hot_y = tf.one_hot(y, 43)
keep_prob = tf.placeholder(tf.float32, name='k_prob')

logits = gen_neural_net(X, keep_prob)  # for logits feed_dict: X, keep_prob
cross_entropy = tf.nn.softmax_cross_entropy_with_logits(labels=one_hot_y, logits=logits)  # logits -> softmax -> cross_entropy
loss_operation = tf.reduce_mean(cross_entropy)  # loss
optimizer = tf.train.AdamOptimizer(learning_rate = rate)  # create Optimizer
training_operation = optimizer.minimize(loss_operation)  # set optimizer to minimize a loss

correct_prediction = tf.equal(tf.argmax(logits, 1), tf.argmax(one_hot_y, 1))  
accuracy_operation = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

saver = tf.train.Saver()

tf.add_to_collection('inputs', X)
tf.add_to_collection('inputs', y)
tf.add_to_collection('inputs', keep_prob)
tf.add_to_collection('logits', logits)
tf.add_to_collection('train_op', training_operation)  # train_op feed_dict: X,y,keep_prob(0.5)
tf.add_to_collection('acc_op', accuracy_operation)  # acc_op feed_dict: X, y, keep_prob(1.0)

In [None]:
def evaluate(X_data, y_data, accuracy_operation, BATCH_SIZE=128):
    num_examples = len(X_data)
    total_accuracy = 0.0
    sess = tf.get_default_session()
    for offset in range(0, num_examples, BATCH_SIZE):
        batch_x, batch_y = X_data[offset:offset+BATCH_SIZE], y_data[offset:offset+BATCH_SIZE]
        # session run accuracy operation
        accuracy = sess.run(accuracy_operation, feed_dict={X: batch_x, y: batch_y, keep_prob:1.0})
        total_accuracy += (accuracy * len(batch_x))
    return total_accuracy / num_examples

acc_train=[]
acc_valid=[]

In [None]:
import time
from sklearn.utils import shuffle

def train_model(sess, X_train, y_train, train_op, acc_op, EPOCHS=10, keep=0.5, BATCH_SIZE = 128):
    num_examples = len(X_train)
    print("Training...")
    print()
    for i in range(EPOCHS):
        t1 = time.time()
        X_train, y_train = shuffle(X_train, y_train)
        for offset in range(0, num_examples, BATCH_SIZE):
            end = offset + BATCH_SIZE
            batch_x, batch_y = X_train[offset:end], y_train[offset:end]
            # session run training operation
            sess.run(train_op, feed_dict={X: batch_x, y: batch_y, keep_prob:keep})
        train_acc = evaluate(X_train, y_train, acc_op)  # accuracy_operation
        validation_accuracy = evaluate(X_valid, y_valid, acc_op) # accuracy_operation
        t2 = time.time()
        acc_train.append(train_acc)
        acc_valid.append(validation_accuracy)
        print("EPOCH {} ".format(i+1)+"; Training Accuracy:  {:.3f}".format(train_acc)+
              "; Validation Accuracy: {:.3f}".format(validation_accuracy)+"; Used time: {:.3f} s".format(t2-t1))
        print()

    saver.save(sess, './mynet')
    print("Model saved")

In [None]:
X_train, y_train = shuffle(X_train, y_train)

# training
with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    train_model(sess, X_train, y_train, training_operation, accuracy_operation)

In [None]:
def plot_save_accs(acc_train, acc_valid):
    plt.plot(acc_train,'b',label ='Training Accuracy')
    plt.plot(acc_valid,'g',label ='Validation Accuracy')
    plt.xlabel('EPOCHS')
    plt.ylabel('Accuracy')
    plt.ylim([0.5, 1])
    plt.legend(loc='best')
    with open('accs.p', 'wb') as f:
        pickle.dump((acc_train, acc_valid), f)
        print('accs saved')

plot_save_accs(acc_train, acc_valid)

In [None]:
# training the model from saved file
tf.reset_default_graph()
def get_accs():
    with open('accs.p', 'rb') as f:
        acc_train, acc_valid = pickle.load(f)
        print('accs restored')
    return acc_train, acc_valid
acc_train, acc_valid = get_accs()

with tf.Session() as sess:
    saver = tf.train.import_meta_graph('./mynet.meta')
    saver.restore(sess, tf.train.latest_checkpoint('.'))
    X, y, keep_prob = tf.get_collection('inputs')
    train_op = tf.get_collection('train_op')[0]
    acc_op = tf.get_collection('acc_op')[0]
    train_model(sess, X_train, y_train, train_op, acc_op)

plot_save_accs(acc_train, acc_valid)

In [None]:
X_train, X_valid, X_test = load_data_X()
y_train, y_valid, y_test = load_data_y()
random_show(X_test, y_test, 6)
random_show(X_test, y_test, 6)
random_show(X_test, y_test, 6)
print('number of data to test: ', len(X_test))
print('number of data to test: ', len(y_test))

In [None]:
X_test = preprocess_img_all(X_test)
explore_data(y_test, 43)

In [None]:
import tensorflow as tf
from copy import deepcopy

def predict(logits, X_test):
    sess = tf.get_default_session()
    return sess.run(tf.argmax(logits, 1), feed_dict={X:X_test, keep_prob:1.0})

tf.reset_default_graph()

with tf.Session() as sess:
    saver = tf.train.import_meta_graph('./mynet.meta')
    saver.restore(sess, tf.train.latest_checkpoint('.'))
    X, y, keep_prob = tf.get_collection('inputs')
    logits = tf.get_collection('logits')[0]
    train_op = tf.get_collection('train_op')[0]
    acc_op = tf.get_collection('acc_op')[0]
    print('Test accuracy (all test images): ', evaluate(X_test, y_test, acc_op))
    pred_res = predict(logits, X_test[0:20])
    true_res = deepcopy(y_test[0:20])
    print(pred_res, ' -> predicted labels')
    print(true_res, ' -> true labels')

In [None]:
print('For these images: {} % accuracy.'.format(np.sum(pred_res == true_res) / len(pred_res)*100))

In [None]:
import pandas as pd
signnames = pd.read_csv('signnames.csv')
signnames.head(10)

In [None]:
from utils import *
import glob
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import cv2
tstimgs_adrs = glob.glob('./tst_imgs/*.jpg')
rawpics =[]
for i in range(len(tstimgs_adrs)):
    tst = mpimg.imread(tstimgs_adrs[i])
    tst = cv2.resize(tst, (256,256))
    rawpics.append(tst)
    
draw_pics(8, rawpics)

In [None]:
# preprocessing
tstpics=[]
for i in range(len(rawpics)):
    tstpics.append(cv2.resize(rawpics[i], (32,32)))
draw_pics(8,tstpics)
tstpics = preprocess_img_all(tstpics)

In [None]:
# prediction
tf.reset_default_graph()

with tf.Session() as sess:
    saver = tf.train.import_meta_graph('./mynet.meta')
    saver.restore(sess, tf.train.latest_checkpoint('.'))
    X, y, keep_prob = tf.get_collection('inputs')
    logits = tf.get_collection('logits')[0]
    prob_res = sess.run(tf.nn.top_k(tf.nn.softmax(logits), k=5), feed_dict={X: tstpics, keep_prob:1.0})
    print(prob_res)
    
def visualize(pics, prob, pred):
    n_examples = len(pics)
    fig, ax = plt.subplots(n_examples, 2, figsize=(15,15))
    
    for i in range(n_examples) :
        labels = pred[i]
        names = [signnames.iloc[l]['SignName'] for l in labels]
        bars = np.arange(5)[::-1]
        ax[i,0].imshow(pics[i])
        ax[i,0].axis('off')
        ax[i,1].barh(bars, prob[i])
        ax[i,1].set_yticks(bars)
        ax[i,1].set_yticklabels(names)
        ax[i,1].yaxis.tick_right()
        ax[i,1].set_xlim([0,1])
    fig.tight_layout()
    
prob = prob_res[0]
pred = prob_res[1]
visualize(rawpics, prob, pred)