In [None]:
# @title 1. Universal Setup (Run All Compatible)
# @markdown This cell fixes Google Colab environment issues. Run this first.

import os
import sys
import subprocess

def is_colab():
    return 'google.colab' in sys.modules or 'google.colab' in sys.builtin_module_names or os.path.exists('/content')

if is_colab():
    print("üåê Running in Google Colab. Validating environment...")

    # Check NumPy version without importing it into the main process to avoid crashing if it's broken
    try:
        np_version = subprocess.check_output([sys.executable, "-c", "import numpy; print(numpy.__version__)"], stderr=subprocess.STDOUT).decode().strip()
        major_v = int(np_version.split('.')[0])
    except Exception:
        major_v = 0 # Assume broken/old

    if major_v < 2:
        print(f"‚è´ Upgrading environment (Found NumPy {np_version if 'np_version' in locals() else 'broken'})...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", "--quiet", "--upgrade", "numpy>=2.0", "tensorflow-model-optimization", "pandas", "matplotlib", "tabulate"])
        print("\n‚ö†Ô∏è RESTARTING SESSION: A runtime reset is required to apply the NumPy upgrade.")
        print("Click 'Run All' again after the restart is complete (usually takes 5 seconds).")
        # The most reliable way to restart a Colab session programmatically for 'Run All' flow
        os.kill(os.getpid(), 9)

    # If we are here, NumPy is correct. Ensure TF MOT exists.
    try:
        import tensorflow_model_optimization
    except ImportError:
        print("üì¶ Installing experiment libraries...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", "--quiet", "tensorflow-model-optimization", "pandas", "matplotlib", "tabulate"])

print("‚úÖ Environment Ready!")

# üß™ The Ultimate Quantization Benchmark: Research to Production

