# EE P 596 - TinyML - Assignment 3
### Due: 11:59 pm (PST) on June 5 (Wed), 2024 via Canvas

In [None]:
import numpy as np
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from tensorflow import keras
from sklearn.metrics import accuracy_score, mean_squared_error, classification_report

## Q1: Load the Dataset and Train Test Split (10 points)

In [None]:
# Task 1: Load the dataset "IMDB_Dataset.csv" to a dataframe named "df"
df = pd.read_csv('IMDB_Dataset.csv')
df. head()

In [None]:
# Task 2: Get features X from column "review" and get labels y from column "sentiment"
# convert the labels from text to integers such that positive = 1 and negative = 0

X = df['review']
y = df['sentiment'].apply(lambda x: 1 if x == 'positive' else 0)

# Split the dataset
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

## Q2: Data Pre-Processing (10 points)

In [None]:
# Task 1:
# (a) Use TfidfVectorizer from sklearn.feature_extraction.text and initialize it ONLY using 
#     max_features = 100 and n_gram_range = (1,1) <---- 1-gram features
# (b) Use the initialized TfidfVectorizer in tfidf to obtain TF-IDF features of X_train and 
#     X_test
# Hint: https://scikit-learn.org/stable/modules/generated/sklearn.feature_extraction.text.TfidfVectorizer.html

tfidf = TfidfVectorizer(max_features=100, ngram_range=(1, 1))
X_train_tfidf = tfidf.fit_transform(X_train)
X_test_tfidf = tfidf.transform(X_test)

In [None]:
# Task 2:
# (a) Use TfidfVectorizer from sklearn.feature_extraction.text and initialize it ONLY using 
#     max_features = 100 and n_gram_range = (2,2) <----- 2-gram features
# (b) Use the initialized TfidfVectorizer in tfidf to obtain TF-IDF features of X_train and 
#     X_test

tfidf_bigrams = TfidfVectorizer(max_features=100, ngram_range=(2, 2))
X_train_tfidf_bigrams = tfidf_bigrams.fit_transform(X_train)
X_test_tfidf_bigrams = tfidf_bigrams.transform(X_test)

## Q3: Create a Keras model equivalent for Logistic Regression (5 points)

In [None]:
# Task: Define the Keras model equivalent for Logistic Regression
# i.e, add ONE dense layer with 1 output neuron, sigmoid activation, and use input_dim to 
#      set input dimentions 
def create_keras_model(input_dim):
    model = keras.Sequential([
        keras.layers.Dense(1, activation='sigmoid', input_dim=input_dim),
    ])
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    return model

## Q4: Train Two models (10 points)

In [None]:
# Task 1: Train the Keras model 1 on 1-gram TF-IDF data
#         Use 10 epochs, 32 batch size, 0.2 validation split
model1 = create_keras_model(X_train_tfidf.shape[1])
model1.fit(X_train_tfidf.toarray(), y_train, epochs=10, batch_size=32, validation_split=0.2)

In [None]:
# Task 2: Train the Keras model 2 on 2-gram TF-IDF data
#         Use 10 epochs, 32 batch size, 0.2 validation split
model2 = create_keras_model(X_train_tfidf_bigrams.shape[1])
model2.fit(X_train_tfidf_bigrams.toarray(), y_train, epochs=10, batch_size=32, validation_split=0.2)

## Q5: Quantize the model 1 and model 2 using Post Training Integer Quantization  (10 points)

In [None]:
import tensorflow as tf

# Task 1: Quantize model 1 using Post Training Integer Quantization

# Convert the Keras model1 to TensorFlow Lite format with integer quantization
converter = tf.lite.TFLiteConverter.from_keras_model(model1)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8

def representative_data_gen():
    for input_value in X_train_tfidf.toarray().astype(np.float32):
        yield [input_value]

converter.representative_dataset = representative_data_gen
tflite_model1 = converter.convert()

# Save the quantized model
with open('model1_quantized.tflite', 'wb') as f:
    f.write(tflite_model1)

In [None]:
# Task 2: Quantize model 1 using Post Training Integer Quantization

# Convert the Keras model to TensorFlow Lite format with integer quantization
converter = tf.lite.TFLiteConverter.from_keras_model(model2)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8

def representative_data_gen():
    for input_value in X_train_tfidf_bigrams.toarray().astype(np.float32):
        yield [input_value]

converter.representative_dataset = representative_data_gen
tflite_model2 = converter.convert()

# Save the quantized model
with open('model2_quantized.tflite', 'wb') as f:
    f.write(tflite_model2)


## Q6: Define a thrid model to aggregate the outputs from first two models and train it usinf Quantization-Aware Integer Quantization  (15 points)

In [None]:
import tensorflow_model_optimization as tfmot

