In [14]:
import os
os.environ['TF_USE_LEGACY_KERAS'] = "1"
import tensorflow as tf
from tensorflow.keras import layers
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from tensorflow_model_optimization.python.core.quantization.keras.quantizers import Quantizer
from tensorflow_model_optimization.python.core.quantization.keras.quantize_config import QuantizeConfig
from tensorflow_model_optimization.python.core.quantization.keras.quantizers import LastValueQuantizer, MovingAverageQuantizer
from tensorflow_model_optimization.python.core.quantization.keras.quantize import quantize_annotate_layer, quantize_apply
from tensorflow_model_optimization.quantization.keras import quantize_apply, quantize_scope
import tensorflow_model_optimization as tfmot
from tensorflow.python.profiler import model_analyzer
from tensorflow.python.profiler import option_builder


In [15]:
#Generate test data:
X=np.random.rand(10000,5)
y = np.sum(X, axis=1)
print(np.shape(X))
print(np.shape(y))

(10000, 5)
(10000,)


In [16]:
class FixedRangeQuantizer(Quantizer):
    def build(self, tensor_shape, name, layer):
        range_var = layer.add_weight(
            name=name + '_range',
            initializer=tf.keras.initializers.Constant(6.0),
            trainable=False
        )
        return {'range_var': range_var}

    def __call__(self, inputs, training, weights, **kwargs):
        return tf.keras.backend.clip(inputs, 0.0, weights['range_var'])

    def get_config(self):
        return {}

In [17]:


class CustomLayerQuantizeConfig(QuantizeConfig):
    def get_weights_and_quantizers(self, layer):
        return [
        (layer.kernel, LastValueQuantizer(num_bits=8, symmetric=True, narrow_range=False, per_axis=False)),
        (layer.bias,   LastValueQuantizer(num_bits=8, symmetric=True, narrow_range=False, per_axis=False)),
    ]

    def get_activations_and_quantizers(self, layer):
        #return []
        return [(layer.activation, MovingAverageQuantizer(num_bits=8, symmetric=False, narrow_range=False, per_axis=False))]

    def set_quantize_weights(self, layer, quantize_weights):
        layer.kernel = quantize_weights[0]

    def set_quantize_activations(self, layer, quantize_activations):
        layer.activation = quantize_activations[0]

    def get_output_quantizers(self, layer):
        return [MovingAverageQuantizer(num_bits=8, symmetric=True, narrow_range=False, per_axis=False)]
        #return []

    def get_config(self):
        return {}


In [18]:
model2 = tf.keras.Sequential([
    layers.Dense(1024, activation='relu', input_shape=(5,)),
    layers.Dense(512, activation='relu'),
    layers.Dense(256, activation='relu'),
    layers.Dense(128, activation='relu'),
    layers.Dense(64, activation='relu'),
    layers.Dense(1, activation='relu')
])

In [19]:
model = tf.keras.Sequential([
    quantize_annotate_layer(layers.Dense(1024, activation='relu', input_shape=(5,)), quantize_config=CustomLayerQuantizeConfig()),
    quantize_annotate_layer(layers.Dense(512, activation='relu'), quantize_config=CustomLayerQuantizeConfig()),
    quantize_annotate_layer(layers.Dense(256, activation='relu'), quantize_config=CustomLayerQuantizeConfig()),
    quantize_annotate_layer(layers.Dense(128, activation='relu'), quantize_config=CustomLayerQuantizeConfig()),
    quantize_annotate_layer(layers.Dense(64, activation='relu'), quantize_config=CustomLayerQuantizeConfig()),
    quantize_annotate_layer(layers.Dense(1, activation='relu'), quantize_config=CustomLayerQuantizeConfig()),
    #layers.Dense(100, activation='relu', input_shape=(5,)),
    #layers.Dense(1, activation='relu')
])
with quantize_scope({'CustomLayerQuantizeConfig': CustomLayerQuantizeConfig}):
    quant_aware_model = quantize_apply(model)

In [20]:
epochs=10
end_step = epochs * 100
pruning_params = {
    'pruning_schedule': tfmot.sparsity.keras.PolynomialDecay(
        initial_sparsity=0.0,    # start training with 0% sparsity
        final_sparsity=0.50,     # end training with 50% sparsity
        begin_step=0,            # when to start pruning
        end_step=end_step        # when to end pruning
    )
}

# Wrap the original model
model2 = tfmot.sparsity.keras.prune_low_magnitude(model2, **pruning_params)

