## Imports

In [None]:
%load_ext autoreload
%autoreload 2
%matplotlib inline
# %matplotlib widget

In [None]:
from ipywidgets import interact, interactive, fixed, interact_manual, Video, Audio
import ipywidgets as widgets
import tensorflow as tf
from tensorflow.keras import datasets, layers, models
import tensorflow_model_optimization as tfmot
from tensorflow.keras import regularizers
import matplotlib.pyplot as plt
import os
from sklearn.model_selection import train_test_split
from tqdm.keras import TqdmCallback
import time
import sys
from fxpmath import Fxp
from matplotlib import pyplot
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error
import tempfile
from tensorflow import keras
sys.path.append('../src')
import numpy as np
import netron
from tensorflow.keras.models import Model

In [None]:
from carihc import *
from r_gen import *
from ssl_utils import *

try:
    import Pickle5 as pickle
except ModuleNotFoundError:
    import pickle

## Functions

In [None]:
# import pickle
def load_file(filename):
    with open(filename, 'rb') as inp:
        data = pickle.load(inp)
    return data

def save_object(obj, filename):
    with open(filename, 'wb') as outp:  # Overwrites any existing file.
        pickle.dump(obj, outp, pickle.HIGHEST_PROTOCOL)


def plot_data(nplot=5, sim_name=''):
    Nrow, Ncol = rs[0].shape[1], rs[0].shape[2]
    fig, ax = plt.subplots(2, 4, figsize=(12,6))
    for i in range(2):
        for j in range(4):
            ax[i][j].pcolor(rs[4*i+j][nplot])
            ax[i][j].set_title(f'{locs[4*i+j]} {sim_name[:-4]}')

def plot_r_recs(nplot=5,  sim_name=''):
    Nrow, Ncol = rs_rec[0].shape[1], rs_rec[0].shape[2]
    fig, ax = plt.subplots(2, 4, figsize=(12,6))
    for i in range(2):
        for j in range(4):
            ax[i][j].pcolor(rs_rec[4*i+j][nplot])
            ax[i][j].set_title(f'rec_{rec_factor} {locs[4*i+j]} {sim_name[:-4]}')



## Dataset visualization

In [None]:
# source location based on datasets
locs = ['D1_01','D1_02','D1_03', 'D1_04', 'D2_01', 'D2_02', 'D1D2_01','D1D2_02']

### mqR

In [None]:
# select parameters
use_output_w = widgets.ToggleButtons( options=['car', 'car_ihc', 'car_ihc_li'], description='Model:', disabled=True )
qC_w = widgets.ToggleButtons( options=[True, False], description='qC:', disabled=True )
ERB_step_w = widgets.ToggleButtons( options=[1.5, 1,0.6, 0.5, 0], value=0.5, description='ERB_step:', disabled=False )
mDur_w = widgets.ToggleButtons( options=[1, 2, 4, 5, 8, 10, 15, 25, 50, 75, 90, 100, 120], value=10, description='mDur[ms]:', disabled=False )
fstep_w = widgets.ToggleButtons( options=[100, 200, 400, 500, 1000], value=200, description='fstep:', disabled=False )
widgets.VBox([use_output_w, qC_w, ERB_step_w, fstep_w, mDur_w])

In [None]:
# assign parameters
use_output = use_output_w.value
ERB_step = ERB_step_w.value
qC = qC_w.value
mDur = mDur_w.value
mC_len = int(mDur*1e-3*48e3) # 480 samples
fstep = fstep_w.value
min_freq_w = widgets.ToggleButtons( options=[1, 2], description='min_f(kHz):', value=2,  disabled=False)
max_freq_w = widgets.ToggleButtons( options=[3, 4, 5], description='max_f(kHz):', value=5,  disabled=False)
widgets.VBox([min_freq_w, max_freq_w])

In [None]:
"""
load pole_freq_min and pole_freq_max
pole_freq_min is based on param_min_pole_Hz
pole_freq_max is based on param_first_pole_theta 
inside CAR 
"""

pole_freq_min,pole_freq_max = min_max_pole_freq(max_freq=max_freq_w.value, ERB_step=ERB_step)

sim_name = f'ERB_{ERB_step}_Freq_poles_{pole_freq_min}_{pole_freq_max}_car_dur_{mDur}ms.pkl' if ERB_step !=0 else f'fstep_{fstep}_500_4k_car_dur_{mDur}ms.pkl'
print(sim_name)

In [None]:
# load simulation mqR
rs = []
for loc in locs:
    rs.append(load_file(f'../result/train/corr_output/mqR_output/{mDur}ms/mqR12_{loc}_{sim_name}'))

In [None]:
# plot simulation results
nplot = widgets.IntText(value=0, description='Frame:', disabled=False)
y = interact(plot_data, nplot=nplot, sim_name=f'_')

### mqRec

In [None]:
# select the rectangular scaling factor 
rec_factor_w = widgets.ToggleButtons( options=[0.1, 0.2, 0.3, 0.4, 0.5], value=0.3, description='rec_factor:', disabled=False )
widgets.VBox([rec_factor_w])

In [None]:
# load simulation mqr_rec
rec_factor = rec_factor_w.value
rs_rec = []
for loc in locs:
    rs_rec.append(load_file(f'../result/train/corr_output/mqRec_output/{mDur}ms/rec{rec_factor}_mqR12_{loc}_{sim_name}'))

In [None]:
# plot simulation results
y = interact(plot_r_recs, nplot=nplot, sim_name=f'_')

### mqRec_pipelined

In [None]:
for loc in locs:
    rs_rec.append(load_file(f'../result/train/corr_output/mqRec_pipe_output/{mDur}ms/rec{rec_factor}_mqR12_{loc}_{sim_name}'))

