In [11]:
import tensorflow as tf
from tensorflow import keras
import numpy as np
import os
import zipfile

original model without weight cluster

In [12]:
# load mnist model
mnisi = keras.datasets.mnist
(train_images, train_labels), (test_images, test_labels) = mnisi.load_data()
# 标准化
train_images = train_images/255.0
test_images = test_images/255.0
# 定义模型
model = keras.Sequential([
    keras.layers.InputLayer(input_shape=(28,28)),
    keras.layers.Reshape(target_shape=(28, 28, 1)),
    keras.layers.Conv2D(filters=12, kernel_size=(3, 3), activation='relu'),
    keras.layers.MaxPool2D(pool_size=(2, 2)),
    keras.layers.Flatten(),
    keras.layers.Dense(10)
])
# 训练模型
model.compile(optimizer='adam', loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy'])
model.fit(train_images, train_labels, validation_split=0.1, epochs=10)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x28e50b66808>

In [27]:
# 评估并保存
_, baseline_model_accuracy = model.evaluate(test_images, test_labels, verbose=0)
print('Baseline test accuracy:', baseline_model_accuracy)
# save model
model_without_cluster = './model/original_without_cluster.h5'
tf.keras.models.save_model(model, './model/original_without_cluster.h5', include_optimizer=False)

Baseline test accuracy: 0.9819999933242798


model with cluster

In [14]:
import tensorflow_model_optimization as tfmot

cluster_weights = tfmot.clustering.keras.cluster_weights
CentroidInitialization = tfmot.clustering.keras.CentroidInitialization

clustering_params = {
  'number_of_clusters': 16,
  'cluster_centroids_init': CentroidInitialization.LINEAR
}

# Cluster a whole model
clustered_model = cluster_weights(model, **clustering_params)

# Use smaller learning rate for fine-tuning clustered model
opt = tf.keras.optimizers.Adam(learning_rate=1e-5)

clustered_model.compile(
  loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
  optimizer=opt,
  metrics=['accuracy'])

clustered_model.summary()

Model: "sequential_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
cluster_reshape_1 (ClusterWe (None, 28, 28, 1)         0         
_________________________________________________________________
cluster_conv2d_1 (ClusterWei (None, 26, 26, 12)        244       
_________________________________________________________________
cluster_max_pooling2d_1 (Clu (None, 13, 13, 12)        0         
_________________________________________________________________
cluster_flatten_1 (ClusterWe (None, 2028)              0         
_________________________________________________________________
cluster_dense_1 (ClusterWeig (None, 10)                40586     
Total params: 40,830
Trainable params: 20,442
Non-trainable params: 20,388
_________________________________________________________________


In [15]:
# Fine-tune weight-cluster model
clustered_model.fit(
  train_images,
  train_labels,
  batch_size=500,
  epochs=1,
  validation_split=0.1)



<tensorflow.python.keras.callbacks.History at 0x28e532e8a88>

In [16]:
_, clustered_model_accuracy = clustered_model.evaluate(
  test_images, test_labels, verbose=0)

print('Baseline test accuracy:', baseline_model_accuracy)
print('Clustered test accuracy:', clustered_model_accuracy)

Baseline test accuracy: 0.9803000092506409
Clustered test accuracy: 0.9776999950408936


In [17]:
# 保存模型
final_model = tfmot.clustering.keras.strip_clustering(clustered_model)

clustered_keras_file = './model/original_with_cluster.h5'
print('Saving clustered model to: ', clustered_keras_file)
tf.keras.models.save_model(final_model, clustered_keras_file, 
                           include_optimizer=False)

Saving clustered model to:  ./model/original_with_cluster.h5


In [19]:
# 保存成可压缩的模型
clustered_tflite_file = './model/clustered_mnist.tflite'
converter = tf.lite.TFLiteConverter.from_keras_model(final_model)
tflite_clustered_model = converter.convert()
with open(clustered_tflite_file, 'wb') as f:
  f.write(tflite_clustered_model)
print('Saved clustered TFLite model to:', clustered_tflite_file)

INFO:tensorflow:Assets written to: C:\Users\wendy\AppData\Local\Temp\tmp6hi33c5n\assets


INFO:tensorflow:Assets written to: C:\Users\wendy\AppData\Local\Temp\tmp6hi33c5n\assets


Saved clustered TFLite model to: ./model/clustered_mnist.tflite


In [25]:
import tempfile
def get_gzipped_model_size(file):
  # It returns the size of the gzipped model in bytes.
  import os
  import zipfile

  _, zipped_file = tempfile.mkstemp('.zip')
  with zipfile.ZipFile(zipped_file, 'w', compression=zipfile.ZIP_DEFLATED) as f:
    f.write(file)

  return os.path.getsize(zipped_file)

In [28]:
converter = tf.lite.TFLiteConverter.from_keras_model(final_model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_quant_model = converter.convert()

_, quantized_and_clustered_tflite_file = tempfile.mkstemp('.tflite')

with open(quantized_and_clustered_tflite_file, 'wb') as f:
  f.write(tflite_quant_model)

print('Saved quantized and clustered TFLite model to:', quantized_and_clustered_tflite_file)
print("Size of gzipped baseline Keras model: %.2f bytes" % (get_gzipped_model_size(model_without_cluster)))
print("Size of gzipped clustered and quantized TFlite model: %.2f bytes" % (get_gzipped_model_size(quantized_and_clustered_tflite_file)))

INFO:tensorflow:Assets written to: C:\Users\wendy\AppData\Local\Temp\tmp5mmrdm21\assets


INFO:tensorflow:Assets written to: C:\Users\wendy\AppData\Local\Temp\tmp5mmrdm21\assets


Saved quantized and clustered TFLite model to: C:\Users\wendy\AppData\Local\Temp\tmp14fjakoh.tflite
Size of gzipped baseline Keras model: 1760.00 bytes
Size of gzipped clustered and quantized TFlite model: 9828.00 bytes


In [29]:
# 查看tflite的准确性
def eval_model(interpreter):
  input_index = interpreter.get_input_details()[0]["index"]
  output_index = interpreter.get_output_details()[0]["index"]

  # Run predictions on every image in the "test" dataset.
  prediction_digits = []
  for i, test_image in enumerate(test_images):
    if i % 1000 == 0:
      print('Evaluated on {n} results so far.'.format(n=i))
    # Pre-processing: add batch dimension and convert to float32 to match with
    # the model's input data format.
    test_image = np.expand_dims(test_image, axis=0).astype(np.float32)
    interpreter.set_tensor(input_index, test_image)

    # Run inference.
    interpreter.invoke()

    # Post-processing: remove batch dimension and find the digit with highest
    # probability.
    output = interpreter.tensor(output_index)
    digit = np.argmax(output()[0])
    prediction_digits.append(digit)

  print('\n')
  # Compare prediction results with ground truth labels to calculate accuracy.
  prediction_digits = np.array(prediction_digits)
  accuracy = (prediction_digits == test_labels).mean()
  return accuracy

In [30]:
interpreter = tf.lite.Interpreter(model_content=tflite_quant_model)
interpreter.allocate_tensors()

test_accuracy = eval_model(interpreter)

print('Clustered and quantized TFLite test_accuracy:', test_accuracy)
print('Clustered TF test accuracy:', clustered_model_accuracy)

TypeError: CreateWrapperFromBuffer(): incompatible function arguments. The following argument types are supported:
    1. (arg0: bytes, arg1: List[str]) -> tflite::interpreter_wrapper::InterpreterWrapper
    2. (arg0: bytes, arg1: List[str], arg2: List[Callable[[int], None]]) -> tflite::interpreter_wrapper::InterpreterWrapper

Invoked with: './model/clustered_mnist.tflite', [], []