In [1]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM, GRU, Dropout, BatchNormalization, Conv1D, MaxPooling1D, Flatten, Input, Reshape, Conv2D, ReLU, MaxPool2D, Masking
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from tensorflow.keras.preprocessing.sequence import pad_sequences
import ast

# Load and preprocess the data
# Replace 'train.csv' with the actual path to your dataset
data = pd.read_csv('dataset/train.csv', header = None, converters = {
    3: ast.literal_eval,
    4: ast.literal_eval,
    5: ast.literal_eval
}, skiprows = 1)

df = pd.DataFrame()

df['acc_x'] = data[3]
df['acc_y'] = data[4]
df['acc_z'] = data[5]
df['gesture'] = data[2]

#remove invalid rows
df.drop(df.loc[df['acc_x']==0].index, inplace=True)
df.drop(df.loc[df['acc_y']==0].index, inplace=True)
df.drop(df.loc[df['acc_z']==0].index, inplace=True)

df = df.dropna()

# Convert the lists into arrays
acc_x = df['acc_x'].values
acc_y = df['acc_y'].values
acc_z = df['acc_z'].values

# Combine all axes into a sequence of shape (timesteps, features)
sequences = [np.array([x, y, z]).T for x, y, z in zip(acc_x, acc_y, acc_z)]

# Pad sequences to the length of the longest sequence
padded_sequences = pad_sequences(sequences, maxlen = 32, padding='post', dtype='float32')

# Encode labels
labels = df['gesture'].values
label_encoder = LabelEncoder()
encoded_labels = label_encoder.fit_transform(labels)
categorical_labels = to_categorical(encoded_labels)


2025-01-04 15:37:20.110588: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-01-04 15:37:20.132109: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2025-01-04 15:37:20.132130: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2025-01-04 15:37:20.132704: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-01-04 15:37:20.136649: I tensorflow/core/platform/cpu_feature_guar

In [2]:
#DATA AUGMENTATION preprocessing

#adding noise
def add_noise(data, noise_level=0.05):
    return (data + np.random.normal(0, noise_level, data.shape)).astype(np.float32)

# Original data: `x_train` (accelerometer sequences), `y_train` (labels)

padded_sequences = np.concatenate((padded_sequences, add_noise(padded_sequences)))
categorical_labels = np.concatenate((categorical_labels, categorical_labels))




def scale_data(data, scaling_factor=0.1):
    """
    Scale the data by a random factor.
    Args:
        data: Numpy array of shape (time_steps, 3).
        scaling_factor: Max scaling factor variation.
    Returns:
        Scaled data.
    """
    factor = 1 + np.random.uniform(-scaling_factor, scaling_factor)
    return (data * factor).astype(np.float32)

padded_sequences = np.concatenate((padded_sequences, scale_data(padded_sequences)))
categorical_labels = np.concatenate((categorical_labels, categorical_labels))





In [3]:
# Split the data
X_train, X_validation, y_train, y_validation = train_test_split(
    padded_sequences, categorical_labels, test_size=0.2, random_state=42
)

X_train, X_test, y_train, y_test = train_test_split(
    X_train, y_train, test_size=0.25, random_state=42)


In [4]:
import optuna
import tensorflow as tf

def objective(trial):
    # Define hyperparameters to be tuned
    filters1 = trial.suggest_int('filters1', 1, 32) # Example range
    filters2 = trial.suggest_int('filters2', 1, 32)
    lstm_units = trial.suggest_int('lstm_units', 1, 32)
    dropout1 = trial.suggest_float('dropout1', 0.0, 1.0)
    dropout2 = trial.suggest_float('dropout2', 0.0, 1.0)
    #dense_units = trial.suggest_int('dense_units', 1, 16)
    learning_rate = trial.suggest_float('learning_rate', 1e-5, 1e-3, log=True) # Log scale for LR

    input_shape = (32, 3) # Your input shape
    model = tf.keras.Sequential()
    model.add(tf.keras.layers.Conv1D(filters=filters1, kernel_size=3, strides=1, activation='relu', input_shape=input_shape))
    model.add(tf.keras.layers.Conv1D(filters=filters2, kernel_size=3, strides=1, activation='relu'))
    model.add(tf.keras.layers.Dropout(dropout1))
    model.add(tf.keras.layers.MaxPooling1D(pool_size=2))
    model.add(tf.keras.layers.LSTM(lstm_units))
    model.add(tf.keras.layers.Dropout(dropout2))
    #model.add(tf.keras.layers.Dense(dense_units, activation='relu'))
    model.add(tf.keras.layers.Dense(len(label_encoder.classes_), activation="softmax"))

    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
    model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

    # Train the model (using a validation set is crucial)
    model.fit(X_train, y_train, epochs=50, validation_data=(X_validation, y_validation), verbose=0) # reduced epochs for faster trials
    _, val_accuracy = model.evaluate(X_validation, y_validation, verbose=0) # Evaluate on validation set

    return val_accuracy # Return the metric to be maximized

