In [1]:
# FUNCTIONS

#ActivityCodes (0=sedentary 1=standing 2=stepping 2.1=cycling 3.1=primary lying, 3.2=secondary lying 4=non-wear 5=travelling)
def reassign_classes(classes):
  for count, value in enumerate(classes):
    if classes[count] == 2.1:
      classes[count] = 2
    elif classes[count] == 3.1:
      classes[count] = 3
    elif classes[count] == 3.2:
      classes[count] = 3
    elif classes[count] == 5.0:
      classes[count] = 0
    else:
      continue
  return classes

# Removing non-wear because it destroys the NN
def remove_classes(dataset, stack, class_to_remove):
  remove_non_wear_idx = posture_classes != class_to_remove
  stack = stack[remove_non_wear_idx]
  dataset = dataset[remove_non_wear_idx]
  return dataset, stack

# Reshape the data for specific models
def reshape_set(dataset, new_shape):
  shaper = dataset.shape
  new_shape.insert(0, shaper[0])
  new_shape = tuple(new_shape)
  dataset = dataset.reshape(new_shape)
  return dataset

def one_hot_data(stack):
  unique_classes = np.unique(stack)
  stack = tf.one_hot(stack, len(unique_classes))
  return stack

def review_class_imbalance(y_train, y_test, labels=None):
  # Find the unique label values...
  unique_classes_train = np.unique(y_train)
  unique_classes_test = np.unique(y_test)
  # Count the unique label values
  unique_train, counts_train = np.unique(y_train, return_counts=True)
  unique_test, counts_test = np.unique(y_test, return_counts=True)
  try:
    count_class_values_train = dict(zip(labels, counts_train))
    count_class_values_test = dict(zip(labels, counts_test))
  except:
    count_class_values_train = dict(zip(unique_train, counts_train))
    count_class_values_test = dict(zip(unique_test, counts_test))

  print('Train Classes')
  print(count_class_values_train)
  print('--------------')
  print('Test Classes')
  print(count_class_values_test)
  print('--------------')
  return unique_classes_train, unique_classes_test

def norm_accel_data(x):
  """
  Processes accel values and returns a normalised integer (0-1).
  """
  x_minimum = 0
  x_maximum = 255
  x_normalized = ((x - x_minimum) / (x_maximum - x_minimum))
  return x_normalized

def process_epochs(x, y=None, shuffle=False):
  """
  Processing epochs
  """
  print("Creating training data...")
  # Normalize the acceleration data
  x = norm_accel_data(x)
  # Convert data to a tensor
  data = tf.constant(x)
  # If the data is a training dataset, we shuffle it
  if shuffle:
    indices = tf.range(start=0, limit=tf.shape(data)[0], dtype=tf.int32)
    shuffled_indices = tf.random.shuffle(indices)
    shuffled_x = tf.gather(x, shuffled_indices)
    shuffled_y = tf.gather(y, shuffled_indices)
    return shuffled_x, shuffled_y
  else:
    return data

def show_confusion_matrix(validations, predictions):
    matrix = metrics.confusion_matrix(validations, predictions, normalize ='true')
    plt.figure(figsize=(6, 4))
    sns.heatmap(matrix,
                cmap='coolwarm',
                linecolor='white',
                linewidths=1,
                xticklabels=LABELS,
                yticklabels=LABELS,
                annot=True)
                #fmt='d')
    plt.title('Confusion Matrix')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.show()

def show_training(history):
  # summarize history for accuracy
  plt.figure(figsize=(10, 4))
  plt.plot(history.history['accuracy'])
  plt.plot(history.history['val_accuracy'])
  plt.title('model accuracy')
  plt.ylabel('accuracy')
  plt.xlabel('epoch')
  plt.legend(['train', 'test'], loc='upper left')
  plt.show()
  # summarize history for loss
  plt.figure(figsize=(10, 4))
  plt.plot(history.history['loss'])
  plt.plot(history.history['val_loss'])
  plt.title('model loss')
  plt.ylabel('loss')
  plt.xlabel('epoch')
  plt.legend(['train', 'validation'], loc='upper left')
  plt.show()

def show_model_results(model_to_test, data_to_test, validation):
  predictions = model_to_test.predict(data_to_test)
  predictions_max = np.argmax(predictions, axis=1)

  show_confusion_matrix(validation, predictions_max)
  print('------------')
  print(classification_report(validation, predictions_max))

