In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import pandas as pd
from tensorflow.keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.utils import to_categorical
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from keras.layers import Embedding, Dense, LSTM, Dropout
from keras.models import Sequential
from keras.regularizers import l2
import tensorflow_model_optimization as tfmot
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import os

# Define and register the PrunableLSTM class
@tf.keras.utils.register_keras_serializable(package="Custom", name="PrunableLSTM")
class PrunableLSTM(LSTM, tfmot.sparsity.keras.PrunableLayer):
    def get_prunable_weights(self):
        return [self.cell.kernel, self.cell.recurrent_kernel]

# Function to log model sizes
def get_model_size(model_name):
    model_path = f"{model_name}.keras"
    if os.path.exists(model_path):
        model_size = os.path.getsize(model_path) / 1024  # Size in KB
        return model_size
    else:
        return None

# Function to log accuracy
def get_model_accuracy(history):
    train_acc = history.history['accuracy'][-1]
    val_acc = history.history['val_accuracy'][-1]
    return train_acc, val_acc

# Function to plot model performance
def plot_model_performance(history, model_name):
    plt.figure(figsize=(12, 4))

    # Plot accuracy
    plt.subplot(1, 2, 1)
    plt.plot(history.history["accuracy"], label="Train Accuracy")
    plt.plot(history.history["val_accuracy"], label="Validation Accuracy")
    plt.title(f"{model_name} - Accuracy")
    plt.xlabel("Epochs")
    plt.ylabel("Accuracy")
    plt.legend()

    # Plot loss
    plt.subplot(1, 2, 2)
    plt.plot(history.history["loss"], label="Train Loss")
    plt.plot(history.history["val_loss"], label="Validation Loss")
    plt.title(f"{model_name} - Loss")
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.legend()

    plt.show()

In [None]:
def load_and_preprocess_data(file_path, max_words=10000, maxlen=66):
    data = pd.read_csv(file_path)
    tokenizer = Tokenizer(num_words=max_words)
    tokenizer.fit_on_texts(data["text"])
    sequences = tokenizer.texts_to_sequences(data["text"])
    data_padded = pad_sequences(sequences, maxlen=maxlen)
    label_encoder = LabelEncoder()
    integer_encoded = label_encoder.fit_transform(data["label"])
    one_hot_labels = to_categorical(integer_encoded)
    x_train, x_temp, y_train, y_temp = train_test_split(
        data_padded, one_hot_labels, test_size=0.3, random_state=42
    )
    x_val, x_test, y_val, y_test = train_test_split(
        x_temp, y_temp, test_size=0.5, random_state=42
    )
    return x_train, x_val, x_test, y_train, y_val, y_test, len(label_encoder.classes_)

# Load and preprocess data
file_path = "sample_data/emotions.csv"
x_train, x_val, x_test, y_train, y_val, y_test, num_classes = load_and_preprocess_data(file_path)

In [None]:
def create_model(input_dim, output_dim, input_length, num_classes, use_pruning=False, use_clustering=False, pruning_schedule=None):
    prune_low_magnitude = tfmot.sparsity.keras.prune_low_magnitude

    model = Sequential()
    if use_pruning:
        model.add(prune_low_magnitude(Embedding(input_dim=input_dim, output_dim=output_dim, input_length=input_length)))
        model.add(prune_low_magnitude(PrunableLSTM(64, kernel_regularizer=l2(0.001)), pruning_schedule=pruning_schedule))
        model.add(Dropout(0.5))
        model.add(prune_low_magnitude(Dense(num_classes, activation="softmax")))
    elif use_clustering:
        clustering_params = {
            'number_of_clusters': 16,
            'cluster_centroids_init': tfmot.clustering.keras.CentroidInitialization.KMEANS_PLUS_PLUS
        }
        model.add(tfmot.clustering.keras.cluster_weights(Embedding(input_dim=input_dim, output_dim=output_dim, input_length=input_length), **clustering_params))
        model.add(tfmot.clustering.keras.cluster_weights(LSTM(64, kernel_regularizer=l2(0.001)), **clustering_params))
        model.add(Dropout(0.5))
        model.add(tfmot.clustering.keras.cluster_weights(Dense(num_classes, activation="softmax"), **clustering_params))
    else:
        model.add(Embedding(input_dim=input_dim, output_dim=output_dim, input_length=input_length))
        model.add(LSTM(64, kernel_regularizer=l2(0.001)))
        model.add(Dropout(0.5))
        model.add(Dense(num_classes, activation="softmax"))

    model.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"])
    return model

