   # The Ultimate Compression Pipeline ~ Ajay Maheshwari

In [14]:
! pip install -q tensorflow-model-optimization

In [15]:
import tensorflow as tf
import tf_keras as keras

import numpy as np
import tempfile
import zipfile
import os

In [16]:
def get_gzipped_model_size(model):
  with tempfile.NamedTemporaryFile(suffix=".h5") as temp_file:  
    model.save(temp_file.name)

    _, zipped_file = tempfile.mkstemp('.zip')
    with zipfile.ZipFile(zipped_file, 'w', compression=zipfile.ZIP_DEFLATED) as f:
      f.write(temp_file.name)
    
    # print(f"Zipped model is saved at: {zipped_file}")

    x = os.path.getsize(zipped_file)
    
    os.remove(zipped_file)
    # print(f"Temporary zip file removed: {zipped_file}")
        
    return x / 1000


def print_model_weights_sparsity(model):
    for layer in model.layers:
        if isinstance(layer, keras.layers.Wrapper):
            weights = layer.trainable_weights
        else:
            weights = layer.weights
        for weight in weights:
            if "kernel" not in weight.name or "centroid" in weight.name:
                continue
            weight_size = weight.numpy().size
            zero_num = np.count_nonzero(weight == 0)
            print(
                f"{weight.name}: {zero_num/weight_size:.2%} sparsity ",
                f"({zero_num}/{weight_size})",
            )


def get_gzipped_model_size2(file):

  _, zipped_file = tempfile.mkstemp('.zip')
  with zipfile.ZipFile(zipped_file, 'w', compression=zipfile.ZIP_DEFLATED) as f:
    f.write(file)

  return os.path.getsize(zipped_file)/1000


def eval_model(interpreter):
  input_index = interpreter.get_input_details()[0]["index"]
  output_index = interpreter.get_output_details()[0]["index"]

  prediction_digits = []
  for i, test_image in enumerate(test_images):
    test_image = np.expand_dims(test_image, axis=0).astype(np.float32)
    interpreter.set_tensor(input_index, test_image)

    interpreter.invoke()

    output = interpreter.tensor(output_index)
    digit = np.argmax(output()[0])
    prediction_digits.append(digit)

  prediction_digits = np.array(prediction_digits)
  accuracy = (prediction_digits == test_labels).mean()
  return accuracy

model_acc = []
model_sz = []



datagen = ImageDataGenerator(
    rotation_range=10,      # Random rotation up to 10 degrees
    width_shift_range=0.1,  # Random horizontal shift by up to 10% of the width
    height_shift_range=0.1, # Random vertical shift by up to 10% of the height
    horizontal_flip=True,   # Random horizontal flip
    vertical_flip=True      # Random vertical flip
)

# Fit the ImageDataGenerator on the training data
datagen.fit(train_images)

NameError: name 'ImageDataGenerator' is not defined

## Creating a Base Model - 1.0

In [None]:
# Load MNIST dataset
mnist = keras.datasets.mnist
(train_images, train_labels), (test_images, test_labels) = mnist.load_data()

# Normalize the input image so that each pixel value is between 0 to 1.
train_images = train_images / 255.0
test_images  = test_images / 255.0

model = keras.Sequential([
  keras.layers.InputLayer(input_shape=(28, 28)),
  keras.layers.Reshape(target_shape=(28, 28, 1)),
  keras.layers.Conv2D(filters=12, kernel_size=(3, 3),
                         activation=tf.nn.relu),
  keras.layers.MaxPooling2D(pool_size=(2, 2)),
  keras.layers.Flatten(),
  keras.layers.Dense(10)
])

opt = keras.optimizers.Adam(learning_rate=1e-3)