In [None]:
# plot simulation results
y = interact(plot_r_recs, nplot=nplot, sim_name=f'_')

## Prepare dataset for training

### Select CAR parameters

In [None]:
use_output_w = widgets.ToggleButtons( options=['car', 'car_ihc', 'car_ihc_li'], description='Model:', disabled=True )
qC_w = widgets.ToggleButtons( options=[True, False], description='qC:', disabled=True )
ERB_step_w = widgets.ToggleButtons( options=[1.5, 1,0.6, 0.5, 0], value=0.5, description='ERB_step:', disabled=False )
mDur_w = widgets.ToggleButtons( options=[1, 2, 4, 5, 8, 10, 15, 25, 50, 75, 90, 100, 120], value=10, description='mDur[ms]:', disabled=False )
fstep_w = widgets.ToggleButtons( options=[100, 200, 400, 500, 1000], value=200, description='fstep:', disabled=False )
rec_factor_w = widgets.ToggleButtons( options=[0.1, 0.2, 0.3, 0.4, 0.5], value=0.3, description='rec_factor:', disabled=False )
pipelined_w = widgets.ToggleButtons (options = ['True', 'False'], value='False', description = 'pipelined', desabled = False)
widgets.VBox([use_output_w, qC_w, ERB_step_w, fstep_w, mDur_w, rec_factor_w, pipelined_w])

In [None]:
# assign parameters
use_output = use_output_w.value
ERB_step = ERB_step_w.value
qC = qC_w.value
mDur = mDur_w.value
mC_len = int(mDur*1e-3*48e3) # 480 samples
fstep = fstep_w.value
rec_factor = rec_factor_w.value
pipelined = pipelined_w.value

In [None]:
min_freq_w = widgets.ToggleButtons( options=[1, 2], description='min_f(kHz):', value=2,  disabled=False)
max_freq_w = widgets.ToggleButtons( options=[3, 4, 5], description='max_f(kHz):', value=5,  disabled=False)
widgets.VBox([min_freq_w, max_freq_w])

In [None]:
pole_freq_min, pole_freq_max = min_max_pole_freq(max_freq=max_freq_w.value, ERB_step=ERB_step)
sim_name = f'ERB_{ERB_step}_Freq_poles_{pole_freq_min}_{pole_freq_max}_car_dur_{mDur}ms.pkl' if ERB_step !=0 else f'fstep_{fstep}_500_4k_car_dur_{mDur}ms.pkl'
print(f'{sim_name}')
print(f'rec{rec_factor}_mqR12_{locs[0]}_{sim_name}')
print(f'Pipelined is {pipelined}')

### Load Input patterns and calculate target output

In [None]:
x_all = []
y_all = []
num_classes = 2
x_part_temp = load_file(f'../result/train/corr_output/mqRec_output/{mDur}ms/rec{rec_factor}_mqR12_{locs[0]}_{sim_name}')
x_all = np.zeros((1,x_part_temp.shape[1], x_part_temp.shape[2]))
y_all = np.zeros((1, num_classes))

for loc in locs:
    if pipelined:
        x_part = load_file(f'../result/train/corr_output/mqRec_pipe_output/{mDur}ms/rec{rec_factor}_mqR12_{loc}_{sim_name}')
    else:
        x_part = load_file(f'../result/train/corr_output/mqRec_output/{mDur}ms/rec{rec_factor}_mqR12_{loc}_{sim_name}')
    x_part = load_file(f'../result/train/corr_output/mqRec_output/{mDur}ms/rec{rec_factor}_mqR12_{loc}_{sim_name}')
    x_all = np.vstack([x_all, x_part])
    if loc == 'D1_01':
        target_y = calc_target_y (index = [0], num_classes = num_classes, set_val=100) 
    if loc == 'D1_02':
        target_y = calc_target_y (index = [0], num_classes = num_classes, set_val=100)
    if loc == 'D1_03':
        target_y = calc_target_y (index = [0], num_classes = num_classes, set_val=100)
    if loc == 'D1_04':
        target_y = calc_target_y (index = [0], num_classes = num_classes, set_val=100)
    if loc == 'D2_01':
        target_y = calc_target_y (index = [1], num_classes = num_classes, set_val=100)
    if loc == 'D2_02':
        target_y = calc_target_y (index = [1], num_classes = num_classes, set_val=100)
    if loc =='D1D2_01':
        target_y = calc_target_y (index = [0,1], num_classes = num_classes, set_val=50)
    if loc =='D1D2_02':
        target_y = calc_target_y (index = [0,1], num_classes = num_classes, set_val=50)    
    lr_ndata = x_part.shape[0]
    y_part = np.ones((lr_ndata, num_classes))*target_y
#     print(y_part)
#     print(y_part.shape)
    y_all = np.r_[y_all, y_part]
    print(loc, y_all.shape)        

### Shuffle dataset

In [None]:
train_x = x_all[1:]
train_y = y_all[1:]

print(train_x.shape, train_y.shape)
train_x, train_y = shuffle_in_unison(train_x, train_y)

print(train_x.shape, train_y.shape)

## Train CNN

In [None]:
# Training parameters.
num_classes = 2
learning_rate = 0.00001
epochs = 1000
batch_size = 32

# Network parameters.
conv_filters = [1, 0]
filter_shape = [(3,3),(3,3)]
fc_filters = [65]
__model__ = f'_rcnn{conv_filters}_{filter_shape}_fc{fc_filters}'
print(__model__)
print(f'rec{rec_factor}_{sim_name}')

