# Imports...

In [None]:
import tensorflow as tf
import sklearn as sk
from sklearn import model_selection
import numpy as np

import data_loading
import data_visualisation
from model_constructor import ModelConstructor, ModelName

# Ensure same seed on every run in order to get consistent results

In [None]:
# The below is necessary for starting Numpy generated random numbers
# in a well-defined initial state.
np.random.seed(1337)

# The below is necessary for starting core Python generated random numbers
# in a well-defined state.
# python_random.seed(123)

# The below set_seed() will make random number generation
# in the TensorFlow backend have a well-defined initial state.
# For further details, see:
# https://www.tensorflow.org/api_docs/python/tf/random/set_seed
tf.random.set_seed(5)

# We also specify a random seed for sklearn, used for KFold cross validation
sklearn_random_seed = 5354

# Some compatibility checks

In [None]:
print(f"TF version: {tf.__version__}")
print(f"Logical devices: {tf.config.list_logical_devices()}")
print(f"TF built with CUDA: {tf.test.is_built_with_cuda()}")
print(f"Visible devices: {tf.config.get_visible_devices()}")
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

tf.debugging.set_log_device_placement(True)

# Load all data that was collected

In [None]:
# This gets all data and puts them into a single dictionary with the following structure:
# {
#     "candidate_id": {
#         "gesture_name": [
#             [sample1],
#             [sample2],
#             ...
#         ],
#         ...
#     },
#     ...
# }
all_data_per_candidate = data_loading.load_gestures_grouped_per_candidate(use_left_hand=True, use_right_hand=True, split_candidate_per_hand=True)

all_candidates = list(all_data_per_candidate.keys())

# Possibly remove unwanted candidates
# all_candidates.remove("E1")
# all_candidates.remove("E6")

print("All candidates:", all_candidates)
# np.random.shuffle(all_candidates)
# print("Shuffled candidates:", all_candidates)

# training_candidates = all_candidates[:int(len(all_candidates) * training_ratio)]
# testing_candidates = all_candidates[int(len(all_candidates) * training_ratio):]

# split_per_candidate = True

# if not split_per_candidate:
#     features, labels = data_loading.get_data_and_labels_from_candidates(all_data_per_candidate)
#     training_data, testing_data, training_labels, testing_labels = sk.model_selection.train_test_split(features, labels, test_size=0.25, random_state=69)
# else:
#     training_data, training_labels = data_loading.get_data_and_labels_from_candidates(training_candidates, all_data_per_candidate)
#     testing_data, testing_labels = data_loading.get_data_and_labels_from_candidates(testing_candidates, all_data_per_candidate)

# print("Training candidates:", training_candidates)
# print("Testing candidates:", testing_candidates)

# Plot one sample to check pre-processing
# try:
#     data_visualisation.plot_data_as_image(training_data[0], label=training_labels[0])
#     data_visualisation.plot_data_as_graph(training_data[0], title=training_labels[0])
# except Exception as e:
#     print("Error plotting data as image: ", e)
#     pass

# # training_data = np.expand_dims(training_data, axis=3)
# # testing_data = np.expand_dims(testing_data, axis=3)

# # Verify that the data and labels are the same shape
# print("")
# print("Training data shape:", training_data.shape)
# print("Training data[0] shape:", training_data[0].shape)
# print("")
# print("Training labels shape:", training_labels.shape)
# print("Training labels[0]:", training_labels[0])
# # print("All training labels:", training_labels)

# if len(testing_data) > 0:
#     print("Testing data shape:", testing_data.shape)
#     # print("Testing data[0] shape:", testing_data[0].shape)
#     print("")
#     print("Testing labels shape:", testing_labels.shape)
#     # print("Testing labels[0]:", testing_labels[0])
# else:
#     print("No testing data")

# Specifying our model and training parameters

In [None]:
batch_size = 256 #128
epochs = 512
curr_model = ModelName.BEERNET_LITE#_EXPERIMENTAL
include_preprocessing_layers = True

# Trying out various input shapes
# input_shape = np.expand_dims(training_data[0], axis=2).shape
# input_shape = (100, 3, 1)
# input_shape = (50, 2, 3)
# input_shape = (25, 4, 3)
input_shape = (20, 5, 3)
# input_shape = (10, 10, 3)
print("Input shape:", input_shape)

# Number of classes is the number of gestures, which is the number of output neurons
num_classes = len(data_loading.GestureNames)

