In [None]:
!pip install scikit-learn keras-tuner git+https://github.com/paulgavrikov/visualkeras.git
!python --version

In [None]:
import numpy as np
import matplotlib.pyplot as plt

import keras_tuner
import tensorflow as tf
import keras

from keras import layers
from keras.utils import timeseries_dataset_from_array, to_categorical
import visualkeras
from PIL import ImageFont
import logging
from enum import Enum
import itertools
import os
import pickle
import re
import random
import time

from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import KFold
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
#from utils import load_gesture_samples, GestureNames, split_data_between_participants, normalize_dataset

In [None]:
print(tf.__version__)

print('1: ', tf.config.list_logical_devices())
print('2: ', tf.test.is_built_with_cuda())
print('3: ', tf.test.gpu_device_name())
print('4: ', tf.config.get_visible_devices())

print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

In [None]:
# Setup logger
logging.basicConfig(level=logging.DEBUG,
                    force = True)

log = logging.getLogger("CSE3000")
log.setLevel(logging.INFO)

In [None]:
# Utility functions cell
RANDOM_SEED = 1000


np.random.seed(RANDOM_SEED)
tf.random.set_seed(RANDOM_SEED)


class Hand(Enum):
    right = "right_hand"
    left = "left_hand"


class GestureException(Exception):
    pass


class Gestures(Enum):
    SWIPE_LEFT = 0, 'swipe_left'
    SWIPE_RIGHT = 1, 'swipe_right'
    SWIPE_UP = 2, 'swipe_up'
    SWIPE_DOWN = 3, 'swipe_down'
    ROT_CW = 4, 'clockwise'
    ROT_CCW = 5, 'counter_clockwise'
    TAP = 6, 'tap'
    DOUBLE_TAP = 7, 'double_tap'
    ZOOM_IN = 8, 'zoom_in'
    ZOOM_OUT = 9, 'zoom_out'

    def __new__(cls, value, name):
        member = object.__new__(cls)
        member._value_ = value
        member.fullname = name
        return member

    def __int__(self):
        return self.value

    def __str__(self):
        return self.fullname

    @staticmethod
    def from_name(name: str):
        try:
            return next(g for g in Gestures if g.fullname == name)
        except StopIteration:
            raise GestureException("No gesture with name '%s' found..." % name)


class LoadGestureException(Exception):
    pass


def load_gesture_samples(gesture: Gestures, hand: Hand = Hand.right, skip_old_data: bool = True):
    result = []
    base_path = f"gestures_data/gestures/{gesture.fullname}/{hand.value}"
    log.debug("Loading gestures from base path: %s" % base_path)
    folder_items = os.listdir(base_path)

    # Filter on the .pickle extension
    filtered_ext = list(filter(lambda x: re.search(r'\.pickle$', x) is not None, folder_items))

    if len(filtered_ext) == 0:
        raise LoadGestureException("No gestures found in folder: %s" % base_path)

    for item in filtered_ext:
        r_match = re.match(r'candidate_(\w+).pickle$', item)
        if r_match is None:
            raise LoadGestureException("Incorrectly formatted data file name: %s" % item)

        candidate_id = r_match.group(1)
        with open(os.path.join(base_path, item), 'rb') as f:
            while True:
                try:
                    data_contents = pickle.load(f)

                    if isinstance(data_contents, dict):
                        if 'target_gesture' in data_contents:
                            # Data v3
                            # print(data_contents)
                            data_contents['gesture'] = Gestures.from_name(data_contents['target_gesture'])
                            # data_contents['all_data'] = data_contents['data']
                            # print(type(data_contents['data']))
                            # data_contents['data'] = list(map(lambda x: x['data'], data_contents['data']))
                            result.append(data_contents)
                        else:
                            # Data v2
                            data_contents['gesture'] = Gestures.from_name(data_contents['gesture'])
                            if not skip_old_data:
                                result.append(data_contents)
                    else:
                        # Data loader v1
                        data = {
                            'data': data_contents,
                            'gesture': gesture,
                            'candidate': candidate_id
                        }
                        if not skip_old_data:
                            result.append(data)
                except EOFError:
                    break

    return result


