In [None]:
import tensorflow as tf
import numpy as np

In [None]:
tf.random.set_seed(1234)
np.random.seed(1234)

## Loading MNIST Data

In [None]:
# Loading Data
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.fashion_mnist.load_data()
# Adding Channel Lenght: Expanding from (28x28) to (28x28x1)
x_train = tf.expand_dims(x_train, -1)
x_test = tf.expand_dims(x_test, -1)
# Creating validation Subset
x_valid = x_train[50000:]  
y_valid = y_train[50000:]  

x_train = x_train[:50000]
y_train = y_train[:50000]


## Creating Dataset Objects

In [None]:
#Training
# ------------------------------
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
# Shuffle
train_dataset = train_dataset.shuffle(buffer_size=x_train.shape[0])

# Normalize images
def normalize_img(x_, y_):
    return tf.cast(x_, tf.float32) / 255., y_

# 1-hot encoding <- for categorical cross entropy
def to_categorical(x_, y_):
    return x_, tf.one_hot(y_, depth=10)

train_dataset = train_dataset.map(to_categorical)
train_dataset = train_dataset.map(normalize_img)

# Divide in batches
bs = 32
train_dataset = train_dataset.batch(bs)
# Repeat
# Without calling the repeat function the dataset 
# will be empty after consuming all the images
train_dataset = train_dataset.repeat()

In [None]:
#Validation   
# -----------------------
valid_dataset = tf.data.Dataset.from_tensor_slices((x_valid, y_valid))
# Normalize images
valid_dataset = valid_dataset.map(normalize_img)
# Enconding
valid_dataset = valid_dataset.map(to_categorical)
# Divide in batches
valid_dataset = valid_dataset.batch(bs)
# Repeat
valid_dataset = valid_dataset.repeat()

In [None]:
#Testing 
# -------------------
test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
test_dataset = test_dataset.map(normalize_img)
test_dataset = test_dataset.map(to_categorical)
test_dataset = test_dataset.batch(1)

## Quantization layer (Bypass during training)

In [None]:
def Quantization_layer(tensor, Quantization = True,signed = True, word_size = 12, frac_size = 6):
    
    factor = 2.0**frac_size
    
    # Quantized max and min values, in case of the need to implement overflow cases.
    #if signed:
    #    Max_Qvalue = ((1 << (word_size-1)) - 1)/factor
    #    Min_Qvalue = -Max_Qvalue - 1
    #else:
    #    Max_Qvalue = ((1 << (word_size)) - 1)/factor
    #    Min_Qvalue = 0
    
    if Quantization:
        return tf.round(tensor*factor) / factor             #Quantization, assuming no overflow
    else:
        return tensor                                       #Simple Bypass 

## Building Lenet5 Model

In [None]:
def build_model(input_layer, Quantization = True, signed = True, word_size = 12, frac_size = 6 ):
    Arguments = {'Quantization':Quantization, 'signed':signed, 'word_size':word_size, 'frac_size':frac_size}
    QInp      = tf.keras.layers.Lambda(Quantization_layer, name="QInp",  arguments = Arguments )(input_layer)
    #Conv Block
    Conv1   = tf.keras.layers.Conv2D(6, kernel_size=5, strides=1, input_shape=(28,28,1), padding='same', name= 'Conv1')(QInp)
    QConv1  = tf.keras.layers.Lambda(Quantization_layer, name="QConv1",  arguments = Arguments )(Conv1)
    Act1    = tf.keras.activations.tanh(QConv1)
    QAct1   = tf.keras.layers.Lambda(Quantization_layer, name="QAct1",   arguments = Arguments )(Act1)
    AvgPool1= tf.keras.layers.AveragePooling2D(name='AvgPool1')(QAct1)
    #Conv Block
    Conv2   = tf.keras.layers.Conv2D(16, kernel_size=5, strides=1, padding='valid',name='Conv2')(AvgPool1)
    QConv2  = tf.keras.layers.Lambda(Quantization_layer, name="QConv2",  arguments = Arguments )(Conv2)
    Act2    = tf.keras.activations.tanh(QConv2)
    QAct2   = tf.keras.layers.Lambda(Quantization_layer, name="QAct2",   arguments = Arguments )(Act2)
    AvgPool2= tf.keras.layers.AveragePooling2D(name='AvgPool2')(QAct2)
    Flatten = tf.keras.layers.Flatten(name='Flatten')(AvgPool2)
    #Dense Block
    Dense1  = tf.keras.layers.Dense(units=120, name='Dense1')(Flatten)
    QDense1 = tf.keras.layers.Lambda(Quantization_layer, name="QDense1", arguments = Arguments )(Dense1)
    Act3    = tf.keras.activations.tanh(QDense1)
    QAct3   = tf.keras.layers.Lambda(Quantization_layer, name="QAct3",   arguments = Arguments )(Act3)
    #Dense Block
    Dense2  = tf.keras.layers.Dense(units=84, name='Dense2')(QAct3)
    QDense2 = tf.keras.layers.Lambda(Quantization_layer, name="QDense2", arguments = Arguments)(Dense2)
    Act4    = tf.keras.activations.tanh(QDense2)
    QAct4   = tf.keras.layers.Lambda(Quantization_layer, name="QAct4",   arguments = Arguments)(Act4)
    #Output Block
    Out     = tf.keras.layers.Dense(units=10,name='Output')(QAct4)
    QOut    = tf.keras.layers.Lambda(Quantization_layer, name="QOut",    arguments = Arguments)(Out)
    Act5    = tf.keras.activations.softmax(QOut)
    QAct5   = tf.keras.layers.Lambda(Quantization_layer, name="QSoftmax",arguments = Arguments)(Act5)
    
    return QAct5

