# Lab 4a: Hardware Benchmarking (HW Accelerators Prespective)
## Hardware for Machine Learning Course

This notebook is to benchmark different HWs from their architecture prespective.
Part-1 covers:
1. Environment setup, Model and dataset preparation
2. CPU performance benchmarking
3. GPU performance benchmarking

Part-2 covers:
1. Model quntization on FP16 and INT8.
2. Evaluation and comparison among quantized models.

Part-2 covers:
1. Model pruning.
2. Pruning sparsity vs accuracy trade-offs.

The lab will explore how a neural network model perform across different hardware platforms and how they can be optimized for specific deployment scenarios.

## PART-1: LATENCY BENCHMARKING
### ENVIRONMENT SETUP

First, we'll set up our environment by importing necessary libraries and checking available hardware.

In [None]:
# Include the needed libraries
import tensorflow as tf
import tensorflow_datasets as tfds
import time
import matplotlib.pyplot as plt
import numpy as np
from tensorflow.keras import layers, models, losses, optimizers, metrics
import os

# Check for accelerator (GPU or TPU)
accelerator_found = False
using_tpu = False
strategy = None

# Check for TPU first (specifically for Colab)
try:
    # For Colab TPU, we need to use the following approach
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver.connect()  # Simplified TPU initialization for Colab
    print(f"TPU detected, initializing...")
    
    # Create TPU strategy
    strategy = tf.distribute.TPUStrategy(tpu)
    using_tpu = True
    accelerator_found = True
    print(f"TPU initialized with {strategy.num_replicas_in_sync} replicas")
    print(f"TPU type: {tpu.get_master()}")  # This will show the TPU type
except (ValueError, AttributeError, ImportError, Exception) as e:
    print(f"TPU not available: {e}")
    
# Check for GPU
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    accelerator_found = True
    strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")
    print(f"GPU detected: {gpus[0].name}")
else:
    print("No accelerator detected!")

if not accelerator_found:
    raise RuntimeError("No accelerator (GPU or TPU) detected! Please choose a GPU or TPU runtime in Colab.")

### MODEL, DATASET PREPARATION, AND BUILDING HELPING FUNCTIONS
Now we'll create our model architectures and prepare the CIFAR-10 dataset for training and evaluation.

In [None]:
# CREATE A CNN MODEL
def create_cnn_model(num_classes=10):
    """Create a simple CNN model for classification."""
    model = models.Sequential([
        # First convolutional block
        layers.Conv2D(32, (3, 3), padding='same', activation='relu', input_shape=(32, 32, 3)),
        layers.MaxPooling2D((2, 2)),
        
        # Second convolutional block
        layers.Conv2D(64, (3, 3), padding='same', activation='relu'),
        layers.MaxPooling2D((2, 2)),
        
        # Flatten and dense layers
        layers.Flatten(),
        layers.Dense(128, activation='relu'),
        layers.Dropout(0.3),
        layers.Dense(num_classes, activation='softmax')
    ])
    
    return model

def load_and_prepare_cifar10(batch_size=32, train_fraction=0.1, val_fraction=0.1):
    """
    Load and prepare a small portion of CIFAR-10 dataset.
    
    Args:
        batch_size: Batch size for datasets
        train_fraction: Fraction of training data to use (0.0 to 1.0)
        val_fraction: Fraction of training data to use for validation (0.0 to 1.0)
    
    Returns:
        train_ds, val_ds, test_ds: tf.data.Dataset objects
    """
    
    # Define preprocessing function
    def preprocess(image, label):
        # Convert image to float32 and normalize to [0,1]
        image = tf.cast(image, tf.float32) / 255.0
        # Normalize with CIFAR-10 mean and std
        mean = tf.constant([0.4914, 0.4822, 0.4465])
        std = tf.constant([0.2470, 0.2435, 0.2616])
        image = (image - mean) / std
        return image, label
    
    # Load full CIFAR-10 dataset
    train_ds_full, test_ds = tfds.load(
        'cifar10',
        split=['train', 'test'],
        as_supervised=True,
        batch_size=-1  # Load all data at once for splitting
    )
    
    # Convert to numpy for easier subset creation
    train_images, train_labels = tfds.as_numpy(train_ds_full)
    test_images, test_labels = tfds.as_numpy(test_ds)
    
    # Calculate sizes for training subset
    total_train = len(train_images)
    train_size = int(total_train * train_fraction)
    val_size = int(total_train * val_fraction)
    
    # Randomly shuffle indices
    indices = np.random.permutation(total_train)
    train_indices = indices[:train_size]
    val_indices = indices[train_size:train_size + val_size]
    
    # Create subsets
    train_images_subset = train_images[train_indices]
    train_labels_subset = train_labels[train_indices]
    val_images = train_images[val_indices]
    val_labels = train_labels[val_indices]
    
    print(f"Training samples: {len(train_images_subset)}")
    print(f"Validation samples: {len(val_images)}")
    print(f"Test samples: {len(test_images)}")
    
    # Create tf.data.Dataset objects
    train_ds = tf.data.Dataset.from_tensor_slices((train_images_subset, train_labels_subset))
    val_ds = tf.data.Dataset.from_tensor_slices((val_images, val_labels))
    test_ds = tf.data.Dataset.from_tensor_slices((test_images, test_labels))
    
    # Apply preprocessing and batching
    train_ds = train_ds.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
    val_ds = val_ds.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
    test_ds = test_ds.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
    
    # Batch and prefetch
    train_ds = train_ds.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    val_ds = val_ds.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    test_ds = test_ds.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    
    return train_ds, val_ds, test_ds