In [None]:
# Network architecture
model = models.Sequential()
model.add(layers.Conv2D(conv_filters[0], filter_shape[0], activation='relu', input_shape=(train_x.shape[1], train_x.shape[2], 1)))
model.add(layers.MaxPooling2D((2, 2)))
# model.add(layers.Conv2D(conv_filters[1], filter_shape[1], activation='relu'))
# model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Flatten())
# model.add(layers.Dense(fc_filters[0], activation='relu',  
#                        kernel_regularizer = regularizers.L1L2(l1=1e-5,l2=1e-4),
#                        bias_regularizer = regularizers.L2(1e-4),
#                        activity_regularizer=regularizers.L2(1e-5)))
model.add(layers.Dense(fc_filters[0], activation='relu'))
# model.add(layers.Dropout(0.5))
model.add(layers.Dense(num_classes))

# model.compile(loss="mean_squared_error" , optimizer="adam", metrics=["mean_squared_error"])
model.compile(loss="mean_squared_error" , optimizer="RMSProp", metrics=["mean_squared_error"])
model.summary()

In [None]:
# Model training
history = model.fit(train_x, train_y, batch_size=batch_size, epochs=epochs, verbose=1, validation_split=0.1)

In [None]:
# Plot Keras History
fig = plt.figure(figsize=(5,3))
plt.plot(history.history['loss'], label='loss')
plt.plot(history.history['val_loss'], label = 'val_loss')
plt.xlabel('Epoch')
plt.ylabel(f"Loss -> {np.min(history.history['loss'][-1]):.2f}\nVal_Loss -> {np.min(history.history['val_loss'][-1]):.2f}")
plt.title(f"{__model__}_rec{rec_factor}_{sim_name}")
plt.xlim([0,epochs])
plt.ylim([0,100])
plt.legend()
plt.grid(alpha=0.2)
if pipelined:
    plt.savefig(f'../result/models/loss{__model__}_rec{rec_factor}_{sim_name}_pipelined.jpg',bbox_inches='tight')
else:
    plt.savefig(f'../result/models/loss{__model__}_rec{rec_factor}_{sim_name}.jpg',bbox_inches='tight')

In [None]:
# Save the entire model as a SavedModel.
if pipelined:
    model.save(f'../result/models/model{__model__}_rec{rec_factor}_{sim_name}_pipelined.h5')
else:
    model.save(f'../result/models/model{__model__}_rec{rec_factor}_{sim_name}.h5')
model.summary()

## Predict testdata using the Trained CNN

In [None]:
# calculate target output
pipelined = False
locs_label = ['D1', 'D2']
locs_test = ['D1_03', 'D2_02','D1D2_02' ]
x_all = []
y_all = []
xtest_part_temp = []
xtest_part_temp = load_file(f'../result/test/corr_output/mqRec_output/{mDur}ms/rec{rec_factor}_mqR12_{locs_test[0]}_{sim_name}')
xtest_all = np.zeros((1,xtest_part_temp.shape[1], x_part_temp.shape[2]))
ytest_all = np.zeros((1, num_classes))

for loc in locs_test:
    if pipelined:
        xtest_part = load_file(f'../result/test/corr_output/mqRec_pipe_output/{mDur}ms/rec{rec_factor}_mqR12_{loc}_{sim_name}')
        print(f'Pipelined is {pipelined}')
    else:
        xtest_part = load_file(f'../result/test/corr_output/mqRec_output/{mDur}ms/rec{rec_factor}_mqR12_{loc}_{sim_name}')
    xtest_all = np.vstack([xtest_all, xtest_part])

    if loc == 'D1_03':
        target_y = calc_target_y (index = [0], num_classes = num_classes, set_val=100) 
    if loc == 'D2_02':
        target_y = calc_target_y (index = [1], num_classes = num_classes, set_val=100)
    if loc == 'D1D2_02':
        target_y = calc_target_y (index = [0,1], num_classes = num_classes, set_val=50)
    
    lr_ndata = xtest_part.shape[0]
    ytest_part = np.ones((lr_ndata, num_classes))*target_y
    ytest_all = np.r_[ytest_all, ytest_part]
print(np.shape(xtest_all), np.shape(ytest_all))

In [None]:
# Load saved cnn model
# load input data

loctest_xs = []
for loc in locs_test:
    if pipelined:
        loctest_x = load_file(f'../result/train/corr_output/mqRec_pipe_output/{mDur}ms/rec{rec_factor}_mqR12_{loc}_{sim_name}')
    else:
        loctest_x = load_file(f'../result/train/corr_output/mqRec_output/{mDur}ms/rec{rec_factor}_mqR12_{loc}_{sim_name}')
    loctest_xs.append(loctest_x)
    
print(np.shape(loctest_xs))

In [None]:
# plot the trained CNN prediction
if pipelined:
    model = tf.keras.models.load_model(f'../result/models/model{__model__}_rec{rec_factor}_{sim_name}_pipelined.h5')
else:
    model = tf.keras.models.load_model(f'../result/models/model{__model__}_rec{rec_factor}_{sim_name}.h5')

print('model:',__model__)
print('sime_name',  sim_name)
print(f'Pipelined is {pipelined}')
# print(model.summary())

loctest_x = loctest_xs
print(np.shape(loctest_xs))


preds=[]
pred=[]
pred_val =[]
pred_max = np.zeros((len(locs_test), num_classes))
for i, loc in enumerate(locs_test):
    pred = np.append(pred, np.argmax(model.predict(loctest_xs[i]), axis=1))
    preds = np.argmax(model.predict(loctest_xs[i]), axis=1)
    pred_val = model.predict(loctest_xs[i])
    for k in range(len(preds)):
        for j in range (num_classes):
            if (preds[k] == j):
                pred_max[i,j] = pred_max[i,j]  + 1
    del preds
