In [None]:
import os
import pandas as pd
import numpy as np
import itertools
import json
from google.cloud import storage
# Matplot libraries
from matplotlib import pyplot as plt
# SkLearn libraries
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.model_selection import ShuffleSplit, GroupShuffleSplit, LeavePGroupsOut, cross_val_score
from sklearn.metrics import classification_report, confusion_matrix
# Tensorflow libraries
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import *
from tensorflow.keras.models import Sequential
from tensorflow.keras.callbacks import EarlyStopping

In [None]:
def read_csv(features, dirPath):
        
    for file in os.listdir(dirPath):
        if features in file:
            df = pd.read_csv(file, delimiter=',', encoding = 'utf-8')
    return df

In [None]:
# Change root directory
dirPath = r'C:\Users\Andreas\Desktop\Thesis\Data'
os.chdir(dirPath)
df = read_csv('raw_stat', dirPath)

In [None]:
def prepare_df(sensors_to_drop, df):

    # Drop sensor data
    for sensor in sensors_to_drop:
        df = df.drop(df.columns[df.columns.str.contains(sensor)], axis=1)

    return df

In [None]:
def split_3Ddataset(exercise,df):
    model = df.loc[df.Exercise == exercise]

    # Groups for cross validation
    groups = model['Subject'].astype('category')

    # Drop unnecesary columns
    model = model.drop(columns=['Exercise'])
    # Features and labels
    y = model['label'].astype('category')
    y.reset_index()
    X = model.drop(columns=['label'])
    
    # Split dataset    
    gss = GroupShuffleSplit(n_splits=2, test_size=0.05)

    for new_index,test_index in gss.split(X, y, groups):
            X_new, X_test = X.iloc[new_index,:], X.iloc[test_index,:]
            y_new, y_test = y.iloc[new_index], y.iloc[test_index]

            groups_new = X_new['Subject'].astype('category')

    for train_index,val_index in gss.split(X_new, y_new, groups_new):
        X_train, X_val = X_new.iloc[train_index,:], X_new.iloc[val_index,:]
        y_train, y_val = y_new.iloc[train_index], y_new.iloc[val_index]

    X_train_scaled = []
    X_test_scaled = []
    X_val_scaled = []

    for data, scaled in zip([X_train, X_test, X_val], [X_train_scaled, X_test_scaled, X_val_scaled]):
        data = data.drop(columns=['Subject'])
        # convert each column to a 2D numpy array
        max_len = []
        for col in data.columns:
            max_len.append(max(len(replist) for replist in data[col]))

        arrs = []
        for col in data.columns:
            arr = np.zeros((data.shape[0], 3500))
            for i, replist in enumerate(data[col]):
                replist = json.loads(replist)
                replist = [float(x) for x in replist]
                arr[i, :len(replist)] = replist
                mean = arr.mean()
                stdev = arr.std()
                normalized = [(x-mean)/stdev for x in arr]
            arrs.append(normalized)

        # stack the 2D arrays along a new axis to create a 3D numpy array
        data = np.stack(arrs, axis=2)

        # apply MinMaxScaler() to each sample individually
        scaler = MinMaxScaler()
        for i in range(data.shape[0]):
            scaled_sample = scaler.fit_transform(data[i])
            scaled.append(scaled_sample)

    # convert the scaled lists to numpy arrays
    X_train = np.array(X_train_scaled)
    X_test = np.array(X_test_scaled)
    X_val = np.array(X_val_scaled)

    Y_train = pd.get_dummies(y_train)
    Y_val = pd.get_dummies(y_val)
    Y_test = pd.get_dummies(y_test)

    return X_train, X_val, X_test, Y_train, Y_val, Y_test
    

