# Quantization and Pruning Experiments

In [None]:
import numpy as np
import matplotlib.pyplot as plt

import cv2

import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras import layers
from tensorflow.keras import models

from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split

import os
import logging

import pandas as pd
import seaborn as sns
import pathlib

# set global seeds for reproducibility
tf.random.set_seed(1234)
np.random.seed(1234)

# Setting parameters for plotting
plt.rcParams['figure.figsize'] = (15.0, 8.0) # set default size of plots
plt.rcParams['image.interpolation'] = 'nearest'
plt.rcParams['image.cmap'] = 'gray'

logging.getLogger("tensorflow").setLevel(logging.DEBUG)

print("TensorFlow version: ", tf.__version__)

In [None]:
# check if GPU is available
print("GPU is", "AVAILABLE" if tf.config.list_physical_devices('GPU') else "NOT AVAILABLE")

# Data Loading

In [None]:
# Weather Dataset
dataset_path = "dataset"

# Function to load dataset
class_labels = [label for label in os.listdir(dataset_path) if os.path.isdir(os.path.join(dataset_path, label))]
class_to_index = {label: index for index, label in enumerate(class_labels)}

def load_dataset(dataset_path):
    images = []
    labels = []
    for label in os.listdir(dataset_path):
        label_path = os.path.join(dataset_path, label)
        if os.path.isdir(label_path):
            for image_file in os.listdir(label_path):
                image_path = os.path.join(label_path, image_file)
                image = cv2.imread(image_path)
                if image is None:
                    continue  # Skip the image if it cannot be loaded
                image = cv2.resize(image, (96, 96))
                images.append(image)
                labels.append(class_to_index[label])
    return np.array(images), np.array(labels)

all_images, all_labels = load_dataset(dataset_path)


# Check if dataset is loaded correctly
if len(all_images) == 0:
    raise ValueError("No images loaded. Please check the dataset path and ensure it contains images.")

### Normalization and Train/Test Split

In [None]:
# Normalize the input image so that each pixel value is between 0 to 1.
all_images = all_images.astype(np.float32) / 255.0

# Split the dataset into training and testing
train_images, test_images, train_labels, test_labels = train_test_split(all_images, all_labels, test_size=0.2, random_state=42)

### Visualize the data
We can visualize the data by plotting 5 samples of each class.

In [None]:
# Visualize some examples from the dataset.
# We show a few examples of training images from each class.

#classes = ['cloudy', 'foggy', 'rainy', 'shine', 'sunrise']
classes = [i for i in range(len(class_labels))]
print(classes)
num_classes = len(classes)
samples_per_class = 2

# Set a seed for reproducibility
np.random.seed(7)

for y, cls in enumerate(classes):
    idxs = np.flatnonzero(train_labels == cls)
    idxs = np.random.choice(idxs, samples_per_class, replace=False)
    for i, idx in enumerate(idxs):
        plt_idx = i * num_classes + y + 1
        plt.subplot(samples_per_class, num_classes, plt_idx)
        plt.imshow(train_images[idx])
        plt.axis('off')
        if i == 0:
            plt.title(cls)

### Load the Keras model

In [None]:
# Load model
from keras.models import load_model
from keras.optimizers import Adam

fp_model = load_model('model.h5', compile=False)

# # Use an optimizer without weight_decay
# optimizer = Adam(learning_rate=0.001)

# # Compile the model with the correct optimizer
# model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

In [None]:
# Compile the model
fp_model.compile(optimizer='adam',
                # the loss function is the sparse categorical cross-entropy
                # loss. It is used when there are two or more label classes. 
                loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                metrics=['accuracy'])

In [None]:
# View the model summary
fp_model.summary()

In [None]:
# More training can be done here
es = tf.keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True)
history = fp_model.fit(
    train_images,
    train_labels,
    epochs=1,
    validation_data=(test_images, test_labels),
    callbacks=[es]
)

In [None]:
# Save model 
if not os.path.exists('models'):
    os.makedirs('models')
fp_model.save('models/base_model.h5')

In [None]:
# Evaluate model on the test set
fp_test_loss, fp_test_acc = fp_model.evaluate(test_images, test_labels, verbose=2)
print('Test accuracy:', fp_test_acc)
print('Test loss:', fp_test_loss)

