# Dataset Preprocessing
In this section, the dataset is put taken from the Google drive, parsed and flattened and finally shuffled so that it will be useful for the neural network. To not overflow Colab's GPU RAM, a portion of the training data is used. 

In [1]:
# Put HMP_Dataset in drive to access it within Colab
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import pandas as pd
import numpy as np
import os
from sklearn.preprocessing import MinMaxScaler
from random import shuffle
import tensorflow as tf
import random as rn
from keras.optimizers import Adam, SGD
from scipy.signal import medfilt

from keras.models import Sequential
from keras.layers import GRU, Input, Reshape
from keras.layers.core import Dense, Activation, Dropout
from keras import regularizers

In [3]:
# Source for this block: https://github.com/xtianmcd/accelerometer_rnn_explorations/blob/master/accelerometer_rnn.py

def get_filepaths(mainfolder):
    """
    Searches a folder for all unique files and compile a dictionary of their paths.
    Parameters
    --------------
    mainfolder: the filepath for the folder containing the data
    Returns
    --------------
    training_filepaths: file paths to be used for training
    testing_filepaths:  file paths to be used for testing
    """
    training_filepaths = {}
    testing_filepaths  = {}
    folders = os.listdir(mainfolder)
    for folder in folders:
        fpath = mainfolder + "/" + folder
        if os.path.isdir(fpath) and "MODEL" not in folder:
            filenames = os.listdir(fpath)
            for filename in filenames[:int(round(0.8*len(filenames)))]:
                fullpath = fpath + "/" + filename
                training_filepaths[fullpath] = folder
            for filename1 in filenames[int(round(0.8*len(filenames))):]:
                fullpath1 = fpath + "/" + filename1
                testing_filepaths[fullpath1] = folder
    return training_filepaths, testing_filepaths

def get_labels(mainfolder):
    """ Creates a dictionary of labels for each unique type of motion """
    labels = {}
    label = 0
    for folder in os.listdir(mainfolder):
        fpath = mainfolder + "/" + folder
        if os.path.isdir(fpath) and "MODEL" not in folder:
            labels[folder] = label
            label += 1
    return labels

def get_data(fp, labels, folders, norm, std, center, med):
    """
    Creates a dataframe for the data in the filepath and creates a one-hot
    encoding of the file's label
    """
    data = pd.read_csv(filepath_or_buffer=fp, sep=' ', names = ["X", "Y", "Z"])
    if norm and not std:
        normed_data = norm_data(data)
    elif std and not norm:
        stdized_data = std_data(data)
    elif center and not norm and not std:
        cent_data = subtract_mean(data)
    elif med:
        normed_data = data
        normed_data['X'] = medfilt(data['X'], kernel_size=3)
        normed_data['Y'] = medfilt(data['Y'], kernel_size=3)
        normed_data['Z'] = medfilt(data['Z'], kernel_size=3)
    else:
        normed_data = data

    one_hot = np.zeros(14)
    file_dir = folders[fp]
    label = labels[file_dir]
    one_hot[label] = 1
    return normed_data, one_hot, label

# Normalizes the data by removing the mean

def subtract_mean(input_data):
    # Subtract the mean along each column
    centered_data = input_data - input_data.mean()
    return centered_data


def norm_data(data):
    """
    Normalizes the data.
    For normalizing each entry, y = (x - min)/(max - min)
    """
    c_data = subtract_mean(data)
    mms = MinMaxScaler()
    mms.fit(c_data)
    n_data = mms.transform(c_data)
    return n_data

def standardize(data):
    c_data = subtract_mean(data)
    std_data = c_data/ pd.std(c_data)
    return std_data

def vectorize(normed):
    """
    Uses a sliding window to create a list of (randomly-ordered) 300-timestep
    sublists for each feature.
    """
    sequences = [normed[i:i+96] for i in range(len(normed)-96)]
    shuffle(sequences)
    sequences = np.array(sequences)
    return sequences