In [5]:
study = optuna.create_study(direction='maximize')  # 'maximize' for accuracy, 'minimize' for loss
study.optimize(objective, n_trials=100)  # Number of trials (hyperparameter combinations)

[I 2025-01-04 15:37:34,347] A new study created in memory with name: no-name-f1321cdf-476e-4756-9742-4cc9a522504e
[W 2025-01-04 15:37:41,716] Trial 0 failed with parameters: {'filters1': 3, 'filters2': 5, 'lstm_units': 3, 'dropout1': 0.0964506018800988, 'dropout2': 0.2664138192241987, 'learning_rate': 4.389093796193573e-05} because of the following error: KeyboardInterrupt().
Traceback (most recent call last):
  File "/home/amroset/anaconda3/envs/microcontrollers/lib/python3.10/site-packages/optuna/study/_optimize.py", line 197, in _run_trial
    value_or_values = func(trial)
  File "/tmp/ipykernel_51112/2312999532.py", line 29, in objective
    model.fit(X_train, y_train, epochs=50, validation_data=(X_validation, y_validation), verbose=0) # reduced epochs for faster trials
  File "/home/amroset/anaconda3/envs/microcontrollers/lib/python3.10/site-packages/keras/src/utils/traceback_utils.py", line 65, in error_handler
    return fn(*args, **kwargs)
  File "/home/amroset/anaconda3/envs/m

KeyboardInterrupt: 

In [5]:
print("Number of finished trials: ", len(study.trials))
print("Best trial:")
trial = study.best_trial
print(" Value: ", trial.value)
print(" Params: ")
for key, value in trial.params.items():
    print(f"    {key}: {value}")

NameError: name 'study' is not defined

In [6]:
def create_hybrid_model(input_shape):
    """Creates an hybrid model based on the provided specifications.

    Args:
        input_shape: Tuple representing the input shape (timesteps, features).

    Returns:
        A Keras Model instance.
    """
    model = tf.keras.Sequential()

    # Convolutional Layers
    model.add(tf.keras.layers.Conv1D(filters=32, kernel_size=3, strides=1, activation='relu', input_shape=input_shape))
    model.add(tf.keras.layers.Conv1D(filters=16, kernel_size=3, strides=1, activation='relu'))

    # Dropout and MaxPooling
    model.add(tf.keras.layers.Dropout(0.086))
    model.add(tf.keras.layers.MaxPooling1D(pool_size=2))

    # LSTM Layer
    model.add(tf.keras.layers.LSTM(32))  # LSTM neurons
    model.add(tf.keras.layers.Dropout(0.2))
    
    # Dropout and Dense Layer
    #model.add(tf.keras.layers.Dropout(0.2))
    #model.add(tf.keras.layers.Dense(20, activation='relu')) # You might need to change the activation depending on your task.

    # Output Layer (Add this based on your problem)
    # Example for classification with 'num_classes' output classes:
    model.add(tf.keras.layers.Dense(20, activation='softmax'))
    # Example for regression with a single output value:
    # model.add(tf.keras.layers.Dense(1))

    return model

In [7]:
# Example usage:
input_shape = (32, 3)  # Example: 32 timesteps, 3 features. Replace with your actual shape.
model = create_hybrid_model(input_shape)

# Compile the model
optimizer = tf.keras.optimizers.Adam(learning_rate=0.0007)
model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy']) # Replace loss if needed

# Print model summary
model.summary()

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv1d_2 (Conv1D)           (None, 30, 32)            320       
                                                                 
 conv1d_3 (Conv1D)           (None, 28, 16)            1552      
                                                                 
 dropout_2 (Dropout)         (None, 28, 16)            0         
                                                                 
 max_pooling1d_1 (MaxPoolin  (None, 14, 16)            0         
 g1D)                                                            
                                                                 
 lstm_1 (LSTM)               (None, 32)                6272      
                                                                 
 dropout_3 (Dropout)         (None, 32)                0         
                                                      

In [8]:
# Train the model
history = model.fit(
    X_train, y_train,
    validation_data=(X_validation, y_validation),
    epochs=50,
    batch_size=32,
    verbose=1
)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


In [9]:
# Evaluate the model
test_loss, test_accuracy = model.evaluate(X_test, y_test)
print(f"Test Loss: {test_loss}, Test Accuracy: {test_accuracy}")

# Save the model
model.save('models/hybrid_gesture_classification_model.h5')

# Decode predicted labels for interpretability
predicted_classes = label_encoder.inverse_transform(np.argmax(model.predict(X_test), axis=1))


