In [None]:
#@title Copyright 2022 HCAIM.

# Statement related to copyrights can be addeded here

# Materials for this exercise are derived from the listed sources
  #  Lab work created by Dr Rosario Catelli
  #  Third Party Resources:
    # https://colab.research.google.com/github/tensorflow/model-optimization/blob/master/tensorflow_model_optimization/g3doc/guide/pruning/pruning_with_keras.ipynb


In [None]:

#@title HCAIM Practical Details

Practical Title =  Model Compression#@param
Module = C #@param ["A", "B", "C", "D"] {type:"raw"}
Focus = Future AI/Learning #@param {type:"raw"}
Topic =  Pruning and Quantization#@param
Solution_Available =  No#@param ["Yes", "No", "NA"] {type:"raw", allow-input: true}
Duration_in_minutes = 150 #@param {type:"slider", min:120, max:180, step:10}

## Learning Outcomes:

  * Understand how to implement techniques of model compression
  * Grasp the advantages of pruning and quantization
  * Becoming familiar with high-level frameworks
  
  

## Lecturer Notes

  * Instruction on **Quiz**: answer in the empty text cell below each quiz.
  

## Instructions/Advice to students:

  * This is an individual work
  * Complete the listed taks with in the allocated time
  * Submit all documentation related to practical on Moodle
  * First complete the easy tasks
  
  

## **Pruning and Quantization**


In [None]:
Allocated_time_in_minutes = 70 #@param {type:"slider", min:0, max:70, step:5}

In this example we will see how to structure a neural network with Keras/TensorFlow and apply magnitude-based *weight pruning*. In detail:

1. Prepare the dataset, create the model, train, evaluate and save the model for later usage
2. TensorFlow Model Optimization Toolkit: model preparation for pruning
8. Switching from TF to TFLite
9. Adding quantization
10. Persistence of accuracy from TF to TFLite

## **Task 1 - Prepare the dataset, create the model, train, evaluate and save the model for later usage**