# Function to train the model with pruning and clustering options
def train_model(model, x_train, y_train, x_val, y_val, use_pruning=False, use_clustering=False):
    callbacks = []
    if use_pruning:
        callbacks = [
            tfmot.sparsity.keras.UpdatePruningStep(),
            tfmot.sparsity.keras.PruningSummaries(log_dir='/tmp/pruning_logs')
        ]
    history = model.fit(x_train, y_train, epochs=50, validation_data=(x_val, y_val), callbacks=callbacks)
    return history

In [None]:
def quantize_model(model, model_name):
    converter = tf.lite.TFLiteConverter.from_keras_model(model)
    converter.optimizations = [tf.lite.Optimize.DEFAULT]
    converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS, tf.lite.OpsSet.SELECT_TF_OPS]
    quantized_tflite_model = converter.convert()
    quantized_model_path = f"{model_name}.tflite"
    with open(quantized_model_path, "wb") as f:
        f.write(quantized_tflite_model)
    model_size = os.path.getsize(quantized_model_path) / 1024  # Size in KB
    return quantized_model_path, model_size

# Function to evaluate the TensorFlow Lite model
def evaluate_tflite_model(tflite_model_path, x_test, y_test):
    interpreter = tf.lite.Interpreter(model_path=tflite_model_path)
    interpreter.allocate_tensors()
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()
    predictions = []
    for i in range(len(x_test)):
        input_data = np.expand_dims(x_test[i], axis=0).astype(np.float32)
        interpreter.set_tensor(input_details[0]['index'], input_data)
        interpreter.invoke()
        output_data = interpreter.get_tensor(output_details[0]['index'])
        predictions.append(output_data)
    predictions = np.array(predictions).squeeze()
    accuracy = np.mean(np.argmax(predictions, axis=1) == np.argmax(y_test, axis=1))
    return accuracy

In [None]:
# Function to plot comparison of model sizes and accuracies
def plot_comparison(teacher_size, pruned_size, clustered_size,
                    teacher_train_acc, teacher_val_acc, pruned_train_acc, pruned_val_acc,
                    clustered_train_acc, clustered_val_acc,
                    quantized_teacher_accuracy, quantized_pruned_accuracy, quantized_clustered_accuracy):
    # Data preparation
    sizes = [teacher_size, pruned_size, clustered_size]
    train_accuracies = [teacher_train_acc, pruned_train_acc, clustered_train_acc]
    val_accuracies = [teacher_val_acc, pruned_val_acc, clustered_val_acc]
    quantized_accuracies = [quantized_teacher_accuracy, quantized_pruned_accuracy, quantized_clustered_accuracy]
    models = ['Teacher', 'Pruned Teacher', 'Clustered Teacher']

    # Plot sizes
    plt.figure(figsize=(14, 6))

    plt.subplot(1, 3, 1)
    sns.barplot(x=models, y=sizes)
    plt.title('Model Sizes (KB)')
    plt.ylabel('Size (KB)')

    # Plot training accuracies
    plt.subplot(1, 3, 2)
    sns.barplot(x=models, y=train_accuracies)
    plt.title('Training Accuracies')
    plt.ylabel('Accuracy')

    # Plot validation accuracies
    plt.subplot(1, 3, 3)
    sns.barplot(x=models, y=val_accuracies)
    plt.title('Validation Accuracies')
    plt.ylabel('Accuracy')

    plt.tight_layout()
    plt.show()

    # Plot quantized models' accuracies
    fig = px.bar(x=models, y=quantized_accuracies, title="Quantized Models' Test Accuracy", labels={'x':'Models', 'y':'Accuracy'})
    fig.show()

