In [None]:
import os
import gzip
import tempfile
import pickle
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, MinMaxScaler, StandardScaler
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Conv1D, MaxPooling1D, Flatten
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical
import tensorflow_model_optimization as tfmot
def divide_samples(data, states,division):
    # Number of divisions per sample
    divisions = division
    # New shape for data
    new_data_shape = (data.shape[0] * divisions, int(data.shape[1] / divisions))
    # Reshape data
    new_data = data.reshape(new_data_shape)
    # Repeat states to match the new number of samples
    new_states = np.repeat(states, divisions)
    return new_data, new_states

def get_gzipped_model_size(file):
    # Returns the size of the gzipped model in bytes.
    _, zipped_file = tempfile.mkstemp('.zip')
    with gzip.open(zipped_file, 'wb') as f:
        with open(file, 'rb') as model_file:
            f.write(model_file.read())
    return os.path.getsize(zipped_file)

def representative_data_gen():
    """Generator function for representative data required for TFLite conversion."""
    for input_value in tf.data.Dataset.from_tensor_slices(X_train.astype(np.float32)).batch(1).take(100):
        yield [input_value]

def divide_samples(data, states, division):
    """Divide the data samples for more granular analysis."""
    divisions = division
    new_data_shape = (data.shape[0] * divisions, int(data.shape[1] / divisions))
    new_data = data.reshape(new_data_shape)
    new_states = np.repeat(states, divisions)
    return new_data, new_states

def evaluate_tflite_model(tflite_model, X, y):
    # Initialize the TFLite interpreter
    interpreter = tf.lite.Interpreter(model_content=tflite_model)
    interpreter.allocate_tensors()
    input_index = interpreter.get_input_details()[0]['index']
    output_index = interpreter.get_output_details()[0]['index']

    # Iterate over the test data and make predictions
    predictions = []
    for test_sample in X:
        test_sample = np.expand_dims(test_sample, axis=0).astype(np.float32)
        interpreter.set_tensor(input_index, test_sample)
        interpreter.invoke()
        output_data = interpreter.get_tensor(output_index)
        predictions.append(output_data[0])

    # Convert predictions to the same format as y for comparison
    predictions = np.argmax(predictions, axis=1)
    if y.ndim > 1:  # Assuming y is one-hot encoded
        y = np.argmax(y, axis=1)

    return accuracy_score(y, predictions)

subj = 'subject1'
path = '/path/to/tinyanesthesia/data'

## Model for Sleep (MA vs. SO states)
Here we train and save the model for the dual model approach. 

In [None]:
# Load SleepData
SleepData = np.load(f"{path}/SleepData_{subj}.npy")
with open(f"{path}/SleepData_metadata_{subj}.pkl", 'rb') as f:
    SleepData_metadata = pickle.load(f)

# Load Data
Data = np.load(f"{path}/Data_{subj}.npy")
with open(f"{path}/Data_metadata_{subj}.pkl", 'rb') as f:
    Data_metadata = pickle.load(f)

# Print shapes and unique states for SleepData
print("SleepData Shape:", SleepData.shape)
print("Unique States in SleepData:", np.unique(SleepData_metadata['GT']))

# Print shapes and unique states for Data
print("Data Shape:", Data.shape)
print("Unique States in Data:", np.unique(Data_metadata['GT']))

# 'MA' is encoded as 0 and 'slow_updown' as 1
labels_SleepData = np.array([0 if state == 'MA' else 1 for state in SleepData_metadata['GT']])

# Encoding 'Awake' as 0 and 'Sleep' as 1
labels_Data = np.array([1 if state == 'Sleep' else 0 for state in Data_metadata['GT']])

In [None]:
division = 10
subset_data, subset_states = divide_samples(SleepData, labels_SleepData,division)
X = subset_data
y = subset_states
num_classes = 2
label_encoder = LabelEncoder()
y_integers = label_encoder.fit_transform(y)
states_encoded = to_categorical(y_integers, num_classes=num_classes)
X_train, X_test, y_train, y_test = train_test_split(X, states_encoded, test_size=0.2,
                                                                    random_state=42, stratify=states_encoded)
print(X_train.shape)
print(y_train.shape)

print(X_test.shape)
print(y_test.shape)

In [None]:
y_test[1151]
x_test_sample = X_test[1151]  # Replace with your actual array
formatted_array = ', '.join(map(lambda x: str(int(round(x))), x_test_sample))
arduino_array = f"int x_test_sample[200] = {{{formatted_array}}};"

#print dummy data to test the dual-model approach
#print(arduino_array) 

In [None]:
model = Sequential([
    Dense(10, activation='relu', input_shape=(200,)),  # Adjusted input shape
    Dense(len(np.unique(states_encoded)), activation='softmax')
])

