In [4]:
import numpy as np
import scipy as sp
import csv
import matplotlib.pyplot as plt
import pandas as pd
import tensorflow as tf
import tensorflow_model_optimization as tfmot
import ast
from collections import OrderedDict
from typing import Tuple

(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.fashion_mnist.load_data()
train_images = train_images / 255.0
test_images = test_images / 255.0

In [None]:
LOAD_PATH_Q_AWARE = "./model/" + "model_q_aware_final_01"
LOAD_TFLITE_PATH = "./model/" + 'tflite_final_01.tflite'
# Load Q Aware model
q_aware_model : tf.keras.Model
with tfmot.quantization.keras.quantize_scope():
    q_aware_model = tf.keras.models.load_model(LOAD_PATH_Q_AWARE)
# Load TFLite model
interpreter = tf.lite.Interpreter(LOAD_TFLITE_PATH)

def bit_flipper(value : int, bit_pos : int) -> int:
    """ Random bit flipper 
    -
    Obtains a value and bit position and flips it.
    - All values are in 8 bits, MSB have higher probability of getting flipped
    - It is assumed value is a signed 8 bit number """
    # Negative 2 Complement conversion
    if value < 0:
        value = (-value ^ 0xFF) + 1
    flip_mask = 1 << bit_pos
    flipped_value = value ^ flip_mask
    # Negative back conversion 2 Complement
    if flipped_value >= 128:
        flipped_value = -((flipped_value ^ 0xFF) + 1)
    return flipped_value

def evaluate_model(interpreter: tf.lite.Interpreter) -> Tuple[float, float]:
    """ Evaluate TFLite Model:
    -
    Receives the interpreter and returns a tuple of loss and accuracy.
    """
    input_index = interpreter.get_input_details()[0]["index"]
    output_index = interpreter.get_output_details()[0]["index"]

    # Run predictions on every image in the "test" dataset.
    prediction_digits = []
    predictions = []
    for i, test_image in enumerate(test_images):
        # Pre-processing: add batch dimension and convert to float32 to match with the model's input data format.
        test_image = np.expand_dims(test_image, axis = 0).astype(np.float32)
        test_image = np.expand_dims(test_image, axis = 3).astype(np.float32)
        interpreter.set_tensor(input_index, test_image)

        # Run inference.
        interpreter.invoke()

        # Post-processing: remove batch dimension and find the digit with highest probability.
        output = interpreter.tensor(output_index)
        digit = np.argmax(output()[0])
        predictions.append(np.copy(output()[0]))
        prediction_digits.append(digit)

    # Compare prediction results with ground truth labels to calculate accuracy.
    prediction_digits = np.array(prediction_digits)
    predictions = np.array(predictions)
    scce = tf.keras.losses.SparseCategoricalCrossentropy()(test_labels, predictions)

    loss = scce.numpy()
    accuracy = (prediction_digits == test_labels).mean()

    return loss, accuracy


In [None]:
# Evaluate accuracy of both models
q_aware_test_loss, q_aware_test_acc = q_aware_model.evaluate(test_images, test_labels)
print('Q Aware model test accuracy: ', "{:0.2%}".format(q_aware_test_acc))
print('Q Aware model test loss: ', q_aware_test_loss)
interpreter.allocate_tensors()
tflite_loss, tflite_accuracy = evaluate_model(interpreter)
print('TFLite model test accuracy: ', "{:0.2%}".format(tflite_accuracy))
print('TFLite model test loss: ', tflite_loss)

In [None]:
BIT_WIDTH = 8
quantized_and_dequantized = OrderedDict()
quantized = OrderedDict()
layer_index_list = []
keys_list = []

layer : tfmot.quantization.keras.QuantizeWrapperV2
for i, layer in enumerate(q_aware_model.layers):
    quantizer : tfmot.quantization.keras.quantizers.Quantizer
    weight : tf.Variable
    if hasattr(layer, '_weight_vars'):
        for weight, quantizer, quantizer_vars in layer._weight_vars:
            min_var = quantizer_vars['min_var']
            max_var = quantizer_vars['max_var']

            key = weight.name[:-2]
            layer_index_list.append(i)
            keys_list.append(key)
            quantized_and_dequantized[key] = quantizer(inputs = weight, training = False, weights = quantizer_vars)
            quantized[key] = np.round(quantized_and_dequantized[key] / max_var * (2**(BIT_WIDTH-1)-1))

for key in quantized:
    # print("Fake Quantized")
    print(key, quantized[key].shape)
    if "dense" not in key:
        # print(quantized_and_dequantized[key][:,:,0,0])
        print(quantized[key][:,:,0,0])
    else:
        # print(quantized_and_dequantized[key][:,0])
        print(quantized[key][:,0])

In [None]:
kernel_idx = 0 
key = keys_list[kernel_idx]
print(type(quantized[key][:,:,0,29]))
print(quantized[key][:,:,0,29])

In [None]:
df = pd.read_csv('Performance.csv')
layer_affected_list = []
kernel_index_list = []
layer_affected_index_list = []
position_disrupted_list = []
bit_flipped_list = []
row : pd.Series
for index, row in df.iterrows():
    layer_affected_list.append(row['layer_affected'])
    try:
        kernel_index_list.append(int(row['kernel_index']))
    except:
        kernel_index_list.append(-1)
    try:
        layer_affected_index_list.append(int(row['layer_affected_index']))
    except:
        layer_affected_index_list.append(-1)
    try:
        position_disrupted_list.append(ast.literal_eval(row['position_disrupted']))
    except:
        position_disrupted_list.append(tuple())
    try:
        bit_flipped_list.append(int(row['bit_disrupted']))
    except:
        bit_flipped_list.append(-1)

BIT_WIDTH = 8
T_VARIABLES_KERNEL_INDEX = 0
out_list = []

pos = 2013
print(position_disrupted_list[pos])
print(bit_flipped_list[pos])
print(df['quantized_value'][pos])
print(quantized[key][position_disrupted_list[pos]])
for i, key in enumerate(layer_affected_list):
    print("Index", i)
    entry = {}
    position = position_disrupted_list[i]
    if len(position) > 1:
        q_aware_copy : tf.keras.Model
        # Load Q Aware model copy
        with tfmot.quantization.keras.quantize_scope():
            q_aware_copy = tf.keras.models.load_model(LOAD_PATH_Q_AWARE)

        if "dense" not in key:
            # It is a convolutional layer
            kernel_row = position[0]
            kernel_column = position[1]
            in_channel = position[2]
            out_channel = position[3]
            kernel_position = (slice(None), slice(None), in_channel, out_channel)
            value_position = (kernel_row, kernel_column)
        else:
            # It is a fully connected layer
            kernel_row = None
            kernel_column = None
            in_channel = position[0]
            out_channel = position[1]
            kernel_position = (slice(None), slice(None)) # This slice takes the whole densely connected kernel
            value_position = (in_channel, out_channel)

        kernel_idx = kernel_index_list[i]
        layer_index = layer_index_list[kernel_idx]
        m_vars = {variable.name: variable for i, variable in enumerate(q_aware_model.layers[layer_index].non_trainable_variables) if keys_list[kernel_idx] in variable.name}
        min_key = list(key for key in m_vars if "min" in key)[0]
        max_key = list(key for key in m_vars if "max" in key)[0]
        if "dense" not in key:
            # Convolutional layers max is divided per channels
            min_var = m_vars[min_key][out_channel]
            max_var = m_vars[max_key][out_channel]
        else:
            # Fully connected layer has only 1 max value for the kernel
            min_var = m_vars[min_key]
            max_var = m_vars[max_key]

        flipped_int_kernel_value = bit_flipper(int(quantized[key][position]), bit_flipped_list[i])
        flipped_float_kernel_val = flipped_int_kernel_value * max_var.numpy() / (2**(BIT_WIDTH - 1) - 1)
        full_kernel = q_aware_copy.layers[layer_index].trainable_variables[T_VARIABLES_KERNEL_INDEX].numpy()
        update_kernel = np.copy(full_kernel)
        update_kernel[position] = flipped_float_kernel_val
        q_aware_copy.layers[layer_index].trainable_variables[T_VARIABLES_KERNEL_INDEX].assign(update_kernel)
        # Laplacian calculation
        kernel = full_kernel[kernel_position]
        original_laplacian = sp.ndimage.laplace(kernel)
        new_laplacian = sp.ndimage.laplace(update_kernel[kernel_position])
        int_kernel = np.copy(quantized[key][kernel_position]) # Important to avoid modifying the original values
        original_int_laplacian = sp.ndimage.laplace(int_kernel)
        int_kernel[value_position] = flipped_int_kernel_value
        new_int_laplacian = sp.ndimage.laplace(int_kernel)    
        
        # Conversion of new model to TF Lite model
        new_converter = tf.lite.TFLiteConverter.from_keras_model(q_aware_copy)
        new_converter.optimizations = [tf.lite.Optimize.DEFAULT]
        new_tflite_model = new_converter.convert()
        new_interpreter = tf.lite.Interpreter(model_content = new_tflite_model)

        # Check new accuracy
        q_copy_test_loss, q_copy_test_acc = q_aware_copy.evaluate(test_images, test_labels, verbose = 0)
        new_interpreter.allocate_tensors()
        new_tflite_loss, new_tflite_accuracy = evaluate_model(new_interpreter)

        entry['position'] = position
        entry['min_var'] = min_var.numpy()
        entry['max_var'] = max_var.numpy()
        entry['original_weight_value'] = full_kernel[position]
        entry['quantized_value'] = quantized[key][position]
        entry['bit_disrupted'] = bit_flipped_list[i]
        entry['flipped_quantized_value'] = flipped_int_kernel_value
        entry['flipped_weight_value'] = flipped_float_kernel_val
        entry['q_aware_accuracy'] = q_copy_test_acc
        entry['tflite_accuracy'] = new_tflite_accuracy
        entry['q_aware_acc_degradation'] = q_copy_test_acc - q_aware_test_acc
        entry['tflite_acc_degradation'] = new_tflite_accuracy - tflite_accuracy
        entry['q_aware_loss'] = q_copy_test_loss
        entry['tflite_loss'] = new_tflite_loss
        entry['original_laplacian'] = original_laplacian[value_position]
        entry['modified_laplacian'] = new_laplacian[value_position]
        entry['original_int_laplacian'] = original_int_laplacian[value_position]
        entry['modified_int_laplacian'] = new_int_laplacian[value_position]
        entry['abs_laplacian_diff'] = np.abs(entry['original_laplacian'] - entry['modified_laplacian'])
        entry['abs_int_laplacian_diff'] = np.abs(entry['original_int_laplacian'] - entry['modified_int_laplacian'])
    else:
        entry['position'] = None
        entry['min_var'] = None
        entry['max_var'] = None
        entry['original_weight_value'] = None
        entry['quantized_value'] = None
        entry['bit_disrupted'] = None
        entry['flipped_quantized_value'] = None
        entry['flipped_weight_value'] = None
        entry['q_aware_accuracy'] = q_aware_test_acc
        entry['tflite_accuracy'] = tflite_accuracy
        entry['q_aware_acc_degradation'] = None
        entry['tflite_acc_degradation'] = None
        entry['q_aware_loss'] = q_aware_test_loss
        entry['tflite_loss'] = tflite_loss
        entry['original_laplacian'] = None
        entry['modified_laplacian'] = None
        entry['original_int_laplacian'] = None
        entry['modified_int_laplacian'] = None
        entry['abs_laplacian_diff'] = None
        entry['abs_int_laplacian_diff'] = None
    out_list.append(entry)
out_df = pd.DataFrame(out_list)
out_df.to_csv('Max_Min.csv')

In [None]:
# Test of loss calculation
import time
q_aware_test_loss, q_aware_test_acc = q_aware_model.evaluate(test_images, test_labels)
print('Test accuracy : ', "{:0.2%}".format(q_aware_test_acc))
print("Test accuracy : ", q_aware_test_loss)
prediction = q_aware_model.predict(test_images)
start_time = time.time()
loss_self = np.array([-np.log(prediction[idx][test_labels[idx]]) for idx in range(test_images.shape[0])])
print("Self calculated loss")
# print(loss_self)
print(np.mean(loss_self))
print("--- %s seconds ---" % (time.time() - start_time))
# print(test_labels)
# print(prediction)
start_time = time.time()
loss_1 = tf.keras.losses.sparse_categorical_crossentropy(test_labels, prediction)
print("Function calculated loss")
# print(loss_1)
print(np.mean(loss_1))
print("--- %s seconds ---" % (time.time() - start_time))
start_time = time.time()
scce = tf.keras.losses.SparseCategoricalCrossentropy()(test_labels, prediction)
print("Class calculated loss")
print(scce.numpy())
print("--- %s seconds ---" % (time.time() - start_time))

In [67]:
import csv
import os
WRITE_FILE_PATH = './Test_file.csv'
FLAG = os.path.exists(WRITE_FILE_PATH)

with open(WRITE_FILE_PATH, 'r') as file:
    file.seek(0, os.SEEK_END)
    file.seek(file.tell() - 3, os.SEEK_SET)
    pos = file.tell()
    while file.read(1) != '\n':
        pos -= 1
        file.seek(pos, os.SEEK_SET)
    cat = ''
    while file.read(1) != ',':
        pos += 1
        file.seek(pos, os.SEEK_SET)
        cat += file.read(1)
    print("Last", cat)
    print("Rest of line", file.readline())
    file.close()

with open(WRITE_FILE_PATH, 'a') as file:
    writer = csv.writer(file, delimiter = ',', lineterminator = '\n',)
    # reader = csv.reader(file, delimiter = ',')
    if not FLAG:
        writer.writerow([''] + ['a']*1_0)
    for i in range(1_0):
        row = [i**2] + [i + j*0.2 for j in range(i+1)]
        writer.writerow(row)
    file.close()

Last 81
Rest of line 9.0,9.2,9.4,9.6,9.8,10.0,10.2,10.4,10.6,10.8



In [42]:
MODELS_DIR = "./model/"
LOAD_PATH_Q_AWARE = MODELS_DIR + "model_q_aware_final_01"
LOAD_TFLITE_PATH = MODELS_DIR + 'tflite_final_01.tflite'
OUTPUTS_DIR = "./outputs/"

SAVE_NAME_SHAPE = "l1_shape.txt"
SAVE_NAME_W_INT = "l1_weights_int.txt"
SAVE_NAME_W_TRUNC = "l1_weights_trunc.txt"
SAVE_NAME_W_ORIGINAL = "l1_weights_original.txt"
SAVE_NAME_B_ORIGINAL = "l1_bias_original.txt"

SAVE_PATH_SHAPE = OUTPUTS_DIR + SAVE_NAME_SHAPE
SAVE_PATH_W_INT = OUTPUTS_DIR + SAVE_NAME_W_INT
SAVE_PATH_W_TRUNC = OUTPUTS_DIR + SAVE_NAME_W_TRUNC
SAVE_PATH_W_ORIGINAL = OUTPUTS_DIR + SAVE_NAME_W_ORIGINAL
SAVE_PATH_B_ORIGINAL = OUTPUTS_DIR + SAVE_NAME_B_ORIGINAL

if not os.path.exists(OUTPUTS_DIR):
    os.mkdir(OUTPUTS_DIR)

# Load Q Aware model
q_aware_model : tf.keras.Model
with tfmot.quantization.keras.quantize_scope():
    q_aware_model = tf.keras.models.load_model(LOAD_PATH_Q_AWARE)
# Load TFLite model
interpreter = tf.lite.Interpreter(LOAD_TFLITE_PATH)

# Quantification of values
BIT_WIDTH = 8
quantized_and_dequantized = OrderedDict()
quantized = OrderedDict()
layer_index_list = []
keys_list = []
layers_shapes = []

layer : tfmot.quantization.keras.QuantizeWrapperV2
for i, layer in enumerate(q_aware_model.layers):
    quantizer : tfmot.quantization.keras.quantizers.Quantizer
    weight : tf.Variable
    if hasattr(layer, '_weight_vars'):
        for weight, quantizer, quantizer_vars in layer._weight_vars:
            min_var = quantizer_vars['min_var']
            max_var = quantizer_vars['max_var']

            key = weight.name[:-2]
            layer_index_list.append(i)
            keys_list.append(key)
            layers_shapes.append(weight.numpy().shape)
            quantized_and_dequantized[key] = quantizer(inputs = weight, training = False, weights = quantizer_vars)
            quantized[key] = np.round(quantized_and_dequantized[key] / max_var * (2**(BIT_WIDTH-1)-1))

# First layer
LAYER_TARGET = 0
T_VARIABLES_KERNEL_INDEX = 0
T_VARIABLES_BIAS_INDEX = 1
key = keys_list[LAYER_TARGET]
kernel_idx = keys_list.index(key)
layer_index = layer_index_list[kernel_idx]
layer_shape = layers_shapes[LAYER_TARGET]

TEST_IN_CHANNEL = 0
TEST_OUT_CHANNEL = 0
print(layer_shape)
print(quantized[key][:,:,0,TEST_OUT_CHANNEL])
print(quantized_and_dequantized[key][:,:,TEST_IN_CHANNEL,TEST_OUT_CHANNEL].numpy())
print(q_aware_model.layers[layer_index].trainable_variables[T_VARIABLES_KERNEL_INDEX][:,:,TEST_IN_CHANNEL,TEST_OUT_CHANNEL].numpy())
print(q_aware_model.layers[layer_index].trainable_variables[T_VARIABLES_BIAS_INDEX][TEST_OUT_CHANNEL].numpy())

with open(SAVE_PATH_SHAPE, 'w') as file:
    writer = csv.writer(file, delimiter = ' ', lineterminator = '\n')
    writer.writerow(list(layer_shape))
with open(SAVE_PATH_W_INT, 'w') as file:
    writer = csv.writer(file, delimiter = ' ', lineterminator = '\n')
    for l in range(layer_shape[3]):
        for k in range(layer_shape[2]):
            for i in range(layer_shape[0]):
                writer.writerow(quantized[key][i,:,k,l].astype(int))
with open(SAVE_PATH_W_TRUNC, 'w') as file:
    writer = csv.writer(file, delimiter = ' ', lineterminator = '\n')
    for l in range(layer_shape[3]):
        for k in range(layer_shape[2]):
            for i in range(layer_shape[0]):
                writer.writerow(quantized_and_dequantized[key][i,:,k,l].numpy())
with open(SAVE_PATH_W_ORIGINAL, 'w') as file:
    writer = csv.writer(file, delimiter = ' ', lineterminator = '\n')
    for l in range(layer_shape[3]):
        for k in range(layer_shape[2]):
            for i in range(layer_shape[0]):
                writer.writerow(q_aware_model.layers[layer_index].trainable_variables[T_VARIABLES_KERNEL_INDEX][i,:,k,l].numpy())
with open(SAVE_PATH_B_ORIGINAL, 'w') as file:
    writer = csv.writer(file, delimiter = ' ', lineterminator = '\n')
    for l in range(layer_shape[3]):
        writer.writerow([q_aware_model.layers[layer_index].trainable_variables[T_VARIABLES_BIAS_INDEX][l].numpy()])

(5, 5, 1, 32)
[[ -34.    9.   41.  -66. -127.]
 [ -70.    8.   57.   27.  -73.]
 [   5.   39.  -30.  -17.   33.]
 [  47.  -23.   -1.  -23.   -1.]
 [  39.   16.  -19.    4.   12.]]
[[-0.11191808  0.02962537  0.13496004 -0.21725276 -0.41804698]
 [-0.23041959  0.02633367  0.18762738  0.08887613 -0.24029471]
 [ 0.01645854  0.12837663 -0.09875125 -0.05595904  0.10862638]
 [ 0.1547103  -0.07570929 -0.00329171 -0.07570929 -0.00329171]
 [ 0.12837663  0.05266733 -0.06254246  0.01316683  0.0395005 ]]
[[-0.11190058  0.02968932  0.13516179 -0.21755771 -0.41798258]
 [-0.23117375  0.02469435  0.18612708  0.08809777 -0.24172066]
 [ 0.01567534  0.130002   -0.09754552 -0.05695752  0.10985629]
 [ 0.15383062 -0.07598737 -0.00296063 -0.07511681 -0.0047392 ]
 [ 0.12786986  0.0541554  -0.06213173  0.0122421   0.03846057]]
0.029690962