In [None]:
def train_with_strategy(strategy, train_ds, val_ds, num_epochs=5):
    """Train the model using the specified distribution strategy."""
    
    print(f"Training with strategy: {strategy}")
    
    with strategy.scope():
        # Create model
        model = create_cnn_model(num_classes=10)
        
        # Compile model
        model.compile(
            optimizer=optimizers.Adam(learning_rate=0.001),
            loss=losses.SparseCategoricalCrossentropy(from_logits=False),
            metrics=['accuracy']
        )
        
        # Print model summary
        model.summary()
    
    # Initialize timing variables
    total_training_time = 0
    total_samples_processed = 0
    epoch_metrics = []
    
    # Custom training loop for timing
    for epoch in range(num_epochs):
        # Start timing
        start_time = time.time()
        
        # Train for one epoch
        history = model.fit(
            train_ds,
            epochs=1,
            validation_data=val_ds,
            verbose=0
        )
        
        # End timing
        epoch_time = time.time() - start_time
        
        # Calculate samples processed
        samples_processed = sum(1 for _ in train_ds.unbatch().batch(1))
        
        # Update totals
        total_training_time += epoch_time
        total_samples_processed += samples_processed
        
        # Calculate samples per second
        samples_per_second = samples_processed / epoch_time
        
        # Store metrics
        epoch_metrics.append({
            'epoch': epoch + 1,
            'train_loss': history.history['loss'][0],
            'train_acc': history.history['accuracy'][0],
            'val_loss': history.history['val_loss'][0],
            'val_acc': history.history['val_accuracy'][0],
            'epoch_time': epoch_time,
            'samples_per_second': samples_per_second
        })
        
        print(f'Epoch {epoch + 1}/{num_epochs}:')
        print(f'Train Loss: {history.history["loss"][0]:.4f}, Train Acc: {history.history["accuracy"][0]:.4f}')
        print(f'Val Loss: {history.history["val_loss"][0]:.4f}, Val Acc: {history.history["val_accuracy"][0]:.4f}')
        print(f'Epoch Time: {epoch_time:.2f}s, Samples/sec: {samples_per_second:.2f}')
        print('-' * 50)
    
    # Calculate overall metrics
    avg_samples_per_second = total_samples_processed / total_training_time
    final_val_acc = epoch_metrics[-1]['val_acc']
    
    print(f"\nOverall Training Summary:")
    print(f"Total Training Time: {total_training_time:.2f}s")
    print(f"Average Samples/second: {avg_samples_per_second:.2f}")
    print(f"Final Validation Accuracy: {final_val_acc:.4f}")
    
    return model, total_training_time, avg_samples_per_second, final_val_acc

def test_model(model, test_ds):
    """Test the model."""
    start_time = time.time()
    test_loss, test_acc = model.evaluate(test_ds, verbose=0)
    test_time = time.time() - start_time
    
    print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}')
    print(f'Test Time: {test_time:.2f}s')
    return test_loss, test_acc