def split_data_between_participants(data, ratio = 0.7):
    lb_candidate = lambda x: x['candidate']

    # For itertools.groupby to work we need to sort the data first
    data.sort(key=lambda x: x['candidate'])

    participants = set(map(lb_candidate, data))
    amount_measurements = len(data)
    amount_participants = len(participants)

    log.debug("Participants: %s" % participants)
    log.info("Got dataset for %d participants with %d measurements total" % (amount_participants, amount_measurements))

    amount_train = int(amount_measurements * 0.7)
    amount_test = amount_measurements - amount_train
    log.info("Estimating %d measurements for training and %d measurements for test (ratio: %0.1f)" % (amount_train, amount_test, ratio))

    train_data = []
    train_data_outcomes = []

    test_data = []
    test_data_outcomes = []

    train_candidates = set()
    test_candidates = set()

    # Group the data per participant as that is the recommended method for training models
    for participant, d in itertools.groupby(data, lb_candidate):
        d_list = list(d)

        for data_point in d_list:
            if len(train_data) < amount_train:
                train_candidates.add(participant)
                train_data.append(data_point['data'])
                train_data_outcomes.append(data_point['gesture'])
            else:
                test_candidates.add(participant)
                test_data.append(data_point['data'])
                test_data_outcomes.append(data_point['gesture'])
                # test_data.extend([p['data'] for p in d_list])
                # test_data_outcomes.extend([p['gesture'].value for p in d_list])

    log.info("Train candidates: %s\tTest candidates: %s" % (train_candidates, test_candidates))

    return (np.array(train_data), np.array(train_data_outcomes)), (np.array(test_data), np.array(test_data_outcomes))

def normalize_dataset(data):
    """Watch out this function might not normalize the data as expected, further research required"""
    normalized = []
    for graph in data:
        scaler = MinMaxScaler(feature_range=(0, 1))
        reshaped = scaler.fit_transform(graph.reshape(-1, graph.shape[-1])).reshape(graph.shape)
        normalized.append(reshaped)
    return np.array(normalized)

def normalize_test(data):
    normalized = []
    for measurement in data:
        mean = measurement.mean()
        std = measurement.std()
        normalized_measurement = (measurement - mean) / std

        normalized.append(normalized_measurement)
    return normalized

def plot_history(history):
    loss = history.history['loss']
    val_loss = history.history['val_loss']
    acc = history.history['acc']
    val_acc = history.history['val_acc']
    mae = history.history['mae']
    val_mae = history.history['val_mae']

    epochs = range(1, len(loss) + 1)

    fig, axs = plt.subplots(1, 3, figsize=(25, 5))

    axs[0].plot(epochs, loss, 'g.', label='Training Loss')
    axs[0].plot(epochs, val_loss, 'c.', label='Validation Loss')
    axs[0].set_xlabel('Epochs')
    axs[0].set_ylabel('Loss')
    axs[0].legend()

    axs[1].plot(epochs, mae, 'g.', label='Training MAE')
    axs[1].plot(epochs, val_mae, 'c.', label='Validation MAE')
    axs[1].set_xlabel('Epochs')
    axs[1].set_ylabel('MAE')
    axs[1].legend()

    axs[2].plot(epochs, acc, 'g.', label='Training Accuracy')
    axs[2].plot(epochs, val_acc, 'c.', label='Validation Accuracy')
    axs[2].set_xlabel('Epochs')
    axs[2].set_ylabel('Accuracy')
    axs[2].legend()

    fig.savefig('output_figures/history_plot.svg')
    fig.show()

def compile_model(model: keras.Model):
    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['acc', 'mae'])

