In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt

# Silence TensorFlow messages
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

import tensorflow as tf
import tensorflow.keras as keras 
from tensorflow.keras import regularizers, layers

In [None]:
batch_size = 64
MODEL_DIR = './models'
FLOAT_MODEL = 'float_model.h5'
QAUNT_MODEL = 'quantized_model.h5'

if not os.path.exists(MODEL_DIR):
    os.mkdir(MODEL_DIR)

In [None]:
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()

print('Training data: {}. {}'.format(x_train.shape, y_train.shape))
print('Test data: {}. {}'.format(x_test.shape, y_test.shape))

In [None]:
fig, axs = plt.subplots(1, 5, figsize=(10,10))
plt.tight_layout()

for i in range(5):
    axs[i].imshow(x_train[i], 'gray')
    axs[i].set_title('Label: {}'.format(y_train[i]))

In [None]:
# Data Normalization
# 라벨 값을 one-hot-encoding으로 바꿈

x_train = x_train.reshape((60000,28,28,1)).astype('float32') / 255.0
y_train = keras.utils.to_categorical(y_train)

x_test = x_test.reshape((10000,28,28,1)).astype('float32') / 255.0
y_test = keras.utils.to_categorical(y_test)

x_train_flat = x_train.reshape((-1, 784))
x_test_flat = x_test.reshape((-1, 784))

In [None]:
# Create saparated datasets for train,validate,test
train_dataset = tf.data.Dataset.from_tensor_slices((x_train[:50000], y_train[:50000])).batch(batch_size)
val_dataset = tf.data.Dataset.from_tensor_slices((x_train[5000:], y_train[5000:])).batch(batch_size)
test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(batch_size)

train_dataset_flat = tf.data.Dataset.from_tensor_slices((x_train_flat[:50000], y_train[:50000])).batch(batch_size)
val_dataset_flat = tf.data.Dataset.from_tensor_slices((x_train_flat[5000:], y_train[5000:])).batch(batch_size)
test_dataset_flat = tf.data.Dataset.from_tensor_slices((x_test_flat, y_test)).batch(batch_size)

In [None]:
def customcnn():
    # create a cnn model
    inputs = keras.Input(shape=(28,28,1))
    x = layers.Conv2D(32, (3,3), activation='relu', input_shape=(28,28,1))(inputs)
    x = layers.MaxPooling2D((2,2))(x)
    x = layers.Conv2D(64, (3,3), activation='relu')(x)
    x = layers.MaxPooling2D((2,2))(x)
    x = layers.Conv2D(64, (3,3), activation='relu')(x)
    x = layers.Flatten()(x)
    x = layers.Dense(64, activation='relu')(x)
    outputs = layers.Dense(10, activation='softmax')(x)

    model = keras.Model(inputs=inputs, outputs=outputs, name='mnist_customcnn_model')
    model.summary()

    # Compile the model"
    optimizer = keras.optimizers.RMSprop(lr=0.001)
    model.compile(optimizer=optimizer, 
            loss="categorical_crossentropy",
            metrics=['accuracy']
            )
    return model

def customFC():
    # create a Fully Connected model
    inputs = keras.Input(shape=(28,28))
    inputs = keras.Input(name='input', shape=(784,))

    x = layers.Dense(300, name='hidden_1', activation='relu')(inputs)
    x = layers.Dense(100, name='hidden_2', activation='relu')(x)
    

    outputs = layers.Dense(10, activation='softmax')(x)

    model = keras.Model(inputs=inputs, outputs=outputs, name='mnist_customcnn_model')
    model.summary()

    # Compile the model
    optimizer = keras.optimizers.RMSprop(lr=0.001)
    model.compile(optimizer=optimizer, 
            loss="categorical_crossentropy",
            metrics=['accuracy']
            )
    return model

In [None]:
# build cnn model
print("\nCreate custom cnn..")
model = customcnn()

In [None]:
model.summary()

In [None]:
# Train the model for 10 epochs using a dataset
print("\nFit on dataset..")
history = model.fit(train_dataset, epochs=10, validation_data=val_dataset)

In [None]:
fig, axs = plt.subplots(1, 2, figsize=(12, 4))