time = np.arange(pred.shape[0])
print(time.shape)
index = 2500
# print(prediction[index:index+2])

In [None]:
fig, ax = plt.subplots()
ax.plot(time, pred, '.', markersize=1)
ax.set_ylim([-0.5,1.5])
ax.set_yticks([0,1])
ax.set_yticklabels(locs_label)
fig.set_size_inches(18, 10)

In [None]:
fig, ax = plt.subplots()
plotdata = pd.DataFrame(pred_max, index=locs_test)
plotdata.plot(kind='bar', stacked=True, ax=ax)
ax.legend(locs_label, loc='center left', bbox_to_anchor=(1, 0.5))
plt.title("Maximum Predicted Value of ADI_CHL datasets")
plt.xlabel("Sound Source Location")
fig.set_size_inches(10, 5)

In [None]:
# predict a single test input
index = 3500
# print(np.shape(xtest_all))
prediction=np.zeros((0,0))
for i, loc in enumerate(locs_test):
    prediction = np.append(prediction, model.predict(loctest_xs[i]))
print("\nPrediction:",prediction[2*index:2*index+2]) 

pred = model.predict(xtest_all)
target = ytest_all
print("\nlenghts of the test dataset:",target.shape)
print("\npredicted :", pred[index])
print("\ntarget :", target[index])
print("MSE:%.4f" %mean_squared_error(target[index], pred[index])) 

In [None]:
# evaluation of the test data
print("Evaluate on test data")
results = model.evaluate(xtest_all, ytest_all, batch_size=batch_size)
print("test loss, test acc:", results)

### Save filters as text files

In [None]:
# Load saved cnn model
# model = tf.keras.models.load_model(f'../result/models/model{__model__}_{sim_name}.h5')
filters, biases = model.layers[0].get_weights()
with open(f'../result/models/model_weights_biases/weights_conv2d_l1{__model__}_rec{rec_factor}_{sim_name}.txt', 'w') as f:
    f.write(str(filters))
np.savetxt(f'../result/models/model_weights_biases/biases_conv2d_l1{__model__}_rec{rec_factor}_{sim_name}.txt', biases)
print('filters : ',filters.shape)
print('biases :', biases)

filters, biases = model.layers[3].get_weights()
np.savetxt(f'../result/models/model_weights_biases/weights_dense_l1{__model__}_rec{rec_factor}_{sim_name}.txt', filters.reshape(-1, filters.shape[-1]))
np.savetxt(f'../result/models/model_weights_biases/biases_dense_l1{__model__}_rec{rec_factor}_{sim_name}.txt', biases)
print('filters : ',filters.shape)
print('biases :', biases)

filters, biases = model.layers[4].get_weights()
np.savetxt(f'../result/models/model_weights_biases/weights_dense_l2{__model__}_rec{rec_factor}_{sim_name}.txt', filters.reshape(-1, filters.shape[-1]))
np.savetxt(f'../result/models/model_weights_biases/biases_dense_l2{__model__}_rec{rec_factor}_{sim_name}.txt', biases)

print('filters : ',filters.shape)
print('biases :', biases)

## Quantization aware training

* Both inputs images and weights are int8, and biases re int32
* Referefnces:

    https://www.tensorflow.org/model_optimization/guide/quantization/training_comprehensive_guide

    https://arxiv.org/pdf/1712.05877.pdf

    https://towardsdatascience.com/inside-quantization-aware-training-4f91c8837ead

    https://stackoverflow.com/questions/62512871/tensorflow-lite-inference-how-do-i-scale-down-the-convolution-layer-outputs

In [None]:
# Load saved cnn model
model = tf.keras.models.load_model(f'../result/models/model{__model__}_rec{rec_factor}_{sim_name}.h5')
print(model.summary())

In [None]:
# Create and deploy quantized model
quantize_model = tfmot.quantization.keras.quantize_model

# q_aware stands for for quantization aware.
q_aware_model = quantize_model(model)

q_aware_model.compile(loss="mean_squared_error" , optimizer="RMSProp", metrics=["mean_squared_error"])

In [None]:
#Train and evaluate the model against baseline
history = q_aware_model.fit(train_x, train_y, batch_size=batch_size, epochs=200, verbose=1, validation_split=0.1)

In [None]:
# Save q_aware_model
q_aware_model.save(f'../result/models/q_aware_model{__model__}_rec{rec_factor}_{sim_name}.h5')

In [None]:
with tfmot.quantization.keras.quantize_scope():
    q_aware_model = tf.keras.models.load_model(f'../result/models/q_aware_model{__model__}_rec{rec_factor}_{sim_name}.h5')
_, baseline_model_loss = model.evaluate(
    xtest_all, ytest_all, verbose=0)

_, q_aware_model_loss = q_aware_model.evaluate(
   xtest_all, ytest_all, verbose=0)

print('Baseline test loss:', baseline_model_loss)
print('Quant test loss:', q_aware_model_loss)

## Visualize q_aware_model

### Visualize filters

In [None]:
# load q_aware model
with tfmot.quantization.keras.quantize_scope():
    q_aware_model = tf.keras.models.load_model(f'../result/models/q_aware_model{__model__}_rec{rec_factor}_{sim_name}.h5')

# print(q_aware_model.summary())

In [None]:
# visualize conv2d l1 filters values
filters = q_aware_model.layers[1].get_weights()
print('filters : ',filters)
print('weights', )

#### Golden model: dense layers

##### Dense L1

In [None]:
# visualize weights between flatten and dense l1
filters = q_aware_model.layers[4].get_weights()
flatten_index = 5 # can get a number up to the lenght of flatten
print(filters[0][flatten_index]) # flatten-dense l1 weights