Test Loss: 0.009661395102739334, Test Accuracy: 0.9978070259094238


  saving_api.save_model(




In [19]:
#Inference
from tensorflow.keras.models import load_model

# Load the trained model
model = load_model('models/hybrid_gesture_classification_model.h5')
model.summary()

print(padded_sequences)

# Load the test data
test_data = pd.read_csv('dataset/test.csv', header=None, converters={
    2: ast.literal_eval,
    3: ast.literal_eval,
    4: ast.literal_eval
}, skiprows=1)

# Preprocess the test data
df_test = pd.DataFrame()
df_test['acc_x'] = test_data[2]
df_test['acc_y'] = test_data[3]
df_test['acc_z'] = test_data[4]

# Ensure all data is consistent (dropping invalid or zero entries)
df_test.drop(df_test.loc[df_test['acc_x'] == 0].index, inplace=True)
df_test.drop(df_test.loc[df_test['acc_y'] == 0].index, inplace=True)
df_test.drop(df_test.loc[df_test['acc_z'] == 0].index, inplace=True)

df_test = df_test.dropna()

# Extract accelerometer data (acc_x, acc_y, acc_z) for the test set
acc_x = df_test['acc_x'].values
acc_y = df_test['acc_y'].values
acc_z = df_test['acc_z'].values

test_sequences = [np.array([x, y, z]).T for x, y, z in zip(acc_x, acc_y, acc_z)]

# Pad sequences to the length of the longest sequence
padded_test_sequences = pad_sequences(test_sequences, padding='post', dtype='float32')

# Make predictions
predictions = model.predict(padded_test_sequences)

# Decode predictions to gesture classes
predicted_classes = np.argmax(predictions, axis=1)

# If you used a LabelEncoder for training, decode the classes
# Replace 'label_encoder' with your encoder used during training
gesture_labels = label_encoder.inverse_transform(predicted_classes)

df_out = pd.DataFrame()
df_out['id'] = test_data[0]
df_out['gesture'] = gesture_labels


# Save the results
df_out.to_csv('cnn_test_predictions.csv', index=False)

print("Predictions saved to test_predictions.csv")

Model: "sequential_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv1d_4 (Conv1D)           (None, 30, 32)            320       
                                                                 
 conv1d_5 (Conv1D)           (None, 28, 16)            1552      
                                                                 
 dropout_4 (Dropout)         (None, 28, 16)            0         
                                                                 
 max_pooling1d_2 (MaxPoolin  (None, 14, 16)            0         
 g1D)                                                            
                                                                 
 lstm_2 (LSTM)               (None, 32)                6272      
                                                                 
 dropout_5 (Dropout)         (None, 32)                0         
                                                      

In [22]:
def representative_data_gen():
  for input_value in tf.data.Dataset.from_tensor_slices(padded_sequences).batch(1).take(32):
    yield [input_value]
        
# Ensure input shape is fixed
model.build(input_shape=(None, 32, 3))  # Example: Fixed length 32 timesteps, 3 features

converter = tf.lite.TFLiteConverter.from_keras_model(model)
# Apply integer quantization
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_data_gen
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]  # Ensure compatibility
converter.inference_input_type = tf.uint8  # Optional: Set input type
converter.inference_output_type = tf.uint8  # Optional: Set output type

tflite_model = converter.convert()

# Save the model
with open('cnn_gesture_classification_model_integer.tflite', 'wb') as f:
    f.write(tflite_model)

print("Model with full integer quantization saved as gesture_classification_model_integer.tflite")

<_TakeDataset element_spec=TensorSpec(shape=(None, 32, 3), dtype=tf.float32, name=None)>
INFO:tensorflow:Assets written to: /tmp/tmpt4y6dw0x/assets


INFO:tensorflow:Assets written to: /tmp/tmpt4y6dw0x/assets
2024-12-31 09:46:24.165038: W tensorflow/compiler/mlir/lite/python/tf_tfl_flatbuffer_helpers.cc:378] Ignored output_format.
2024-12-31 09:46:24.165065: W tensorflow/compiler/mlir/lite/python/tf_tfl_flatbuffer_helpers.cc:381] Ignored drop_control_dependency.
2024-12-31 09:46:24.165191: I tensorflow/cc/saved_model/reader.cc:83] Reading SavedModel from: /tmp/tmpt4y6dw0x
2024-12-31 09:46:24.170301: I tensorflow/cc/saved_model/reader.cc:51] Reading meta graph with tags { serve }
2024-12-31 09:46:24.170325: I tensorflow/cc/saved_model/reader.cc:146] Reading SavedModel debug info (if present) from: /tmp/tmpt4y6dw0x
2024-12-31 09:46:24.183523: I tensorflow/cc/saved_model/loader.cc:233] Restoring SavedModel bundle.
2024-12-31 09:46:24.230063: I tensorflow/cc/saved_model/loader.cc:217] Running initialization op on SavedModel bundle at path: /tmp/tmpt4y6dw0x
2024-12-31 09:46:24.253775: I tensorflow/cc/saved_model/loader.cc:316] SavedModel

