In [1]:
import tensorflow as tf
import os
import math
import numpy as np
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
import pickle
import tqdm

%matplotlib inline

In [2]:
with open('traffic.pickle', 'rb') as pfile:
    data = pickle.load(pfile)
    features = data['features']
    labels = data['labels']
    del data
print('Data loaded')

Data loaded


In [3]:
x_train, x_valid, y_train, y_valid = train_test_split(features, labels, test_size=0.1, random_state=0)
x_train, x_test, y_train, y_test = train_test_split(x_train, y_train, test_size=0.2, random_state=0)

In [4]:
n_labels = len(np.unique(labels))

In [5]:
def lenet(x):
    # input: 32x32x3, output: 14x14x6
    conv1_w = tf.Variable(tf.truncated_normal((5, 5, 3, 6), stddev=0.1))
    conv1_b = tf.Variable(tf.zeros(6))
    conv1 = tf.nn.conv2d(x, conv1_w, strides=[1, 1, 1, 1], padding='VALID') + conv1_b
    conv1 = tf.nn.relu(conv1)
    conv1 = tf.nn.max_pool(conv1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID')
    
    # input 14x14x6, output: 5x5x16
    conv2_w = tf.Variable(tf.truncated_normal((5, 5, 6, 16), stddev=0.1))
    conv2_b = tf.Variable(tf.zeros(16))
    conv2 = tf.nn.conv2d(conv1, conv2_w, strides=[1, 1, 1, 1], padding='VALID') + conv2_b
    conv2 = tf.nn.relu(conv2)
    conv2 = tf.nn.max_pool(conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID')
    
    # fc 400
    fc0 = tf.contrib.layers.flatten(conv2)
    
    # input:400, output: 120
    fc1_w = tf.Variable(tf.truncated_normal((400, 120), stddev=0.1))
    fc1_b = tf.Variable(tf.zeros(120))
    fc1 = tf.nn.relu(tf.matmul(fc0, fc1_w) + fc1_b)
    fc1 = tf.nn.dropout(fc1, 0.5)
    
    # input:120, output: 84
    fc2_w = tf.Variable(tf.truncated_normal((120, 84), stddev=0.1))
    fc2_b = tf.Variable(tf.zeros(84))
    fc2 = tf.nn.relu(tf.matmul(fc1, fc2_w) + fc2_b)
    fc2 = tf.nn.dropout(fc2, 0.5)
    
    # input: 84, output: n_labels, 43
    fc3_w = tf.Variable(tf.truncated_normal((84, n_labels), stddev=0.1))
    fc3_b = tf.Variable(tf.zeros(n_labels))
    logits = tf.matmul(fc2, fc3_w) + fc3_b
    
    return logits

In [6]:
x = tf.placeholder(tf.float32, (None, 32, 32, 3))
y = tf.placeholder(tf.int32, (None))
y_hot = tf.one_hot(y, n_labels)

logits = lenet(x)
cross_entropy = tf.nn.softmax_cross_entropy_with_logits(labels=y_hot, logits=logits)
loss = tf.reduce_mean(cross_entropy)

correct_prediction = tf.equal(tf.argmax(logits, 1), tf.argmax(y_hot, 1))
accuracy_operation = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

saver = tf.train.Saver()

def evaluate(X_data, y_data):
    batch_size = 1000
    num_examples = len(X_data)
    total_accuracy = 0
    sess = tf.get_default_session()
    for offset in range(0, num_examples, batch_size):
        batch_x, batch_y = X_data[offset:offset+batch_size], y_data[offset:offset+batch_size]
        accuracy = sess.run(accuracy_operation, feed_dict={x: batch_x, y: batch_y})
        total_accuracy += (accuracy * len(batch_x))
    return total_accuracy / num_examples

def train_nn(epochs, batch_size, learning_rate):    
    
    validation_accuracy = []

    optimizer = tf.train.AdamOptimizer(learning_rate).minimize(loss)

    with tf.Session() as sess:
        sess.run(tf.global_variables_initializer())
        num_examples = len(x_train)
        
        for i in range(epochs):
            x_train0, y_train0 = shuffle(x_train, y_train)
            for offset in range(0, num_examples, batch_size):
                end = offset + batch_size
                batch_x, batch_y = x_train0[offset:end], y_train0[offset:end]
                sess.run(optimizer, feed_dict={x: batch_x, y: batch_y})

            acc = evaluate(x_valid, y_valid)
#             print('Validation accuracy in epoch {:>}/{}: {:>.5f}'.format(i, epochs, acc))
            validation_accuracy.append(acc)
            
    return validation_accuracy

In [None]:
def test_nn():
    with tf.Session() as sess:
        saver.restore(sess, tf.train.latest_checkpoint('.'))
        test_accuracy = evaluate(x_test, y_test)

    return test_accuracy

In [None]:
epochs = [50, 100, 150]
batch_sizes = [128, 256, 512, 1024, 2048]
learning_rates = [0.001, 0.002, 0.004, 0.008, 0.02]
validation_accuracy = {}
for epoch in epochs:
    for batch_size in batch_sizes:
        for learning_rate in learning_rates:
            validation_accuracy[epoch, batch_size, learning_rate] = train_nn(epoch, batch_size, learning_rate)
            print('epochs = {}, batch_size = {}, learning_rate = {}: accuracy = {:>.4f}'.\
                  format(epoch, batch_size, learning_rate, validation_accuracy[epoch, batch_size, learning_rate][-1]))

epochs = 50, batch_size = 128, learning_rate = 0.001: accuracy = 0.8370
epochs = 50, batch_size = 128, learning_rate = 0.002: accuracy = 0.8564
epochs = 50, batch_size = 128, learning_rate = 0.004: accuracy = 0.4685


In [None]:
# test_accuracy = test_nn()
# print(test_accuracy)