The dataset used for training the teacher and distilling the teacher is
[MNIST](https://keras.io/api/datasets/mnist/), a handwritten digits dataset.

Please note the procedure would be equivalent for any other dataset, e.g. [CIFAR-10](https://keras.io/api/datasets/cifar10/) (with a suitable choice of models).

In [None]:
from tensorflow import keras

In [None]:
# For the sake of simplicity, the MNIST dataset is already available within Keras so we just need to load it properly.

(train_images, train_labels), (test_images, test_labels) = keras.datasets.mnist.load_data() # The method "load_data()" returns two tuples of NumPy arrays: (x_train, y_train), (x_test, y_test) -> We call them (train_images, train_labels), (test_images, test_labels).
                                                                                            # x_train: uint8 NumPy array of grayscale image data with shapes (60000, 28, 28), containing the training data. Pixel values range from 0 to 255.
                                                                                            # y_train: uint8 NumPy array of digit labels (integers in range 0-9) with shape (60000,) for the training data.
                                                                                            # x_test: uint8 NumPy array of grayscale image data with shapes (10000, 28, 28), containing the test data. Pixel values range from 0 to 255.
                                                                                            # y_test: uint8 NumPy array of digit labels (integers in range 0-9) with shape (10000,) for the test data.

# Normalize the input image so that each pixel value is between 0 and 1: this helps us to make the network more "stable" during learning.
train_images = train_images / 255.0
test_images = test_images / 255.0



Define the model architecture using the Sequential approach given by TensorFlow.

"Model" groups layers into an object with training and inference features.

In [None]:
import tensorflow as tf

In [None]:
model = keras.Sequential([

  keras.layers.InputLayer(input_shape=(28, 28)),  # Layer to be used as an entry point into a Network:
                                                  # input_shape parameter: Shape tuple (not including the batch axis), or TensorShape instance (not including the batch axis).

  keras.layers.Reshape(target_shape=(28, 28, 1)),   # Layer that reshapes inputs into the given shape.
                                                    # target_shape: Tuple of integers, does not include the samples dimension (batch size).

  keras.layers.Conv2D(filters=12, kernel_size=(3, 3), activation='relu'), # 2D convolution layer (e.g. spatial convolution over images): this layer creates a convolution kernel that is convolved with the layer input to produce a tensor of outputs
                                                                          # filters: Integer, the dimensionality of the output space (i.e. the number of output filters in the convolution).
                                                                          # kernel_size: An integer or tuple/list of 2 integers, specifying the height and width of the 2D convolution window. Can be a single integer to specify the same value for all spatial dimensions.
                                                                          # activation: Activation function to use. If you don't specify anything, no activation is applied.
                                                                          # 'relu': With default values, this returns the standard ReLU activation: max(x, 0), the element-wise maximum of 0 and the input tensor.

  keras.layers.MaxPool2D(pool_size=(2, 2)), # Max pooling operation for 2D spatial data.
                                            # Downsamples the input along its spatial dimensions (height and width) by taking the maximum value over an input window (of size defined by pool_size) for each channel of the input.
                                            # pool_size: Integer or tuple of 2 integers, window size over which to take the maximum. (2, 2) will take the max value over a 2x2 pooling window. If only one integer is specified, the same window length will be used for both dimensions.

  keras.layers.Flatten(), # Flattens the input. Does not affect the batch size.

  keras.layers.Dense(units=10)  # Just your regular densely-connected NN layer.
                                # Dense implements the operation: output = activation(dot(input, kernel) + bias) where activation is the element-wise activation function passed as the activation argument, kernel is a weights matrix created by the layer, and bias is a bias vector created by the layer (only applicable if use_bias is True). These are all attributes of Dense.
                                # Activation function to use. If you don't specify anything, no activation is applied (ie. "linear" activation: a(x) = x).
])

Train the digit classification model.

The ***compile*** method configures the model for training.

The ***fit*** method trains the model for a fixed number of epochs (iterations on a dataset). About the batch_size hyper-parameters:
* batch_size: Integer or None. Number of samples per gradient update. If unspecified, batch_size will default to 32. Do not specify the batch_size if your data is in the form of datasets, generators, or keras.utils.Sequence instances (since they generate batches).

In [None]:
model.compile(

    optimizer='adam', # String (name of optimizer) or optimizer instance.

    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), # Loss function. May be a string (name of loss function), or a tf.keras.losses.Loss instance.

    metrics=['accuracy']  # List of metrics to be evaluated by the model during training and testing. Each of this can be a string (name of a built-in function), function or a tf.keras.metrics.Metric instance.

    )

In [None]:
model.summary()

In [None]:
model_history = model.fit(

    x=train_images, # Input data

    y=train_labels, # Target data

    epochs=4, # Integer. Number of epochs to train the model. An epoch is an iteration over the entire x and y data provided (unless the steps_per_epoch flag is set to something other than None). Note that in conjunction with initial_epoch, epochs is to be understood as "final epoch". The model is not trained for a number of iterations given by epochs, but merely until the epoch of index epochs is reached.

    validation_split=0.1, # Float between 0 and 1. Fraction of the training data to be used as validation data. The model will set apart this fraction of the training data, will not train on it,
                          # and will evaluate the loss and any model metrics on this data at the end of each epoch. The validation data is selected from the last samples in the x and y data provided, before shuffling.
)

The ***evaluate*** method returns the loss value & metrics values for the model in test mode.

Computation is done in batches. About the batch_size hyper-parameter:


* batch_size: Integer or None. Number of samples per batch of computation. If unspecified, batch_size will default to 32. Do not specify the batch_size if your data is in the form of a dataset, generators, or keras.utils.Sequence instances (since they generate batches).


In [None]:
import tempfile

In [None]:
_, baseline_model_accuracy = model.evaluate(x=test_images, y=test_labels)  # The loss value is thrown away, while x are input data and y are target data.

print('Baseline test accuracy:', baseline_model_accuracy)

_, keras_file = tempfile.mkstemp('.h5') # tempfile.mkstemp return a tuple containing an OS-level handle to (1) an open file (thrown away in this case) and (2) the absolute pathname of that file.
tf.keras.models.save_model(model=model, filepath=keras_file, include_optimizer=False)  # Saves a model as a TensorFlow SavedModel or HDF5 file:
# The SavedModel and HDF5 file contains:
# - the model's configuration (topology)
# - the model's weights
# - the model's optimizer's state (if any)
# Thus models can be reinstantiated in the exact same state, without any of the code used for model definition or training.

print('Saved baseline model to:', keras_file)

## **Task 2 - TensorFlow Model Optimization Toolkit: model preparation for pruning**

The TensorFlow Model Optimization Toolkit is a suite of tools for optimizing ML models for deployment and execution. Among many uses, the toolkit supports techniques used to:
* Reduce latency and inference cost for cloud and edge devices (e.g. mobile, IoT).
* Deploy models to edge devices with restrictions on processing, memory, power-consumption, network usage, and model storage space.
* Enable execution on and optimize for existing hardware or new special purpose accelerators.

In [None]:
! pip install -q tensorflow-model-optimization

In [None]:
import tensorflow_model_optimization as tfmot

You will apply pruning to the whole model and see this in the model summary.

In this example, you start the model with 50% sparsity (50% zeros in weights)
and end with 80% sparsity.

In [None]:
import numpy as np

In [None]:
prune_low_magnitude = tfmot.sparsity.keras.prune_low_magnitude  # Modify a tf.keras layer or model to be pruned during training.

# This function wraps a tf.keras model or layer with pruning functionality which sparsifies the layer's weights during training.
# For example, using this with 50% sparsity will ensure that 50% of the layer's weights are zero.

# The function accepts either a single keras layer (subclass of tf.keras.layers.Layer), list of keras layers or a Sequential or Functional tf.keras model and handles them appropriately.
# If it encounters a layer it does not know how to handle, it will throw an error. While pruning an entire model, even a single unknown layer would lead to an error.

# Compute end step to finish pruning after 2 epochs.
batch_size = 128
epochs = 2
validation_split = 0.1  # 10% of training set will be used for validation set.

num_images = train_images.shape[0] * (1 - validation_split) # This is 90% of training set (i.e. 54000)
end_step = np.ceil(num_images / batch_size).astype(np.int32) * epochs # This will be used as stop point for pruning

# Define model for pruning using PolynomialDecay: pruning will be schedule with a PolynomialDecay function.
pruning_params = {
      'pruning_schedule': tfmot.sparsity.keras.PolynomialDecay(initial_sparsity=0.50, # Sparsity (%) at which pruning begins.
                                                               final_sparsity=0.80,   # Sparsity (%) at which pruning ends.
                                                               begin_step=0,          # Step at which to begin pruning.
                                                               end_step=end_step)     # Step at which to end pruning.
}

model_for_pruning = prune_low_magnitude(model, **pruning_params)  # ** unpacks keyword arguments

# `prune_low_magnitude` requires a recompile (equal to model.compile)
model_for_pruning.compile(optimizer='adam',
                          loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                          metrics=['accuracy'])

model_for_pruning.summary()

**Quiz**

Look at the model summary: parameters are doubled respect to model.summary(). Why?

*Your answer here*

Fine tune with pruning for two epochs.

`tfmot.sparsity.keras.UpdatePruningStep` is required during training, and `tfmot.sparsity.keras.PruningSummaries` provides logs for tracking progress and debugging.

In [None]:
logdir = tempfile.mkdtemp() # tempfile.mkdtemp() returns the absolute pathname of a new temporary directory.

callbacks = [
  tfmot.sparsity.keras.UpdatePruningStep(), # Keras callback which updates pruning wrappers with the optimizer step. This callback must be used when training a model which needs to be pruned. Not doing so will throw an error.
  tfmot.sparsity.keras.PruningSummaries(log_dir=logdir),  # A Keras callback for adding pruning summaries to tensorboard. Logs the sparsity(%) and threshold at a given iteration step.
]

model_for_pruning.fit(train_images, train_labels, # Input data, Target data
                  batch_size=batch_size,  # Integer or None. Number of samples per gradient update. If unspecified, batch_size will default to 32. Do not specify the batch_size if your data is in the form of datasets, generators, or keras.utils.Sequence instances (since they generate batches).
                  epochs=epochs, validation_split=validation_split,
                  callbacks=callbacks)  # List of keras.callbacks.Callback instances. List of callbacks to apply during training.

For this example, there is minimal loss in test accuracy after pruning, compared to the baseline.

In [None]:
_, model_for_pruning_accuracy = model_for_pruning.evaluate(x=test_images, y=test_labels)  # Loss value is thrown away

print('Baseline test accuracy:', baseline_model_accuracy)
print('Pruned test accuracy:', model_for_pruning_accuracy)

## **Task 3 - Switching from TF to TFLite: adding quantization**

Both `tfmot.sparsity.keras.strip_pruning` and applying a standard compression algorithm (e.g. via gzip) are necessary to see the compression
benefits of pruning.

*   `strip_pruning` is necessary since it removes every tf.Variable that pruning only needs during training, which would otherwise add to model size during inference
*   Applying a standard compression algorithm is necessary since the serialized weight matrices are the same size as they were before pruning. However, pruning makes most of the weights zeros, which is
added redundancy that algorithms can utilize to further compress the model.

First, create a compressible model for TensorFlow.

In [None]:
model_for_export = tfmot.sparsity.keras.strip_pruning(model_for_pruning)  # Strip pruning wrappers from the model. Once a model has been pruned to required sparsity, this method can be used to restore the original model with the sparse weights.

_, pruned_keras_file = tempfile.mkstemp('.h5')  # # tempfile.mkstemp return a tuple containing an OS-level handle to an open file (thrown away in this case) and the absolute pathname of that file.
tf.keras.models.save_model(model_for_export, pruned_keras_file, include_optimizer=False)  # Saves a model as a TensorFlow SavedModel or HDF5 file:
# The SavedModel and HDF5 file contains:
# - the model's configuration (topology)
# - the model's weights
# - the model's optimizer's state (if any)
# Thus models can be reinstantiated in the exact same state, without any of the code used for model definition or training.
print('Saved pruned Keras model to:', pruned_keras_file)

Then, create a compressible model for TFLite.

In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(model_for_export)  # Converts a TensorFlow model into TensorFlow Lite model.
                                                                        # from_keras_model: creates a TFLiteConverter object from a Keras model.
pruned_tflite_model = converter.convert() # Converts a TensorFlow GraphDef based on instance variables: the converted data in serialized format.

_, pruned_tflite_file = tempfile.mkstemp('.tflite') # Already seen!

with open(pruned_tflite_file, 'wb') as f:
  f.write(pruned_tflite_model)

print('Saved pruned TFLite model to:', pruned_tflite_file)

Define a helper function to actually compress the models via gzip and measure the zipped size.

In [None]:
def get_gzipped_model_size(file):
  # Returns size of gzipped model, in bytes.
  import os
  import zipfile

  _, zipped_file = tempfile.mkstemp('.zip')
  with zipfile.ZipFile(zipped_file, 'w', compression=zipfile.ZIP_DEFLATED) as f:
    f.write(file)

  return os.path.getsize(zipped_file)

Compare and see that the models are smaller and smaller.

In [None]:
print("Size of gzipped baseline Keras model: %.2f bytes" % (get_gzipped_model_size(keras_file)))
print("Size of gzipped pruned Keras model: %.2f bytes" % (get_gzipped_model_size(pruned_keras_file)))
print("Size of gzipped pruned TFlite model: %.2f bytes" % (get_gzipped_model_size(pruned_tflite_file)))

You can apply post-training quantization to the pruned model for additional benefits.

In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(model_for_export)  # Converts a TensorFlow model into TensorFlow Lite model.
                                                                        # from_keras_model: creates a TFLiteConverter object from a Keras model.
converter.optimizations = [tf.lite.Optimize.DEFAULT]  # Enum defining the optimizations to apply when generating a tflite model.
                                                      # DEFAULT Default optimization strategy that quantizes model weights. Enhanced optimizations are gained
                                                      # by providing a representative dataset that quantizes biases and activations as well.
                                                      # Converter will do its best to reduce size and latency, while minimizing the loss in accuracy.
quantized_and_pruned_tflite_model = converter.convert() # Converts a TensorFlow GraphDef based on instance variables: the converted data in serialized format.

_, quantized_and_pruned_tflite_file = tempfile.mkstemp('.tflite') # Already seen

with open(quantized_and_pruned_tflite_file, 'wb') as f:
  f.write(quantized_and_pruned_tflite_model)

print('Saved quantized and pruned TFLite model to:', quantized_and_pruned_tflite_file)

print("Size of gzipped baseline Keras model: %.2f bytes" % (get_gzipped_model_size(keras_file)))
print("Size of gzipped pruned and quantized TFlite model: %.2f bytes" % (get_gzipped_model_size(quantized_and_pruned_tflite_file)))

Now, define a helper function to evaluate the TF Lite model on the test dataset.

In [None]:
import numpy as np

def evaluate_model(interpreter):

  input_index = interpreter.get_input_details()[0]["index"] # Gets model input tensor details.
  # Returns a list in which each item is a dictionary with details about an input tensor.
  # Each dictionary contains the following fields that describe the tensor:
  # name: The tensor name.
  # index: The tensor index in the interpreter.
  # shape: The shape of the tensor.
  # and others...

  output_index = interpreter.get_output_details()[0]["index"] # Gets model output tensor details.
  # Returns a list in which each item is a dictionary with details about an output tensor.
  # The dictionary contains the same fields as described for get_input_details().

  # Run predictions on ever y image in the "test" dataset.
  prediction_digits = []
  for i, test_image in enumerate(test_images):
    if i % 1000 == 0:
      print('Evaluated on {n} results so far.'.format(n=i))
    # Pre-processing: add batch dimension and convert to float32 to match with the model's input data format.
    test_image = np.expand_dims(test_image, axis=0).astype(np.float32)  # Expand the shape of an array.
                                                                        # Insert a new axis that will appear at the axis position in the expanded array shape.
                                                                        # e.g. if test_image.shape = (28,) then np.expand_dims(test_image, axis=0) will result in test_image.shape = (1, 28)

    interpreter.set_tensor(input_index, test_image) # Sets the value of the input tensor.
                                                    # Note this copies data in value.

    # Run inference. Invoke the interpreter.
    interpreter.invoke()  # Be sure to set the input sizes, allocate tensors and fill values before calling this.

    # Post-processing: remove batch dimension and find the digit with highest
    # probability.
    output = interpreter.tensor(output_index) # Returns a function that can return a new numpy array pointing to the internal TFLite tensor state at any point.
    digit = np.argmax(output()[0])  # Returns the indices of the maximum values along an axis.
    prediction_digits.append(digit)

  print('\n')
  # Compare prediction results with ground truth labels to calculate accuracy.
  prediction_digits = np.array(prediction_digits)
  accuracy = (prediction_digits == test_labels).mean()
  return accuracy

Run the evaluation:

In [None]:
interpreter = tf.lite.Interpreter(model_content=quantized_and_pruned_tflite_model)  # Interpreter interface for running TensorFlow Lite models.

interpreter.allocate_tensors()  # It allocates memory.

test_accuracy = evaluate_model(interpreter) # It passes the interpreter of the TFLite model for evaluation to the helper function created before.

print('Pruned and quantized TFLite test_accuracy:', test_accuracy)
print('Pruned TF test accuracy:', model_for_pruning_accuracy)

The difference in accuracy between the pruned model in TensorFlow and the pruned and quantized model in TensorFlow Lite is minimal!