def profile_inference(models_dict, test_ds, batch_sizes=[1, 32, 64], num_iterations=100):
    """
    Profile inference performance on CPU and accelerator for different batch sizes.
    
    Args:
        models_dict: Dictionary with keys 'cpu' and 'accelerator' containing trained models
        test_ds: tf.data.Dataset for test dataset
        batch_sizes: List of batch sizes to profile
        num_iterations: Number of inference iterations for accurate timing
    
    Returns:
        Dictionary with profiling results
    """
    results = {
        'cpu': {},
        'accelerator': {}
    }
    
    # Get a fixed batch from test dataset
    for batch in test_ds.take(1):
        fixed_inputs, _ = batch
    
    for device_name, model in models_dict.items():
        print(f"\nProfiling inference on {device_name.upper()}...")
        
        # Determine device to use
        if device_name == 'cpu':
            device = '/CPU:0'
        else:
            # For accelerator, check what's available
            if tf.config.list_physical_devices('GPU'):
                device = '/GPU:0'
            elif tf.config.list_physical_devices('TPU'):
                device = '/TPU:0'
            else:
                device = '/CPU:0'
                print(f"  Warning: No accelerator found, using CPU for {device_name} profiling")
        
        print(f"  Using device: {device}")
        
        for batch_size in batch_sizes:
            # Prepare batch
            if batch_size <= fixed_inputs.shape[0]:
                inputs = fixed_inputs[:batch_size]
            else:
                # Repeat to create larger batch
                repeats = (batch_size + fixed_inputs.shape[0] - 1) // fixed_inputs.shape[0]
                inputs = tf.tile(fixed_inputs, [repeats, 1, 1, 1])[:batch_size]
            
            # Create a tf.function for faster inference (especially on GPU)
            @tf.function
            def infer_step(x):
                return model(x, training=False)
            
            # Warm-up with explicit device context
            with tf.device(device):
                for _ in range(10):
                    _ = infer_step(inputs)
            
            # Force synchronization for GPU/TPU by running a small operation
            if device != '/CPU:0':
                # Create a small dummy tensor and force evaluation
                _ = tf.constant(0).numpy() if device == '/CPU:0' else tf.constant(0)
            
            # Timed inference with explicit device context
            total_time_seconds = 0
            
            # Use time.time() with appropriate synchronization
            with tf.device(device):
                # For GPU, we need to ensure all operations are complete before/after timing
                if device != '/CPU:0':
                    # Synchronize by running a small operation and forcing evaluation
                    _ = tf.constant(0)
                
                start_time = time.time()
                
                for _ in range(num_iterations):
                    _ = infer_step(inputs)
                
                # For GPU/TPU, ensure all operations are complete
                if device != '/CPU:0':
                    # Force a synchronization by converting a small tensor to numpy
                    _ = tf.constant(0).numpy()
                
                end_time = time.time()
                total_time_seconds = end_time - start_time
            
            total_time_ms = total_time_seconds * 1000
            
            # Calculate metrics
            avg_time_per_batch_ms = total_time_ms / num_iterations
            samples_per_second = (batch_size * num_iterations) / total_time_seconds
            
            results[device_name][batch_size] = {
                'avg_inference_time_ms': avg_time_per_batch_ms,
                'samples_per_second': samples_per_second,
                'total_time_seconds': total_time_seconds,
                'num_iterations': num_iterations,
                'device_used': device
            }
            
            print(f"  Batch size {batch_size}:")
            print(f"    Device: {device}")
            print(f"    Avg inference time: {avg_time_per_batch_ms:.2f} ms")
            print(f"    Samples/second: {samples_per_second:.2f}")
    
    # Print comparison summary
    print("\n" + "="*70)
    print("INFERENCE PROFILING SUMMARY")
    print("="*70)
    print(f"{'Batch Size':<12} {'Metric':<20} {'CPU':<15} {'Accelerator':<15} {'Speedup':<10}")
    print("-"*70)
    
    for batch_size in batch_sizes:
        cpu_time = results['cpu'][batch_size]['avg_inference_time_ms']
        acc_time = results['accelerator'][batch_size]['avg_inference_time_ms']
        speedup_time = cpu_time / acc_time if acc_time > 0 else float('inf')
        
        cpu_throughput = results['cpu'][batch_size]['samples_per_second']
        acc_throughput = results['accelerator'][batch_size]['samples_per_second']
        speedup_throughput = acc_throughput / cpu_throughput if cpu_throughput > 0 else float('inf')
        
        print(f"{batch_size:<12} {'Time (ms)':<20} {cpu_time:<15.2f} {acc_time:<15.2f} {speedup_time:<10.2f}x")
        print(f"{batch_size:<12} {'Samples/sec':<20} {cpu_throughput:<15.2f} {acc_throughput:<15.2f} {speedup_throughput:<10.2f}x")
        print("-"*70)
    
    return results