In [None]:
def summarize_results():
    teacher_size = get_model_size("teacher_model")
    pruned_size = get_model_size("pruned_teacher_model")
    clustered_size = get_model_size("clustered_teacher_model")

    teacher_train_acc, teacher_val_acc = get_model_accuracy(teacher_history)
    pruned_train_acc, pruned_val_acc = get_model_accuracy(pruned_history)
    clustered_train_acc, clustered_val_acc = get_model_accuracy(clustered_history)

    summary = f"""
    Model Comparison Summary:

    1. **Teacher Model:**
       - Size: {teacher_size:.2f} KB
       - Training Accuracy: {teacher_train_acc:.4f}
       - Validation Accuracy: {teacher_val_acc:.4f}

    2. **Pruned Teacher Model:**
       - Size: {pruned_size:.2f} KB
       - Training Accuracy: {pruned_train_acc:.4f}
       - Validation Accuracy: {pruned_val_acc:.4f}

    3. **Clustered Teacher Model:**
       - Size: {clustered_size:.2f} KB
       - Training Accuracy: {clustered_train_acc:.4f}
       - Validation Accuracy: {clustered_val_acc:.4f}

    4. **Quantized Models Test Accuracy:**
       - Teacher Model: {quantized_teacher_accuracy:.4f}
       - Pruned Teacher Model: {quantized_pruned_accuracy:.4f}
       - Clustered Teacher Model: {quantized_clustered_accuracy:.4f}

    Aim of the Project:
    The aim of this project was to explore and compare various model optimization techniques, specifically pruning and clustering, and to evaluate their impact on model size and performance. Pruning and clustering were applied to a base LSTM model, and the optimized models were further quantized to assess their effectiveness in reducing model size while maintaining accuracy.

    Comparative Explanation:
    - The largest model is the **Teacher Model** with a size of {teacher_size:.2f} KB, achieving a validation accuracy of {teacher_val_acc:.4f}.
    - The **Pruned Teacher Model** reduced the size to {pruned_size:.2f} KB with a slight drop in validation accuracy to {pruned_val_acc:.4f}.
    - The **Clustered Teacher Model** also reduced the size to {clustered_size:.2f} KB, maintaining a validation accuracy of {clustered_val_acc:.4f}.
    - After quantization, the **Clustered Teacher Model** achieved the highest test accuracy of {quantized_clustered_accuracy:.4f}, showing the effectiveness of clustering and quantization in maintaining performance while reducing model size.
    """
    print(summary)

    plot_comparison(teacher_size, pruned_size, clustered_size,
                    teacher_train_acc, teacher_val_acc, pruned_train_acc, pruned_val_acc,
                    clustered_train_acc, clustered_val_acc,
                    quantized_teacher_accuracy, quantized_pruned_accuracy, quantized_clustered_accuracy)


In [None]:
# Train the teacher model
teacher_model = create_model(input_dim=10000, output_dim=32, input_length=66, num_classes=num_classes)
teacher_history = train_model(teacher_model, x_train, y_train, x_val, y_val)
teacher_model.save("teacher_model.keras")

# Define the pruning schedule
pruning_schedule = tfmot.sparsity.keras.PolynomialDecay(
    initial_sparsity=0.0,
    final_sparsity=0.5,
    begin_step=2000,
    end_step=10000
)

# Train the pruned model
pruned_model = create_model(input_dim=10000, output_dim=32, input_length=66, num_classes=num_classes, use_pruning=True, pruning_schedule=pruning_schedule)
pruned_history = train_model(pruned_model, x_train, y_train, x_val, y_val, use_pruning=True)

# Strip pruning and save the final pruned model
final_pruned_model = tfmot.sparsity.keras.strip_pruning(pruned_model)
final_pruned_model.save("pruned_teacher_model.keras")



In [None]:
# Quantize the teacher model
quantized_teacher_model_path, quantized_teacher_size = quantize_model(teacher_model, "quantized_teacher_model")
quantized_teacher_accuracy = evaluate_tflite_model(quantized_teacher_model_path, x_test, y_test)

# Quantize the pruned model
quantized_pruned_model_path, quantized_pruned_size = quantize_model(final_pruned_model, "quantized_pruned_teacher_model")
quantized_pruned_accuracy = evaluate_tflite_model(quantized_pruned_model_path, x_test, y_test)



In [None]:
# Train the clustered model
clustered_model = create_model(input_dim=10000, output_dim=32, input_length=66, num_classes=num_classes, use_clustering=True)
clustered_history = train_model(clustered_model, x_train, y_train, x_val, y_val, use_clustering=True)

# Strip clustering and save the final clustered model
final_clustered_model = tfmot.clustering.keras.strip_clustering(clustered_model)
final_clustered_model.save("clustered_teacher_model.keras")

# Quantize the clustered model
quantized_clustered_model_path, quantized_clustered_size = quantize_model(final_clustered_model, "quantized_clustered_teacher_model")
quantized_clustered_accuracy = evaluate_tflite_model(quantized_clustered_model_path, x_test, y_test)

# Summarize results and plot comparison
summarize_results()