# Define the Keras model equivalent for Logistic Regression for QAT
def create_keras_model_QAT(input_dim):
    model = keras.Sequential([
        tfmot.quantization.keras.quantize_annotate_layer(keras.layers.Dense(1, activation='sigmoid', input_dim=input_dim),
    ])
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    return model

# Model 3: Combined Sentiment Analysis using another DNN
model1_pred = model1.predict(X_train_tfidf.toarray())
model2_pred = model2.predict(X_train_tfidf_bigrams.toarray())

X_train_combined = np.column_stack((model1_pred, model2_pred))

model3 = create_keras_model_QAT(X_train_combined.shape[1])

In [None]:
# Apply MinMaxScaler
scaler = MinMaxScaler()
X_train_combined_scaled = scaler.fit_transform(X_train_combined)

# Apply quantization-aware training
quantize_model = tfmot.quantization.keras.quantize_apply(model3)

quantize_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Train the quantization-aware model
quantize_model.fit(X_train_combined_scaled, y_train, epochs=10, batch_size=32, validation_split=0.2)

# Convert the trained model to TensorFlow Lite format with QAT
converter = tf.lite.TFLiteConverter.from_keras_model(quantize_model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8

def representative_data_gen():
    for input_value in X_train_combined_scaled.astype(np.float32):
        yield [input_value]

converter.representative_dataset = representative_data_gen
tflite_model3_qat = converter.convert()

# Save the quantized model
with open('model3_qat_quantized.tflite', 'wb') as f:
    f.write(tflite_model3_qat)

## Q7: Evaluate the three Quntized Models (30 points)

In [None]:
# Load the quantized model1
interpreter_model1 = tf.lite.Interpreter(model_path='model1_quantized.tflite')
interpreter_model1.allocate_tensors()

# Get input and output details
input_details_model1 = interpreter_model1.get_input_details()
output_details_model1 = interpreter_model1.get_output_details()

# Scale inputs to int8 range
def scale_to_int8(input_data, input_scale, input_zero_point):
    return (input_data / input_scale + input_zero_point).astype(np.int8)

# Evaluate Model 1
def evaluate_model1():
    y_pred1 = []
    input_scale, input_zero_point = input_details_model1[0]['quantization']
    for i in range(len(X_test_tfidf.toarray())):
        input_data = np.expand_dims(X_test_tfidf.toarray()[i].astype(np.float32), axis=0)
        input_data_int8 = scale_to_int8(input_data, input_scale, input_zero_point)
        interpreter_model1.set_tensor(input_details_model1[0]['index'], input_data_int8)
        interpreter_model1.invoke()
        output = interpreter_model1.get_tensor(output_details_model1[0]['index'])
        y_pred1.append((output[0] > 0.5).astype(int))
    return y_pred1

y_pred1_quantized = evaluate_model1()
print("Model 1 - Quantized Sentiment Analysis")
print("Accuracy:", accuracy_score(y_test, y_pred1_quantized))
print(classification_report(y_test, y_pred1_quantized))


In [None]:
# Load the quantized model2
interpreter_model2 = tf.lite.Interpreter(model_path='model2_quantized.tflite')
interpreter_model2.allocate_tensors()

# Get input and output details
input_details_model2 = interpreter_model2.get_input_details()
output_details_model2 = interpreter_model2.get_output_details()

# Evaluate Model 2
def evaluate_model2():
    y_pred2 = []
    input_scale, input_zero_point = input_details_model2[0]['quantization']
    for i in range(len(X_test_tfidf_bigrams.toarray())):
        input_data = np.expand_dims(X_test_tfidf_bigrams.toarray()[i].astype(np.float32), axis=0)
        input_data_int8 = scale_to_int8(input_data, input_scale, input_zero_point)
        interpreter_model2.set_tensor(input_details_model2[0]['index'], input_data_int8)
        interpreter_model2.invoke()
        output = interpreter_model2.get_tensor(output_details_model2[0]['index'])
        y_pred2.append((output[0] > 0.5).astype(int))
    return y_pred2

y_pred2_quantized = evaluate_model2()
print("\nModel 2 - Quantized Sentiment Analysis")
print("Accuracy:", accuracy_score(y_test, y_pred2_quantized))
print(classification_report(y_test, y_pred2_quantized))


In [None]:
# Load the quantized model3 (QAT)
interpreter_model3 = tf.lite.Interpreter(model_path='model3_qat_quantized.tflite')
interpreter_model3.allocate_tensors()

# Get input and output details
input_details_model3 = interpreter_model3.get_input_details()
output_details_model3 = interpreter_model3.get_output_details()

# Prepare test data
X_test_tfidf_dense = X_test_tfidf.toarray()
X_test_tfidf_bigrams_dense = X_test_tfidf_bigrams.toarray()

model1_probs_test = model1.predict(X_test_tfidf_dense)
model2_probs_test = model2.predict(X_test_tfidf_bigrams_dense)

X_test_combined = np.column_stack((model1_probs_test, model2_probs_test))
X_test_combined_scaled = scaler.transform(X_test_combined)

# Evaluate Model 3
def evaluate_model3():
    y_pred3 = []
    input_scale, input_zero_point = input_details_model3[0]['quantization']
    for i in range(len(X_test_combined_scaled)):
        input_data = np.expand_dims(X_test_combined_scaled[i].astype(np.float32), axis=0)
        input_data_int8 = scale_to_int8(input_data, input_scale, input_zero_point)
        interpreter_model3.set_tensor(input_details_model3[0]['index'], input_data_int8)
        interpreter_model3.invoke()
        output = interpreter_model3.get_tensor(output_details_model3[0]['index'])
        y_pred3.append((output[0] > 0.5).astype(int))
    return y_pred3

y_pred3_quantized = evaluate_model3()
print("\nModel 3 (QAT) - Quantized Sentiment Analysis")
print("Accuracy:", accuracy_score(y_test, y_pred3_quantized))
print(classification_report(y_test, y_pred3_quantized))

## Q7: Breifly discuss the observed Results and how would you implement this on Arduino Nano BLE (10 points)