def plot_inference_results(profile_results):
    """
    Plot inference profiling results comparing CPU vs Accelerator.
    
    Args:
        profile_results: Dictionary returned by profile_inference() function
    """
    batch_sizes = list(profile_results['cpu'].keys())
    
    # Extract data
    cpu_times = [profile_results['cpu'][bs]['avg_inference_time_ms'] for bs in batch_sizes]
    acc_times = [profile_results['accelerator'][bs]['avg_inference_time_ms'] for bs in batch_sizes]
    
    cpu_throughput = [profile_results['cpu'][bs]['samples_per_second'] for bs in batch_sizes]
    acc_throughput = [profile_results['accelerator'][bs]['samples_per_second'] for bs in batch_sizes]
    
    # Calculate speedups
    time_speedups = [cpu_times[i] / acc_times[i] for i in range(len(batch_sizes))]
    throughput_speedups = [acc_throughput[i] / cpu_throughput[i] for i in range(len(batch_sizes))]
    
    # Create figure with subplots
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
    fig.suptitle('Inference Performance: CPU vs Accelerator', fontsize=16, fontweight='bold')
    
    # Plot 1: Inference Time (bar chart)
    x = np.arange(len(batch_sizes))
    width = 0.35
    
    bars1 = ax1.bar(x - width/2, cpu_times, width, label='CPU', color='skyblue', edgecolor='navy')
    bars2 = ax1.bar(x + width/2, acc_times, width, label='Accelerator', color='lightcoral', edgecolor='darkred')
    
    ax1.set_xlabel('Batch Size', fontsize=12)
    ax1.set_ylabel('Inference Time (ms)', fontsize=12)
    ax1.set_title('Average Inference Time per Batch', fontsize=14)
    ax1.set_xticks(x)
    ax1.set_xticklabels([f'BS={bs}' for bs in batch_sizes])
    ax1.legend()
    ax1.grid(True, alpha=0.3)
    
    # Add value labels on bars
    for bars in [bars1, bars2]:
        for bar in bars:
            height = bar.get_height()
            ax1.text(bar.get_x() + bar.get_width()/2., height,
                    f'{height:.1f}', ha='center', va='bottom', fontsize=9)
    
    # Plot 2: Throughput (bar chart)
    bars1 = ax2.bar(x - width/2, cpu_throughput, width, label='CPU', color='skyblue', edgecolor='navy')
    bars2 = ax2.bar(x + width/2, acc_throughput, width, label='Accelerator', color='lightcoral', edgecolor='darkred')
    
    ax2.set_xlabel('Batch Size', fontsize=12)
    ax2.set_ylabel('Samples/Second', fontsize=12)
    ax2.set_title('Throughput', fontsize=14)
    ax2.set_xticks(x)
    ax2.set_xticklabels([f'BS={bs}' for bs in batch_sizes])
    ax2.legend()
    ax2.grid(True, alpha=0.3)
    
    # Add value labels on bars
    for bars in [bars1, bars2]:
        for bar in bars:
            height = bar.get_height()
            ax2.text(bar.get_x() + bar.get_width()/2., height,
                    f'{height:.0f}', ha='center', va='bottom', fontsize=9)
    
    plt.tight_layout()
    plt.show()
    
    # Print detailed summary table
    print("\n" + "="*70)
    print("DETAILED INFERENCE PERFORMANCE COMPARISON")
    print("="*70)
    print(f"{'Batch Size':<12} {'Metric':<20} {'CPU':<15} {'Accelerator':<15} {'Speedup':<10}")
    print("-"*70)
    
    for i, bs in enumerate(batch_sizes):
        print(f"{bs:<12} {'Time (ms)':<20} {cpu_times[i]:<15.2f} {acc_times[i]:<15.2f} {time_speedups[i]:<10.2f}x")
        print(f"{bs:<12} {'Samples/sec':<20} {cpu_throughput[i]:<15.0f} {acc_throughput[i]:<15.0f} {throughput_speedups[i]:<10.2f}x")
        print("-"*70)

### PREPARING BENCHARKING FOR BOTH THE CPU AND THE GPU

In [None]:
# Main execution
print("Loading and preparing CIFAR-10 dataset...")
train_ds, val_ds, test_ds = load_and_prepare_cifar10(
    batch_size=32,
    train_fraction=0.1,
    val_fraction=0.1
)
num_epochs = 3
# Print CIFAR-10 class names
classes = ['airplane', 'automobile', 'bird', 'cat', 'deer', 
           'dog', 'frog', 'horse', 'ship', 'truck']
print(f"Classes: {classes}")

### BENCHMARKING THE CPU TRAINING
Measuring baseline performance on CPU before accelerator comparison

In [None]:
# Train on CPU
print("\n" + "="*60)
print("TRAINING ON CPU")
print("="*60)
cpu_strategy = tf.distribute.OneDeviceStrategy(device="/cpu:0")
cpu_model, cpu_time, cpu_samples_per_sec, cpu_val_acc = train_with_strategy(
    cpu_strategy, train_ds, val_ds, num_epochs
)

# Test CPU model
print("\nTesting CPU model:")
cpu_test_loss, cpu_test_acc = test_model(cpu_model, test_ds)

### BENCHMARKING THE ACCELERATOR TRAINING 
Evaluating training speed and throughput on GPU/TPU hardware

In [None]:
# Train on accelerator (GPU or TPU)
print("\n" + "="*60)
print("TRAINING ON ACCELERATOR")
print("="*60)
accelerator_model, accelerator_time, accelerator_samples_per_sec, accelerator_val_acc = train_with_strategy(
    strategy, train_ds, val_ds, num_epochs
)

# Test accelerator model
print("\nTesting Accelerator model:")
accelerator_test_loss, accelerator_test_acc = test_model(accelerator_model, test_ds)

### BENCHMARKING THE CPU AND THE ACCELERATOR INFERENCE
Comparing inference latency and throughput across hardware platforms

In [None]:
# Profile inference
models_for_profiling = {
    'cpu': cpu_model,
    'accelerator': accelerator_model
}
profile_results = profile_inference(models_for_profiling, test_ds, batch_sizes=[1, 32, 64], num_iterations=100)

# Plot results
plot_inference_results(profile_results)

## Part-2: MODEL QUANTIZATION AND PRUNING
In this part we will quantize a model then benchmark its performance

### IMPORTING NEEDED LIBRARIES AND BUILDING HELPING FUNCTIONS

In [None]:
# Load the saved CNN model
from tensorflow.keras.models import load_model
import pandas as pd
import seaborn as sns
import os

# Save the accelerator model first
accelerator_model.save('accelerator_model.h5')
print("Accelerator model saved as 'accelerator_model.h5'")
original_model = load_model('accelerator_model.h5')

# Since your test_ds is a batched tf.data.Dataset, you need to extract the data
print("\nExtracting test data for evaluation...")

# Extract test data from the test_ds dataset
x_test_list = []
y_test_list = []

for images, labels in test_ds.unbatch():  # Unbatch to get individual samples
    x_test_list.append(images.numpy())
    y_test_list.append(labels.numpy())