In [None]:
def split_stats_dataset(exercise,df):
    dataset = df.loc[df.Exercise == exercise]
    dataset = dataset.reset_index(drop = True)

    # Rename labels to ints
    events = dataset.label.unique()
    dataset.replace(inplace = True,to_replace=['Correct repetition'], value=0)
    dataset.replace(inplace = True,to_replace=['Knees caving inwards','Feet on the same line','Insufficient ROM'], value=1)
    dataset.replace(inplace = True,to_replace=['Back arching','Stride too short'], value=4)
    dataset.replace(inplace = True,to_replace=['Back rounding','Feet too close','Leaning forward'], value=3)
    dataset.replace(inplace = True,to_replace=[ 'Hips rising before torso','Feet too far','Knee touching the ground'], value=2)

    # Groups for cross validation
    groups = dataset['Subject'].astype('category')

    # Features and labels
    y = dataset['label']
     # Drop unnecesary columns
    X = dataset.drop(columns=['Exercise','label'])
    
    # Split dataset    
    gss = GroupShuffleSplit(n_splits=1, test_size=0.2)

    for new_index,test_index in gss.split(X, y,groups):
            X_new, X_test = X.iloc[new_index,:], X.iloc[test_index,:]
            y_new, y_test = y.iloc[new_index], y.iloc[test_index]

    X_new = X_new.reset_index(drop = True)
    y_new = y_new.reset_index(drop = True)
    groups_new = X_new['Subject'].astype('category')

    for train_index,val_index in gss.split(X_new, y_new,groups_new):
        X_train, X_val = X_new.iloc[train_index,:], X_new.iloc[val_index,:]
        y_train, y_val = y_new.iloc[train_index], y_new.iloc[val_index]

    X_train = X_train.drop(columns=['Subject'])
    X_val = X_val.drop(columns=['Subject'])
    X_test = X_test.drop(columns=['Subject'])

    # scaler = MinMaxScaler()

    # X_train_scaled = pd.DataFrame(scaler.fit_transform(X_train)[:,:])
    # X_val_scaled = pd.DataFrame(scaler.fit_transform(X_val)[:,:])
    # X_test_scaled = pd.DataFrame(scaler.fit_transform(X_test)[:,:])

    # Y_train = pd.get_dummies(y_train)
    # Y_val = pd.get_dummies(y_val)
    # Y_test = pd.get_dummies(y_test)

    y_train = y_train.values
    X_train = X_train.values
    y_train = np.array(y_train)
    #
    y_val = y_val.values
    X_val = X_val.values
    y_val = np.array(y_val)
    #
    y_test = y_test.values
    X_test = X_test.values
    y_test = np.array(y_test)
    
    return X_train, X_val, X_test, y_train, y_val, y_test, events


In [None]:
def CNN(X_train, X_val, X_test, Y_train, Y_val, Y_test):

    class_names = Y_test.columns

    input_shape = (X_train.shape[1],X_train.shape[2])
    output_size = len(Y_train.columns)

    cnn = Sequential()
    cnn.add(Conv1D(filters=20, kernel_size=10, activation='relu', input_shape=input_shape))
    cnn.add(Dropout(0.05))    
    cnn.add(Dense(30, activation='relu'))
    cnn.add(Flatten())
    cnn.add(Dense(output_size, activation='softmax'))
    cnn.summary()

    # Add Early Stopping
    checkpoint_path = 'best_cnn'
    checkpoint = keras.callbacks.ModelCheckpoint(checkpoint_path, monitor='val_accuracy',
                                verbose=1, save_best_only=True, save_weights_only=False, mode='auto')
    
    early_stopping_callback = EarlyStopping(monitor = 'val_accuracy', mode = 'max', patience = 20)
    # Compile Model
    cnn.compile(loss = 'categorical_crossentropy', optimizer = 'Adam', metrics =  ['accuracy'])
    # Train Model
    cnn_history = cnn.fit(x = X_train, y = Y_train, verbose=0,
                        epochs = 200, batch_size = 10,
                        validation_data=(X_val, Y_val),
                        callbacks = [checkpoint, early_stopping_callback])

    print('Learning rate plots: ')
    plot_learning(cnn_history)

    print('Test evaluation metrics: ')
    loss, accuracy = cnn.evaluate(X_test, Y_test)
    print(loss, accuracy)

    Y_pred = cnn.predict(X_test)
    # Convert the prediction result to class name
    Y_pred_label = [class_names[i] for i in np.argmax(Y_pred, axis=1)]
    Y_true_label = [class_names[i] for i in np.argmax(np.array(Y_test), axis=1)]
    # Plot the confusion matrix
    cm = confusion_matrix(np.argmax(np.array(Y_test), axis=1), np.argmax(Y_pred, axis=1))
    plot_confusion_matrix(cm, class_names)
    # Print the classification report
    print('\nClassification Report:\n', classification_report(Y_true_label, Y_pred_label))