axs[0].plot(history.history['loss'], 'b')
axs[0].plot(history.history['val_loss'], 'r')
axs[0].set_title('Training Loss / Validation Loss')
axs[0].set(xlabel='Epochs', ylabel='loss')

axs[1].plot(history.history['accuracy'], 'b')
axs[1].plot(history.history['val_accuracy'], 'r')
axs[1].set_title('Training Accuracy / Validation Accuracy')

In [None]:
# Evaluate model with test data
print("\nEvaluate model on test dataset..")
import time

loss, acc = model.evaluate(test_dataset)  # returns loss and metrics
print("Test Loss: %.3f" % loss)
print("Test Accuracy: %.3f" % acc)

In [None]:
# build cnn model
print("\nCreate custom FC..")
model_FC = customFC()

In [None]:
# Train the model for 10 epochs using a dataset
print("\nFit on dataset..")
history_FC = model_FC.fit(train_dataset_flat, epochs=10, validation_data=val_dataset_flat)

In [None]:
fig, axs = plt.subplots(1, 2, figsize=(12, 4))

axs[0].plot(history_FC.history['loss'], 'b')
axs[0].plot(history_FC.history['val_loss'], 'r')
axs[0].set_title('Training Loss / Validation Loss')
axs[0].set(xlabel='Epochs', ylabel='loss')

axs[1].plot(history_FC.history['accuracy'], 'b')
axs[1].plot(history_FC.history['val_accuracy'], 'r')
axs[1].set_title('Training Accuracy / Validation Accuracy')

In [None]:
# Evaluate model with test data
print("\nEvaluate model on test dataset..")
import time

loss, acc = model_FC.evaluate(test_dataset_flat)  # returns loss and metrics
print("Test Loss: %.3f" % loss)
print("Test Accuracy: %.3f" % acc)

In [None]:
# Save CNN model
path = os.path.join(MODEL_DIR, FLOAT_MODEL)
print("\nSave trained model to{}.".format(path))
model.save(path)

## MNIST model quantization

In [None]:
### 32-bit float model

In [None]:
model = tf.keras.models.load_model('./models/float_model.h5')
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()
#saving converted model in "converted_model.tflite" file
open("./models/converted_model.tflite", "wb").write(tflite_model)

In [None]:
def representative_data_gen():
    for input_value in tf.data.Dataset.from_tensor_slices(x_test).batch(1).take(100):
        yield [input_value]

In [None]:
### 8-bit integer quantization

In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_data_gen
# Ensure that if any ops can't be quantized, the converter throws an error
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
# Set the input and output tensors to uint8 (APIs added in r2.3)
#converter.inference_input_type = tf.uint8
#converter.inference_output_type = tf.uint8

tflite_model_quant_int8 = converter.convert()
#saving converted model in "converted_model.tflite" file
open("./models/converted_quant_model_int8.tflite", "wb").write(tflite_model_quant_int8)

In [None]:
### 16-bit float quantization

In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_types = [tf.float16]
tflite_quant_model_float16 = converter.convert()
#saving converted model in "converted_model.tflite" file
open("./models/converted_quant_model_float16.tflite", "wb").write(tflite_quant_model_float16)

In [None]:
# integer only: 16-bit activations with 8-bit weights (experimental)

In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.representative_dataset = representative_data_gen
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_ops = [tf.lite.OpsSet.EXPERIMENTAL_TFLITE_BUILTINS_ACTIVATIONS_INT16_WEIGHTS_INT8]
tflite_quant_model_act16_wei_8 = converter.convert()
#saving converted model in "converted_model.tflite" file
open("./models/converted_quant_model_act16_wei_8.tflite", "wb").write(tflite_quant_model_act16_wei_8)

In [None]:
print("32-bit Float model in Mb:", 
      os.path.getsize('./models/converted_model.tflite') / float(2**20))
print("16-bit Float Quantized model in Mb:", 
      os.path.getsize('./models/converted_quant_model_float16.tflite') / float(2**20))
print("Compression ratio:", 
      os.path.getsize('./models/converted_model.tflite')/os.path.getsize('./models/converted_quant_model_float16.tflite'))

In [None]:
print("32-bit Float model in Mb:", 
      os.path.getsize('./models/converted_model.tflite') / float(2**20))