# Convert to numpy arrays
x_test = np.array(x_test_list)
y_test = np.array(y_test_list)

print(f"Test data shape: {x_test.shape}")
print(f"Test labels shape: {y_test.shape}")

# Similarly, extract training data for representative dataset
x_train_list = []
y_train_list = []

# Note: train_ds is also batched, so we need to unbatch it
for images, labels in train_ds.unbatch():
    x_train_list.append(images.numpy())
    y_train_list.append(labels.numpy())

x_train = np.array(x_train_list)
y_train = np.array(y_train_list)

print(f"Training data shape: {x_train.shape}")

# Function to evaluate model performance
def evaluate_model(model, x_test, y_test, model_name="Model"):
    """Evaluate model accuracy and size."""
    # Measure model size
    model_size = 0
    if hasattr(model, 'weights'):
        for weight in model.weights:
            model_size += weight.numpy().nbytes
    else:
        model.save('temp_model.h5')
        model_size = os.path.getsize('temp_model.h5')
        os.remove('temp_model.h5')

    # Evaluate model accuracy
    loss, accuracy = model.evaluate(x_test, y_test, verbose=0)

    print(f"{model_name} Evaluation:")
    print(f"  Accuracy: {accuracy * 100:.2f}%")
    print(f"  Model Size: {model_size / 1024 / 1024:.2f} MB")

    return {
        'model_name': model_name,
        'accuracy': accuracy * 100,
        'model_size_mb': model_size / 1024 / 1024
    }

# Evaluate original model
original_results = evaluate_model(original_model, x_test, y_test, "Original Model")

# Apply post-training quantization with TensorFlow Lite
# First, create a representative dataset for quantization
def representative_dataset():
    # Use a subset of training data for representative dataset
    for i in range(min(5, len(x_train))):
        # The input needs to be in the correct shape for the model
        # The model expects input shape (1, 32, 32, 3)
        yield [x_train[i:i+1].astype(np.float32)]

### TensorFlow Lite Conversion

First, we'll convert our model to TensorFlow Lite format, which is optimized for mobile and edge devices.

In [None]:
# Convert to TFLite model
converter = tf.lite.TFLiteConverter.from_keras_model(original_model)
tflite_model = converter.convert()

# Save the TFLite model
with open('original_model.tflite', 'wb') as f:
    f.write(tflite_model)

# Check the size of the TFLite model
tflite_model_size = os.path.getsize('original_model.tflite')
print(f"\nTFLite model size: {tflite_model_size / 1024 / 1024:.2f} MB")