def train_and_save_model(model_to_train, X_train, y_train, model_name):

  #callback = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=10)

  # Hyper-parameters
  EPOCHS = 50

  model_to_train.compile(loss='categorical_crossentropy',
              optimizer='adam', 
              metrics=['accuracy'])
  
  history = model_to_train.fit(X_train,
                  y_train,
                  epochs=EPOCHS,
                  validation_split=0.2,
                  batch_size=32,
                  #callbacks=[callback],
                  verbose=1)

  show_training(history)

  filename  = '/content/drive/MyDrive/Documents/Work/UOS Post-Doc/Amputee Posture Classification/models/' + model_name + '.h5'

  # save the model
  model_to_train.save(filename)

In [2]:
!pip install pyyaml h5py
import numpy as np
import tensorflow as tf
print("TF version:", tf.__version__)

# Import more tools
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import sklearn.metrics as metrics

# Check for GPU availability
print("GPU", "available 🤗" if tf.config.list_physical_devices("GPU") else "not available 😟")

print('------------')

from tensorflow.python.client import device_lib
device_lib.list_local_devices()

TF version: 2.4.1
GPU available 🤗
------------


[name: "/device:CPU:0"
 device_type: "CPU"
 memory_limit: 268435456
 locality {
 }
 incarnation: 9291396848035930299, name: "/device:GPU:0"
 device_type: "GPU"
 memory_limit: 14674281152
 locality {
   bus_id: 1
   links {
   }
 }
 incarnation: 5003797569849956474
 physical_device_desc: "device: 0, name: Tesla T4, pci bus id: 0000:00:04.0, compute capability: 7.5"]

In [3]:
from google.colab import drive
drive.mount('/content/drive')

engineering_set1 = np.load('/content/drive/MyDrive/Documents/Work/UOS Post-Doc/Amputee Posture Classification/data/1_pure_engineering_set.npy')
posture_classes1 = np.load('/content/drive/MyDrive/Documents/Work/UOS Post-Doc/Amputee Posture Classification/data/1_pure_engineering_set_classes.npy')
engineering_set2 = np.load('/content/drive/MyDrive/Documents/Work/UOS Post-Doc/Amputee Posture Classification/data/2_pure_engineering_set.npy')
posture_classes2 = np.load('/content/drive/MyDrive/Documents/Work/UOS Post-Doc/Amputee Posture Classification/data/2_pure_engineering_set_classes.npy')
engineering_set3 = np.load('/content/drive/MyDrive/Documents/Work/UOS Post-Doc/Amputee Posture Classification/data/3_pure_engineering_set.npy')
posture_classes3 = np.load('/content/drive/MyDrive/Documents/Work/UOS Post-Doc/Amputee Posture Classification/data/3_pure_engineering_set_classes.npy')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
engineering_set = np.concatenate((engineering_set1, engineering_set2, engineering_set3), axis=0)
posture_classes = np.concatenate((posture_classes1, posture_classes2, posture_classes3), axis=0)

In [5]:
# Main Processing
posture_classes = reassign_classes(posture_classes)
engineering_set, posture_classes = remove_classes(engineering_set, posture_classes, 4)

X_train, X_test, y_train, y_test = train_test_split(engineering_set, posture_classes, test_size=0.2, random_state=42)

y_train_hot = one_hot_data(y_train)
y_test_hot = one_hot_data(y_test)

X_train_reshaped1 = reshape_set(X_train, [1,5,59,3])
X_test_reshaped1 = reshape_set(X_test, [1,5,59,3])

X_train_reshaped2 = reshape_set(X_train, [5,1,59,3])
X_test_reshaped2 = reshape_set(X_test, [5,1,59,3])

X_train = process_epochs(X_train)
X_test = process_epochs(X_test)

X_train_reshaped1 = process_epochs(X_train_reshaped1)
X_test_reshaped1 = process_epochs(X_test_reshaped1)

X_train_reshaped2 = process_epochs(X_train_reshaped2)
X_test_reshaped2 = process_epochs(X_test_reshaped2)

Creating training data...
Creating training data...
Creating training data...
Creating training data...
Creating training data...
Creating training data...


In [6]:
LABELS = ['Sedentary', 'Standing', 'Stepping', 'Lying']

unique_classes_train, unique_classes_test = review_class_imbalance(y_train, y_test, LABELS)

INPUT_SHAPE = X_train.shape[1:]
print('input shape:', INPUT_SHAPE)

INPUT_SHAPE_RESHAPED_1 = X_train_reshaped1.shape[1:]
print('input shape reshaped:', INPUT_SHAPE_RESHAPED_1)

INPUT_SHAPE_RESHAPED_2 = X_train_reshaped2.shape[1:]
print('input shape reshaped:', INPUT_SHAPE_RESHAPED_2)