# extract weights and biases of a neuron in the dense l1 from q_aware model
neuron_index = 1
dense_l1_w = []
for i in range(len(filters[0])):
    dense_l1_w.append(filters[0][i][neuron_index])
print(np.size(dense_l1_w))
# print(dense_l1_w)

dense_l1_b =  filters[1][neuron_index]
print('biases:', dense_l1_b)

In [None]:
# a typycal neuron input extracted from feature maps visualitation 
fin =  [  0.3320455 , 0.        , 0.3984546  ,0.         ,0.49142736 ,1.0758275,
          1.0758275 , 0.5047092 , 0.27891824 ,2.8555913  ,1.4078729  ,0.,
          1.1289548 , 0.6773728 , 0.          ]

# quantization factors extracted from tflite model
s1, z1 =  0.013281820341944695  , -128
s2, z2 =  0.04141935333609581  , 0
s3, z3 = 0.04488903284072876 , 128
 

# define a golden model for float neuron with expected qeantized vale
single_neuron_model(weights=dense_l1_w, fin=fin, bias=dense_l1_b, s3 = s3, z3=z3,relu='true' )

# define a golden model for queantized neuron
single_quantized_neuron_model(weights=dense_l1_w, fin=fin, bias=dense_l1_b, s1=s1, s2=s2, s3 = s3, z1 = z1, z3=z3, relu='true')

##### Dense L2

In [None]:
# visualize weights between flatten and dense l1
filters = q_aware_model.layers[5].get_weights()
flatten_index = 5 # can get a number up to the lenght of flatten
print(filters[0][flatten_index]) # flatten-dense l1 weights

# extract weights and biases of a neuron in the dense l1 from q_aware model
neuron_index = 1
dense_l2_w = []
for i in range(len(filters[0])):
    dense_l2_w.append(filters[0][i][neuron_index])
print(np.size(dense_l2_w))
# print(dense_l1_w)

dense_l2_b =  filters[1][neuron_index]
print('biases:', dense_l2_b)

In [None]:
# a typycal neuron input extracted from feature maps visualitation 
fin =  	[0.        , 0.         , 0.        ,  0.         , 0.         , 0.,
         0.62844646 , 0.22444516 , 1.5262271 ,  5.296906   , 0.         , 0.89778066,
         0.         , 0.         , 5.0275717 ,  0.         , 0.         , 0.,
         0.04488903 , 0.22444516 , 0.        ,  0.         , 3.5013447  , 0.,
         4.309347   , 0.         , 4.17468   ,  0.         , 0.         , 0.,
         3.860457   , 0.         , 0.        ,  1.8853394  , 0.         , 0.,
         0.         , 0.         , 0.        ,  0.         , 0.         , 3.7257898,
         0.         , 0.         , 5.431573  ,  0.         , 0.         , 0.,
         0.         , 0.         , 4.040013  ,  6.1497974  , 0.35911226 , 0.,
         5.0724607  , 0.17955613 , 4.5337925 ,  6.868022   , 0.         , 0.,
         0.         , 0.         , 2.2893407 ,  0.22444516 , 0.         ]


# quantization factors extracted from tflite model
s1, z1 =  0.04488903284072876  , -128
s2, z2 =  0.16951020061969757  , 0
s3, z3 = 0.41242459416389465 , 122
 

# define a golden model for float neuron with expected qeantized vale
single_neuron_model(weights=dense_l2_w, fin=fin, bias=dense_l2_b, s3 = s3, z3=z3,relu='true' )

# define a golden model for queantized neuron
single_quantized_neuron_model(weights=dense_l2_w, fin=fin, bias=dense_l2_b, s1=s1, s2=s2, s3 = s3, z1 = z1, z3=z3, relu='true')

###  Visualize feature maps

#### Conv2dL1

In [None]:
# reference: https://www.tertiaryinfotech.com/feature-map-visualization-using-tensorflow-keras/
with tfmot.quantization.keras.quantize_scope():
    q_aware_model = tf.keras.models.load_model(f'../result/models/q_aware_model{__model__}_rec{rec_factor}_{sim_name}.h5')

# prepare input image for prediction
x = xtest_all[3000][:]

# print(x.shape[0], x.shape[1])
x = x.reshape((1, x.shape[0], x.shape[1], 1))

base_model = q_aware_model
# base_model = model
activation_model = Model(inputs=base_model.inputs, outputs=base_model.layers[2].output)


activation = activation_model(x)
# print(activation)

plt.figure(figsize=(40,40))
for i in range(1):
    plt.subplot(8,5,i+1)
    plt.imshow(activation[0,:,:,i])
    plt.title("ch "+ str(i))
plt.colorbar()
plt.show()

In [None]:
for i in range(1):
    print(np.where(activation[0,:,:,i] == np.amax(activation[0,:,:,i])))

In [None]:
activation[0,:,:,0]

In [None]:
s3 , z3 = 0.013281820341944695, 128
np.multiply(activation[0,:,:,0], 1/s3)-z3

#### Dense Layers

In [None]:
with tfmot.quantization.keras.quantize_scope():
    q_aware_model = tf.keras.models.load_model(f'../result/models/q_aware_model{__model__}_rec{rec_factor}_{sim_name}.h5')

# prepare input image for prediction
x = xtest_all[3500][:]

# print(x.shape[0], x.shape[1])
x = x.reshape((1, x.shape[0], x.shape[1], 1))

base_model = q_aware_model
# base_model = model
activation_model = Model(inputs=base_model.inputs, outputs=base_model.layers[5].output)
activation = activation_model(x)
print('real activation L1:\n', activation)
s3 , z3 = 0.04488903284072876 , 128
print('quantized activation:\n', np.multiply(activation, 1/s3)-z3)