model: tf.keras.Model = ModelConstructor.get_model(model=curr_model, input_shape=input_shape, 
                                                   num_classes=num_classes, include_preprocessing=include_preprocessing_layers)

model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), 
                  loss=tf.keras.losses.SparseCategoricalCrossentropy(),
                  metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

# Before K-fold cross validation, store the initial weights of the model, 
# so that we can train the model from scratch on the entire dataset after K-fold cross validation without recompiling the model
initial_weights = model.get_weights()

model.summary()

# Testing the model using K-Fold Cross Validation

In [None]:
# General training of the model
# model.fit(training_data, training_labels, batch_size=32, epochs=250)
# model.fit(training_data, training_labels, batch_size=64, epochs=500)

# Perform k-fold cross validation
k = min(6, len(all_candidates))

kfold = sk.model_selection.StratifiedKFold(n_splits=k, shuffle=True, random_state=sklearn_random_seed)
kfold_scores = []
confusion_matrices = []
histories = []

all_candidates = np.array(all_candidates)

# Since there is an imbalance in the dataset we make use of stratified k-fold cross validation
# Make a list of labels that correspond to the candidate's used hand. 
# If the candidate used their left hand, the label is 0, otherwise it is 1
# Currently, done by checking if the candidate's name contains "_L", probably not the best way
hand_labels = [0 if "_L" in candidate else 1 for candidate in all_candidates]

print(f"Training model with architecture: {curr_model.name} with {k}-fold cross validation")
print(f"Using {len(all_candidates)} candidates")
print("-----------------------------------")

for idx, (train, test) in enumerate(kfold.split(all_candidates, hand_labels)):
    print(f"Fold {idx + 1} of {k}. Training on: {train}, testing on {test}")
    print("Train candidates:", all_candidates[train], "Test candidates:", all_candidates[test])
    
    # Specifying the model and compiling it
    model = ModelConstructor.get_model(model=curr_model, input_shape=input_shape, 
                                       num_classes=num_classes, include_preprocessing=include_preprocessing_layers)
    
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), 
                  loss=tf.keras.losses.SparseCategoricalCrossentropy(),
                  metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
    
    # model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
    #                 loss=tf.keras.losses.CategoricalCrossentropy(),
    #                 metrics=[tf.keras.metrics.CategoricalAccuracy()])
    
    # Getting the training and testing data
    train_features, train_labels = data_loading.get_data_and_labels_from_candidates(all_candidates[train], all_data_per_candidate, input_shape=input_shape)
    test_features, test_labels = data_loading.get_data_and_labels_from_candidates(all_candidates[test], all_data_per_candidate, input_shape=input_shape)

    # Converting the labels to integers, important for the loss function to work
    train_labels = data_loading.map_labels_to_integers(train_labels)
    test_labels = data_loading.map_labels_to_integers(test_labels)
    # train_labels = data_loading.map_labels_to_one_hot_encoding(train_labels)
    # test_labels = data_loading.map_labels_to_one_hot_encoding(test_labels)

    # Training the model
    history = model.fit(train_features, train_labels, batch_size=batch_size, epochs=epochs, verbose = 0)

    # Gathering results of this fold
    histories.append(history)
    score = model.evaluate(test_features, test_labels, verbose=0)
    print("Loss and accuracy:", score)
    kfold_scores.append(score)
    confusion_matrices.append(sk.metrics.confusion_matrix(test_labels, np.argmax(model.predict(test_features), axis=1)))

    print("-----------------------------------")


# Printing some of the validation results

In [None]:
print("K-fold scores:", kfold_scores)
print("Average k-fold score:", np.mean(kfold_scores, axis=0))
print("Std k-fold score:", np.std(kfold_scores, axis=0))

# Plot the confusion matrix
confusion_matrix = np.mean(confusion_matrices, axis=0)
data_visualisation.plot_confusion_matrix(confusion_matrix, [gesture.value for gesture in data_loading.GestureNames], normalize=True)

# Plot the training history by averaging the histories over the k-folds
loss = np.mean([history.history['loss'] for history in histories], axis=0)
accuracy = np.mean([history.history['sparse_categorical_accuracy'] for history in histories], axis=0)
# accuracy = np.mean([history.history['categorical_accuracy'] for history in histories], axis=0)
data_visualisation.plot_loss_and_accuracy(loss, accuracy)

