# Implenting LeNet-5 using tensorflow low-level API

## Importing modules

In [None]:
import numpy as np;
import numpy.random as npr;
import _pickle as cPickle
import tensorflow as tf
# import tensorflow.compat.v1 as tf
# tf.disable_v2_behavior()

## Configuration

In [None]:
EPOCHS = 20
BATCH_SIZE = 64
LR = 0.01
imagesAsArray = np.load("imagesAsArray.npy")
labels = np.load("labelsVector.npy")
numberOfTrainingImages = 12000
numberOfTestingImages = 3000
numberOfLanes = 4
imgWidth = 256
imgHeight = 144
neuronsInImage = imgWidth * imgHeight

print(imagesAsArray.shape)
print(labels.shape)

if (numberOfTrainingImages + numberOfTestingImages >= imagesAsArray.shape[0]):
  print('Changed number of trainig and testing data')
  numberOfTrainingImages = 1000
  numberOfTestingImages = 3265

## Shuffling and Splitting training & testing data

In [None]:
rawIndices = np.array(range(0,imagesAsArray.shape[0]))
npr.shuffle(rawIndices)
trainIndice = rawIndices[0:numberOfTrainingImages]
testIndice = rawIndices[numberOfTrainingImages: numberOfTrainingImages + numberOfTestingImages]
traind = imagesAsArray[trainIndice]
trainl = labels[trainIndice]
testd = imagesAsArray[testIndice]
testl = labels[testIndice]

## Logging configuration

In [None]:
print('Class Distribution: {}'.format(trainl.sum(axis=0)))
print('Training Data Dimension: {}'.format(traind.shape))
print('Training Label Dimension: {}'.format(trainl.shape))
print('Testing Data Dimension: {}'.format(testd.shape))
print('Testing Label Dimension: {}'.format(testl.shape))
print('EPOCHS: {}'.format(EPOCHS))
print('BATCH_SIZE: {}'.format(BATCH_SIZE))
print('Optimizer: SGD')
print('Learning Rate: {}'.format(LR))

## Flatten/Reshape Input

In [None]:
traind = traind.reshape(-1, imgHeight, imgWidth,1)
testd = testd.reshape(-1, imgHeight, imgWidth, 1)
print(traind.shape, testd.shape)

## Initialize data tensor in NHWC format

In [None]:
data_placeholder = tf.placeholder(tf.float32,[None,imgHeight, imgWidth,1])
label_placeholder = tf.placeholder(tf.float32,[None,numberOfLanes])
print(data_placeholder)

## Feed Dict

In [None]:
# trainFd = {data_placeholder: traind, label_placeholder: trainl }
# testFd = {data_placeholder: testd, label_placeholder: testl }

## Hidden Layer 1
#### Convolution Layer with 32 fiters and a kernel size of 5

In [None]:
conv1 = tf.nn.relu(tf.layers.conv2d(data_placeholder,6, 5,name="H1"))
print(conv1)

#### Max Pooling (down-sampling) with strides of 2 and kernel size of 2

In [None]:
a1 = tf.layers.max_pooling2d(conv1, 2, 2)
print(a1)

## Hidden Layer 2
#### Convolution Layer with 64 filters and a kernel size of 3

In [None]:
conv2 = tf.nn.relu(tf.layers.conv2d(a1, 16, 5,name="H2"))

#### Max Pooling (down-sampling) with strides of 2 and kernel size of 2

In [None]:
a2 = tf.layers.max_pooling2d(conv2, 2, 2)
print(a2)
# a2flat = tf.reshape(a2, (-1,4*4*16))
a2flat = tf.reshape(a2, (-1,33*61*16))
print(a2flat)

## Hidden Layer 3

In [None]:
Z3 = 120
# allocate variables
# W3 = tf.Variable(npr.uniform(-0.01,0.01, [4*4*16,Z3]),dtype=tf.float32, name ="W3")
W3 = tf.Variable(npr.uniform(-0.01,0.01, [33*61*16,Z3]),dtype=tf.float32, name ="W3")
b3 = tf.Variable(npr.uniform(-0.01,0.01, [1,Z3]),dtype=tf.float32, name ="b3")
# compute activations
a3 = tf.nn.relu(tf.matmul(a2flat, W3) + b3)
print(a3)

## Hidden Layer 4

In [None]:
Z4 = 84
# allocate variables
W4 = tf.Variable(npr.uniform(-0.01,0.01, [Z3,Z4]),dtype=tf.float32, name ="W4")
b4 = tf.Variable(npr.uniform(-0.01,0.01, [1,Z4]),dtype=tf.float32, name ="b4")
# compute activations
a4 = tf.nn.relu(tf.matmul(a3, W4) + b4)
print(a4)

## Output layer

In [None]:
# alloc variables
Z5 = numberOfLanes
W5 = tf.Variable(npr.uniform(-0.1,0.1, [Z4,Z5]),dtype=tf.float32, name ="W5")
b5 = tf.Variable(npr.uniform(-0.01,0.01, [1,Z5]),dtype=tf.float32, name ="b5")
# compute activations
logits = tf.matmul(a4, W5) + b5
print(logits)

## Loss and Accuracy functions

In [None]:
lossBySample = tf.nn.softmax_cross_entropy_with_logits_v2(logits=logits, labels=label_placeholder)
loss = tf.reduce_mean(lossBySample)

nrCorrect = tf.reduce_mean(tf.cast(tf.equal (tf.argmax(logits,axis=1), tf.argmax(label_placeholder,axis=1)), tf.float32))

## Create update optimizer

In [None]:
optimizer = tf.train.GradientDescentOptimizer(learning_rate = LR)
update = optimizer.minimize(loss) ;

## Learn

In [None]:
# Model Evaluation
def evaluate(X_data, y_data):
    num_examples = len(X_data)
    total_accuracy = 0
    sess = tf.get_default_session()
    for offset in range(0, num_examples, BATCH_SIZE):
        batch_x, batch_y = X_data[offset:offset+BATCH_SIZE], y_data[offset:offset+BATCH_SIZE]
        accuracy = sess.run(nrCorrect, feed_dict={data_placeholder: batch_x, label_placeholder: batch_y})
        total_accuracy += (accuracy * len(batch_x))
    return total_accuracy / num_examples

In [None]:
with tf.Session() as sess:
    ## init all variables
    sess.run(tf.global_variables_initializer())
    for i in range(EPOCHS):
        for offset in range(0, trainl.shape[0], BATCH_SIZE):
            end = offset + BATCH_SIZE
            batch_x, batch_y = traind[offset:end], trainl[offset:end]
            batch_test_x, batch_test_y = testd[offset:end], testl[offset:end]
            trainFd = {data_placeholder: batch_x, label_placeholder: batch_y}
            testFd = {data_placeholder: batch_test_x, label_placeholder: batch_test_y}
            #update parameters
            sess.run(update, feed_dict=trainFd)
            correct, lossVal = sess.run([nrCorrect,loss], feed_dict=trainFd)
            #testacc = sess.run(nrCorrect, feed_dict = testFd)
            print('Epoch {}, acc={:.6f}, loss={:.6f}\r'.format(i, float(correct), lossVal), end='')

        print()
        X_validation, y_validation = traind[::-1], trainl[::-1]
        validation_accuracy = evaluate(X_validation, y_validation)
        print("Validation Accuracy = {:.6f}".format(validation_accuracy))
        test_accuracy = evaluate(testd, testl)
        print("Test Accuracy = {:.6f}".format(test_accuracy))
        print()
    
#     test_accuracy = evaluate(testd, testl)
#     print("Test Accuracy = {:.3f}".format(test_accuracy))