<a href="https://colab.research.google.com/github/getodo/Thesis_Project/blob/master/ENGG4812_Neural_Network_Design.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from __future__ import absolute_import, division, print_function, unicode_literals

import numpy as np
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt
import os

import tensorflow as tf
from tensorflow.keras import layers

from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
import scipy
from scipy import stats
import seaborn as sns
#import sklearn.metrics as metrics

from torchvision import datasets, models, transforms
from google.colab import files

In [None]:
uploaded = files.upload()

Saving hip_training_data.csv to hip_training_data.csv


In [None]:
class_names = ['0','1','2','3','4','5','6','7','8']
df_train = pd.read_csv('/content/arm_training_data.csv')
features = ['X', 'Y', 'Z']
label = ['Activity']

In [None]:
def setup_lstm_data(df_train, count):
  time_step = 64
  step = 32
  features = 3
  window = []
  labels = []
  for x in range(9 - count):
    df_train = df_train[df_train.Activity != 9 - x]
  for i in range(0, df_train.shape[0] - time_step, step):
    xs = df_train['X'].values[i: i + time_step]
    ys = df_train['Y'].values[i: i + time_step]
    zs = df_train['Z'].values[i: i + time_step]
    label = stats.mode(df_train['Activity'][i: i + time_step])[0][0]
    window.append([xs, ys, zs])
    labels.append(label)
  windows = np.asarray(window, dtype= np.float32).reshape(-1, time_step, features)
  labels = np.asarray(pd.get_dummies(labels), dtype = np.float32)
  X_train, X_test, y_train, y_test = train_test_split(windows, labels, test_size = 0.2)
  return X_train, X_test, y_train, y_test

In [None]:
def setup_dense_data(df_train, count):
  for x in range(9 - count):
    df_train = df_train[df_train.Activity != 8 - x]
  features = pd.DataFrame(df_train, columns = ['X', 'Y', 'Z'])
  label = pd.DataFrame(df_train, columns = ['Activity'])
  features = features.to_numpy()
  label = label.to_numpy()
  X_train, X_test, y_train, y_test = train_test_split(features, label, test_size = 0.2)
  return X_train, X_test, y_train, y_test

In [None]:
def setup_dense_model(neu, out):
  model = tf.keras.Sequential([
    tf.keras.layers.Dense(neu, activation=tf.nn.relu, input_shape=(3,)),
    tf.keras.layers.Dense(neu, activation=tf.nn.relu),
    tf.keras.layers.Dense(neu, activation=tf.nn.relu),
    tf.keras.layers.Dense(neu, activation=tf.nn.relu),
    tf.keras.layers.Dense(neu, activation=tf.nn.relu),
    tf.keras.layers.Dense(neu, activation=tf.nn.relu),
    tf.keras.layers.Dense(out, activation=tf.nn.softmax),
  ])
  model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
  return model

In [None]:
def setup_lstm_model(out):
  model = tf.keras.Sequential([
    tf.keras.layers.Bidirectional(
    tf.keras.layers.LSTM(40),
    input_shape=(64, 3)),
    tf.keras.layers.Flatten(), 
    tf.keras.layers.Dense(out, activation = "softmax") 
  ])
  model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
  return model

In [None]:
def train_dense_model(neu, out):
  model = setup_dense_model(neu, out)
  X_train, X_test, y_train, y_test = setup_dense_data(df_train, out)
  model.fit(X_train, y_train, epochs = 10)
  return model, X_train, X_test, y_train, y_test

In [None]:
def train_lstm_model(out):
  model = setup_lstm_model(out)
  X_train, X_test, y_train, y_test = setup_lstm_data(df_train, out)
  model.fit(X_train, y_train, epochs = 25)
  return model, X_train, X_test, y_train, y_test

In [None]:
# train model x neurons up to 9 activities
activity_count = 9
model, X_train, X_test, y_train, y_test = train_dense_model(40, activity_count)
temp_class = class_names[0:activity_count]

export_dir = './saved_model'
tf.saved_model.save(model, export_dir=export_dir)
results = model.evaluate(X_test, y_test)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
INFO:tensorflow:Assets written to: ./saved_model/assets


INFO:tensorflow:Assets written to: ./saved_model/assets




In [None]:
# train model up to 9 activities
model, X_train, X_test, y_train, y_test = train_lstm_model(9)
temp_class = class_names[0:9]