def kfold_cross_validation(model: keras.Model, features: np.ndarray, labels: np.ndarray, num_folds: int = 5):
    kfold = KFold(num_folds, shuffle=True, random_state=RANDOM_SEED)

    fold_num = 1
    acc_per_fold = []
    loss_per_fold = []
    confusion_per_fold = []

    for train, test in kfold.split(features, labels):
        fold_model = keras.models.clone_model(model)
        log.info("Fold No. %d" % fold_num)
        compile_model(fold_model)
        history = fold_model.fit(features[train], labels[train], batch_size=32, epochs=200, verbose=2)
        scores = fold_model.evaluate(features[test], labels[test], verbose=2)
        predictions = np.argmax(fold_model.predict(features[test]), axis=1)
        acc_per_fold.append(scores[1] * 100)
        loss_per_fold.append(scores[0])
        # confusion_per_fold.append(confusion_matrix(labels[test], predictions))

        fold_num += 1



    return acc_per_fold, loss_per_fold, confusion_per_fold


In [None]:
combined = []

# skip_gestures = [Gestures.ZOOM_IN, Gestures.ZOOM_OUT, Gestures.DOUBLE_TAP, Gestures.ROT_CW, Gestures.ROT_CCW]
skip_gestures = []
# only_gestures = [Gestures.SWIPE_UP, Gestures.SWIPE_DOWN, Gestures.SWIPE_RIGHT, Gestures.SWIPE_LEFT, Gestures.TAP]
filtered_gestures = filter(lambda x: x not in skip_gestures, Gestures)
filtered_gestures_list = list(filtered_gestures)

right_samples_count = 0
left_samples_count = 0
for g in filtered_gestures_list:
    right_samples = load_gesture_samples(g, hand=Hand.right)
    left_samples = load_gesture_samples(g, hand=Hand.left)
    right_samples_count += len(right_samples)
    left_samples_count += len(left_samples)
    combined.extend(right_samples)
    combined.extend(left_samples)

log.info("Got %d Right hand measurements and %d Left hand measurements" % (right_samples_count, left_samples_count))

# Deterministic shuffle
random.Random(4).shuffle(combined)
(x_train, y_train), (x_test, y_test) = split_data_between_participants(combined)

print(x_train.shape)
print(y_train.shape)

print(x_test.shape)
print(y_test.shape)

x_train_normalized = normalize_dataset(x_train)
x_test_normalized = normalize_dataset(x_test)

y_train = to_categorical(y_train, len(Gestures))
y_test = to_categorical(y_test, len(Gestures))

# Hardcoded for now, so fix
# x_train_normalized = x_train_normalized.reshape((-1, 100, 3, 1))
# x_test_normalized = x_test_normalized.reshape((-1, 100, 3, 1))

# For CNN
input_shape = (25, 4, 3)
x_train_reshaped = x_train_normalized.reshape((-1, 25, 4, 3))
x_test_reshaped = x_test_normalized.reshape((-1, 25, 4, 3))

print(x_train_normalized.shape)
print(x_test_normalized.shape)

# Combine for K-Fold
combined_x = np.concatenate([x_train_normalized, x_test_normalized])
combined_y = np.concatenate([y_train, y_test])

print(combined_x.shape)
print(combined_y.shape)


In [None]:
# Visualisation for CNN
one_sample = x_train_normalized[200]
plt.plot(one_sample)
plt.savefig('output_figures/sample_graph.svg')
plt.show()

In [None]:
# reshaped_sample = one_sample.reshape((100, 3))
reshaped_sample = np.rot90(one_sample, k=1)
print(one_sample.shape)
print(reshaped_sample.shape)
# plt.plot(one_sample[:,0])
plt.imshow(1 - reshaped_sample, cmap='Greys', interpolation='nearest', aspect='auto')
plt.yticks([])
plt.savefig('output_figures/sample_heatmap.svg')
plt.show()

In [None]:
print(x_train_normalized[0][:,:,:1].shape)
one_sample = x_train_normalized[200]
image_data = np.concatenate([one_sample[:,:,0], one_sample[:,:,1], one_sample[:,:,2]], axis=1)
plt.imshow(1-image_data, cmap='Greys', aspect=1)
plt.savefig('output_figures/convolutional_sample.svg')
plt.show()

In [None]:
# Show all training data
train_amount = len(x_train)
fig, axs = plt.subplots(1, train_amount, figsize=(40, 4))
for i in range(train_amount):
    axs[i].plot(x_train[i])
    axs[i].annotate(Gestures(y_train[i]).fullname, (0,0), (0, -40), xycoords='axes fraction', textcoords='offset points', va='top')