# Quantize the model to float16
converter = tf.lite.TFLiteConverter.from_keras_model(original_model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_types = [tf.float16]
tflite_fp16_model = converter.convert()

### Float16 Quantization

Next, we'll apply Float16 quantization, which reduces the precision of weights from float32 to float16, potentially reducing model size by up to 50%.

In [None]:
# Save the quantized model
with open('quantized_fp16_model.tflite', 'wb') as f:
    f.write(tflite_fp16_model)

# Check the size of the quantized TFLite model
tflite_fp16_model_size = os.path.getsize('quantized_fp16_model.tflite')
print(f"Float16 quantized TFLite model size: {tflite_fp16_model_size / 1024 / 1024:.2f} MB")
print(f"Size reduction: {(tflite_model_size / tflite_fp16_model_size):.2f}%")


### INT8 Quantization

Now, we'll apply full integer quantization, which converts weights and activations to 8-bit integers. This is one of the most aggressive form of quantization and can result in significant size reduction.

In [None]:
# Quantize the model to int8 (full integer quantization)
converter = tf.lite.TFLiteConverter.from_keras_model(original_model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_dataset
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.float32  # Keep input as float32 for easier evaluation
converter.inference_output_type = tf.float32  # Keep output as float32
tflite_int8_model = converter.convert()

# Save the int8 quantized model
with open('quantized_int8_model.tflite', 'wb') as f:
    f.write(tflite_int8_model)

# Check the size of the int8 quantized TFLite model
tflite_int8_model_size = os.path.getsize('quantized_int8_model.tflite')
print(f"Int8 quantized TFLite model size: {tflite_int8_model_size / 1024 / 1024:.2f} MB")
print(f"Size reduction: {(tflite_model_size / tflite_int8_model_size):.2f}%")


### Evaluating TFLite Models

Now, let's evaluate the performance of our quantized TFLite models to see how quantization affects accuracy.

In [None]:
# Function to evaluate TFLite model accuracy
def evaluate_tflite_model(tflite_model_path, x_test, y_test):
    """Evaluate a TFLite model on the test dataset."""
    # Load TFLite model
    interpreter = tf.lite.Interpreter(model_path=tflite_model_path)
    interpreter.allocate_tensors()

    # Get input and output tensors
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()

    print(f"\nEvaluating {tflite_model_path}")
    print(f"Input details: {input_details[0]['dtype']}, shape: {input_details[0]['shape']}")
    print(f"Output details: {output_details[0]['dtype']}, shape: {output_details[0]['shape']}")

    # Check if the model is quantized
    input_scale, input_zero_point = input_details[0]['quantization']
    is_quantized = input_scale != 0

    # Test model on all test data
    correct_predictions = 0
    num_samples = len(x_test)
    
    for i in range(num_samples):
        # Get test sample and ensure correct shape (add batch dimension)
        test_image = x_test[i:i+1].copy()  # Shape becomes (1, 32, 32, 3)
        true_label = y_test[i]

        # FIX: Handle quantization properly
        if is_quantized:
            # For quantized models, we need to convert to the expected type
            if input_details[0]['dtype'] == np.int8:
                test_image = (test_image / input_scale + input_zero_point).astype(np.int8)
            elif input_details[0]['dtype'] == np.uint8:
                test_image = (test_image / input_scale + input_zero_point).astype(np.uint8)
        else:
            # For float models, ensure float32
            test_image = test_image.astype(np.float32)

        # Ensure correct shape
        test_image = np.reshape(test_image, input_details[0]['shape'])

        # Set the input tensor
        interpreter.set_tensor(input_details[0]['index'], test_image)

        # Run inference
        interpreter.invoke()

        # Get the output tensor
        output = interpreter.get_tensor(output_details[0]['index'])

        # FIX: Handle output properly
        if output_details[0]['dtype'] == np.int8:
            # Dequantize if necessary
            output_scale, output_zero_point = output_details[0]['quantization']
            output = output.astype(np.float32)
            output = (output - output_zero_point) * output_scale

        # Get predicted label (assuming 10 classes)
        predicted_label = np.argmax(output[0])

        if predicted_label == true_label:
            correct_predictions += 1

        # Print progress occasionally
        if (i + 1) % 1000 == 0:
            print(f"  Processed {i + 1}/{num_samples} samples...")

    # Calculate accuracy
    accuracy = correct_predictions / num_samples

    # Get model size
    model_size = os.path.getsize(tflite_model_path)

    print(f"\nTFLite Model: {tflite_model_path}")
    print(f"  Accuracy: {accuracy * 100:.2f}%")
    print(f"  Model Size: {model_size / 1024 / 1024:.2f} MB")
    print(f"  Correct predictions: {correct_predictions}/{num_samples}")

    return {
        'model_name': os.path.basename(tflite_model_path),
        'accuracy': accuracy * 100,
        'model_size_mb': model_size / 1024 / 1024
    }

# Evaluate TFLite models
print("\n" + "="*60)
print("EVALUATING TFLITE MODELS")
print("="*60)

tflite_original_results = evaluate_tflite_model('original_model.tflite', x_test, y_test)
tflite_fp16_results = evaluate_tflite_model('quantized_fp16_model.tflite', x_test, y_test)
tflite_int8_results = evaluate_tflite_model('quantized_int8_model.tflite', x_test, y_test)


### Comparing Quantization Results
Now let's compare all our models to understand the trade-offs between model size and accuracy.

In [None]:
# Collect all quantization results
quantization_results = [
    original_results,
    tflite_original_results,
    tflite_fp16_results,
    tflite_int8_results
]

# Create a dataframe and print results
quantization_df = pd.DataFrame(quantization_results)
print("\n" + "="*60)
print("QUANTIZATION RESULTS SUMMARY")
print("="*60)
print(quantization_df.to_string(index=False))

# Plot quantization results
plt.figure(figsize=(14, 5))

# Plot accuracy comparison
plt.subplot(1, 2, 1)
bars1 = plt.bar(quantization_df['model_name'], quantization_df['accuracy'], 
                color=['blue', 'green', 'orange', 'red'])
plt.title('Accuracy Comparison', fontsize=14, fontweight='bold')
plt.ylabel('Accuracy (%)')
plt.xlabel('Model')
plt.xticks(rotation=45, ha='right')
plt.ylim([0, 105])  # Give some headroom for labels

# Add value labels on bars
for bar in bars1:
    height = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2., height,
             f'{height:.1f}%', ha='center', va='bottom')

# Plot model size comparison
plt.subplot(1, 2, 2)
bars2 = plt.bar(quantization_df['model_name'], quantization_df['model_size_mb'],
                color=['blue', 'green', 'orange', 'red'])
plt.title('Model Size Comparison', fontsize=14, fontweight='bold')
plt.ylabel('Model Size (MB)')
plt.xlabel('Model')
plt.xticks(rotation=45, ha='right')

# Add value labels on bars
for bar in bars2:
    height = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2., height,
             f'{height:.2f} MB', ha='center', va='bottom')

plt.tight_layout()
plt.show()

# Plot trade-off between accuracy and model size
plt.figure(figsize=(10, 6))

# Create scatter plot with different colors for each model
colors = ['blue', 'green', 'orange', 'red']
sizes = [100, 100, 100, 100]

for i, row in quantization_df.iterrows():
    plt.scatter(row['model_size_mb'], row['accuracy'], 
               s=200, color=colors[i], label=row['model_name'], alpha=0.7)

# Add connecting lines to show trade-off
plt.plot(quantization_df['model_size_mb'], quantization_df['accuracy'], 
         'k--', alpha=0.3, label='Trade-off curve')