ConverterError: /home/amroset/anaconda3/envs/microcontrollers/lib/python3.10/runpy.py:196:1: error: 'tf.TensorListReserve' op requires element_shape to be static during TF Lite transformation pass
    return _run_code(code, main_globals, None,
^
<unknown>:0: note: loc(fused["StatefulPartitionedCall:", "StatefulPartitionedCall"]): called from
/home/amroset/anaconda3/envs/microcontrollers/lib/python3.10/runpy.py:196:1: error: failed to legalize operation 'tf.TensorListReserve' that was explicitly marked illegal
    return _run_code(code, main_globals, None,
^
<unknown>:0: note: loc(fused["StatefulPartitionedCall:", "StatefulPartitionedCall"]): called from
<unknown>:0: error: Lowering tensor list ops is failed. Please consider using Select TF ops and disabling `_experimental_lower_tensor_list_ops` flag in the TFLite converter object. For example, converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS, tf.lite.OpsSet.SELECT_TF_OPS]\n converter._experimental_lower_tensor_list_ops = False


In [8]:
import os

# Convert the model to TFLite without quantization
converter = tf.lite.TFLiteConverter.from_keras_model(model)
fp_tflite_model = converter.convert()

# Save the model to disk
open("cnn_model_f32.tflite", "wb").write(fp_tflite_model)

# Show the model size for the non-quantized HDF5 model
fp_h5_in_kb = os.path.getsize('models/gesture_classification_model.h5') / 1024
print("HDF5 Model size without quantization: %d KB" % fp_h5_in_kb)

# Show the model size for the non-quantized TFLite model
fp_tflite_in_kb = os.path.getsize('cnn_model_f32.tflite') / 1024
print("TFLite Model size without quantization: %d KB" % fp_tflite_in_kb)

# Determine the reduction in model size
print("\nReduction in file size by a factor of %f" % (fp_h5_in_kb / fp_tflite_in_kb))

INFO:tensorflow:Assets written to: /tmp/tmpkqzhqnjh/assets


INFO:tensorflow:Assets written to: /tmp/tmpkqzhqnjh/assets


HDF5 Model size without quantization: 442 KB
TFLite Model size without quantization: 35 KB

Reduction in file size by a factor of 12.375984


2024-12-30 00:37:52.289228: W tensorflow/compiler/mlir/lite/python/tf_tfl_flatbuffer_helpers.cc:378] Ignored output_format.
2024-12-30 00:37:52.289244: W tensorflow/compiler/mlir/lite/python/tf_tfl_flatbuffer_helpers.cc:381] Ignored drop_control_dependency.
2024-12-30 00:37:52.289344: I tensorflow/cc/saved_model/reader.cc:83] Reading SavedModel from: /tmp/tmpkqzhqnjh
2024-12-30 00:37:52.290940: I tensorflow/cc/saved_model/reader.cc:51] Reading meta graph with tags { serve }
2024-12-30 00:37:52.290955: I tensorflow/cc/saved_model/reader.cc:146] Reading SavedModel debug info (if present) from: /tmp/tmpkqzhqnjh
2024-12-30 00:37:52.295056: I tensorflow/cc/saved_model/loader.cc:233] Restoring SavedModel bundle.
2024-12-30 00:37:52.340014: I tensorflow/cc/saved_model/loader.cc:217] Running initialization op on SavedModel bundle at path: /tmp/tmpkqzhqnjh
2024-12-30 00:37:52.352383: I tensorflow/cc/saved_model/loader.cc:316] SavedModel load for tags { serve }; Status: success: OK. Took 63038 m

In [9]:
interpreter = tf.lite.Interpreter(model_content=tflite_model)
interpreter.allocate_tensors()

input_type = interpreter.get_input_details()[0]['dtype']
print('input: ', input_type)
output_type = interpreter.get_output_details()[0]['dtype']
print('output: ', output_type)

# Show the model size for the 8-bit quantized TFLite model
tflite_quant_in_kb = os.path.getsize('models/cnn_gesture_classification_model_integer.tflite') / 1024
print("TFLite Model size with 8-bit quantization: %d KB" % tflite_quant_in_kb)


input:  <class 'numpy.uint8'>
output:  <class 'numpy.uint8'>
TFLite Model size with 8-bit quantization: 14 KB


INFO: Created TensorFlow Lite XNNPACK delegate for CPU.


In [10]:
# Helper function to run inference on a TFLite model

test_sequence_indices = range(X_test.shape[0])

print(y_test)