In [None]:
# Show all test data
test_amount = len(x_test)
fig, axs = plt.subplots(1, test_amount, figsize=(20, 4))
for i in range(test_amount):
    axs[i].plot(x_test[i])
    axs[i].annotate(f'Gesture: {Gestures(y_test[i]).fullname}', (0,0), (0, -40), xycoords='axes fraction', textcoords='offset points', va='top')

In [None]:
# Show normalized test data
test_amount = len(x_test_normalized)
fig, axs = plt.subplots(1, test_amount, figsize=(20, 4))
for i in range(test_amount):
    axs[i].plot(x_test_normalized[i])

In [None]:
# Implement data windowing
# SLICES = 5
#
# single_source = np.array(train[0], dtype=np.uint16)
# single_targets = np.full((single_source.shape[0], 1, 1), train_outcome[0])
# print(single_source.shape)
#
# train_ds = timeseries_dataset_from_array(data=single_source, targets=single_targets, sequence_length=SLICES, sequence_stride=SLICES)
#
# for example_inputs, example_labels in train_ds.take(1):
#     print(f'Inputs shape (batch, time, features): {example_inputs.shape}')
#     print(f'Targets shape (batch, time, features): {example_labels.shape}')
#
#
# single_validation_source = np.array(test[0], dtype=np.uint16)
# single_validation_targets = np.full((single_validation_source.shape[0], 1, 1), test_outcome[0])
#
# validation_ds = keras.utils.timeseries_dataset_from_array(data=single_validation_source, targets=single_validation_targets, sequence_length=SLICES, sequence_stride=SLICES)


In [None]:
# train_ds = timeseries_dataset_from_array(data=x_train, targets=y_train, sequence_length=SLICES, sequence_stride=SLICES)
# validation_ds = timeseries_dataset_from_array(data=x_test, targets=y_test, sequence_length=SLICES, sequence_stride=SLICES)

In [None]:
# print(dir(train_ds))
# print(train_ds.element_spec)

In [None]:
USE_GPU = False
device = "/device:GPU:0" if USE_GPU else "/device:CPU:0"

def build_cnn_model(shape):
    model = keras.Sequential()

    # model_lstm.add(layers.LSTM(units=128, input_shape=[1, 3]))
    model.add(layers.Input(shape=shape, name="sensor_image"))
    model.add(layers.ZeroPadding2D(padding=(0, 2)))
    model.add(layers.Conv2D(8, kernel_size=(3, 3), strides=(1, 1), activation="relu"))
    model.add(layers.Conv2D(16, kernel_size=(2, 2), strides=(1, 1), activation="relu"))
    model.add(layers.Conv2D(16, kernel_size=(2, 2), activation="relu"))
    model.add(layers.MaxPooling2D(pool_size=(3, 1)))
    model.add(layers.Conv2D(32, kernel_size=(5, 1), activation="relu"))
    model.add(layers.Flatten())
    model.add(layers.Dropout(0.5))
    model.add(layers.Dense(len(Gestures), activation="softmax", name="predictions"))

    return model


In [None]:
model_cnn = build_cnn_model(input_shape)
model_cnn.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['acc', 'mae'])
model_cnn.summary()
start = time.perf_counter()
simple_history = model_cnn.fit(x_train_reshaped, y_train, epochs=200, batch_size=2, validation_data=(x_test_reshaped, y_test))

end = time.perf_counter()
print("Training took: %0.1f seconds" % (end - start))

# Training times:
| GPU   | CPU   |
|-------|-------|
| 18.5s | 12.3s |

In [None]:
plot_history(simple_history)

In [None]:
score = model_cnn.evaluate(x_test_normalized, y_test, verbose=0)
print("Test loss:", score[0])
print("Test accuracy:", score[1])

In [None]:
y_pred = model_cnn.predict(x_test_reshaped)
ConfusionMatrixDisplay.from_predictions(np.argmax(y_test, axis=1), np.argmax(y_pred, axis=1), normalize='true', cmap='Blues', display_labels=Gestures, xticks_rotation='vertical')