In [None]:
model2.compile(optimizer='adam', loss='mse', metrics=['accuracy'])
callbacks = [
    tfmot.sparsity.keras.UpdatePruningStep(),
]

model2.fit(X, y,callbacks=callbacks, epochs=epochs)
quant_aware_model.compile(optimizer='adam', loss='mse', metrics=['accuracy'])
quant_aware_model.fit(X, y, epochs=epochs)

Epoch 1/10




Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tf_keras.src.callbacks.History at 0x770b103aae00>

In [22]:
model2 = tfmot.sparsity.keras.strip_pruning(model2)

In [28]:
vals=np.random.rand(1,5)
result=np.sum(vals)
prediction=quant_aware_model.predict(vals)
prediction2=model2.predict(vals)
print(result)
print(prediction)
print(prediction2)

1.549348416433205
[[1.5461711]]
[[1.547096]]


In [24]:
index = np.random.choice(X.shape[0], 100, replace=False)
x_random = X[index]
def representative_data_gen():
    # Here, let's use 100 samples for calibration
    for i in range(100):
        # The model expects (batch_size=1, 5) if it’s Dense(…, input_shape=(5,)).
        # So we add a batch dimension of size 1:
        yield [x_random[i:i+1].astype(np.float32)]  # shape (1, 5)

In [25]:
quantconverter = tf.lite.TFLiteConverter.from_keras_model(quant_aware_model)
converter2 = tf.lite.TFLiteConverter.from_keras_model(model2)

converter2.optimizations = [tf.lite.Optimize.DEFAULT]
converter2.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter2.representative_dataset = representative_data_gen

quantconverter.optimizations = [tf.lite.Optimize.DEFAULT]
quantconverter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
quantconverter.representative_dataset = representative_data_gen
quantlite=quantconverter.convert()
lite2=converter2.convert()

INFO:tensorflow:Assets written to: /tmp/tmpul7emx20/assets


INFO:tensorflow:Assets written to: /tmp/tmpul7emx20/assets


INFO:tensorflow:Assets written to: /tmp/tmpycq8nebm/assets


W0000 00:00:1739700183.757227    4748 tf_tfl_flatbuffer_helpers.cc:365] Ignored output_format.
W0000 00:00:1739700183.757239    4748 tf_tfl_flatbuffer_helpers.cc:368] Ignored drop_control_dependency.
2025-02-16 11:03:03.757328: I tensorflow/cc/saved_model/reader.cc:83] Reading SavedModel from: /tmp/tmpul7emx20
2025-02-16 11:03:03.759394: I tensorflow/cc/saved_model/reader.cc:52] Reading meta graph with tags { serve }
2025-02-16 11:03:03.759405: I tensorflow/cc/saved_model/reader.cc:147] Reading SavedModel debug info (if present) from: /tmp/tmpul7emx20
2025-02-16 11:03:03.776146: I tensorflow/cc/saved_model/loader.cc:236] Restoring SavedModel bundle.
2025-02-16 11:03:03.838784: I tensorflow/cc/saved_model/loader.cc:220] Running initialization op on SavedModel bundle at path: /tmp/tmpul7emx20
2025-02-16 11:03:03.855031: I tensorflow/cc/saved_model/loader.cc:466] SavedModel load for tags { serve }; Status: success: OK. Took 97704 microseconds.
fully_quantize: 0, inference_type: 6, input_i

In [26]:
#quant_aware_model.save('testfolder/8bit.h5')  # Save the model in HDF5 format
#model2.save('testfolder/32bit.h5')  # Save the model in HDF5 format
with open("testfolder/8bit.tflite", "wb") as f:
    f.write(quantlite)

with open("testfolder/32bit.tflite", "wb") as f:
    f.write(lite2)

# Get the size of the saved model file in bytes
model_size = os.path.getsize('testfolder/8bit.tflite')
print(model_size)

model_size = os.path.getsize('testfolder/32bit.tflite')
print(model_size)

714344
761032


Before:
714344
761032

In [27]:
import lzma

with open("testfolder/8bit.tflite", "rb") as f_in:
    model_data = f_in.read()

compressed_data = lzma.compress(model_data)

with open("testfolder/8bit.tflite.xz", "wb") as f_out:
    f_out.write(compressed_data)


In [29]:
with open("testfolder/32bit.tflite", "rb") as f_in:
    model_data = f_in.read()

compressed_data = lzma.compress(model_data)

with open("testfolder/32bit.tflite.xz", "wb") as f_out:
    f_out.write(compressed_data)