In [1]:
! pip install -q tensorflow-model-optimization

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m240.6/240.6 kB[0m [31m5.7 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m17.3/17.3 MB[0m [31m43.4 MB/s[0m eta [36m0:00:00[0m
[?25h[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
tensorflow 2.12.0 requires numpy<1.24,>=1.22, but you have numpy 1.24.2 which is incompatible.
numba 0.56.4 requires numpy<1.24,>=1.18, but you have numpy 1.24.2 which is incompatible.[0m[31m
[0m

In [2]:
import tensorflow as tf
from tensorflow import keras
import numpy as np
import tempfile
import zipfile
import os
from keras.datasets import cifar10

## Train a tf.keras model for CIFAR without clustering

In [22]:
# Load cifar dataset
cifar = keras.datasets.cifar10
(train_images, train_labels), (test_images, test_labels) = cifar10.load_data()

# Normalize the input image so that each pixel value is between 0 to 1.
train_images = train_images.astype('float32') / 255.0
test_images  = test_images.astype('float32') / 255.0

# Define the model architecture.
model = keras.Sequential([
    keras.layers.InputLayer(input_shape=(32, 32,3)),
    keras.layers.Reshape(target_shape=(32, 32, 3)),
    keras.layers.Conv2D(filters=12, kernel_size=(3, 3), activation=tf.nn.relu),
    keras.layers.MaxPooling2D(pool_size=(2, 2)),
    keras.layers.Flatten(),
    keras.layers.Dense(10)
])

# Train the digit classification model
model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

model.fit(
    train_images,
    train_labels,
    validation_split=0.1,
    epochs=10
)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x7fbd7f4c4dc0>

### Evaluate the baseline model and save it for later usage

In [23]:
_, baseline_model_accuracy = model.evaluate(
    test_images, test_labels, verbose=0)

print('Baseline test accuracy:', baseline_model_accuracy)

_, keras_file = tempfile.mkstemp('.h5')
print('Saving model to: ', keras_file)
tf.keras.models.save_model(model, keras_file, include_optimizer=False)

Baseline test accuracy: 0.5928000211715698
Saving model to:  /tmp/tmpqp7g5y1m.h5


## Fine-tune the pre-trained model with clustering

Apply the `cluster_weights()` API to a whole pre-trained model to demonstrate its effectiveness in reducing the model size after applying zip while keeping decent accuracy. For how best to balance the accuracy and compression rate for your use case, please refer to the per layer example in the [comprehensive guide](https://www.tensorflow.org/model_optimization/guide/clustering/clustering_comprehensive_guide).


### Define the model and apply the clustering API

Before you pass the model to the clustering API, make sure it is trained and shows some acceptable accuracy.

In [6]:
import tensorflow_model_optimization as tfmot

cluster_weights = tfmot.clustering.keras.cluster_weights
CentroidInitialization = tfmot.clustering.keras.CentroidInitialization

clustering_params = {
  'number_of_clusters': 16,
  'cluster_centroids_init': CentroidInitialization.LINEAR
}

# Cluster a whole model
clustered_model = cluster_weights(model, **clustering_params)

# Use smaller learning rate for fine-tuning clustered model
opt = tf.keras.optimizers.Adam(learning_rate=1e-5)

clustered_model.compile(
  loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
  optimizer=opt,
  metrics=['accuracy'])

clustered_model.summary()

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 cluster_reshape_1 (ClusterW  (None, 32, 32, 3)        0         
 eights)                                                         
                                                                 
 cluster_conv2d_1 (ClusterWe  (None, 30, 30, 12)       676       
 ights)                                                          
                                                                 
 cluster_max_pooling2d_1 (Cl  (None, 15, 15, 12)       0         
 usterWeights)                                                   
                                                                 
 cluster_flatten_1 (ClusterW  (None, 2700)             0         
 eights)                                                         
                                                                 
 cluster_dense_1 (ClusterWei  (None, 10)              

### Fine-tune the model and evaluate the accuracy against baseline

Fine-tune the model with clustering for 1 epoch.

In [7]:
# Fine-tune model
clustered_model.fit(
  train_images,
  train_labels,
  batch_size=500,
  epochs=1,
  validation_split=0.1)



<keras.callbacks.History at 0x7fbd7f64ba00>

For this example, there is minimal loss in test accuracy after clustering, compared to the baseline.

In [8]:
_, clustered_model_accuracy = clustered_model.evaluate(
  test_images, test_labels, verbose=0)

print('Baseline test accuracy:', baseline_model_accuracy)
print('Clustered test accuracy:', clustered_model_accuracy)

Baseline test accuracy: 0.5953999757766724
Clustered test accuracy: 0.5616999864578247


## Create **6x** smaller models from clustering

Both `strip_clustering` and applying a standard compression algorithm (e.g. via gzip) are necessary to see the compression benefits of clustering. 

First, create a compressible model for TensorFlow. Here, `strip_clustering` removes all variables (e.g. `tf.Variable` for storing the cluster centroids and the indices) that clustering only needs during training, which would otherwise add to model size during inference.

In [9]:
final_model = tfmot.clustering.keras.strip_clustering(clustered_model)

_, clustered_keras_file = tempfile.mkstemp('.h5')
print('Saving clustered model to: ', clustered_keras_file)
tf.keras.models.save_model(final_model, clustered_keras_file, 
                           include_optimizer=False)



Saving clustered model to:  /tmp/tmp9qgebgfe.h5


Then, create compressible models for TFLite. You can convert the clustered model to a format that's runnable on your targeted backend. TensorFlow Lite is an example you can use to deploy to mobile devices.

In [16]:
clustered_tflite_file = '/tmp/clustered_cifar.tflite'
converter = tf.lite.TFLiteConverter.from_keras_model(final_model)
tflite_clustered_model = converter.convert()
with open(clustered_tflite_file, 'wb') as f:
  f.write(tflite_clustered_model)
print('Saved clustered TFLite model to:', clustered_tflite_file)



Saved clustered TFLite model to: /tmp/clustered_cifar.tflite


Define a helper function to actually compress the models via gzip and measure the zipped size.

In [17]:
def get_gzipped_model_size(file):
  # It returns the size of the gzipped model in bytes.
  import os
  import zipfile

  _, zipped_file = tempfile.mkstemp('.zip')
  with zipfile.ZipFile(zipped_file, 'w', compression=zipfile.ZIP_DEFLATED) as f:
    f.write(file)

  return os.path.getsize(zipped_file)

Compare and see that the models are **6x** smaller from clustering

In [18]:
print("Size of gzipped baseline Keras model: %.2f bytes" % (get_gzipped_model_size(keras_file)))
print("Size of gzipped clustered Keras model: %.2f bytes" % (get_gzipped_model_size(clustered_keras_file)))
print("Size of gzipped clustered TFlite model: %.2f bytes" % (get_gzipped_model_size(clustered_tflite_file)))

Size of gzipped baseline Keras model: 104080.00 bytes
Size of gzipped clustered Keras model: 16105.00 bytes
Size of gzipped clustered TFlite model: 15283.00 bytes


## Create an **8x** smaller TFLite model from combining weight clustering and post-training quantization

You can apply post-training quantization to the clustered model for additional benefits.

In [19]:
converter = tf.lite.TFLiteConverter.from_keras_model(final_model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_quant_model = converter.convert()

_, quantized_and_clustered_tflite_file = tempfile.mkstemp('.tflite')

with open(quantized_and_clustered_tflite_file, 'wb') as f:
  f.write(tflite_quant_model)

print('Saved quantized and clustered TFLite model to:', quantized_and_clustered_tflite_file)
print("Size of gzipped baseline Keras model: %.2f bytes" % (get_gzipped_model_size(keras_file)))
print("Size of gzipped clustered and quantized TFlite model: %.2f bytes" % (get_gzipped_model_size(quantized_and_clustered_tflite_file)))



Saved quantized and clustered TFLite model to: /tmp/tmpzeshlxoq.tflite
Size of gzipped baseline Keras model: 104080.00 bytes
Size of gzipped clustered and quantized TFlite model: 11738.00 bytes


## See the persistence of accuracy from TF to TFLite

Define a helper function to evaluate the TFLite model on the test dataset.

In [20]:
def eval_model(interpreter):
  input_index = interpreter.get_input_details()[0]["index"]
  output_index = interpreter.get_output_details()[0]["index"]

  # Run predictions on every image in the "test" dataset.
  prediction_digits = []
  for i, test_image in enumerate(test_images):
    if i % 1000 == 0:
      print('Evaluated on {n} results so far.'.format(n=i))
    # Pre-processing: add batch dimension and convert to float32 to match with
    # the model's input data format.
    test_image = np.expand_dims(test_image, axis=0).astype(np.float32)
    interpreter.set_tensor(input_index, test_image)

    # Run inference.
    interpreter.invoke()

    # Post-processing: remove batch dimension and find the digit with highest
    # probability.
    output = interpreter.tensor(output_index)
    digit = np.argmax(output()[0])
    prediction_digits.append(digit)

  print('\n')
  # Compare prediction results with ground truth labels to calculate accuracy.
  prediction_digits = np.array(prediction_digits)
  accuracy = (prediction_digits == test_labels).mean()
  return accuracy

You evaluate the model, which has been clustered and quantized, and then see the accuracy from TensorFlow persists to the TFLite backend.

In [21]:
interpreter = tf.lite.Interpreter(model_content=tflite_quant_model)
interpreter.allocate_tensors()

test_accuracy = eval_model(interpreter)

print('Clustered and quantized TFLite test_accuracy:', test_accuracy)
print('Clustered TF test accuracy:', clustered_model_accuracy)

Evaluated on 0 results so far.
Evaluated on 1000 results so far.
Evaluated on 2000 results so far.
Evaluated on 3000 results so far.
Evaluated on 4000 results so far.
Evaluated on 5000 results so far.
Evaluated on 6000 results so far.
Evaluated on 7000 results so far.
Evaluated on 8000 results so far.
Evaluated on 9000 results so far.


Clustered and quantized TFLite test_accuracy: 0.1
Clustered TF test accuracy: 0.5616999864578247