def build_inputs(files_list, accel_labels, file_label_dict, norm_bool, std_bool, center_bool, med_bool):
    X_seq    = []
    y_seq    = []
    labels = []
    for path in files_list:
        normed_data, target, target_label = get_data(path, accel_labels, file_label_dict, norm_bool, std_bool, center_bool, med_bool)
        input_list = vectorize(normed_data)
        for inputs in range(len(input_list)):
            X_seq.append(input_list[inputs])
            y_seq.append(list(target))
            labels.append(target_label)
    X_ = np.array(X_seq)
    y_ = np.array(y_seq)
    return X_, y_, labels

In [4]:
mainpath = "/content/drive/MyDrive/HMP_Dataset"

activity_labels                  = get_labels(mainpath)
training_dict, testing_dict      = get_filepaths(mainpath)
training_files                   = list(training_dict.keys())
testing_files                    = list(testing_dict.keys())

print(activity_labels)

# build training inputs and labels
X_train, y_train, train_labels = build_inputs(
    training_files,
    activity_labels,
    training_dict,
    False, False, False, True)
# build tesing inputs and labels
X_test, y_test, test_labels    = build_inputs(
    training_files,
    activity_labels,
    training_dict,
    False, False, False, True)

# alternate train and test datasets without filtering
X_train_raw, y_train_raw, train_labels_raw = build_inputs(
    training_files,
    activity_labels,
    training_dict,
    False, False, False, False)

X_test_raw, y_test_raw, test_labels_raw    = build_inputs(
    training_files,
    activity_labels,
    training_dict,
    False, False, False, False)


{'Use_telephone': 0, 'Standup_chair': 1, 'Liedown_bed': 2, 'Sitdown_chair': 3, 'Pour_water': 4, 'Walk': 5, 'Drink_glass': 6, 'Descend_stairs': 7, 'Climb_stairs': 8, 'Eat_meat': 9, 'Getup_bed': 10, 'Eat_soup': 11, 'Comb_hair': 12, 'Brush_teeth': 13}


In [5]:
# Randomize the train and test data
train_indices = np.random.permutation(len(X_train))
X_train = X_train[train_indices]
y_train = y_train[train_indices]
train_labels = np.array(train_labels)[train_indices].tolist()

test_indices = np.random.permutation(len(X_test))
X_test = X_test[test_indices]
y_test = y_test[test_indices]
test_labels = np.array(test_labels)[test_indices].tolist()

train_indices_raw = np.random.permutation(len(X_train_raw))
X_train_raw = X_train_raw[train_indices_raw]
y_train_raw = y_train_raw[train_indices_raw]
train_labels_raw = np.array(train_labels_raw)[train_indices_raw].tolist()

test_indices_raw = np.random.permutation(len(X_test_raw))
X_test_raw = X_test_raw[test_indices_raw]
y_test_raw = y_test_raw[test_indices_raw]
test_labels_raw = np.array(test_labels_raw)[test_indices_raw].tolist()

In [6]:
# Take one part of the training data so the Colab GPU ram doesn't overflow
X_train.shape
X_test.shape

X_train = X_train[:50000, :, :]
X_test = X_test[:50000, :, :]

y_train = y_train[:50000]
y_test = y_test[:50000]

test_labels = test_labels[:50000]

X_train_raw = X_train_raw[:50000, :, :]
X_test_raw = X_test_raw[:50000, :, :]

y_train_raw = y_train_raw[:50000]
y_test_raw = y_test_raw[:50000]

test_labels_raw = test_labels_raw[:50000]

In [7]:
X_train_flat = X_train.reshape(X_train.shape[0], -1)
y_train_flat = y_train.reshape(y_train.shape[0], -1)
X_test_flat = X_test.reshape(X_test.shape[0], -1)
y_test_flat = y_test.reshape(y_test.shape[0], -1)

# Network Definition and Training
In this section, the simplest neural network described in the paper is constructed and trained using the dataset processed in the previous part. This network takes in a moving window of 3 seconds. There are 32 samples per second, and every second of samples contains 32 points for each of the 3 axes, or 96 points per second. So, the input layer takes in a flattened array of 96 x 3 = 288 integers.