def run_tflite_model(tflite_file, test_image_indices):
  global X_test

  # Initialize the interpreter
  interpreter = tf.lite.Interpreter(model_path=str(tflite_file))
  interpreter.allocate_tensors()

  input_details = interpreter.get_input_details()[0]
  output_details = interpreter.get_output_details()[0]

  predictions = []
  for i, test_sequence_index in enumerate(test_sequence_indices):
    test_sequence = X_test[test_sequence_index]
    test_label = y_test[test_sequence_index]

    if (test_sequence_index % 100 == 0):
      print("Evaluated on %d sequences." % test_sequence_index)

    # Check if the input type is quantized, then rescale input data to uint8
    if input_details['dtype'] == np.uint8:
      input_scale, input_zero_point = input_details["quantization"]
      test_sequence = test_sequence / input_scale + input_zero_point

    test_sequence = np.expand_dims(test_sequence, axis=0).astype(input_details["dtype"])
    interpreter.set_tensor(input_details["index"], test_sequence)
    interpreter.invoke()
    output = interpreter.get_tensor(output_details["index"])[0]

    predictions.append(np.argmax(output, axis=-1))

  return predictions


# Helper function to evaluate a TFLite model on all images
def evaluate_model(tflite_file, model_type):
  global X_test
  global y_test

  test_sequence_indices = range(X_train.shape[0])
  predictions = run_tflite_model(tflite_file, test_sequence_indices)
    
  labels = np.argmax(y_test, axis=1)

  accuracy = (np.sum(labels == predictions) * 100) / len(X_test)

  print('%s model accuracy is %.4f%% (Number of test samples=%d)' % (
      model_type, accuracy, len(X_test)))

    


[[0. 0. 1. ... 0. 0. 0.]
 [0. 0. 1. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 ...
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]]


In [11]:
import pathlib
# this might take a few minutes (~ 1- 2 minutes)
# if it takes longer than that, I suggest to 
# restart the runtime and try again
# if the issue still persists, restart your computer
tflite_model_quant_int8_file = pathlib.Path('models/cnn_gesture_classification_model_integer.tflite')
tflite_model_quant_int8_model_type = "Full Post-Quantized INT8"

evaluate_model(tflite_model_quant_int8_file, tflite_model_quant_int8_model_type)

input_details = interpreter.get_input_details()[0]
input_scale, input_zero_point = input_details["quantization"]

print(input_scale)
print(input_zero_point)




Evaluated on 0 sequences.
Evaluated on 100 sequences.
Evaluated on 200 sequences.
Evaluated on 300 sequences.
Evaluated on 400 sequences.
Evaluated on 500 sequences.
Evaluated on 600 sequences.
Evaluated on 700 sequences.
Evaluated on 800 sequences.
Evaluated on 900 sequences.
Evaluated on 1000 sequences.
Evaluated on 1100 sequences.
Evaluated on 1200 sequences.
Evaluated on 1300 sequences.
Evaluated on 1400 sequences.
Evaluated on 1500 sequences.
Evaluated on 1600 sequences.
Evaluated on 1700 sequences.
Evaluated on 1800 sequences.
Evaluated on 1900 sequences.
Evaluated on 2000 sequences.
Evaluated on 2100 sequences.
Evaluated on 2200 sequences.
Full Post-Quantized INT8 model accuracy is 95.9211% (Number of test samples=2280)
0.026128482073545456
119


In [12]:
# we plot also the confusion matrix of the quantized model
tflite_model_quant_int8_pred = run_tflite_model(tflite_model_quant_int8_file, range(y_test.shape[0]))

# compute the accuracy of the quantized model
from sklearn.metrics import accuracy_score

labels = np.argmax(y_test, axis=1)

full_int8_accuracy = accuracy_score(labels, tflite_model_quant_int8_pred)
print("Full-precision model accuracy is %.4f%% (Number of test samples=%d)" % (test_accuracy * 100, len(y_test)))
print("Quantized model accuracy is %.4f%% (Number of test samples=%d)" % (full_int8_accuracy * 100, len(y_test)))

Evaluated on 0 sequences.
Evaluated on 100 sequences.
Evaluated on 200 sequences.
Evaluated on 300 sequences.
Evaluated on 400 sequences.
Evaluated on 500 sequences.
Evaluated on 600 sequences.
Evaluated on 700 sequences.
Evaluated on 800 sequences.
Evaluated on 900 sequences.
Evaluated on 1000 sequences.
Evaluated on 1100 sequences.
Evaluated on 1200 sequences.
Evaluated on 1300 sequences.
Evaluated on 1400 sequences.
Evaluated on 1500 sequences.
Evaluated on 1600 sequences.
Evaluated on 1700 sequences.
Evaluated on 1800 sequences.
Evaluated on 1900 sequences.
Evaluated on 2000 sequences.
Evaluated on 2100 sequences.
Evaluated on 2200 sequences.
Full-precision model accuracy is 97.5000% (Number of test samples=2280)
Quantized model accuracy is 95.9211% (Number of test samples=2280)