print("16-bit(A) 8-bit(W) int Quantized model in Mb:", 
      os.path.getsize('./models/converted_quant_model_act16_wei_8.tflite') / float(2**20))
print("Compression ratio:", 
      os.path.getsize('./models/converted_model.tflite')/os.path.getsize('./models/converted_quant_model_act16_wei_8.tflite'))

In [None]:
import tensorflow.keras.models as models
from tensorflow_model_optimization.quantization.keras import vitis_quantize

In [None]:
# Load the floating point trained model
print('Load float model..')
path = os.path.join(MODEL_DIR, FLOAT_MODEL)
print(path)
try:
    float_model = models.load_model(path)
except:
    print('\nError:load float model failed!')

In [None]:
# get input dimensions of the floating-point model
height = float_model.input_shape[1]
width = float_model.input_shape[2]

In [None]:
# Run vitis-quantization
print('\nRun quantization..')
quantizer = vitis_quantize.VitisQuantizer(float_model)
quantized_model = quantizer.quantize_model(calib_dataset=test_dataset)

In [None]:
# Save quantized model
path = os.path.join(MODEL_DIR, QAUNT_MODEL)
quantized_model.save(path)
print('\nSaved quantized model as',path)

In [None]:
path = os.path.join(MODEL_DIR, QAUNT_MODEL)
with vitis_quantize.quantize_scope():
    quantized_model = models.load_model(path, compile=False)

In [None]:
# Compile the model
print('\nCompile model..')
quantized_model.compile(optimizer="rmsprop", 
        loss="categorical_crossentropy",
        metrics=['accuracy']
        )

In [None]:
# Evaluate model with test data
print("\nEvaluate model on test Dataset")
loss, acc = quantized_model.evaluate(test_dataset)  # returns loss and metrics
print("Test Loss: %.3f" % loss)
print("Test Accuracy: %.3f" % acc)

In [None]:
!echo "-----------------------------------------"
!echo "COMPILING MODEL FOR ZCU104.."
!echo "-----------------------------------------"

!vai_c_tensorflow2 \
            --model ./models/quantized_model.h5 \
            --arch /opt/vitis_ai/compiler/arch/DPUCZDX8G/ZCU104/arch.json \
            --output_dir ./compiled_model/zcu104 \
            --net_name customcnn

!echo "-----------------------------------------"
!echo "MODEL COMPILED"
!echo "-----------------------------------------"

In [None]:
for w in quantized_model.non_trainable_weights:
    print(w)


quantized_model.summary()
model.summary()

In [None]:
import time
def TFLiteInference(model_path,x_test,y_test):

    #Step 1. Load TFLite model and allocate tensors.
    interpreter = tf.lite.Interpreter(model_path=model_path)
    interpreter.allocate_tensors()
    print(tflite_interpreter.get_input_details())
    # Get indexes of input and output layers
    input_index = interpreter.get_input_details()[0]['index']
    output_index = interpreter.get_output_details()[0]['index']

    sum_correct=0.0
    sum_time=0.0
    for idx, data in enumerate(zip(x_test,y_test)):
        image=data[0]
        label=data[1]
        image=tf.expand_dims(image, axis=0) #shape will be [1,32,32,3]
        
        s_time=time.time()
        #Step 2. Transform input data
        interpreter.set_tensor(input_index,image)
        #Step 3. Run inference
        interpreter.invoke()
        #Step 4. Interpret output
        pred=interpreter.get_tensor(output_index)
        
        sum_time+=time.time()-s_time
        if np.argmax(pred)== np.argmax(label):
            sum_correct+=1.0
    
    mean_acc=sum_correct / float(idx+1)
    mean_time=sum_time / float(idx+1)

    print(f'Accuracy of TFLite model: {mean_acc}')
    print(f'Inference time of TFLite model: {mean_time}')
    
TFLiteInference(model_path='./models/converted_model.tflite',x_test=x_test,y_test=y_test)
TFLiteInference(model_path='./models/converted_quant_model_float16.tflite',x_test=x_test,y_test=y_test)
TFLiteInference(model_path='./models/converted_quant_model_act16_wei_8.tflite',x_test=x_test,y_test=y_test)
TFLiteInference(model_path='./models/converted_quant_model_int8.tflite',x_test=x_test,y_test=y_test)