# Annotate points
for i, row in quantization_df.iterrows():
    plt.annotate(row['model_name'],
                (row['model_size_mb'], row['accuracy']),
                xytext=(10, 5), textcoords='offset points',
                fontsize=10, fontweight='bold')

plt.title('Accuracy vs. Model Size Trade-off', fontsize=14, fontweight='bold')
plt.xlabel('Model Size (MB)')
plt.ylabel('Accuracy (%)')
plt.grid(True, alpha=0.3)
plt.legend()
plt.tight_layout()
plt.show()

# Print summary statistics
print("\n" + "="*60)
print("PERFORMANCE SUMMARY")
print("="*60)
print(f"{'Model':<25} {'Accuracy (%)':<15} {'Size (MB)':<15} {'Size Reduction':<15}")
print("-"*60)

for i, row in quantization_df.iterrows():
    if i == 0:
        size_reduction = "0% (baseline)"
    else:
        reduction = (1 - row['model_size_mb'] / quantization_df.iloc[0]['model_size_mb']) * 100
        size_reduction = f"{reduction:.1f}%"
    
    print(f"{row['model_name']:<25} {row['accuracy']:<15.2f} {row['model_size_mb']:<15.2f} {size_reduction:<15}")

### Quantization Summary

In this section, we've applied different quantization techniques to our CNN model:

1. **TensorFlow Lite Conversion**: Converting to TFLite format without quantization
2. **Float16 Quantization**: Reducing weight precision from 32-bit to 16-bit floating point
3. **INT8 Quantization**: Full integer quantization with 8-bit weights and activations

We've observed how each technique affects model size and accuracy. The results show that quantization can significantly reduce model size with minimal impact on accuracy, making it an effective technique for deploying models to resource-constrained environments like mobile devices and edge hardware.

## PART-3: MODEL PRUNING

In this section, we'll explore model pruning and its effects on model size and accuracy.

In [None]:
# Import tensorflow_model_optimization again to ensure it's available
try:
  import tensorflow_model_optimization as tfmot
  from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Dropout
  from tensorflow.keras.models import Model
except:
  !pip install tensorflow-model-optimization
  import tensorflow_model_optimization as tfmot
  from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Dropout
  from tensorflow.keras.models import Model

### Manual Pruning Implementation

We'll implement a manual pruning approach that removes weights based on their magnitude. This is a common approach where weights with small absolute values are considered less important and set to zero.