OUTPUT_SHAPE = len(unique_classes_train)
print('output shape:', OUTPUT_SHAPE)

Train Classes
{'Sedentary': 54798, 'Standing': 35903, 'Stepping': 11478, 'Lying': 54909}
--------------
Test Classes
{'Sedentary': 13744, 'Standing': 8982, 'Stepping': 2876, 'Lying': 13670}
--------------
input shape: (295, 3)
input shape reshaped: (1, 5, 59, 3)
input shape reshaped: (5, 1, 59, 3)
output shape: 4


In [7]:
# Models from - https://machinelearningmastery.com/how-to-develop-rnn-models-for-human-activity-recognition-time-series-classification/

mlm_lstm_model = tf.keras.Sequential([
                                      tf.keras.layers.LSTM(100, input_shape=INPUT_SHAPE),
                                      tf.keras.layers.Dropout(0.5),
                                      tf.keras.layers.Dense(100, activation='relu'),
                                      tf.keras.layers.Dense(OUTPUT_SHAPE, activation='softmax')],
                                      name='MLM-LSTM-Model')

mlm_cnn_lstm_model = tf.keras.Sequential([
                                          tf.keras.layers.TimeDistributed(tf.keras.layers.Conv1D(filters=64, kernel_size=3, activation='relu'), input_shape=INPUT_SHAPE_RESHAPED_1),
                                          tf.keras.layers.TimeDistributed(tf.keras.layers.Conv1D(filters=64, kernel_size=3, activation='relu')),
                                          tf.keras.layers.TimeDistributed(tf.keras.layers.Dropout(0.5)),
                                          tf.keras.layers.TimeDistributed(tf.keras.layers.MaxPooling2D(pool_size=2)),
                                          tf.keras.layers.TimeDistributed(tf.keras.layers.Flatten()),
                                          tf.keras.layers.LSTM(100),
                                          tf.keras.layers.Dropout(0.5),
                                          tf.keras.layers.Dense(100, activation='relu'),
                                          tf.keras.layers.Dense(OUTPUT_SHAPE, activation='softmax')],
                                          name='MLM-CNN-LSTM-Model')

mlm_convlstm_model = tf.keras.Sequential([
                                          tf.keras.layers.ConvLSTM2D(filters=64, kernel_size=(1,3), activation='relu', input_shape=INPUT_SHAPE_RESHAPED_2),
                                          tf.keras.layers.Dropout(0.5),
                                          tf.keras.layers.Flatten(),
                                          tf.keras.layers.Dense(100, activation='relu'),
                                          tf.keras.layers.Dense(OUTPUT_SHAPE, activation='softmax')],
                                          name='MLM-ConvLSTM-Model')

In [None]:
# Training and saving the models

#train_and_save_model(mlm_lstm_model, X_train, y_train_hot, 'mlm_lstm_model')
train_and_save_model(mlm_cnn_lstm_model, X_train_reshaped1, y_train_hot, 'mlm_cnn_lstm_model')
#train_and_save_model(mlm_convlstm_model, X_train_reshaped2, y_train_hot, 'mlm_convlstm_model')

Epoch 1/50


In [None]:
valid = np.argmax(y_test_hot, axis=1)

# ---------- LSTM Model ----------
#mlm_lstm_model_loaded = tf.keras.models.load_model('/content/drive/MyDrive/Documents/Work/UOS Post-Doc/Amputee Posture Classification/models/mlm_lstm_model.h5')
#mlm_lstm_model_loaded.summary()
#show_model_results(mlm_lstm_model_loaded, X_test, valid)

print('------------------------------------------------------------------------------------------------')

# ---------- CNN LSTM Model ----------
mlm_cnn_lstm_model_loaded = tf.keras.models.load_model('/content/drive/MyDrive/Documents/Work/UOS Post-Doc/Amputee Posture Classification/models/mlm_cnn_lstm_model.h5')
mlm_cnn_lstm_model_loaded.summary()
show_model_results(mlm_cnn_lstm_model_loaded, X_test_reshaped1, valid)

print('------------------------------------------------------------------------------------------------')

# ---------- Conv LSTM Model ----------
#mlm_convlstm_model_loaded = tf.keras.models.load_model('/content/drive/MyDrive/Documents/Work/UOS Post-Doc/Amputee Posture Classification/models/mlm_convlstm_model.h5')
#mlm_convlstm_model_loaded.summary()
#show_model_results(mlm_convlstm_model_loaded, X_test_reshaped2, valid)