export_dir = './saved_model'
tf.saved_model.save(model, export_dir=export_dir)

In [None]:
results = model.evaluate(X_test, y_test)



In [None]:
y_pred = model.predict(X_test)
df_y_pred = pd.DataFrame(y_pred)

In [None]:
max_index = df_y_pred.idxmax(axis=1)
list_max_index = max_index.values.tolist()
list_max_index

In [None]:
cf_matrix = confusion_matrix(y_test, list_max_index)

In [None]:
matrix = confusion_matrix(y_test, list_max_index, labels=[0, 1, 2, 3, 4, 6, 7, 8, 9])
print('Confusion matrix : \n',matrix)

In [None]:
labels = ['Sit', 'Sit Cross-Legged', 'Stand', 'Prone', 'Walking', 'Running', 'Star jump', 'Push up', 'Plank']
plt.figure(figsize = (15,8))
sns.set(font_scale=1.2)
ax = sns.heatmap(cf_matrix, xticklabels=labels, yticklabels=labels, cmap='Blues', fmt='', annot=True)
ax.set_title('Activity Tracker Confusion Matrix Hip Model\n', fontsize=18);
ax.set_xlabel('\nPredicted Values', fontsize=18)
ax.set_ylabel('Actual Values', fontsize=18);

In [None]:
y_pred = []
temp_pred = model.predict(X_test)
for x in range(len(y_test)):
  i = np.argmax(temp_pred[x])
  y_pred.append(i)

In [None]:
print(y_pred)