## Make tfLite model and evaluate

In [None]:
# Converting a SavedModel to a TensorFlow Lite model.
converter = tf.lite.TFLiteConverter.from_keras_model(q_aware_model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]

quantized_tflite_model = converter.convert()


In [None]:
# Create float TFLite model.
float_converter = tf.lite.TFLiteConverter.from_keras_model(model)
float_tflite_model = float_converter.convert()
# save tf lite model
with open(f'../result/models/quant_file{__model__}_rec{rec_factor}_{sim_name}.tflite', 'wb') as f:
  f.write(quantized_tflite_model)

with open(f'../result/models/float_file{__model__}_rec{rec_factor}_{sim_name}.tflite', 'wb') as f:
  f.write(float_tflite_model)

print("keras model in Mb:", os.path.getsize(f'../result/models/model{__model__}_rec{rec_factor}_{sim_name}.h5') / float(2**20))
print("Float model in Mb:", os.path.getsize(f'../result/models/float_file{__model__}_rec{rec_factor}_{sim_name}.tflite') / float(2**20))
print("Quantized model in Mb:", os.path.getsize(f'../result/models/quant_file{__model__}_rec{rec_factor}_{sim_name}.tflite') / float(2**20))


In [None]:
# Interpreter interface for running TensorFlow Lite models.
interpreter = tf.lite.Interpreter(model_content=quantized_tflite_model)
interpreter.allocate_tensors()

In [None]:
loss_res = []
for i in range(len(xtest_all)):
    single_test_pred, loss = predict_model(interpreter,xtest_all, ytest_all, i)
    loss_res.append(loss)
print("mean loss:", np.sum(loss_res)/len(xtest_all))
print(np.max(loss_res))
print(np.min(loss_res))

In [None]:
i = 4000
single_test_pred, loss = predict_model(interpreter,xtest_all, ytest_all, i)
print("Loss: ", loss)

## Export weights and biases binary values to memory

todo list for exporting weights and biases to memory: 

    1- open the model using netron
    
    2- manually save weight and biases in this path: 
    
         f'../../hdl_design/test/quantized_weights_biases/'
         
    3- manually modify the names of the weights and biases files here

In [None]:
# visualize tflite model using netron
# !pip install netron
netron.start(f'../result/models/quant_file{__model__}_rec{rec_factor}_{sim_name}.tflite')

In [None]:
# load weights and biases, reshepe and save them with .txt format
config_name = 'c10' # config name is based on the CAR  parameters according to the table
name_param_cnn = 34 # name_param_cnn is extracted from the name of the files saved from netron
name_param_dense = 68 # name_param_dense is extracted from the name of the files saved from netron

conv2D_1_weights = np.load(f'../../hdl_design/test/{config_name}_quantized_weights_biases/sequential_{name_param_cnn}_quant_conv2d_{name_param_cnn}_Conv2D;sequential_{name_param_cnn}_quant_conv2d_{name_param_cnn}_LastValueQuant_FakeQuantWithMinMaxVarsPerChannel.npy')
np.savetxt(f'../../hdl_design/test/{config_name}_quantized_weights_biases/conv2D_1_weights.txt', conv2D_1_weights.reshape(conv2D_1_weights.shape[0], -1), fmt="%d")

conv2D_1_biases = np.load(f'../../hdl_design/test/{config_name}_quantized_weights_biases/sequential_{name_param_cnn}_quant_conv2d_{name_param_cnn}_BiasAdd_ReadVariableOp.npy')
np.savetxt(f'../../hdl_design/test/{config_name}_quantized_weights_biases/conv2D_1_biases.txt', conv2D_1_biases, fmt="%d")
print(conv2D_1_biases)

dense_1_weights = np.load(f'../../hdl_design/test/{config_name}_quantized_weights_biases/sequential_{name_param_cnn}_quant_dense_{name_param_dense}_MatMul;sequential_{name_param_cnn}_quant_dense_{name_param_dense}_LastValueQuant_FakeQuantWithMinMaxVars.npy')
np.savetxt(f'../../hdl_design/test/{config_name}_quantized_weights_biases/dense_1_weights.txt',dense_1_weights, fmt="%d")
        
dense_1_biases = np.load(f'../../hdl_design/test/{config_name}_quantized_weights_biases/sequential_{name_param_cnn}_quant_dense_{name_param_dense}_BiasAdd_ReadVariableOp.npy')
np.savetxt(f'../../hdl_design/test/{config_name}_quantized_weights_biases/dense_1_biases.txt',dense_1_biases, fmt="%d")
print(dense_1_biases.shape)

dense_2_weights = np.load(f'../../hdl_design/test/{config_name}_quantized_weights_biases/sequential_{name_param_cnn}_quant_dense_{name_param_dense+1}_MatMul;sequential_{name_param_cnn}_quant_dense_{name_param_dense+1}_LastValueQuant_FakeQuantWithMinMaxVars.npy')
np.savetxt(f'../../hdl_design/test/{config_name}_quantized_weights_biases/dense_2_weights.txt',dense_2_weights, fmt="%d")

dense_2_biases = np.load(f'../../hdl_design/test/{config_name}_quantized_weights_biases/sequential_{name_param_cnn}_quant_dense_{name_param_dense+1}_BiasAdd_ReadVariableOp.npy')
np.savetxt(f'../../hdl_design/test/{config_name}_quantized_weights_biases/dense_2_biases.txt',dense_2_biases, fmt="%d")


In [None]:
# save the weights and biases binary values in memmory
save_4d_arr_in_mem(w = conv2D_1_weights, fileloc='../../hdl_design/test/mem/cnn_param_mem/conv2d_1_weights.txt')