model.compile(optimizer=Adam(), loss='categorical_crossentropy', metrics=['accuracy'])
history = model.fit(X_train, y_train, epochs=15, batch_size=128)

y_test_integers = np.argmax(y_test, axis=1)
y_pred = model.predict(X_test)
y_pred_classes = np.argmax(y_pred, axis=1)
original_accuracy = accuracy_score(y_test_integers, y_pred_classes)


# Your model conversion and evaluation code
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_types = [tf.float16]
quantized_tflite_model = converter.convert()

# Evaluate TFLite model
quantized_accuracy = evaluate_tflite_model(quantized_tflite_model, X_test, y_test)

# Print accuracies
print("Original Model Accuracy: {:.2f}%".format(original_accuracy * 100))
print("Quantized Model Accuracy: {:.2f}%".format(quantized_accuracy * 100))

# Save the original model
model.save('model_sleep.h5')
original_model_size = get_gzipped_model_size('model_sleep.h5')
print("Size of gzipped original Keras model: %.2f Kbytes" % (original_model_size / 1024))

# Save the TFLite model
tflite_model_path = 'quantized_f32_model_sleep.tflite'
with open(tflite_model_path, 'wb') as f:
    f.write(quantized_tflite_model)

# Calculate and print the size of the TFLite model
quantized_pruned_model_size = get_gzipped_model_size(tflite_model_path)
print("Size of gzipped quantized TFLite model: %.2f Kbytes" % (quantized_pruned_model_size / 1024))

## Model to distinguish Sleep vs. Non-sleep states (AW vs. SO and MA)
Analogous to previous case. 

In [None]:
division = 10
subset_data_data, subset_states_data = divide_samples(Data, labels_Data, division)
X = subset_data_data
y = subset_states_data
num_classes = 2
label_encoder = LabelEncoder()
y_integers = label_encoder.fit_transform(y)
states_encoded = to_categorical(y_integers, num_classes=num_classes)
X_train, X_test, y_train, y_test = train_test_split(X, states_encoded, test_size=0.2,
                                                                    random_state=42, stratify=states_encoded)
print(X_train.shape)
print(y_train.shape)

print(X_test.shape)
print(y_test.shape)

In [None]:
model2 = Sequential([
    Dense(4, activation='relu', input_shape=(200,)),  # Adjusted input shape
    Dense(len(np.unique(states_encoded)), activation='softmax')
])

model2.compile(optimizer=Adam(), loss='categorical_crossentropy', metrics=['accuracy'])
history = model2.fit(X_train, y_train, epochs=15, batch_size=128)

y_test_integers = np.argmax(y_test, axis=1)
y_pred = model2.predict(X_test)
y_pred_classes = np.argmax(y_pred, axis=1)
original_accuracy = accuracy_score(y_test_integers, y_pred_classes)

# Your model conversion and evaluation code
converter = tf.lite.TFLiteConverter.from_keras_model(model2)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_types = [tf.float16]
quantized_tflite_model = converter.convert()

# Evaluate TFLite model
quantized_accuracy = evaluate_tflite_model(quantized_tflite_model, X_test, y_test)

# Print accuracies
print("Original Model Accuracy: {:.2f}%".format(original_accuracy * 100))
print("Quantized Model Accuracy: {:.2f}%".format(quantized_accuracy * 100))

# Save the original model
model2.save('model_awake.h5')
original_model_size = get_gzipped_model_size('model_awake.h5')
print("Size of gzipped original Keras model: %.2f Kbytes" % (original_model_size / 1024))

# Save the TFLite model
tflite_model_path = 'quantized_f32_model_awake.tflite'
with open(tflite_model_path, 'wb') as f:
    f.write(quantized_tflite_model)

# Calculate and print the size of the TFLite model
quantized_pruned_model_size = get_gzipped_model_size(tflite_model_path)
print("Size of gzipped quantized TFLite model: %.2f Kbytes" % (quantized_pruned_model_size / 1024))

## Generate the Pretrained Model
Convert that TFLite file into a TFLite Micro file which can be uploaded to your microcontroller. 
More info at: https://colab.research.google.com/github/tinyMLx/colabs

In [None]:
MODEL_TFLITE = 'model.tflite' # model of your choice
MODEL_TFLITE_MICRO = 'model.cc'
!xxd -i {MODEL_TFLITE} > {MODEL_TFLITE_MICRO}
REPLACE_TEXT = MODEL_TFLITE.replace('/', '_').replace('.', '_')
!sed -i 's/'{REPLACE_TEXT}'/g_model/g' {MODEL_TFLITE_MICRO}