In [None]:
# Define a function to create a prunable model
def create_prunable_cnn_model(input_shape, num_classes):
    """Create a functional CNN model for pruning compatibility."""
    inputs = Input(shape=input_shape)

    x = Conv2D(32, kernel_size=(3, 3), activation='relu', padding='same')(inputs)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Conv2D(64, kernel_size=(3, 3), activation='relu', padding='same')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Flatten()(x)
    x = Dense(128, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(num_classes, activation='softmax')(x)

    model = Model(inputs=inputs, outputs=outputs)

    model.compile(
        optimizer='adam',
        loss='sparse_categorical_crossentropy', 
        metrics=['accuracy']
    )
    return model

# Define a function to manually prune weights based on magnitude
def prune_weights(model, sparsity=0.5):
    """
    Prune model weights based on magnitude.

    Args:
        model: Keras model to prune
        sparsity: Target sparsity level (fraction of weights to be pruned)

    Returns:
        Pruned model
    """
    print(f"\nApplying magnitude-based pruning with {sparsity:.0%} sparsity...")

    pruned_model = tf.keras.models.clone_model(model)
    pruned_model.set_weights(model.get_weights())

    # Prune eligible layers (Conv and Dense)
    for i, layer in enumerate(pruned_model.layers):
        if isinstance(layer, (tf.keras.layers.Conv2D, tf.keras.layers.Dense)):
            weights = layer.get_weights()

            # Only prune the weight matrix, not the bias
            weight_matrix = weights[0]

            # Flatten the weight matrix to identify the threshold
            flat_weights = weight_matrix.flatten()
            abs_weights = np.abs(flat_weights)

            # Calculate the threshold value based on the sparsity level
            k = int(flat_weights.size * sparsity)
            if k > 0:
                threshold = np.partition(abs_weights, k)[k]

                # Create a mask for weights with magnitude below threshold
                mask = np.abs(weight_matrix) > threshold

                # Apply the mask to the weights
                pruned_weights = weight_matrix * mask

                # Update the weights
                weights[0] = pruned_weights
                layer.set_weights(weights)

                non_zero = np.count_nonzero(pruned_weights)
                total = pruned_weights.size
                print(f"  Layer {i} ({layer.name}): {non_zero}/{total} weights retained ({non_zero/total:.2%})")

    # Recompile the model with the same settings
    pruned_model.compile(
        optimizer='adam',
        loss='sparse_categorical_crossentropy',  # Changed to sparse
        metrics=['accuracy']
    )

    return pruned_model

# Prepare validation data from training data
# Use 10% of training data for validation
val_split = 0.1
val_size = int(len(x_train) * val_split)

# Shuffle indices
indices = np.random.permutation(len(x_train))
val_indices = indices[:val_size]
train_indices = indices[val_size:]

x_train_subset = x_train[train_indices]
y_train_subset = y_train[train_indices]
x_val = x_train[val_indices]
y_val = y_train[val_indices]

print(f"Training samples: {len(x_train_subset)}")
print(f"Validation samples: {len(x_val)}")

# Create and train model using the accelerator strategy
print("\n" + "="*60)
print("TRAINING MODEL FOR PRUNING ON ACCELERATOR")
print("="*60)

with strategy.scope():
    # Create a new model for pruning within the strategy scope
    input_shape = (32, 32, 3)
    num_classes = 10
    cnn_model_for_pruning = create_prunable_cnn_model(input_shape, num_classes)
    
    # First, train the model before pruning
    print("\nTraining model before pruning...")
    history = cnn_model_for_pruning.fit(
        x_train_subset, y_train_subset,
        batch_size=128,
        epochs=num_epochs,
        validation_data=(x_val, y_val),
        verbose=1
    )

# Define evaluation function
def prune_evaluate_model(model, x_test, y_test, model_name="Model"):
    """Evaluate model accuracy and estimated size after pruning."""
    total_size = 0
    nonzero_size = 0

    for weight in model.weights:
        w_np = weight.numpy()
        total_size += w_np.nbytes
        nonzero_size += np.count_nonzero(w_np) * w_np.itemsize

    # Accuracy
    loss, accuracy = model.evaluate(x_test, y_test, verbose=0)

    print(f"\n{model_name} Evaluation:")
    print(f"  Accuracy: {accuracy * 100:.2f}%")
    print(f"  Original Size: {total_size / 1024 / 1024:.2f} MB")
    print(f"  Estimated Pruned Size: {nonzero_size / 1024 / 1024:.2f} MB")
    print(f"  Size Reduction: {(1 - nonzero_size / total_size) * 100:.2f}%")

    return {
        'model_name': model_name,
        'accuracy': accuracy * 100,
        'original_size_mb': total_size / 1024 / 1024,
        'pruned_size_mb': nonzero_size / 1024 / 1024,
        'estimated_sparsity': 1 - (nonzero_size / total_size)
    }

# Evaluate before pruning
print("\n" + "="*60)
print("EVALUATING BEFORE PRUNING")
print("="*60)
pre_prune_results = prune_evaluate_model(
    cnn_model_for_pruning, 
    x_test, y_test, 
    "Model Before Pruning"
)

# Apply pruning with different sparsity levels
sparsity_levels = [0.3, 0.5, 0.7, 0.9]
pruning_results = []

for sparsity in sparsity_levels:
    print("\n" + "="*60)
    print(f"PRUNING WITH {sparsity:.0%} SPARSITY")
    print("="*60)
    
    # Prune the model
    pruned_model = prune_weights(cnn_model_for_pruning, sparsity=sparsity)
    
    # Evaluate pruned model
    results = prune_evaluate_model(
        pruned_model, 
        x_test, y_test, 
        f"Model {sparsity:.0%} Pruned"
    )
    pruning_results.append(results)

# Visualize pruning results
import matplotlib.pyplot as plt
import pandas as pd

# Combine all results
all_results = [pre_prune_results] + pruning_results

# Create DataFrame
pruning_df = pd.DataFrame(all_results)
print("\n" + "="*60)
print("PRUNING RESULTS SUMMARY")
print("="*60)
print(pruning_df.to_string(index=False))

# Plot results
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

# Plot 1: Accuracy vs Sparsity
sparsity_values = [0] + sparsity_levels
accuracy_values = [pre_prune_results['accuracy']] + [r['accuracy'] for r in pruning_results]

ax1.plot(sparsity_values, accuracy_values, 'bo-', linewidth=2, markersize=8)
ax1.set_xlabel('Sparsity Level', fontsize=12)
ax1.set_ylabel('Accuracy (%)', fontsize=12)
ax1.set_title('Accuracy vs Pruning Sparsity', fontsize=14, fontweight='bold')
ax1.grid(True, alpha=0.3)
ax1.set_xticks(sparsity_values)

# Plot 2: Size Reduction
size_reduction = [0] + [r['estimated_sparsity'] * 100 for r in pruning_results]
ax2.bar([f"{s:.0%}" for s in sparsity_values], size_reduction, 
        color=['blue', 'green', 'orange', 'red', 'purple'])
ax2.set_xlabel('Sparsity Level', fontsize=12)
ax2.set_ylabel('Size Reduction (%)', fontsize=12)
ax2.set_title('Model Size Reduction', fontsize=14, fontweight='bold')
ax2.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Print detailed comparison
print("\n" + "="*60)
print("DETAILED PRUNING COMPARISON")
print("="*60)
print(f"{'Model':<20} {'Accuracy (%)':<15} {'Size (MB)':<15} {'Reduction':<15}")
print("-"*65)

for i, row in pruning_df.iterrows():
    if i == 0:
        reduction = "0% (baseline)"
    else:
        reduction = f"{row['estimated_sparsity']*100:.1f}%"
    print(f"{row['model_name']:<20} {row['accuracy']:<15.2f} {row['pruned_size_mb']:<15.2f} {reduction:<15}")