plt.title("Confusion Matrix CNN")
plt.savefig('output_figures/confusion_matrix_cnn.svg', bbox_inches="tight")

plt.show()

In [None]:
# K-Fold validation
model_cnn = build_cnn_model(input_shape)
(acc_per_fold, loss_per_fold, confusion_per_fold) = kfold_cross_validation(model_cnn, combined_x, combined_y, num_folds=5)

In [None]:
print(acc_per_fold)
print(loss_per_fold)
print("Acc: %.3f std: %.3f" % (np.average(acc_per_fold), np.std(acc_per_fold)))
print(np.average(loss_per_fold))

In [None]:
print(x_test_normalized.shape)

for i in range(x_test_normalized.shape[0]):
    test_sample = np.expand_dims(x_test_normalized[i], -4)
    test_prediction = model_cnn.predict(test_sample)
    print("Prediction: %s, actual: %s" % (Gestures(np.argmax(test_prediction)), Gestures(np.argmax(y_test[i]))))

In [None]:
# Split data visualisation
print(x_train_normalized.shape)
SEQUENCE_LENGTH = 4

for measurement_x, measurement_y in zip(x_train_normalized, y_train):

    # Lock the measurement for now:
    measurement_x = x_train_normalized[200]
    measurement_y = y_train[200]
    print(measurement_x.shape)
    print(measurement_y.shape)
    fig, axs = plt.subplots(1, 2, figsize=(20, 5))
    axs[0].plot(measurement_x)
    timeseries_train_ds = timeseries_dataset_from_array(measurement_x, measurement_y, sequence_length=SEQUENCE_LENGTH, shuffle=True)

    for inputs, targets in timeseries_train_ds:
        print(inputs.shape)
        print(targets.shape)

        print("Input length is: %d" % len(inputs))
        for i in range(len(inputs)):
            axs[1].plot(np.arange(SEQUENCE_LENGTH * i, SEQUENCE_LENGTH * (i + 1)), inputs[i])
            # axs[1].plot(inputs[i])
            # plt.plot(np.arange(10 * i, 10 * (i + 1)), np.arange(1, step=0.1))


    fig.show()
    break

In [None]:
BATCH_SIZE = 10
SEQUENCE_LENGTH = 100

# Split video input visualisation
measurement_x = x_train_normalized[200]
measurement_y = y_train[200]

fig, axs = plt.subplots(1, 2, figsize=(20, 5))
axs[0].plot(measurement_x)

split_x = np.array(np.array_split(measurement_x, BATCH_SIZE))
for i in range(len(split_x)):
    axs[1].plot(np.arange((SEQUENCE_LENGTH/BATCH_SIZE) * i, (SEQUENCE_LENGTH/BATCH_SIZE) * (i + 1)), split_x[i])

print(split_x.shape)
fig.show()

In [None]:
USE_GPU = False
device = "/device:GPU:0" if USE_GPU else "/device:CPU:0"

def build_lstm_stateless_model(lstm_units, dense_1_units, dropout):
    # https://towardsdatascience.com/time-series-classification-for-human-activity-recognition-with-lstms-using-tensorflow-2-and-keras-b816431afdff
    model = keras.Sequential(name="LSTM_Stateless")

    # Subject to change as we split up the data in fragments:
    # Stateless?
    model.add(layers.Input(shape=(100, 3), name="sensor_image"))
    model.add(layers.LSTM(units=lstm_units, name="lstm"))
    # Stateful?
    # model_lstm_stateless.add(layers.Input(batch_shape=(100, 3, 1), name="sensor_image"))
    # model_lstm_stateless.add(layers.LSTM(units=128, stateful=True, name="lstm"))

    # Extra
    if dropout:
        model.add(layers.Dropout(rate=0.25, name="dropout"))

    model.add(layers.Dense(units=dense_1_units, activation='relu', name="dense_1"))

    # Output stage
    model.add(layers.Dense(len(Gestures), activation="softmax", name="predictions"))
    # model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['acc', 'mae'])
    model.summary()
    return model