# Training the model with the entire dataset

In [None]:
# After K-fold cross validation, reset the weights of the model to the initial weights
model.set_weights(initial_weights)

all_data, all_labels = data_loading.get_data_and_labels_from_candidates(list(all_data_per_candidate.keys()), all_data_per_candidate, input_shape=input_shape)

# Print all candidates, for checking purposes
print("All candidates:", all_candidates)

# Print length of entire dataset
print("Length of all data:", all_data.shape)

# And now we can train the model on all the data (training + testing)
# model.fit(training_data, training_labels, batch_size=batch_size, epochs=epochs, verbose = 0)
# model.fit(testing_data, testing_labels, batch_size=batch_size, epochs=epochs, verbose = 0)
all_labels = data_loading.map_labels_to_integers(all_labels)
model.fit(all_data, all_labels, batch_size=batch_size, epochs=epochs, verbose = 0)


# history = model.history.history
# print("History:", history)

# Plot the loss and accuracy
# data_visualisation.plot_loss_and_accuracy(history['loss'], history['sparse_categorical_accuracy'])

In [None]:
%%script false
# i = 5
# # Needed because the model expects a batch of data, not a single sample
# reshaped_data = np.expand_dims(testing_data[i], axis=0)

# print("Data shape: ", reshaped_data.shape)

# print("Predictions: ", model.predict(reshaped_data))
# print("Actual label: ", gesture_classes[training_labels[i]])
# print("Predicted label: ", gesture_classes[np.argmax(model.predict(reshaped_data))])


# This is not needed if we use K-fold cross validation

predictions = model.predict(testing_data)

gesture_classes = list(data_loading.GestureNames)

for i in range(len(predictions)):
    with np.printoptions(precision=6, suppress=True):
        print(f"Actual: {gesture_classes[testing_labels[i]]}, Predicted: {gesture_classes[np.argmax(predictions[i])]}")
        print(f"Confidence of prediction: {np.max(predictions[i])}, All predictions: {predictions[i]}")

    # Plot sample if it is wrong
    if testing_labels[i] != np.argmax(predictions[i]):
        try:
            data_visualisation.plot_data_as_image(testing_data[i], label=f"Predicted: {gesture_classes[np.argmax(predictions[i])]}, Actual: {gesture_classes[testing_labels[i]]}")
            data_visualisation.plot_data_as_graph(testing_data[i], title=f"Predicted: {gesture_classes[np.argmax(predictions[i])]}, Actual: {gesture_classes[testing_labels[i]]}")
        except:
            pass

    print("")


# Exporting our model for TFLite

In [None]:
# %%script skip cell --no-raise-error

converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]

def representative_data_generator():
    for i in range(min(1000, len(all_data))):
        sample_data = np.random.rand(*input_shape)
        sample_data = np.expand_dims(sample_data, axis=0)
        # sample_data = np.expand_dims(sample_data, axis=3)
        yield [sample_data.astype(np.float32)]

# for data in representative_data_generator():
#     print(data)

converter.representative_dataset = representative_data_generator
# converter.target_spec.supported_ops = []
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8, tf.lite.OpsSet.SELECT_TF_OPS]
# converter._experimental_new_quantizer = True
converter.allow_custom_ops = True
# converter.target_spec.supported_types = [tf.int8]

# converter.inference_input_type = tf.float32
# converter.inference_output_type = tf.uint8

tflite_model = converter.convert()
open("converted_model.tflite", "wb").write(tflite_model)

print("Model converted to tflite")

# interpreter = tf.lite.Interpreter(model_content=tflite_model)

# print("input details:", interpreter.get_input_details())
# print("output details:", interpreter.get_output_details())
# print("input shape:", interpreter.get_input_details()[0]['shape'])

# Validating compressed model

In [None]:
# TODO: Test the tflite model

# Load the TFLite model
interpreter = tf.lite.Interpreter(model_path="converted_model.tflite")
interpreter.allocate_tensors()
signature = interpreter.get_signature_runner()

# Get one sample of data to test the model from all the data
# Use the first sample of data
i = 123
sample_data = np.expand_dims(all_data[i], axis=0)
# sample_data = all_data[i]
print("Label: ", all_labels[i])
output = signature(sensor_image=sample_data.astype(np.float32))
print("Output:", output)

# Converting the TFLite to C code

Run this command:

```
xxd -i converted_model.tflite > model_data.cc
```