In [13]:
# Function: Convert some hex value into an array for C programming
def hex_to_c_array(hex_data, var_name):

    c_str = ''

    # Create header guard
    c_str += '#ifndef ' + var_name.upper() + '_H\n'
    c_str += '#define ' + var_name.upper() + '_H\n\n'

    # Add array length at top of file
    c_str += '\nstatic const unsigned int ' + var_name + '_len = ' + str(len(hex_data)) + ';\n'

    # Declare C variable
    c_str += 'static const unsigned char ' + var_name + '[] = {'
    hex_array = []
    for i, val in enumerate(hex_data) :

        # Construct string from hex
        hex_str = format(val, '#04x')

        # Add formatting so each line stays within 80 characters
        if (i + 1) < len(hex_data):
            hex_str += ','
        if (i + 1) % 12 == 0:
            hex_str += '\n '
        hex_array.append(hex_str)

    # Add closing brace
    c_str += '\n ' + format(' '.join(hex_array)) + '\n};\n\n'

    # Close out header guard
    c_str += '#endif //' + var_name.upper() + '_H'

    return c_str

In [14]:
c_model_name = 'q8_cnn'
# check if dir 'cfiles' exists, if not create it
if not os.path.exists('cfiles'):
    os.makedirs('cfiles')
# Write TFLite model to a C source (or header) file
with open('cfiles/' + c_model_name + '.h', 'w') as file:
    file.write(hex_to_c_array(tflite_model, c_model_name))

In [15]:
input_details = interpreter.get_input_details()[0]
input_scale, input_zero_point = input_details["quantization"]

# save the test data as numpy arrays
np.save('x_test_gestures.npy', (X_test / input_scale + input_zero_point).astype(np.uint8))
np.save('y_test_gestures.npy', (y_test.astype(np.uint8)))

# print the location of the files
print('Test image data location: ', os.path.abspath('x_test_gestures.npy'))
print('Test labels location: ', os.path.abspath('y_test_gestures.npy'))

Test image data location:  /home/amroset/Machine Learning on Microcontrollers/Project/x_test_gestures.npy
Test labels location:  /home/amroset/Machine Learning on Microcontrollers/Project/y_test_gestures.npy


In [16]:
X_test_try = (X_test[14] / input_scale + input_zero_point).astype(np.uint8)

print(input_scale)
print(input_zero_point)
print(X_test[19])
print(np.argmax(y_test[14]))