def build_tuning_lstm_stateless_model(hp):
    lstm_units = hp.Choice('units', [32, 64, 128, 256, 512, 1024])
    dense_1_units = hp.Choice('units', [16, 32, 64, 128, 256, 512, 1024])
    dropout = hp.Boolean("dropout")
    model = build_lstm_stateless_model(lstm_units, dense_1_units, dropout)
    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['acc', 'mae'])
    return model


In [None]:
with tf.device(device):
    start = time.perf_counter()
    tuner = keras_tuner.RandomSearch(build_tuning_lstm_stateless_model, objective='val_loss', max_trials=20)
    tuner.search(x_train_normalized, y_train, epochs=300, batch_size=16, shuffle=False, validation_data=(x_test_normalized, y_test))

    end = time.perf_counter()
    print("Tuning took %0.1f seconds" % (end - start))


In [None]:
tuner.get_best_models()[0]

In [None]:
USE_GPU = True
device = "/device:GPU:0" if USE_GPU else "/device:CPU:0"

with tf.device(device):
    start = time.perf_counter()
    # Do we need to reset the states inbetween?

    model_lstm_stateless = build_lstm_stateless_model(128, 256, True)
    model_lstm_stateless.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['acc', 'mae'])

    lstm_stateless_history = model_lstm_stateless.fit(x_train_normalized, y_train, epochs=300, batch_size=16, shuffle=False, validation_data=(x_test_normalized, y_test))

    end = time.perf_counter()
    print("Training took %0.1f seconds" % (end - start))


In [None]:
plot_history(lstm_stateless_history)

In [None]:
score = model_lstm_stateless.evaluate(x_test_normalized, y_test, verbose=0)
print("Test loss:", score[0])
print("Test accuracy:", score[1])

In [None]:
font = ImageFont.truetype("RobotoSlab-VariableFont_wght.ttf", 32)
visualkeras.layered_view(model_lstm_stateless, font=font, min_xy=160, scale_z=1, spacing=50, legend=True, to_file='output_figures/visual_lstm_stateless.png').show()

In [None]:
y_pred = model_lstm_stateless.predict(x_test_normalized)
ConfusionMatrixDisplay.from_predictions(np.argmax(y_test, axis=1), np.argmax(y_pred, axis=1), normalize='pred', cmap='Blues', display_labels=Gestures, xticks_rotation='vertical')

plt.title("Confusion Matrix LSTM")
plt.savefig('output_figures/confusion_matrix_lstm.svg', bbox_inches="tight")

plt.show()

In [None]:
print("================= Running on training data (Toy Example): =================")
for i in range(x_train_normalized.shape[0]):
    test_sample = np.expand_dims(x_train_normalized[i], -4)
    test_prediction = model_lstm_stateless.predict(test_sample)
    print("Prediction: %s, actual: %s" % (Gestures(np.argmax(test_prediction)), Gestures(np.argmax(y_train[i]))))

print("\n\n================= Running on testing data: =================")
for i in range(x_test_normalized.shape[0]):
    test_sample = np.expand_dims(x_test_normalized[i], -4)
    test_prediction = model_lstm_stateless.predict(test_sample)
    print("Prediction: %s, actual: %s" % (Gestures(np.argmax(test_prediction)), Gestures(np.argmax(y_test[i]))))

In [None]:
# K-Fold validation
model = build_lstm_stateless_model(128, 256, True)
(acc_per_fold, loss_per_fold, confusion_per_fold) = kfold_cross_validation(model, combined_x, combined_y, num_folds=10)

In [None]:
print(acc_per_fold)
print("Acc: %.3f std: %.3f" % (np.average(acc_per_fold), np.std(acc_per_fold)))
print(np.average(loss_per_fold))