In [8]:
# Instantiate the simplest network described in the paper
with tf.device('/device:GPU:0'):
  model_alt = Sequential()
  model_alt.add(Input(shape=288,dtype=tf.int8, batch_size=1))
  model_alt.add(Dense(units=512, activation='relu'))
  model_alt.add(Dense(units=258, activation='relu'))
  model_alt.add(Dense(units=128, activation='relu'))
  model_alt.add(Dense(units=y_train.shape[1], activation='softmax'))
  optimizer = tf.keras.optimizers.experimental.Adagrad(learning_rate=0.01)
  model_alt.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])

In [9]:
model_alt.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense (Dense)               (1, 512)                  147968    
                                                                 
 dense_1 (Dense)             (1, 258)                  132354    
                                                                 
 dense_2 (Dense)             (1, 128)                  33152     
                                                                 
 dense_3 (Dense)             (1, 14)                   1806      
                                                                 
Total params: 315,280
Trainable params: 315,280
Non-trainable params: 0
_________________________________________________________________


The network plateaus at around 77% validation accuracy. It looks very promising, but real life performance is closer to 60%. This 60% was described in the research paper as the accuracy performance.

In [10]:
with tf.device('/device:GPU:0'):
  model_alt.fit(X_train_flat, y_train_flat, epochs = 30, validation_split = 0.2, batch_size = 32, verbose = 1)

Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30


# TensorFlow to TensorFlow Lite Conversion

In this section, the model is converted from a TensorFlow model to a TFLite model that is compatible with the X-CUBE-AI toolchain. It is later written into a .tflite file that can be used to transfer the trained model onto the microcontroller.

In [14]:
# Convert the simplest model (model_alt).
from tensorflow.python.ops.numpy_ops import np_config

np_config.enable_numpy_behavior()
converter = tf.lite.TFLiteConverter.from_keras_model(model_alt)
tflite_model_quant = converter.convert()

In [15]:
interpreter = tf.lite.Interpreter(model_content=tflite_model_quant)
input_type = interpreter.get_input_details()[0]['dtype']
print('input: ', input_type)
output_type = interpreter.get_output_details()[0]['dtype']
print('output: ', output_type)

input:  <class 'numpy.int8'>
output:  <class 'numpy.float32'>


In [16]:
open('mod.tflite', 'wb').write(tflite_model_quant)

1263672

# Converted Model Tests

In this section, a TFLite interpreter was instantiated so that the converted model could be loaded. Then, the same validation set was given to the converted data in order to see if the conversion was successful.

In [17]:
# Helper function to run inference on a TFLite model
def run_tflite_model(tflite_file, test_image_indices):
  global test_images

  # Initialize the interpreter
  interpreter = tf.lite.Interpreter(model_path=str(tflite_file))
  interpreter.allocate_tensors()

  input_details = interpreter.get_input_details()[0]
  output_details = interpreter.get_output_details()[0]

  predictions = np.zeros((len(test_image_indices),), dtype=np.int8)
  for i, test_image_index in enumerate(test_image_indices):
    test_image = X_test_flat[test_image_index]
    test_label = y_test_flat[test_image_index]

    # Check if the input type is quantized, then rescale input data to uint8

    test_image = np.expand_dims(test_image, axis=0).astype(input_details["dtype"])
    interpreter.set_tensor(input_details["index"], test_image)
    interpreter.invoke()
    output = interpreter.get_tensor(output_details["index"])[-1]

    predictions[i] = output.argmax()

  return predictions


In [21]:
# Helper function to evaluate a TFLite model on all images
def evaluate_model(tflite_file):
  global test_images
  global test_labels

  test_image_indices = range(X_test_flat.shape[0])
  predictions = run_tflite_model(tflite_file, test_image_indices)

  accuracy = (np.sum(test_labels== predictions) * 100) / len(X_test)

  print('Model accuracy is %.4f%% (Number of test samples=%d)' % (accuracy, len(X_test)))

In [22]:
evaluate_model('mod.tflite')

Model accuracy is 78.3280% (Number of test samples=50000)


It looks like the conversion was successful, and the model will fit on the on-chip flash memory.