# Train the digit classification model
model.compile(optimizer=opt,
              loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

model.fit(
    train_images,
    train_labels,
    validation_split=0.1,
    epochs=10
)

### Evaluate the baseline model and save it for later usage

In [None]:
_, baseline_model_accuracy = model.evaluate(
    test_images, test_labels, verbose=0)

print('Baseline test accuracy:', baseline_model_accuracy)

sz = get_gzipped_model_size(model)
print("Base model size: ",  sz , ' KB' )

model_acc.append(baseline_model_accuracy)
model_sz.append(sz)

                         ---------- Checkpoint Point 1 ---------

## Pruning and then fine-tuning the model - 2.0

In [17]:
import tensorflow_model_optimization as tfmot

pruning_params = {
          'pruning_schedule': tfmot.sparsity.keras.ConstantSparsity(0.5, begin_step=0, frequency=100)     
}

callbacks = [
      tfmot.sparsity.keras.UpdatePruningStep()
]
    
    
pruned_model = model


prune_low_magnitude = tfmot.sparsity.keras.prune_low_magnitude

pruned_model = prune_low_magnitude(pruned_model, **pruning_params)

# learning rate for fine-tuning
opt = keras.optimizers.Adam(learning_rate=1e-5)


pruned_model.compile(
  loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
  optimizer=opt,
  metrics=['accuracy'])

# Fine-tune model
pruned_model.fit(
  train_images,
  train_labels,
  epochs=10,
  validation_split=0.1,
  callbacks=callbacks)

stripped_pruned_model = tfmot.sparsity.keras.strip_pruning(pruned_model)

print_model_weights_sparsity(stripped_pruned_model)

pruned_model = stripped_pruned_model





Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
conv2d_1/kernel:0: 50.00% sparsity  (54/108)
dense_1/kernel:0: 50.00% sparsity  (10140/20280)


In [18]:
pruned_model.compile(
      loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=opt,
      metrics=['accuracy'])


In [19]:
_, pruned_model_accuracy = pruned_model.evaluate(
    test_images, test_labels, verbose=0)

print('Pruned Model test accuracy:', pruned_model_accuracy)

sz = get_gzipped_model_size(pruned_model)
print("Stripped model size: ",  sz , ' KB' )

model_acc.append(pruned_model_accuracy)
model_sz.append(sz)


Pruned Model test accuracy: 0.9807000160217285
Stripped model size:  179.756  KB


                         ---------- Checkpoint Point 2 ---------

## Knowledge Distillation - 3.0

In [20]:
from keras import layers
from keras import ops
import numpy as np

class Distiller(keras.Model):
    def __init__(self, student, teacher):
        super().__init__()
        self.teacher = teacher
        self.student = student

    def compile(
        self,
        optimizer,
        metrics,
        student_loss_fn,
        distillation_loss_fn,
        alpha=0.1,
        temperature=3,
    ):
        super().compile(optimizer=optimizer, metrics=metrics)
        self.student_loss_fn = student_loss_fn
        self.distillation_loss_fn = distillation_loss_fn
        self.alpha = alpha
        self.temperature = temperature

    def compute_loss(
        self, x=None, y=None, y_pred=None, sample_weight=None, allow_empty=False
    ):
        teacher_pred = self.teacher(x, training=False)
        student_loss = self.student_loss_fn(y, y_pred)

        distillation_loss = self.distillation_loss_fn(
            ops.softmax(teacher_pred / self.temperature, axis=1),
            ops.softmax(y_pred / self.temperature, axis=1),
        ) * (self.temperature**2)

        loss = self.alpha * student_loss + (1 - self.alpha) * distillation_loss
        return loss

    def call(self, x):
        return self.student(x)

In [21]:
teacher = keras.Sequential(
    [
        keras.layers.InputLayer(input_shape=(28, 28)),
        keras.layers.Reshape(target_shape=(28, 28, 1)),
        keras.layers.Conv2D(filters=16, kernel_size=(3, 3), activation=tf.nn.relu),  # Increase filters
        keras.layers.MaxPooling2D(pool_size=(2, 2)),
        keras.layers.Conv2D(filters=24, kernel_size=(3, 3), activation=tf.nn.relu),  # Add another layer
        keras.layers.MaxPooling2D(pool_size=(2, 2)),
        keras.layers.Flatten(),
        keras.layers.Dense(units=128, activation=tf.nn.relu),  # Add a hidden layer
        keras.layers.Dense(10)
    ],
    name="teacher",
)


# teacher = keras.Sequential(
#     [
#           keras.layers.InputLayer(input_shape=(28, 28)),
#           keras.layers.Reshape(target_shape=(28, 28, 1)),
#           keras.layers.Conv2D(filters=12, kernel_size=(3, 3),
#                                  activation=tf.nn.relu),
#           keras.layers.MaxPooling2D(pool_size=(2, 2)),
#           keras.layers.Flatten(),
#           keras.layers.Dense(10)
#     ],
#     name="teacher",
# )


teacher.compile(
    optimizer=keras.optimizers.Adam(),
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=[keras.metrics.SparseCategoricalAccuracy()],
)



In [22]:
teacher.fit(train_images, train_labels, epochs=12)
teacher.evaluate(test_images, test_labels)

Epoch 1/12
Epoch 2/12
Epoch 3/12
Epoch 4/12
Epoch 5/12
Epoch 6/12
Epoch 7/12
Epoch 8/12
Epoch 9/12
Epoch 10/12
Epoch 11/12
Epoch 12/12


[0.04396843910217285, 0.9896000027656555]

In [23]:
distiller = Distiller(student=pruned_model, teacher=teacher)
distiller.compile(
    optimizer=keras.optimizers.Adam(),
    metrics=[keras.metrics.SparseCategoricalAccuracy()],
    student_loss_fn=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    distillation_loss_fn=keras.losses.KLDivergence(),
    alpha=0.1,
    temperature=10,
)




In [24]:
# Distill teacher se student
distiller.fit(train_images, train_labels, epochs=15)

distiller.evaluate(test_images, test_labels)

Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15


0.9785000085830688

In [25]:
_, acc = distiller.student.evaluate(
    test_images, test_labels, verbose=0)

print('Distilled Model test accuracy:', acc)

sz = get_gzipped_model_size(distiller.student)
print("Distilled model size: ",  sz , ' KB' )

model_acc.append(acc)
model_sz.append(sz)

Distilled Model test accuracy: 0.9785000085830688
Distilled model size:  203.039  KB


                         ---------- Checkpoint Point 3 ---------

In [29]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

datagen = ImageDataGenerator(
    rotation_range=10,      
    width_shift_range=0.1,  
    height_shift_range=0.1, 
)

train_images = np.expand_dims(train_images, axis=-1)
test_images = np.expand_dims(test_images, axis=-1)

datagen.fit(train_images)

augmented_images = []
augmented_labels = []
for X_batch, y_batch in datagen.flow(train_images, train_labels, batch_size=len(train_images), shuffle=False):
    augmented_images.append(X_batch)
    augmented_labels.append(y_batch)
    break  

# Concatenate the augmented images and labels
augmented_images = np.concatenate(augmented_images)
augmented_labels = np.concatenate(augmented_labels)

# Convert labels of original number 6 to 9
augmented_labels[train_labels == 6] = 9

# Check the shape of augmented dataset
print("Augmented train images shape:", augmented_images.shape)
print("Augmented train labels shape:", augmented_labels.shape)

Augmented train images shape: (60000, 28, 28, 1)
Augmented train labels shape: (60000,)


## Weight Clustering - 4.0

In [None]:
def print_model_weight_clusters(model):
    for layer in model.layers:
        if isinstance(layer, keras.layers.Wrapper):
            weights = layer.trainable_weights
        else:
            weights = layer.weights
        for weight in weights:
            # ignore auxiliary quantization weights
            if "quantize_layer" in weight.name:
                continue
            if "kernel" in weight.name:
                unique_count = len(np.unique(weight))
                print(
                    f"{layer.name}/{weight.name}: {unique_count} clusters "
                )

In [None]:
import tensorflow_model_optimization as tfmot
from tensorflow_model_optimization.python.core.clustering.keras.experimental import (
    cluster,
)

cluster_weights = tfmot.clustering.keras.cluster_weights
CentroidInitialization = tfmot.clustering.keras.CentroidInitialization

cluster_weights = cluster.cluster_weights

clustering_params = {
  'number_of_clusters': 8,
  'cluster_centroids_init': CentroidInitialization.KMEANS_PLUS_PLUS,
  'preserve_sparsity': True
}

sparsity_clustered_model = cluster_weights(distiller.student, **clustering_params)

sparsity_clustered_model.compile(optimizer='adam',
              loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

print('Train sparsity preserving clustering model:')
sparsity_clustered_model.fit(train_images, train_labels,epochs=6, validation_split=0.1)

In [None]:
stripped_clustered_model = tfmot.clustering.keras.strip_clustering(sparsity_clustered_model)

print("Model sparsity:\n")
print_model_weights_sparsity(stripped_clustered_model)

print("\nModel clusters:\n")
print_model_weight_clusters(stripped_clustered_model)

In [None]:
stripped_clustered_model.compile(optimizer=opt,
              loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

In [None]:
_, stripped_clustered_model_accuracy = stripped_clustered_model.evaluate(
    test_images, test_labels, verbose=0)

print('Clustered Model test accuracy:', stripped_clustered_model_accuracy)

sz = get_gzipped_model_size(stripped_clustered_model)
print("Clustered model size: ",  sz , ' KB' )

model_acc.append(stripped_clustered_model_accuracy)
model_sz.append(sz)

                         ---------- Checkpoint Point 4 ---------

## Quantization - 5.0

In [None]:
quant_aware_annotate_model = tfmot.quantization.keras.quantize_annotate_model(
              stripped_clustered_model)
quant_model = tfmot.quantization.keras.quantize_apply(
              quant_aware_annotate_model,
              tfmot.experimental.combine.Default8BitClusterPreserveQuantizeScheme(preserve_sparsity=True))

quant_model.compile(optimizer='adam',
              loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])
print('Training after quantization model:')
quant_model.fit(train_images, train_labels, batch_size=128, epochs=3, validation_split=0.1)

In [None]:
print("Final Model clusters:")
print_model_weight_clusters(quant_model)
print("\nFinal Model sparsity:")
print_model_weights_sparsity(quant_model)

In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(quant_model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
final_tflite_model = converter.convert()
final_model_file = 'final_model.tflite'
# Save the model.
with open(final_model_file, 'wb') as f:
    f.write(final_tflite_model)


sz = get_gzipped_model_size2(final_model_file)
print("Final model size: ", sz, ' KB')

model_sz.append(sz)

In [None]:
interpreter = tf.lite.Interpreter(final_model_file)
interpreter.allocate_tensors()
 
final_test_accuracy = eval_model(interpreter)

print('Final test accuracy:', final_test_accuracy)

model_acc.append(final_test_accuracy)

                         ---------- Checkpoint Point 5 ---------

In [None]:
for i in range(len(model_acc)):
    print(f"Accuracy = { round(model_acc[i]*100,2)} with size = {model_sz[i]} KB ")

                         ------- Final Comparison Summary -------