save_1d_arr_in_mem(b = conv2D_1_biases, nbits=32, fileloc='../../hdl_design/test/mem/cnn_param_mem/conv2d_1_biases.txt')

save_2d_arr_in_mem(w=dense_1_weights, nbits=8, fileloc='../../hdl_design/test/mem/cnn_param_mem/dense_1_weights.txt')
print(dense_1_weights.shape)

save_1d_arr_in_mem(b=dense_1_biases, nbits=32, fileloc='../../hdl_design/test/mem/cnn_param_mem/dense_1_biases.txt')

save_2d_arr_in_mem(w=dense_2_weights, nbits=8, fileloc='../../hdl_design/test/mem/cnn_param_mem/dense_2_weights.txt')

save_1d_arr_in_mem(b=dense_2_biases, nbits=32, fileloc='../../hdl_design/test/mem/cnn_param_mem/dense_2_biases.txt')

## Export Quantized Feature In
Reference: https://www.tensorflow.org/lite/performance/quantization_spec

https://towardsdatascience.com/inside-quantization-aware-training-4f91c8837ead

In [None]:
# input quantization factors s1 and z1 are extracted from tflite model
s1, z1 = 0.0073202610947191715 , -5

index = 6000
r = xtest_all[index]
print('prediction result', ytest_all[index])
q = (r/s1 )+z1
q_r = np.round(q)
print(np.round(q))
filename = f'quantized_xtest_all{index}.txt'
np.savetxt(f'../../hdl_design/test/feature_ins/{filename}', q_r, fmt="%d")
save_1d_arr_in_mem(b = q_r.ravel(), signed = True, nbits=8, n_frac =0, fileloc=f'../../hdl_design/test/feature_ins/mem/{filename}')

## Export Down scaling factors to memory

todo list for exporting down-scaling factors to memory:

    1- open the model using netron
    
    2- manually copy here zero-point and scaling factors of each layer     

### Conv2d L1

In [None]:
# if config_name == 'c1'
    # input scaling factors
# s1, z1 = 0.007598039228469133, -1
# # weigthes scaling factors
# s2, z2 = [0.015725351870059967 ], 0
# # output scaling factors
# s3, z3 = 0.012478901073336601, [-128]
    
# elif config_name == 'c10':
# input scaling factors
s1, z1 = 0.0073202610947191715 , -5
# weigthes scaling factors
s2, z2 = [0.031487368047237396  ], 0
# output scaling factors
s3, z3 = 0.013281820341944695, [-128]

M0_d, M0, n, b = [], [], [], []
kernels = conv2D_1_weights
print(kernels.shape)
for i in range(kernels.shape[0]):
    print("CONV2D_L1- ch: ", i)
    k = kernels[i]
    M0t, M0tt,  nt, bt = Int32_to_int8 (s1 = s1, s2 = s2[i], s3 = s3 ,z1 = z1, z3 = z3, k = k)
    M0_d.append(M0t)
    M0.append(M0tt)
    n.append(nt)
    b.append(bt)
print(M0_d, n , b)

np.savetxt(f'../../hdl_design/test/cnn_m0_n_b_z3/conv2d_l1_n.txt', n, fmt="%d")
np.savetxt(f'../../hdl_design/test/cnn_m0_n_b_z3/conv2d_l1_b.txt', b, fmt="%d")
np.savetxt(f'../../hdl_design/test/cnn_m0_n_b_z3/conv2d_l1_m0.txt', M0_d, fmt="%d")
np.savetxt(f'../../hdl_design/test/cnn_m0_n_b_z3/conv2d_l1_z3.txt', z3, fmt="%d")

save_1d_arr_in_mem(b = n, nbits=5, fileloc='../../hdl_design/test/mem/cnn_param_mem/conv2d_1_n.txt' )
save_1d_arr_in_mem(b = M0, signed = False, nbits=32, n_frac =32, fileloc='../../hdl_design/test/mem/cnn_param_mem/conv2d_1_m0.txt' )
save_1d_arr_in_mem(b = b, nbits=32, fileloc='../../hdl_design/test/mem/cnn_param_mem/conv2d_1_b.txt' )
save_1d_arr_in_mem(b = z3, nbits=8, fileloc='../../hdl_design/test/mem/cnn_param_mem/conv2d_1_z3.txt' )

### Conv2d L2

In [None]:
# s1 = 0.02145662158727646 
# s2 = [      0.008125287480652332 ,
#               0.008655051700770855 ,
#               0.008863701485097408 ,
#               0.0065904706716537476 ,
#               0.008133678697049618 ,
#               0.007388727739453316 ,
#               0.01101324986666441 ,
#               0.00890156626701355 ,
#               0.009305829182267189 ,
#               0.015653613954782486  ]
# s3 =  0.028310053050518036
# z1 = -46
# z2 = 0
# z3 = [-66]

# kernels = conv2D_2_weights
# M0_d, M0, n, b = [], [], [], []
# for i in range(kernels.shape[0]):
#     print("CONV2D_L2- ch: ", i)
#     k = kernels[i]
#     M0t, M0tt,  nt, bt = Int32_to_int8 (s1 = s1, s2 = s2[i], s3 = s3 ,z1 = z1, z3 = z3, k = k)
#     M0_d.append(M0t)
#     M0.append(M0tt)
#     n.append(nt)
#     b.append(bt)
# print(b)