[0, 0, 5, 0, 4, 3, 3, 0, 3, 1, 4, 2, 0, 4, 1, 7, 8, 1, 8, 8, 4, 1, 5, 4, 5, 4, 2, 2, 3, 4, 1, 3, 8, 4, 3, 0, 4, 3, 7, 8, 8, 5, 6, 2, 4, 5, 3, 1, 1, 3, 6, 2, 0, 0, 5, 4, 0, 3, 1, 0, 3, 7, 8, 8, 3, 3, 1, 8, 8, 5, 0, 5, 4, 3, 0, 2, 3, 0, 1, 5, 1, 2, 5, 0, 8, 6, 0, 1, 5, 4, 1, 1, 1, 3, 2, 5, 1, 0, 4, 8, 6, 6, 4, 5, 3, 3, 8, 0, 0, 5, 1, 5, 0, 2, 3, 2, 0, 6, 8, 8, 3, 0, 4, 4, 4, 2, 0, 3, 0, 5, 8, 8, 8, 3, 4, 8, 8, 2, 4, 4, 3, 4, 5, 0, 5, 4, 1, 3, 3, 0, 4, 8, 4, 8, 1, 5, 1, 3, 2, 3, 5, 1, 8, 2, 1, 6, 5, 0, 3, 4, 8, 8, 2, 2, 0, 5, 8, 0, 4, 0, 1, 6, 2, 6, 0, 5, 1, 4, 4, 2, 4, 5, 8, 1, 0, 5, 2, 2, 1, 4, 0, 3, 2, 8, 4, 5, 4, 5, 6, 2, 1, 4, 3, 4, 0, 1, 2, 2, 8, 5, 3, 6, 4, 1, 4, 5, 4, 3, 6, 7, 0, 4, 3, 8, 4, 5, 4, 4, 4, 0, 7, 1, 1, 3, 0, 7, 1, 3, 0, 2, 2, 6, 0, 7, 4, 3, 6, 8, 0, 1, 1, 1, 5, 2, 2, 2, 4, 8, 8, 8, 4, 0, 1, 3, 0, 5, 0, 8, 2, 8, 0, 6, 2, 5, 1, 4, 4, 3, 0, 5, 8, 2, 1, 4, 2, 1, 5, 2, 1, 2, 2, 5, 8, 4, 0, 5, 8, 1, 0, 4, 5, 1, 0, 8, 2, 4, 3, 1, 8, 8, 1, 4, 3, 8, 6, 6, 0, 0, 7, 0, 4, 5, 5, 

In [None]:
cf_matrix = confusion_matrix(y_test, y_pred)
labels = ['Sit', 'SitCross', 'Stand,', 'Prone', 'Walk', 'Run', 'Starjump', 'Pushup', 'Plank']
plt.figure(figsize = (15,8))
ax = sns.heatmap(cf_matrix/np.sum(cf_matrix), xticklabels=labels, yticklabels=labels, cmap='Blues', fmt='.2%', annot=True)
ax.set_title('Activity Tracker Confusion Matrix Arm Model\n');
ax.set_xlabel('\nPredicted Values')
ax.set_ylabel('Actual Values ');


In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.target_spec.supported_ops = [
  tf.lite.OpsSet.TFLITE_BUILTINS, # enable TensorFlow Lite ops.
  tf.lite.OpsSet.SELECT_TF_OPS # enable TensorFlow ops.
]
# uncomment to quantise model (doesnt impact accuracy at the moment)
#converter.optimizations = [tf.lite.Optimize.DEFAULT]

tflite_model = converter.convert()
# Save the model to disk
open("model.tflite", "wb").write(tflite_model)

tf_model_size = os.path.getsize("model.tflite")
print("Basic model is %d bytes" % tf_model_size)

INFO:tensorflow:Assets written to: /tmp/tmp3vf0p31p/assets


INFO:tensorflow:Assets written to: /tmp/tmp3vf0p31p/assets


Basic model is 38288 bytes


In [None]:
tflite_interpreter = tf.lite.Interpreter("/content/model.tflite")

input_details = tflite_interpreter.get_input_details()
output_details = tflite_interpreter.get_output_details()

print("== Input details ==")
print("name:", input_details[0]['name'])
print("shape:", input_details[0]['shape'])
print("type:", input_details[0]['dtype'])

print("\n== Output details ==")
print("name:", output_details[0]['name'])
print("shape:", output_details[0]['shape'])
print("type:", output_details[0]['dtype'])


tflite_interpreter.resize_tensor_input(input_details[0]['index'], (len(X_test)   , 3))
tflite_interpreter.resize_tensor_input(output_details[0]['index'], (len(y_test)   , 4))
tflite_interpreter.allocate_tensors()

input_details = tflite_interpreter.get_input_details()
output_details = tflite_interpreter.get_output_details()

df_X_test = X_test.astype('float32')

print("== Input details ==")
print("name:", input_details[0]['name'])
print("shape:", input_details[0]['shape'])
print("type:", input_details[0]['dtype'])

print("\n== Output details ==")
print("name:", output_details[0]['name'])
print("shape:", output_details[0]['shape'])
print("type:", output_details[0]['dtype'])

In [None]:
tflite_interpreter.set_tensor(input_details[0]['index'], df_X_test)

tflite_interpreter.invoke()

tflite_model_predictions = tflite_interpreter.get_tensor(output_details[0]['index'])
print("Prediction results shape:", tflite_model_predictions.shape)

tflite_pred_dataframe = pd.DataFrame(tflite_model_predictions)
tflite_pred_dataframe.columns = temp_class


In [None]:
df_max_index = tflite_pred_dataframe.idxmax(axis=1)
list_max_index = df_max_index.values.tolist()

for i in range(0, len(list_max_index)):
  list_max_index[i] = int(list_max_index[i])

false = 0

for i in range(len(y_test)):
  if (y_test[i] != list_max_index[i]):
    false += 1

print((len(y_test) - false)/len(y_test))

In [None]:
# convert tensorflow lite model to hex array to be flashed to controller
def hex_to_c_array(hex_data, var_name):
  c_str = ''

  c_str += '#ifndef ' + var_name.upper() + '_H\n'
  c_str += '#define ' + var_name.lower() + '_H\n\n'

  c_str += '\nunsigned int ' + var_name + '_len = ' + str(len(hex_data)) + ';\n'

  c_str += 'unsigned char ' + var_name + '[] = {'
  hex_array = []

  for i, val in enumerate(hex_data) :
    
    hex_str = format(val, '#04x')

    if (i + 1) < len(hex_data):
      hex_str += ','
    if (i + 1) % 12 == 0:
      hex_str += '\n '
    hex_array.append(hex_str)

  c_str += '\n ' + format(' '.join(hex_array)) + '\n};\n\n'
  c_str += '#endif //' + var_name.upper() + '_H'

  return c_str

In [None]:
# write TFLite model to a C source file
c_model_name = 'arm_lite_model'
with open(c_model_name + '.h', 'w') as file:
  file.write(hex_to_c_array(tflite_model, c_model_name))