0.026128482073545456
119
[[-1.2017318   0.16807531  0.15167339]
 [-1.2530056   0.16807531  0.15167339]
 [-1.1504579   0.16807531  0.15167339]
 [-1.1504579   0.20704933  0.28464743]
 [-0.27880186  1.3762691   0.7832997 ]
 [-0.02243226  2.5844631   0.1849169 ]
 [ 0.38775876 -1.6637025  -0.11427449]
 [-1.6631967  -1.8585724  -1.7432058 ]
 [ 0.02884166  0.20704933 -2.2086146 ]
 [ 2.0285232  -0.10474256  2.3789873 ]
 [ 0.38775876 -0.6893526   1.1157348 ]
 [ 0.7979498  -0.6503786  -0.04778747]
 [ 0.95177156 -0.10474256 -0.08103098]
 [ 0.69540197  0.3239714  -0.4467094 ]
 [ 0.6441284  -0.06576855 -0.24724855]
 [ 0.7979498  -0.06576855 -0.31373534]
 [ 0.          0.          0.        ]
 [ 0.          0.          0.        ]
 [ 0.          0.          0.        ]
 [ 0.          0.          0.        ]
 [ 0.          0.          0.        ]
 [ 0.          0.          0.        ]
 [ 0.          0.          0.        ]
 [ 0.          0.          0.        ]
 [ 0.          0.          0.        ]


In [17]:
def write_test_sample_to_header(X_test_try, output_file):
    """
    Extract a test sample from X_test and write it to a data.h file.
    
    Args:
        X_test (np.ndarray): The test dataset.
        sample_index (int): Index of the sample to extract.
        output_file (str): Path to the output .h file.
    """
    
    # Flatten the sample if it is multi-dimensional
    flattened_sample = X_test_try.flatten()
    
    # Generate the C array string
    c_array = ", ".join(map(str, flattened_sample))
    
    # Determine the shape of the original sample
    original_shape = X_test_try.shape
    
    # Write to the header file
    with open(output_file, "w") as file:
        file.write("// Auto-generated header file with test data\n")
        file.write(f"#ifndef DATA_H\n#define DATA_H\n\n")
        file.write(f"#define SAMPLE_SIZE {len(flattened_sample)}\n")
        file.write(f"const float test_sample[SAMPLE_SIZE] = {{ {c_array} }};\n")
        file.write(f"// Original shape: {original_shape}\n")
        file.write("\n#endif // DATA_H\n")

# Example usage
output_file = "data.h"
write_test_sample_to_header(X_test_try, output_file)

In [18]:
from qkeras import *
import gc

def get_cnn_quantized_model(num_classes):

        gc.collect()
        keras.backend.clear_session()
    
    
        #qkeras model

        quantized_model = keras.Sequential([
                        keras.layers.Input((32,3)),
                        keras.layers.Reshape((8,4,3)),
                        #QActivation("quantized_bits(16)"),
                        QConv2D(filters=8, kernel_size = (5,5), padding = "same", kernel_quantizer="quantized_bits(8)"),
                        keras.layers.BatchNormalization(),
                        QActivation("quantized_relu(8)"),
                        QConv2D(filters=8, kernel_size = (5,5), padding = "same", kernel_quantizer="quantized_bits(8)"),
                        keras.layers.BatchNormalization(),
                        QActivation("quantized_relu(8)"),
                        keras.layers.MaxPool2D(pool_size = 2),
                        keras.layers.Flatten(),
                        QDense(32, kernel_quantizer="quantized_bits(8)"),
                        keras.layers.BatchNormalization(),
                        QActivation("quantized_relu(8)"),
                        QDense(num_classes, kernel_quantizer="quantized_bits(8)"),
                        keras.layers.Activation('softmax'),
                        ])

        return quantized_model
    

In [19]:
qmodel = get_cnn_quantized_model(len(label_encoder.classes_))

qmodel.compile(loss="categorical_crossentropy", optimizer='adam', metrics = ['accuracy'])

qmodel.summary()



Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 reshape (Reshape)           (None, 8, 4, 3)           0         
                                                                 
 q_conv2d (QConv2D)          (None, 8, 4, 8)           608       
                                                                 
 batch_normalization (Batch  (None, 8, 4, 8)           32        
 Normalization)                                                  
                                                                 
 q_activation (QActivation)  (None, 8, 4, 8)           0         
                                                                 
 q_conv2d_1 (QConv2D)        (None, 8, 4, 8)           1608      
                                                                 
 batch_normalization_1 (Bat  (None, 8, 4, 8)           32        
 chNormalization)                                       

In [93]:
# Train and evaluate the quantization aware model
es = [
        tf.keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True),
        keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, verbose=1, min_delta=0.0001, mode='auto', cooldown=0, min_lr=0)
                
    ]
qmodel.fit(
                  X_train,
                  y_train,
                  epochs=20,
                  validation_data=(X_validation, y_validation),
                  callbacks=[es]
              )



Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


<keras.src.callbacks.History at 0x723596bff4f0>

In [94]:
# evaluate the model on the test set
quant_loss, quant_acc = qmodel.evaluate(X_test, y_test, verbose=0)
print('Quantization aware training loss: ', quant_loss)
print('Quantization aware training accuracy: ', quant_acc)
qmodel.save('qat_cnn_gesture_classification_model.h5')

Quantization aware training loss:  0.040925752371549606
Quantization aware training accuracy:  0.9929824471473694


  saving_api.save_model(


In [95]:
# convert the QAT model to a fully quantized model using TFLite

def representative_data_gen():
  for input_value in tf.data.Dataset.from_tensor_slices(X_train).batch(1).take(100):
    yield [input_value]

converter = tf.lite.TFLiteConverter.from_keras_model(qmodel)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_data_gen
# Ensure that if any ops can't be quantized, the converter throws an error
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
# Set the input and output tensors to uint8 (APIs added in r2.3)
converter.inference_input_type = tf.uint8
converter.inference_output_type = tf.uint8

tflite_model_quant_int8_qat = converter.convert()

INFO:tensorflow:Assets written to: /tmp/tmp4yqthdn9/assets


INFO:tensorflow:Assets written to: /tmp/tmp4yqthdn9/assets
2024-12-29 19:17:45.985105: W tensorflow/compiler/mlir/lite/python/tf_tfl_flatbuffer_helpers.cc:378] Ignored output_format.
2024-12-29 19:17:45.985124: W tensorflow/compiler/mlir/lite/python/tf_tfl_flatbuffer_helpers.cc:381] Ignored drop_control_dependency.
2024-12-29 19:17:45.985230: I tensorflow/cc/saved_model/reader.cc:83] Reading SavedModel from: /tmp/tmp4yqthdn9
2024-12-29 19:17:45.988489: I tensorflow/cc/saved_model/reader.cc:51] Reading meta graph with tags { serve }
2024-12-29 19:17:45.988500: I tensorflow/cc/saved_model/reader.cc:146] Reading SavedModel debug info (if present) from: /tmp/tmp4yqthdn9
2024-12-29 19:17:45.997999: I tensorflow/cc/saved_model/loader.cc:233] Restoring SavedModel bundle.
2024-12-29 19:17:46.059358: I tensorflow/cc/saved_model/loader.cc:217] Running initialization op on SavedModel bundle at path: /tmp/tmp4yqthdn9
2024-12-29 19:17:46.086235: I tensorflow/cc/saved_model/loader.cc:316] SavedModel