# np.savetxt(f'../../hdl_design/test/cnn_m0_n_b/conv2d_l2_n.txt', n, fmt="%d")
# np.savetxt(f'../../hdl_design/test/cnn_m0_n_b/conv2d_l2_b.txt', b, fmt="%d")
# np.savetxt(f'../../hdl_design/test/cnn_m0_n_b/conv2d_l2_m0.txt', M0_d, fmt="%d")
# np.savetxt(f'../../hdl_design/test/cnn_m0_n_b/conv2d_l2_z3.txt', z3, fmt="%d")

# save_1d_arr_in_mem(b = n, nbits=5, fileloc='../../hdl_design/test/mem/cnn_param_mem/conv2d_2_n.txt' )
# save_1d_arr_in_mem(b = M0, signed = False, nbits=32, n_frac =32, fileloc='../../hdl_design/test/mem/cnn_param_mem/conv2d_2_m0.txt' )
# save_1d_arr_in_mem(b = b, nbits=32, fileloc='../../hdl_design/test/mem/cnn_param_mem/conv2d_2_b.txt' )
# save_1d_arr_in_mem(b = z3, nbits=8, fileloc='../../hdl_design/test/mem/cnn_param_mem/conv2d_2_z3.txt' )

### Dense L1

In [None]:
# if config_name == 'c1'
# input scaling factors
# s1, z1 =  0.012478901073336601 , -128
# # weigthes scaling factors
# s2, z2 = 0.08118652552366257, 0
# # output scaling factors
# s3, z3 =  0.16681046783924103 , [-128]

# if config_name == 'c10'
s1, z1 =  0.013281820341944695 , -128
# weigthes scaling factors
s2, z2 = 0.04141935333609581, 0
# output scaling factors
s3, z3 =  0.04488903284072876 , [-128]

bn = []
for i in range(fc_filters[0]):
    print("DENSE LAYER 1, Neuron : ", i)
    k = dense_1_weights[i]
    bias = dense_1_biases[i]
    print(k)
    print("bias :", bias)
    M0t, M0, n, b,  = Int32_to_int8 (s1 = s1, s2 = s2, s3 = s3 ,z1=z1, z3 = z3, k = k)
    bn.append(b)
print(n)
n = [n]
M0 = [M0]
# z3 = [z3]
save_1d_arr_in_mem(b = n, nbits=5, fileloc='../../hdl_design/test/mem/cnn_param_mem/dense_1_n.txt' )
save_1d_arr_in_mem(b = M0, signed = False, nbits=32, n_frac =32, fileloc='../../hdl_design/test/mem/cnn_param_mem/dense_1_m0.txt' )
save_1d_arr_in_mem(b = bn, nbits=32, fileloc='../../hdl_design/test/mem/cnn_param_mem/dense_1_b.txt' )
save_1d_arr_in_mem(b = z3, nbits=8, fileloc='../../hdl_design/test/mem/cnn_param_mem/dense_1_z3.txt' )


np.savetxt(f'../../hdl_design/test/cnn_m0_n_b_z3/dense_l1_z3.txt', np.round(z3), fmt="%d")
np.savetxt(f'../../hdl_design/test/cnn_m0_n_b_z3/dense_l1_b.txt', np.round(bn), fmt="%d")
np.savetxt(f'../../hdl_design/test/cnn_m0_n_b_z3/dense_l1_n.txt', np.round(n), fmt="%d")
np.savetxt(f'../../hdl_design/test/cnn_m0_n_b_z3/dense_l1_m0.txt', np.round(M0), fmt="%d")

### Dense l2

In [None]:
# s3 and z3 should be calculated manually based on the desired output

# if config_name == 'c1'
# input scaling factors
# s1, z1 =  0.16681046783924103, -128
# # weigthes scaling factors
# s2, z2 = 0.05973675847053528, 0
# # output scaling factors
# s3, z3 =  0.42258980870246887  , [119]

# if config_name == 'c10'
# input scaling factors
s1, z1 =  0.04488903284072876, -128
# weigthes scaling factors
s2, z2 = 0.16951020061969757, 0
# output scaling factors
# s3, z3 =  0.41242459416389465  , [122]
s3, z3 =  2.5*0.41242459416389465  , [0]

bn = []
for i in range(num_classes):
    print("DENSE LAYER 2, Neuron : ", i)
    k = dense_2_weights[i]
    M0t, M0, n, b = Int32_to_int8 (s1 = s1, s2 = s2, s3 = s3 , z1 = z1, z3 = z3, k = k)
    bn.append(b)
n = [n]
M0 = [M0]
print("M0, n, b", M0, n, b)
M0t =[M0t]
np.savetxt(f'../../hdl_design/test/cnn_m0_n_b_z3/dense_l2_b.txt', np.round(bn), fmt="%d")
np.savetxt(f'../../hdl_design/test/cnn_m0_n_b_z3/dense_l2_n.txt', np.round(n), fmt="%d")
np.savetxt(f'../../hdl_design/test/cnn_m0_n_b_z3/dense_l2_m0.txt', np.round(M0t), fmt="%d")
np.savetxt(f'../../hdl_design/test/cnn_m0_n_b_z3/dense_l2_z3.txt', np.round(z3), fmt="%d")

save_1d_arr_in_mem(b = n, nbits=5, fileloc='../../hdl_design/test/mem/cnn_param_mem/dense_2_n.txt' )
save_1d_arr_in_mem(b = M0, signed = False, nbits=32, n_frac =32, fileloc='../../hdl_design/test/mem/cnn_param_mem/dense_2_m0.txt' )
save_1d_arr_in_mem(b = bn, nbits=32, fileloc='../../hdl_design/test/mem/cnn_param_mem/dense_2_b.txt' )
save_1d_arr_in_mem(b = z3, nbits=8, fileloc='../../hdl_design/test/mem/cnn_param_mem/dense_2_z3.txt' )