In [None]:
def build_lstm_stateful_model(lstm_units, dense_1_units, dropout):
    # https://towardsdatascience.com/time-series-classification-for-human-activity-recognition-with-lstms-using-tensorflow-2-and-keras-b816431afdff
    model = keras.Sequential(name="LSTM_Stateful")

    # Subject to change as we split up the data in fragments:
    model.add(layers.Input(shape=(100, 3), name="sensor_image"))
    model.add(
        layers.Bidirectional(
            layers.LSTM(units=lstm_units, input_shape=(100, 3), name="lstm")
        )
    )

    # Extra
    if dropout:
        model.add(layers.Dropout(rate=0.25, name="dropout"))

    model.add(layers.Dense(units=dense_1_units, activation='relu', name="dense_1"))

    # Output stage
    model.add(layers.Dense(len(Gestures), activation="softmax", name="predictions"))
    model.summary()
    return model

In [None]:
USE_GPU = True
device = "/device:GPU:0" if USE_GPU else "/device:CPU:0"
print(x_train_normalized.shape)

with tf.device(device):
    model_lstm_stateful = build_lstm_stateful_model(128, 128, True)
    model_lstm_stateful.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['acc', 'mae'])

    start = time.perf_counter()
    # Do we need to reset the states inbetween?
    lstm_stateful_history = model_lstm_stateful.fit(x_train_normalized, y_train, epochs=200, batch_size=8, shuffle=False, validation_data=(x_test_normalized, y_test))

    end = time.perf_counter()
    print("Training took: %0.1f seconds" % (end - start))

In [None]:
plot_history(lstm_stateful_history)

In [None]:
score = model_lstm_stateful.evaluate(x_test_normalized, y_test, verbose=0)
print("Test loss:", score[0])
print("Test accuracy:", score[1])

In [None]:
# KFold
# K-Fold validation
model = build_lstm_stateful_model(128, 128, False)

(acc_per_fold, loss_per_fold, confusion_per_fold) = kfold_cross_validation(model, combined_x, combined_y, num_folds=5)

In [None]:
print(acc_per_fold)
print("Acc: %.3f std: %.3f" % (np.average(acc_per_fold), np.std(acc_per_fold)))
print(np.average(loss_per_fold))

In [None]:
y_pred = model_lstm_stateful.predict(x_test_normalized)
ConfusionMatrixDisplay.from_predictions(np.argmax(y_test, axis=1), np.argmax(y_pred, axis=1), normalize='pred', cmap='Blues', display_labels=Gestures, xticks_rotation='vertical')

plt.title("Confusion Matrix LSTM")
plt.savefig('output_figures/confusion_matrix_lstm.svg', bbox_inches="tight")

plt.show()

In [None]:
visualkeras.layered_view(model_lstm_stateful, legend=True).show()

In [None]:
# taps = list(filter(lambda x: Gestures(x[1]) == Gestures.TAP, zip(x_train_normalized, y_train)))
# TODO: Remove the tap gesture that is noise
taps = load_gesture_samples(Gestures.TAP, Hand.right)
zooms = load_gesture_samples(Gestures.ZOOM_OUT, Hand.right)
print(len(taps))
random_tap = random.choice(taps)
random_zoom = random.choice(zooms)
# print(random_plot['candidate'])
fig, axs = plt.subplots(1, 2, figsize=(20, 5))
axs[0].plot(random_tap['data'])
axs[1].plot(random_zoom['data'])
plt.show()

In [None]:
swipe_ups = load_gesture_samples(Gestures.SWIPE_UP, Hand.right)
random_select = random.randint(0, len(swipe_ups))
print("Randomly selected gesture: %d" % random_select)
selected_swipe_up = swipe_ups[14]['data']
scaler = MinMaxScaler((0, 1))
normalized_selected = scaler.fit_transform(selected_swipe_up)

plt.plot(normalized_selected)
plt.xlabel("Time")
plt.ylabel("Normalized photodiode reading")
plt.legend(['Top', 'BottomRight', 'BottomLeft'])
plt.savefig('output_figures/sample_graph.svg')
plt.show()

In [None]:
print("\n\n================= Running on testing data: =================")
y_pred = model_lstm_stateful.predict(x_test_normalized)
for actual, pred in zip(y_test, y_pred):
    pred = Gestures(np.argmax(pred))
    actual = Gestures(np.argmax(actual))
    if pred == Gestures.SWIPE_RIGHT:
        print("Prediction: %s, actual: %s" % (pred, actual))