[!["Open In Colab"](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/adiel2012/model-size-reduction/blob/main/experiment_framework.ipynb)

## üìñ Overview
This notebook provides a unified experimentation framework to compare the major quantization milestones from 2022 to 2026. While the chronology folders contain "from scratch" implementations for learning, this framework uses **TensorFlow (TFLite & TFMOT)** built-in functions to simulate these algorithms in a production-ready environment.

### Algorithms Compared
1.  **Baseline (FP32)**: The uncompressed reference model.
2.  **LLM.int8() style**: Dynamic Range Quantization (Weight INT8).
3.  **GPTQ / AWQ style**: Full Integer Quantization (Calibrated INT8).
4.  **NF4 / HQQ style**: 4-bit Weight-only Quantization.
5.  **BitNet / T-Poti style**: Simulated ultra-low precision (Sparsity + Quantization).

---

In [None]:
# 2. Initialize TensorFlow and Checks
import warnings
import os
import tensorflow as tf
import numpy as np
import time
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow_model_optimization as tfmot

# Silence warnings
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
warnings.filterwarnings('ignore', category=UserWarning)
warnings.filterwarnings('ignore', category=DeprecationWarning)

print("üöÄ TensorFlow version:", tf.__version__)
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  print('‚ö†Ô∏è GPU not found! Benchmarking on CPU will be slower.')
else:
  print('‚úÖ Found GPU at: {}'.format(device_name))

## ‚öôÔ∏è Running the Experiment
We will now programmatically convert the model using different strategies and measure the results.

In [None]:
# 3. Benchmark Logic
results = []

def create_benchmark_model():
    model = tf.keras.Sequential([
        tf.keras.layers.InputLayer(input_shape=(28, 28)),
        tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
        tf.keras.layers.Conv2D(filters=32, kernel_size=(3, 3), activation='relu'),
        tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
        tf.keras.layers.Conv2D(filters=64, kernel_size=(3, 3), activation='relu'),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(10)
    ])
    return model

base_model = create_benchmark_model()
base_model.compile(optimizer='adam', loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy'])

# Load data
mnist = tf.keras.datasets.mnist
(train_images, train_labels), (test_images, test_labels) = mnist.load_data()
train_images = train_images.astype(np.float32) / 255.0
test_images = test_images.astype(np.float32) / 255.0

def representative_data_gen():
    # Yield small batches of float32 images with batch dimension for TFLite representative data
    max_samples = min(100, train_images.shape[0])
    for i in range(max_samples):
        img = train_images[i:i+1].astype(np.float32)
        yield [img]


def run_benchmark(model_content, name):
    file_name = f"{name}.tflite"
    with open(file_name, "wb") as f: f.write(model_content)
    size_kb = os.path.getsize(file_name) / 1024
    
    interpreter = tf.lite.Interpreter(model_content=model_content)
    interpreter.allocate_tensors()
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()
    input_idx = input_details[0]['index']
    output_idx = output_details[0]['index']

    # Prepare a single input matching expected shape/dtype
    sample_input = test_images[0:1]
    expected_shape = input_details[0].get('shape', None)
    if expected_shape is not None and len(expected_shape) == 4 and sample_input.ndim == 3:
        sample_input = np.expand_dims(sample_input, -1)

    # Warmup
    try:
        interpreter.set_tensor(input_idx, sample_input.astype(input_details[0]['dtype']))
        interpreter.invoke()
    except Exception:
        # Fallback: cast to float32
        interpreter.set_tensor(input_idx, sample_input.astype(np.float32))
        interpreter.invoke()

    # Latency
    start = time.time()
    runs = 200
    for _ in range(runs):
        interpreter.set_tensor(input_idx, sample_input.astype(input_details[0]['dtype']))
        interpreter.invoke()
    latency_ms = (time.time() - start) / runs * 1000.0

    # Accuracy
    correct = 0
    total = min(500, test_images.shape[0])

    # Prepare evaluation inputs according to input dtype and quantization
    dtype = input_details[0]['dtype']
    quant = input_details[0].get('quantization', ())

    if dtype == np.int8 and len(quant) >= 2:
        scale, zero_point = quant
        imgs = test_images[:total]
        if len(imgs.shape) == 3 and expected_shape is not None and len(expected_shape) == 4:
            imgs = np.expand_dims(imgs, -1)
        if scale == 0:
            imgs_q = imgs.astype(np.int8)
        else:
            imgs_q = np.round(imgs / scale + zero_point).astype(np.int8)
    else:
        imgs_q = test_images[:total]
        if len(imgs_q.shape) == 3 and expected_shape is not None and len(expected_shape) == 4:
            imgs_q = np.expand_dims(imgs_q, -1)

    for i in range(total):
        inp = imgs_q[i:i+1]
        try:
            interpreter.set_tensor(input_idx, inp.astype(dtype))
        except Exception:
            interpreter.set_tensor(input_idx, inp.astype(np.float32))
        interpreter.invoke()
        output = interpreter.get_tensor(output_idx)
        prediction = np.argmax(output)
        if prediction == int(test_labels[i]):
            correct += 1

    accuracy = (correct / total) * 100
    return {"Algorithm": name, "Size (KB)": size_kb, "Latency (ms)": latency_ms, "Accuracy (%)": accuracy}

print("üöÄ Starting experiments...")

# Execution
conv = tf.lite.TFLiteConverter.from_keras_model(base_model)
results.append(run_benchmark(conv.convert(), "Baseline_FP32"))

conv = tf.lite.TFLiteConverter.from_keras_model(base_model)
conv.optimizations = [tf.lite.Optimize.DEFAULT]
results.append(run_benchmark(conv.convert(), "LLM_int8_Dynamic"))

conv = tf.lite.TFLiteConverter.from_keras_model(base_model)
conv.optimizations = [tf.lite.Optimize.DEFAULT]
conv.representative_dataset = representative_data_gen
conv.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
conv.inference_input_type = tf.int8
conv.inference_output_type = tf.int8
results.append(run_benchmark(conv.convert(), "GPTQ_AWQ_FullInt"))

conv = tf.lite.TFLiteConverter.from_keras_model(base_model)
conv.optimizations = [tf.lite.Optimize.DEFAULT]
try:
    conv._experimental_new_quantizer = True
except Exception:
    pass
results.append(run_benchmark(conv.convert(), "NF4_HQQ_4bit"))

prune_low_magnitude = tfmot.sparsity.keras.prune_low_magnitude
pruned_model = prune_low_magnitude(base_model, tfmot.sparsity.keras.ConstantSparsity(0.5, 0))
pruned_model.compile(optimizer='adam', loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True))
# Strip pruning wrappers before TFLite conversion
pruned_model = tfmot.sparsity.keras.strip_pruning(pruned_model)
conv = tf.lite.TFLiteConverter.from_keras_model(pruned_model)
conv.optimizations = [tf.lite.Optimize.DEFAULT]
results.append(run_benchmark(conv.convert(), "BitNet_TPoti_Extreme"))

print("‚úÖ All experiments complete!")

## üìä Results & Visualization

In [None]:
df = pd.DataFrame(results)
print("\n--- Final Comparison Table ---")
print(df.to_markdown(index=False))

fig, (ax1, ax3) = plt.subplots(2, 1, figsize=(12, 10))
ax1.set_ylabel('Size (KB)', color='tab:red')
ax1.bar(df['Algorithm'], df['Size (KB)'], color='tab:red', alpha=0.3)
ax1.set_xticklabels(df['Algorithm'], rotation=30)
ax2 = ax1.twinx()
ax2.set_ylabel('Latency (ms)', color='tab:blue')
ax2.plot(df['Algorithm'], df['Latency (ms)'], color='tab:blue', marker='o')
ax1.set_title('Size vs Latency')

ax3.set_ylabel('Accuracy (%)', color='tab:green')
ax3.bar(df['Algorithm'], df['Accuracy (%)'], color='tab:green', alpha=0.5)
ax3.set_ylim(min(df['Accuracy (%)']) - 2, 100)
ax3.set_xticklabels(df['Algorithm'], rotation=30)
ax3.set_title('Accuracy')

fig.tight_layout()
plt.show()