In [None]:
def NN(exercise, df):

  accuracy = 0
  while accuracy<0.4:
    X_train, X_val, X_test, y_train, y_val, y_test, events = split_stats_dataset(exercise,df)

    n_o_classes = 5

    model = tf.keras.Sequential([
        tf.keras.layers.Dropout(0.05),
        tf.keras.layers.Dense(30, activation='relu'),
        tf.keras.layers.Dense(n_o_classes, activation='softmax')
    ])

    # Add Early Stopping
    checkpoint_path = 'best_model'
    checkpoint = keras.callbacks.ModelCheckpoint(checkpoint_path, monitor='val_accuracy',
                                verbose=0, save_best_only=True, save_weights_only=False)

    early_stopping_callback = EarlyStopping(monitor = 'val_accuracy', patience = 20,verbose=0)
    # Compile Model
    model.compile(loss = 'sparse_categorical_crossentropy', optimizer = 'Adam', metrics = ['accuracy'])
    # Train Model
    model_history = model.fit(x = X_train, y = y_train,verbose=0,
                        epochs = 100, batch_size = 10,
                        validation_data=(X_val, y_val),
                        callbacks = [checkpoint, early_stopping_callback])
  
    print('Test evaluation metrics: ')
    _, accuracy = model.evaluate(X_test, y_test)

  numarray = []
  y_pred = model.predict(X_test)
  val = (y_pred.size) / n_o_classes
  val = int(val)
  for i in range(int(val)):
      numarray.append(np.argmax(y_pred[i]))

  cn = confusion_matrix(y_test, numarray)
  print(events)
  print(cn)

  MODEL_NAME = 'best_model'
  converter = tf.lite.TFLiteConverter.from_saved_model(MODEL_NAME)

  converter.target_spec.supported_ops = [
    tf.lite.OpsSet.TFLITE_BUILTINS, # enable TensorFlow Lite ops.
    tf.lite.OpsSet.SELECT_TF_OPS # enable TensorFlow ops.
  ]
  converter.allow_custom_ops=True
  converter.experimental_new_converter =True
  tflite_model = converter.convert()
  open("model.tflite", "wb").write(tflite_model)  

  # storage_client = storage.Client()
  # bucket = storage_client.get_bucket('prevent-deloza')
  # blob = bucket.blob('models/model.tflite')
  # blob.upload_from_filename('model.tflite')

In [None]:
def plot_confusion_matrix(cm, classes):
    
    plt.figure(figsize = (7,7))
    cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
    plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
    plt.title('Confusion matrix')
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=55)
    plt.yticks(tick_marks, classes)
    
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], '.2f'),
              horizontalalignment="center",
              color="black")

    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.show()

In [None]:
def plot_learning(history):
    
    fig, axs = plt.subplots(1, 2, figsize=(20, 5))
    axs[0].plot(history.history['loss'], 'blue', label = 'Train')
    axs[0].plot(history.history['val_loss'], 'red', label = 'Validation')
    axs[0].set_title('Model Loss')
    axs[0].set_ylabel('loss')
    axs[0].set_xlabel('epoch')
    axs[0].legend(loc='upper right')

    axs[1].plot(history.history['accuracy'], 'blue', label = 'Train')
    axs[1].plot(history.history['val_accuracy'], 'red', label = 'Validation')
    axs[1].set_title('Model Accuracy')
    axs[1].set_ylabel('accuracy')
    axs[1].set_xlabel('epoch')
    axs[1].legend(loc='upper right')

In [None]:
# df = read_csv('resampled_features')
# df = prepare_df(['Lumbar','Upper','RightLower'],df)
# X_train, X_val, X_test, Y_train, Y_val, Y_test = split_3Ddataset('Lunges',df)
# CNN(X_train, X_val, X_test, Y_train, Y_val, Y_test)

In [None]:
# Change root directory
dirPath = r'C:\Users\Andreas\Desktop\Thesis\Data\best'
os.chdir(dirPath)
inp = prepare_df(['Upper','Lower'],df)
NN('Squats',inp)