In [96]:
interpreter = tf.lite.Interpreter(model_content=tflite_model_quant_int8_qat)
input_type = interpreter.get_input_details()[0]['dtype']
print('input: ', input_type)
output_type = interpreter.get_output_details()[0]['dtype']
print('output: ', output_type)
# Save the quantized model to disk
open("gesture_qat_int8.tflite", "wb").write(tflite_model_quant_int8_qat)

# Show the model size for the 8-bit quantized TFLite model
tflite_quant_in_kb = os.path.getsize('models/gesture_qat_int8.tflite') / 1024
print("TFLite Model size with 8-bit quantization: %d KB" % tflite_quant_in_kb)

input:  <class 'numpy.uint8'>
output:  <class 'numpy.uint8'>
TFLite Model size with 8-bit quantization: 22 KB


In [97]:
c_model_name = 'qat8_gesture'
# Write TFLite model to a C source (or header) file
with open(c_model_name + '.h', 'w') as file:
    file.write(hex_to_c_array(tflite_model_quant_int8_qat, c_model_name))

In [24]:
# this might take a few minutes (~ 1- 2 minutes)
# if it takes longer than that, I suggest to 
# restart the runtime and try again
# if the issue still persists, restart your computer
tflite_model_quant_int8_qat_file = pathlib.Path('models/no_qkeras_gesture_qat_int8.tflite')
tflite_model_quant_int8_qat_type = "Full QAT INT8"

evaluate_model(tflite_model_quant_int8_qat_file, tflite_model_quant_int8_qat_type)
print("Full-precision model accuracy is %.4f%% (Number of test samples=%d)" % (test_accuracy * 100, len(X_train)))

Evaluated on 0 sequences.
Evaluated on 100 sequences.
Evaluated on 200 sequences.
Evaluated on 300 sequences.
Evaluated on 400 sequences.
Evaluated on 500 sequences.
Evaluated on 600 sequences.
Evaluated on 700 sequences.
Evaluated on 800 sequences.
Evaluated on 900 sequences.
Evaluated on 1000 sequences.
Evaluated on 1100 sequences.
Evaluated on 1200 sequences.
Evaluated on 1300 sequences.
Evaluated on 1400 sequences.
Evaluated on 1500 sequences.
Evaluated on 1600 sequences.
Evaluated on 1700 sequences.
Evaluated on 1800 sequences.
Evaluated on 1900 sequences.
Evaluated on 2000 sequences.
Evaluated on 2100 sequences.
Evaluated on 2200 sequences.
Full QAT INT8 model accuracy is 97.7632% (Number of test samples=2280)
Full-precision model accuracy is 97.5000% (Number of test samples=6840)


In [23]:
# evaluate the model on the test set
tflite_model_quant_int8_qat_file = "models/no_qkeras_gesture_qat_int8.tflite"
tflite_model_quant_int8_qat_model_type = "Quantized aware training model"
tflite_model_quant_int8_qat_pred = run_tflite_model(tflite_model_quant_int8_qat_file, range(X_train.shape[0]))


Evaluated on 0 sequences.
Evaluated on 100 sequences.
Evaluated on 200 sequences.
Evaluated on 300 sequences.
Evaluated on 400 sequences.
Evaluated on 500 sequences.
Evaluated on 600 sequences.
Evaluated on 700 sequences.
Evaluated on 800 sequences.
Evaluated on 900 sequences.
Evaluated on 1000 sequences.
Evaluated on 1100 sequences.
Evaluated on 1200 sequences.
Evaluated on 1300 sequences.
Evaluated on 1400 sequences.
Evaluated on 1500 sequences.
Evaluated on 1600 sequences.
Evaluated on 1700 sequences.
Evaluated on 1800 sequences.
Evaluated on 1900 sequences.
Evaluated on 2000 sequences.
Evaluated on 2100 sequences.
Evaluated on 2200 sequences.


In [100]:
# compute the accuracy of the model
full_qat_int8_accuracy = accuracy_score(labels, tflite_model_quant_int8_qat_pred)
print('Full QAT INT8 accuracy is %.4f%% (Number of test samples=%d)' % (full_qat_int8_accuracy * 100, len(y_test)))
print('Full-precision model accuracy is %.4f%% (Number of test samples=%d)' % (test_accuracy * 100, len(y_test)))

Full QAT INT8 accuracy is 99.2544% (Number of test samples=2280)
Full-precision model accuracy is 97.9825% (Number of test samples=2280)