In [None]:
input_layer  = tf.keras.Input((28, 28, 1))
output_layer = build_model(input_layer, Quantization = False)

Lenet = tf.keras.Model(inputs=input_layer, outputs=output_layer)

## Summary of the network

In [None]:
Lenet.summary()

## Training Options

In [None]:
# Optimization params
# -------------------

# Loss
loss = tf.keras.losses.CategoricalCrossentropy()

# learning rate
lr = 1e-3
optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
# -------------------

# Validation metrics
# ------------------

metrics = ['accuracy']
# ------------------

# Compile Model
Lenet.compile(optimizer=optimizer, loss=loss, metrics=metrics)

## Callbacks

In [None]:
import os
from datetime import datetime

early_stop = True
tensorboard = True
checkpoints = False


# Save Directory (Change for your own directory)
cwd = os.getcwd()

if (tensorboard or checkpoints):
# Creating SubDirectory
    exps_dir = os.path.join(cwd, 'GraphData')
    if not os.path.exists(exps_dir):
        os.makedirs(exps_dir)
    now = datetime.now().strftime('%b%d_%H-%M-%S')
    exp_name = "Lenet5"
    exp_dir = os.path.join(exps_dir, exp_name + '_' + str(now))
    if not os.path.exists(exp_dir):
        os.makedirs(exp_dir)
    
callbacks = []

# Model checkpoint
# ----------------
if checkpoints:
    ckpt_dir = os.path.join(exp_dir, 'ckpts')
    if not os.path.exists(ckpt_dir):
        os.makedirs(ckpt_dir)

    ckpt_callback = tf.keras.callbacks.ModelCheckpoint(filepath=os.path.join(ckpt_dir, 'cp_{epoch:02d}.ckpt'), 
                                                       save_weights_only=True)  # False to save the model directly
    callbacks.append(ckpt_callback)

# ----------------

# Visualize Learning on Tensorboard
# ---------------------------------
if tensorboard:
    tb_dir = os.path.join(exp_dir, 'tb_logs')
    if not os.path.exists(tb_dir):
        os.makedirs(tb_dir)
    tb_callback = tf.keras.callbacks.TensorBoard(log_dir=tb_dir,profile_batch=0,histogram_freq=1,
                                                 write_graph=True,write_images=True,embeddings_freq=1)  
    callbacks.append(tb_callback)

# Early Stopping
# --------------
if early_stop:
    es_callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)
    callbacks.append(es_callback)

# ---------------------------------

# How to visualize Tensorboard

# 1. tensorboard --logdir EXPERIMENTS_DIR c--port PORT     <- from terminal
# 2. localhost:PORT   <- in your browser

## Training

In [None]:
Lenet.fit(x=train_dataset,
          epochs=100,  #### set repeat in training dataset
          steps_per_epoch=int(np.ceil(x_train.shape[0] / bs)),
          validation_data=valid_dataset,
          validation_steps=int(np.ceil(x_valid.shape[0] / bs)), 
          callbacks=callbacks)

## Saving Weights

In [None]:
Wgt_dir = os.path.join(cwd, 'TrainedWeights')
if not os.path.exists(Wgt_dir):
    os.makedirs(Wgt_dir)
Wgt_dir = os.path.join(Wgt_dir, 'Weights')
Lenet.save_weights(Wgt_dir)