In [None]:
# Plot the training history
plt.plot(history.history['accuracy'], label='accuracy')
plt.plot(history.history['val_accuracy'], label = 'val_accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.ylim([0, 1])
plt.legend(loc='lower right')

# Evaluate accuracy

In [None]:
# plot the confusion matrix for the quantized model
from sklearn.metrics import confusion_matrix
import itertools

"""
function: plot_confusion_matrix
    - input: cm, classes, normalize, title, cmap
    - output: none
    - description: plots the confusion matrix
"""

def plot_confusion_matrix(cm, classes,
                            normalize=False,
                            title='Confusion matrix',
                            cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')
    
    print(cm)
    
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)
    
    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")
    
    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

In [None]:
cm = confusion_matrix(test_labels, np.argmax(fp_model.predict(test_images), axis=-1))
plot_confusion_matrix(cm, classes, title='Confusion matrix, without normalization', cmap=plt.cm.Spectral.reversed())

## Convert to a tflite model

In [None]:
# Convert the model to TFLite without quantization
converter = tf.lite.TFLiteConverter.from_keras_model(fp_model)
fp_tflite_model = converter.convert()

# Save the model to disk
open("models/model_f32.tflite", "wb").write(fp_tflite_model)

# Show the model size for the non-quantized HDF5 model
fp_h5_in_kb = os.path.getsize('models/base_model.h5') / 1024
print("HDF5 Model size without quantization: %d KB" % fp_h5_in_kb)

# Show the model size for the non-quantized TFLite model
fp_tflite_in_kb = os.path.getsize('models/model_f32.tflite') / 1024
print("TFLite Model size without quantization: %d KB" % fp_tflite_in_kb)

# Determine the reduction in model size
print("\nReduction in file size by a factor of %f" % (fp_h5_in_kb / fp_tflite_in_kb))

### Quantize with dynamic range optimization

In [None]:
# Convert the model to TFLite with quantization
converter = tf.lite.TFLiteConverter.from_keras_model(fp_model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
dynR_quant_tflite_model = converter.convert()

# Save the model to disk
open("models/model_quant8_dynR.tflite", "wb").write(dynR_quant_tflite_model)

print("Model was saved at location: %s" % os.path.abspath('models/fmodel_quant8_dynR.tflite'))

In [None]:
interpreter = tf.lite.Interpreter(model_content=dynR_quant_tflite_model)
input_type = interpreter.get_input_details()[0]['dtype']
print('input: ', input_type)
output_type = interpreter.get_output_details()[0]['dtype']
print('output: ', output_type)

### Quantize the model to 8-bit integer precision

In [None]:
def representative_data_gen():
  for input_value in tf.data.Dataset.from_tensor_slices(train_images).batch(1).take(100):
    yield [input_value]

converter = tf.lite.TFLiteConverter.from_keras_model(fp_model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_data_gen
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.uint8
converter.inference_output_type = tf.uint8

tflite_model_quant_int8 = converter.convert()

In [None]:
interpreter = tf.lite.Interpreter(model_content=tflite_model_quant_int8)
input_type = interpreter.get_input_details()[0]['dtype']
print('input: ', input_type)
output_type = interpreter.get_output_details()[0]['dtype']
print('output: ', output_type)

In [None]:
# Save the quantized model to disk
open("models/full_quant.tflite", "wb").write(tflite_model_quant_int8)

print("Model was saved at location: %s" % os.path.abspath('models/full_quant.tflite'))

In [None]:
# Show the model size for the 8-bit quantized TFLite model
tflite_quant_in_kb = os.path.getsize('models/full_quant.tflite') / 1024
print("TFLite Model size with 8-bit quantization: %d KB" % tflite_quant_in_kb)

print("TFLite Model size without quantization: %d KB" % fp_tflite_in_kb)

# Determine the reduction in model size
print("\nReduction in model size by a factor of %f" % (fp_tflite_in_kb / tflite_quant_in_kb))

### Performance evaluation of TF Lite models

In [None]:
# Helper function to run inference on a TFLite model
def run_tflite_model(tflite_file, test_image_indices):
  global test_images

  # Initialize the interpreter
  interpreter = tf.lite.Interpreter(model_path=str(tflite_file))
  interpreter.allocate_tensors()

  input_details = interpreter.get_input_details()[0]
  output_details = interpreter.get_output_details()[0]

  predictions = np.zeros((len(test_image_indices),), dtype=int)
  for i, test_image_index in enumerate(test_image_indices):
    test_image = test_images[test_image_index]
    test_label = test_labels[test_image_index]

    if (test_image_index % 1000 == 0):
      print("Evaluated on %d images." % test_image_index)

    # Check if the input type is quantized, then rescale input data to uint8
    if input_details['dtype'] == np.uint8:
      input_scale, input_zero_point = input_details["quantization"]
      test_image = test_image / input_scale + input_zero_point

    test_image = np.expand_dims(test_image, axis=0).astype(input_details["dtype"])
    interpreter.set_tensor(input_details["index"], test_image)
    interpreter.invoke()
    output = interpreter.get_tensor(output_details["index"])[0]

    predictions[i] = output.argmax()

  return predictions


# Helper function to evaluate a TFLite model on all images
def evaluate_model(tflite_file, model_type):
  global test_images
  global test_labels

  test_image_indices = range(test_images.shape[0])
  predictions = run_tflite_model(tflite_file, test_image_indices)

  accuracy = (np.sum(test_labels== predictions) * 100) / len(test_images)

  print('%s model accuracy is %.4f%% (Number of test samples=%d)' % (
      model_type, accuracy, len(test_images)))

In [None]:
tflite_model_quant_int8_file = pathlib.Path('models/full_quant.tflite')
tflite_model_quant_int8_model_type = "Full Post-Quantized INT8"

evaluate_model(tflite_model_quant_int8_file, tflite_model_quant_int8_model_type)
print("Full-precision model accuracy is %.4f%% (Number of test samples=%d)" % (fp_test_acc * 100, len(test_images)))

In [None]:
# we plot also the confusion matrix of the quantized model
tflite_model_quant_int8_pred = run_tflite_model(tflite_model_quant_int8_file, range(test_images.shape[0]))
cm = confusion_matrix(test_labels, tflite_model_quant_int8_pred);
plot_confusion_matrix(cm, classes, title='Confusion matrix, without normalization of the quantized model', cmap=plt.cm.Spectral.reversed())

In [None]:
# compute the accuracy of the quantized model
from sklearn.metrics import accuracy_score
full_int8_accuracy = accuracy_score(test_labels, tflite_model_quant_int8_pred)
print("Full-precision model accuracy is %.4f%% (Number of test samples=%d)" % (fp_test_acc * 100, len(test_images)))
print("Quantized model accuracy is %.4f%% (Number of test samples=%d)" % (full_int8_accuracy * 100, len(test_images)))

### Writing the model to a C header file

In [None]:
# Function: Convert some hex value into an array for C programming
def hex_to_c_array(hex_data, var_name):

    c_str = ''

    # Create header guard
    c_str += '#ifndef ' + var_name.upper() + '_H\n'
    c_str += '#define ' + var_name.upper() + '_H\n\n'

    # Add array length at top of file
    c_str += '\nstatic const unsigned int ' + var_name + '_len = ' + str(len(hex_data)) + ';\n'

    # Declare C variable
    c_str += 'static const unsigned char ' + var_name + '[] = {'
    hex_array = []
    for i, val in enumerate(hex_data) :

        # Construct string from hex
        hex_str = format(val, '#04x')

        # Add formatting so each line stays within 80 characters
        if (i + 1) < len(hex_data):
            hex_str += ','
        if (i + 1) % 12 == 0:
            hex_str += '\n '
        hex_array.append(hex_str)

    # Add closing brace
    c_str += '\n ' + format(' '.join(hex_array)) + '\n};\n\n'

    # Close out header guard
    c_str += '#endif //' + var_name.upper() + '_H'

    return c_str

In [None]:
c_model_name = 'weather_model'
# check if dir 'cfiles' exists, if not create it
if not os.path.exists('cfiles'):
    os.makedirs('cfiles')
# Write TFLite model to a C source (or header) file
with open('cfiles/' + c_model_name + '.h', 'w') as file:
    file.write(hex_to_c_array(tflite_model_quant_int8, c_model_name))

### Saving test data used for inference on the MCU

In [None]:
# save the test data as numpy arrays
int_test_images = (test_images*255).astype(np.uint8)

np.save('x_test.npy', int_test_images.astype(np.uint8))
np.save('y_test.npy', test_labels.astype(np.uint8))
# plot the first 5 images in the test set with their labels
# map class labels to names
class_names = ['cloudy', 'foggy', 'rainy', 'shine', 'sunrise']

for i in range(5):
    plt.subplot(1, 5, i+1)
    plt.imshow(int_test_images.astype(np.uint8)[i], cmap='gray')
    plt.title('Label: %s' % class_names[test_labels[i]])

# print the location of the files
print('Test image data location: ', os.path.abspath('x_test.npy'))
print('Test labels location: ', os.path.abspath('y_test.npy'))

### Quantization-aware training

In [None]:
!pip install -q tensorflow-model-optimization

In [None]:
import tensorflow_model_optimization as tfmot

# Convert the model to a quantization aware model
quant_aware_model = tfmot.quantization.keras.quantize_model(fp_model)

# `quantize_model` requires a recompile.
quant_aware_model.compile(optimizer='adam',
                loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                metrics=['accuracy'])

quant_aware_model.summary()

In [None]:
# evaluate the model on the test set
quant_loss, quant_acc = quant_aware_model.evaluate(test_images, test_labels, verbose=0)
print('Quantization aware training loss: ', quant_loss)
print('Quantization aware training accuracy: ', quant_acc)
print('Full-precision training accuracy: ', fp_test_acc)

In [None]:
# convert the QAT model to a fully quantized model using TFLite

def representative_data_gen():
  for input_value in tf.data.Dataset.from_tensor_slices(train_images).batch(1).take(100):
    yield [input_value]

converter = tf.lite.TFLiteConverter.from_keras_model(quant_aware_model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_data_gen
# Ensure that if any ops can't be quantized, the converter throws an error
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
# Set the input and output tensors to uint8 (APIs added in r2.3)
converter.inference_input_type = tf.uint8
converter.inference_output_type = tf.uint8

tflite_model_quant_int8_qat = converter.convert()

In [None]:
interpreter = tf.lite.Interpreter(model_content=tflite_model_quant_int8_qat)
input_type = interpreter.get_input_details()[0]['dtype']
print('input: ', input_type)
output_type = interpreter.get_output_details()[0]['dtype']
print('output: ', output_type)
# Save the quantized model to disk
open("models/fmnist_qat_int8.tflite", "wb").write(tflite_model_quant_int8_qat)

In [None]:
c_model_name = 'qat8weather'
# Write TFLite model to a C source (or header) file
with open('cfiles/' + c_model_name + '.h', 'w') as file:
    file.write(hex_to_c_array(tflite_model_quant_int8_qat, c_model_name))

In [None]:
# this might take a few minutes (~ 1- 2 minutes)
# if it takes longer than that, I suggest to 
# restart the runtime and try again
# if the issue still persists, restart your computer
tflite_model_quant_int8_qat_file = pathlib.Path('models/qat_int8.tflite')
tflite_model_quant_int8_qat_type = "Full QAT INT8"

evaluate_model(tflite_model_quant_int8_qat_file, tflite_model_quant_int8_qat_type)
print("Full-precision model accuracy is %.4f%% (Number of test samples=%d)" % (fp_test_acc * 100, len(test_images)))

In [None]:
# evaluate the model on the test set
tflite_model_quant_int8_qat_file = "models/qat_int8.tflite"
tflite_model_quant_int8_qat_model_type = "Quantized aware training model"
tflite_model_quant_int8_qat_pred = run_tflite_model(tflite_model_quant_int8_qat_file, range(test_images.shape[0]))
cm = confusion_matrix(test_labels, tflite_model_quant_int8_qat_pred);
plot_confusion_matrix(cm, classes, title='Confusion matrix, without normalization of the quantized model', cmap=plt.cm.Spectral.reversed())


In [None]:
# compute the accuracy of the model
full_qat_int8_accuracy = accuracy_score(test_labels, tflite_model_quant_int8_qat_pred)
print('Full QAT INT8 accuracy is %.4f%% (Number of test samples=%d)' % (full_qat_int8_accuracy * 100, len(test_images)))
print('Full-precision model accuracy is %.4f%% (Number of test samples=%d)' % (fp_test_acc * 100, len(test_images)))

Pruning with constant sparsity

In [None]:
import tensorflow_model_optimization as tfmot


# Strucutred pruning with constant sparsity
pruning_params = {
        'pruning_schedule': tfmot.sparsity.keras.ConstantSparsity(0.5, begin_step=2000, frequency=100),
        'block_size': (1, 1),
}

# Create a pruning model
pruned_model = tfmot.sparsity.keras.prune_low_magnitude(fp_model, **pruning_params)

# `prune_low_magnitude` requires a recompile.
pruned_model.compile(optimizer='adam',
                loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                metrics=['accuracy'])

pruned_model.summary()

In [None]:
# Train and evaluate the pruned model
pruned_model.fit(
                    train_images,
                    train_labels,
                    epochs=2,
                    validation_data=(test_images, test_labels),
                    callbacks=[es]
)

In [None]:
# evaluate the model on the test set
pruned_loss, pruned_acc = pruned_model.evaluate(train_images, train_labels, verbose=0)
print('Pruned model loss: ', pruned_loss)
print('Pruned model accuracy: ', pruned_acc)
print('Full-precision model accuracy: ', fp_test_acc)

### Conversion to TF Lite

In [None]:
pruned_model_for_export = tfmot.sparsity.keras.strip_pruning(pruned_model)

pruned_keras_file = 'models/pruned_model.h5'
tf.keras.models.save_model(pruned_model_for_export, pruned_keras_file, include_optimizer=False)
print('Saved pruned Keras model to:', os.path.abspath(pruned_keras_file))

In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(pruned_model_for_export)
pruned_tflite_model = converter.convert()

pruned_tflite_file = 'models/pruned_model.tflite'

with open(pruned_tflite_file, 'wb') as f:
    f.write(pruned_tflite_model)

print('Saved pruned TFLite model to:', os.path.abspath(pruned_tflite_file))

In [None]:
c_model_name = 'prunedweather'
# Write TFLite model to a C source (or header) file
with open('cfiles/' + c_model_name + '.h', 'w') as file:
    file.write(hex_to_c_array(pruned_tflite_model, c_model_name))

In [None]:
# Define a helper function to actually compress the models via gzip and measure the zipped size.
import tempfile

def get_gzipped_model_size(file):
    # It returns the size of the gzipped model in bytes.
    import os
    import zipfile
    
    _, zipped_file = tempfile.mkstemp('.zip')
    with zipfile.ZipFile(zipped_file, 'w', compression=zipfile.ZIP_DEFLATED) as f:
        f.write(file)
    
    return os.path.getsize(zipped_file)

In [None]:
# compare the size of the pruned model and the full-precision model
print('Size of the pruned model: ', get_gzipped_model_size(pruned_tflite_file))
print('Size of the full-precision model: ', get_gzipped_model_size('models/model_f32.tflite'))
print("The achieved compression ratio is %.2fx" % (get_gzipped_model_size('models/model_f32.tflite') / get_gzipped_model_size(pruned_tflite_file)))

In [None]:
# evaluate the model on the test set
pruned_tflite_file = "models/pruned_model.tflite"
pruned_tflite_model_type = "Pruned model"
evaluate_model(pruned_tflite_file, pruned_tflite_model_type)
print("Full-precision model accuracy is %.4f%% (Number of test samples=%d)" % (fp_test_acc * 100, len(test_images)))

In [None]:
# evaluate the model on the test set
pruned_tflite_file = "models/pruned_model.tflite"
pruned_tflite_model_type = "Pruned model"
pruned_tflite_pred = run_tflite_model(pruned_tflite_file, range(test_images.shape[0]))
cm = confusion_matrix(test_labels, pruned_tflite_pred);
plot_confusion_matrix(cm, classes, title='Confusion matrix, without normalization of the pruned model', cmap=plt.cm.Spectral.reversed())

In [None]:
# compute the accuracy of the model
pruned_accuracy = accuracy_score(test_labels, pruned_tflite_pred)
print('Pruned model accuracy is %.4f%% (Number of test samples=%d)' % (pruned_accuracy * 100, len(test_images)))
print('Full-precision model accuracy is %.4f%% (Number of test samples=%d)' % (fp_test_acc * 100, len(test_images)))

### 2. Unstructured pruning with constant sparsity

In [None]:
# Unstrucutred pruning with constant sparsity
pruning_params = {
        'pruning_schedule': tfmot.sparsity.keras.ConstantSparsity(0.5, begin_step=2000, frequency=100),
}

# Create a pruning model
pruned_model_unstructured = tfmot.sparsity.keras.prune_low_magnitude(fp_model, **pruning_params)

# `prune_low_magnitude` requires a recompile.
pruned_model_unstructured.compile(optimizer='adam',
                loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                metrics=['accuracy'])

pruned_model_unstructured.summary()

In [None]:
# Train and evaluate the pruned model
pruned_model_unstructured.fit(
                    train_images,
                    train_labels,
                    epochs=10,
                    validation_data=(test_images, test_labels),
                    callbacks=[es]
)

In [None]:
# evaluate the model on the test set
pruned_loss_unstructured, pruned_acc_unstructured = pruned_model_unstructured.evaluate(test_images, test_labels, verbose=0)
print('Pruned model loss: ', pruned_loss_unstructured)
print('Pruned model accuracy: ', pruned_acc_unstructured)
print('Full-precision model accuracy: ', fp_test_acc)

### Conversion to TF Lite

In [None]:
# Conversion to TF Lite
pruned_model_unstructured_for_export = tfmot.sparsity.keras.strip_pruning(pruned_model_unstructured)

pruned_keras_file_unstructured = 'models/pruned_model_unstructured.h5'
tf.keras.models.save_model(pruned_model_unstructured_for_export, pruned_keras_file_unstructured, include_optimizer=False)
print('Saved pruned Keras model to:', os.path.abspath(pruned_keras_file_unstructured))

In [None]:
# Convert the model
converter = tf.lite.TFLiteConverter.from_keras_model(pruned_model_unstructured_for_export)
pruned_tflite_model_unstructured = converter.convert()

# Save the model
pruned_tflite_file_unstructured = 'models/pruned_model_unstructured.tflite'

with open(pruned_tflite_file_unstructured, 'wb') as f:
    f.write(pruned_tflite_model_unstructured)

print('Saved pruned TFLite model to:', os.path.abspath(pruned_tflite_file_unstructured))

In [None]:
c_model_name = 'unstr_prunedweather'
# Write TFLite model to a C source (or header) file
with open('cfiles/' + c_model_name + '.h', 'w') as file:
    file.write(hex_to_c_array(pruned_tflite_model_unstructured, c_model_name))

In [None]:
# compare the size of the pruned model and the full-precision model
print('Size of the unstructured pruned model: ', get_gzipped_model_size(pruned_tflite_file_unstructured))
print('Size of the structured pruned model: ', get_gzipped_model_size(pruned_tflite_file))
print('Size of the full-precision model: ', get_gzipped_model_size('models/model_f32.tflite'))
print("The achieved compression ratio is %.2fx" % (get_gzipped_model_size('models/model_f32.tflite') / get_gzipped_model_size(pruned_tflite_file_unstructured)))

In [None]:
# evaluate the model on the test set
pruned_tflite_file_unstructured = "models/pruned_model_unstructured.tflite"
pruned_tflite_model_type_unstructured = "Pruned model unstructured"
evaluate_model(pruned_tflite_file_unstructured, pruned_tflite_model_type_unstructured)
print("Full-precision model accuracy is %.4f%% (Number of test samples=%d)" % (fp_test_acc * 100, len(test_images)))

In [None]:
# evaluate the model on the test set
pruned_tflite_file_unstructured = "models/pruned_model_unstructured.tflite"
pruned_tflite_model_type_unstructured = "Pruned model unstructured"
pruned_tflite_pred_unstructured = run_tflite_model(pruned_tflite_file_unstructured, range(test_images.shape[0]))
cm = confusion_matrix(test_labels, pruned_tflite_pred_unstructured);
plot_confusion_matrix(cm, classes, title='Confusion matrix, without normalization of the pruned model unstructured', cmap=plt.cm.Spectral.reversed())


In [None]:
# compute the accuracy of the model
pruned_accuracy_unstructured = accuracy_score(test_labels, pruned_tflite_pred_unstructured)
print('Unstructured pruned model accuracy is %.4f%% (Number of test samples=%d)' % (pruned_accuracy_unstructured * 100, len(test_images)))
print('Structured pruned model accuracy is %.4f%% (Number of test samples=%d)' % (pruned_accuracy * 100, len(test_images)))
print('Full-precision model accuracy is %.4f%% (Number of test samples=%d)' % (fp_test_acc * 100, len(test_images)))

### 3. Unstructured pruning with dynamic sparsity

In [None]:
# Unstructured pruning with dynamic sparsity
pruning_params = {
        'pruning_schedule': tfmot.sparsity.keras.PolynomialDecay(initial_sparsity=0.50,
                                                                    final_sparsity=0.80,
                                                                    begin_step=2000,
                                                                    end_step=4000,
                                                                    frequency=100)

}

# Create a pruning model
pruned_model_unstructured_dynamic = tfmot.sparsity.keras.prune_low_magnitude(fp_model, **pruning_params)

# `prune_low_magnitude` requires a recompile.
pruned_model_unstructured_dynamic.compile(optimizer='adam',
                loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                metrics=['accuracy'])

pruned_model_unstructured_dynamic.summary()

In [None]:
# Train and evaluate the pruned model
pruned_model_unstructured_dynamic.fit(
                    train_images,
                    train_labels,
                    epochs=10,
                    validation_data=(test_images, test_labels),
                    callbacks=[es]
)

In [None]:
# evaluate the model on the test set
pruned_loss_unstructured_dynamic, pruned_acc_unstructured_dynamic = pruned_model_unstructured_dynamic.evaluate(test_images, test_labels, verbose=0)
print('Dynamically unstructured pruned model loss: ', pruned_loss_unstructured_dynamic)
print('Dynamically unstructured pruned model accuracy: ', pruned_acc_unstructured_dynamic)
print('Full-precision model accuracy: ', fp_test_acc)

In [None]:
# Conversion to Keras Lite
pruned_model_unstructured_dynamic_for_export = tfmot.sparsity.keras.strip_pruning(pruned_model_unstructured_dynamic)

pruned_keras_file_unstructured_dynamic = 'models/pruned_model_unstructured_dynamic.h5'
tf.keras.models.save_model(pruned_model_unstructured_dynamic_for_export, pruned_keras_file_unstructured_dynamic, include_optimizer=False)
print('Saved pruned Keras model to:', os.path.abspath(pruned_keras_file_unstructured_dynamic))  

In [None]:
# Convert the model
converter = tf.lite.TFLiteConverter.from_keras_model(pruned_model_unstructured_dynamic_for_export)
pruned_tflite_model_unstructured_dynamic = converter.convert()

# Save the model
pruned_tflite_file_unstructured_dynamic = 'models/pruned_model_unstructured_dynamic.tflite'

with open(pruned_tflite_file_unstructured_dynamic, 'wb') as f:
    f.write(pruned_tflite_model_unstructured_dynamic)

print('Saved pruned TFLite model to:', os.path.abspath(pruned_tflite_file_unstructured_dynamic))

In [None]:
# write TFLite model to a C source (or header) file
c_model_name = 'unstr_dyn_prunedweather'

with open('cfiles/' + c_model_name + '.h', 'w') as file:
    file.write(hex_to_c_array(pruned_tflite_model_unstructured_dynamic, c_model_name))

In [None]:
# compare the size of the pruned model and the full-precision model
print('Size of the unstructured dynamically pruned model: ', get_gzipped_model_size(pruned_tflite_file_unstructured_dynamic))
print('Size of the full-precision model: ', get_gzipped_model_size('models/model_f32.tflite'))
print("The achieved compression ratio is %.2fx" % (get_gzipped_model_size('models/model_f32.tflite') / get_gzipped_model_size(pruned_tflite_file_unstructured_dynamic)))

In [None]:
# evaluate the model on the test set
pruned_tflite_file_unstructured_dynamic = "models/pruned_model_unstructured_dynamic.tflite"
pruned_tflite_model_type_unstructured_dynamic = "Pruned model unstructured dynamic"
evaluate_model(pruned_tflite_file_unstructured_dynamic, pruned_tflite_model_type_unstructured_dynamic)
print("Full-precision model accuracy is %.4f%% (Number of test samples=%d)" % (fp_test_acc * 100, len(test_images)))

In [None]:
# evaluate the model on the test set
pruned_tflite_file_unstructured_dynamic = "models/pruned_model_unstructured_dynamic.tflite"
pruned_tflite_model_type_unstructured_dynamic = "Pruned model unstructured dynamic"
pruned_tflite_pred_unstructured_dynamic = run_tflite_model(pruned_tflite_file_unstructured_dynamic, range(test_images.shape[0]))
cm = confusion_matrix(test_labels, pruned_tflite_pred_unstructured_dynamic);
plot_confusion_matrix(cm, classes, title='Confusion matrix, without normalization of the pruned model unstructured dynamic', cmap=plt.cm.Spectral.reversed())

In [None]:
# compute the accuracy of the model
pruned_accuracy_unstructured_dynamic = accuracy_score(test_labels, pruned_tflite_pred_unstructured_dynamic)
print('Pruned model accuracy is %.4f%% (Number of test samples=%d)' % (pruned_accuracy_unstructured_dynamic * 100, len(test_images)))
print('Full-precision model accuracy is %.4f%% (Number of test samples=%d)' % (fp_test_acc * 100, len(test_images)))

### Quantization-aware training with pruning

In [None]:
# PQAT
quant_aware_annotate_model = tfmot.quantization.keras.quantize_annotate_model(
              pruned_model_unstructured_for_export)

pruned_qat_model = tfmot.quantization.keras.quantize_apply(quant_aware_annotate_model,
                   tfmot.experimental.combine.Default8BitPrunePreserveQuantizeScheme())

pruned_qat_model.compile(optimizer='adam',
                loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                metrics=['accuracy'])

pruned_qat_model.summary()

In [None]:
# train the model
pruned_qat_model.fit(
                    train_images,
                    train_labels,
                    epochs=10,
                    validation_data=(test_images, test_labels),
                    callbacks=[es]
)

In [None]:
# evaluate the model on the test set
pruned_qat_loss, pruned_qat_acc = pruned_qat_model.evaluate(test_images, test_labels, verbose=0)
print('Pruned QAT model loss: ', pruned_qat_loss)
print('Pruned QAT model accuracy: ', pruned_qat_acc)
print('Full-precision model accuracy: ', fp_test_acc)

In [None]:
# Convert the model
# converter = tf.lite.TFLiteConverter.from_keras_model(pruned_qat_model)
# converter.optimizations = [tf.lite.Optimize.DEFAULT]
# pruned_qat_tflite_model = converter.convert()

# # Save the model
# pruned_qat_tflite_file = 'models/pruned_qat_model.tflite'

# with open(pruned_qat_tflite_file, 'wb') as f:
#     f.write(pruned_qat_tflite_model)

print('Saved pruned QAT TFLite model to:', os.path.abspath(pruned_qat_tflite_file))

# write TFLite model to a C source (or header) file
c_model_name = 'pruned_qat_weather'

# with open('cfiles/' + c_model_name + '.h', 'w') as file:
#     file.write(hex_to_c_array(pruned_qat_tflite_model, c_model_name))

# compare the size of the pruned model and the full-precision model
print('Size of the pruned QAT model: ', get_gzipped_model_size(pruned_qat_tflite_file))
print('Size of th QAT model: ', get_gzipped_model_size( 'models/qat_int8.tflite'))
print('Size of the full-precision model: ', get_gzipped_model_size('models/model_f32.tflite'))
print("The achieved compression ratio is %.2fx" % (get_gzipped_model_size('models/model_f32.tflite') / get_gzipped_model_size(pruned_qat_tflite_file)))

In [None]:
# evaluate the model on the test set
pruned_qat_tflite_file = "models/pruned_qat_model.tflite"
pruned_qat_tflite_model_type = "Pruned QAT model"
evaluate_model(pruned_qat_tflite_file, pruned_qat_tflite_model_type)
print("Full-precision model accuracy is %.4f%% (Number of test samples=%d)" % (fp_test_acc * 100, len(test_images)))

In [None]:
# evaluate the model on the test set
pruned_qat_tflite_file = "models/pruned_qat_model.tflite"
pruned_qat_tflite_model_type = "Pruned QAT model"
pruned_qat_tflite_pred = run_tflite_model(pruned_qat_tflite_file, range(test_images.shape[0]))
cm = confusion_matrix(test_labels, pruned_qat_tflite_pred);
plot_confusion_matrix(cm, classes, title='Confusion matrix, without normalization of the pruned QAT model', cmap=plt.cm.Spectral.reversed())

In [None]:
# compute the accuracy of the model
pruned_qat_accuracy = accuracy_score(test_labels, pruned_qat_tflite_pred)
print('Pruned QAT model accuracy is %.4f%% (Number of test samples=%d)' % (pruned_qat_accuracy * 100, len(test_images)))
print('Full-precision model accuracy is %.4f%% (Number of test samples=%d)' % (fp_test_acc * 100, len(test_images)))

# Result Summary

In [None]:
from prettytable import PrettyTable
# create a table with the results
model_types = ['full precision', 'PTQ', 'sPruned constant', 'uPruned constant', 'uPruned dynamic']
#model_accuracies = [fp_test_acc, full_int8_accuracy, pruned_accuracy, pruned_accuracy_unstructured, pruned_accuracy_unstructured_dynamic, pruned_qat_accuracy]
model_accuracies = [fp_test_acc, full_int8_accuracy, pruned_accuracy, pruned_accuracy_unstructured, pruned_accuracy_unstructured_dynamic]

# cut precision to 4 decimal places
model_accuracies = [round(x, 4) for x in model_accuracies]
#model_sizes = [get_gzipped_model_size('models/model_f32.tflite'), get_gzipped_model_size('models/full_quant.tflite'), get_gzipped_model_size('models/pruned_model.tflite'), get_gzipped_model_size('models/pruned_model_unstructured.tflite'), get_gzipped_model_size('models/pruned_model_unstructured_dynamic.tflite'), get_gzipped_model_size('models/pruned_qat_model.tflite')]
model_sizes = [get_gzipped_model_size('models/model_f32.tflite'), get_gzipped_model_size('models/full_quant.tflite'), get_gzipped_model_size('models/pruned_model.tflite'), get_gzipped_model_size('models/pruned_model_unstructured.tflite'), get_gzipped_model_size('models/pruned_model_unstructured_dynamic.tflite')]

# divide by 1000 to get the size in KB
model_sizes = [round(x / 1000, 2) for x in model_sizes]

table = PrettyTable()
table.field_names = ["Model type", "Accuracy", "Size (KB)"]
for i in range(len(model_types)):
    table.add_row([model_types[i], model_accuracies[i], model_sizes[i]])
print(table)

# plot model on x axis and accuracy on y axis
# and make size of the points proportional to the size of the model
plt.figure(figsize=(15, 8))
sns.scatterplot(x=model_types, y=model_accuracies, size=model_sizes, sizes=(100, 1000), hue=model_types)
# do not show the legend
plt.legend([], [], frameon=False)
# get only legend for model type
handles, labels = plt.gca().get_legend_handles_labels()
by_label = dict(zip(labels, handles))
# only keep first 6 elements of dict
by_label = {k: by_label[k] for k in list(by_label)[:6]}
plt.legend(by_label.values(), by_label.keys(), title='Model type', loc='lower right')
# annotate the points with the size of the model
for i in range(len(model_types)):
    plt.annotate(str(model_sizes[i]) + 'kB', (model_types[i], model_accuracies[i] + 0.0001))


plt.title('Accuracy vs model type')
plt.xlabel('Model type')
plt.ylabel('Accuracy')
plt.show()