Creating the environment

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sn
import pickle
from tensorflow.keras.utils import to_categorical
from keras.models import Model
from keras.layers import Input, Dense, LSTM, multiply, concatenate, Activation, Masking, Reshape
from keras.layers import Conv1D, BatchNormalization, GlobalAveragePooling1D, Permute, Dropout
from collections import Counter
from imblearn.over_sampling import SMOTE
from sklearn.preprocessing import LabelEncoder, StandardScaler

from keras.models import Sequential
from keras.layers import Dense
from keras.layers import LSTM
from keras.layers import Bidirectional
from keras.layers import Dropout
from keras.layers import Flatten
import tensorflow as tf
from tensorflow.keras import layers
from sklearn.metrics import roc_auc_score, classification_report, roc_curve, confusion_matrix, accuracy_score, precision_score, recall_score, f1_score
#from bayes_opt import BayesianOptimization
pd.set_option('display.max_rows', None)
# fix random seed for reproducibility
np.random.seed(7)
tf.compat.v1.random.set_random_seed(1234)

Building the functions

In [None]:
SMOTE = SMOTE()
# fold-accuracy plot
def plot_acc(num_folds, acc_per_fold):
    plt.figure(num=None, figsize=(8, 6), dpi=80, facecolor='w', edgecolor='k')
    plt.style.use('ggplot')
    folds = range(1, len(acc_per_fold) + 1)
    plt.plot(folds, acc_per_fold, '-r^', label='Validation acc')
    plt.xticks(np.arange(1, num_folds + 1, 1))
    plt.title('Validation accuracy per fold')
    plt.xlabel('Fold'), plt.ylabel('Accuracy'), plt.legend(loc='best')
    plt.show()

def earlystopping(min_delta, patience):
  es_cb = tf.keras.callbacks.EarlyStopping(monitor='val_loss',
                                    min_delta=min_delta,
                                    patience=patience,
                                    restore_best_weights=True)
  return es_cb
def generate_generalization_metrics(fold_no, model_name, model_scores):
  print('=' * 50)
  print(f'> Score for fold {fold_no}:\n'
        f'loss: {model_scores[0]:.2f}, '
        f'accuracy: {model_scores[1]:.2f}\n')
  loss_per_fold.append(model_scores[0])
  acc_per_fold.append(model_scores[1])

  print('>>> Summmary of scores per fold: <<<')
  for i in range(0, len(acc_per_fold)):
       print(f'Fold {i+1} - Loss: {loss_per_fold[i]:.2f}, Accuracy: {acc_per_fold[i]:.2f}')
  print('Average scores for all folds:\n'
          f'Accuracy: {np.mean(acc_per_fold):.2f} (+- {np.std(acc_per_fold):.2f})\n'
          f'Loss: {np.mean(loss_per_fold):.2f}\n'
          f'{"=" * 50}\n')


In [None]:
num_steps = 100
def lstm_data_transform(x_data, y_data, num_steps= num_steps):
    """ Changes data to the format for LSTM training
for sliding window approach """
    # Prepare the list for the transformed data
    X, y = list(), list()
    # Loop of the entire data set
    for i in range(x_data.shape[0]):
        # compute a new (sliding window) index
        end_ix = i + num_steps
        # if index is larger than the size of the dataset, we stop
        if end_ix >= x_data.shape[0]:
            break
        # Get a sequence of data for x
        seq_X = x_data[i:end_ix]
        # Get only the last element of the sequency for y
        seq_y = y_data[end_ix]
        # Append the list with sequencies
        X.append(seq_X)
        y.append(seq_y)
    # Make final arrays
    x_array = np.array(X)
    y_array = np.array(y)
    return x_array, y_array

Reading the data

In [None]:
data = pd.read_csv('combined1.csv')
data['Subject'].unique()

In [None]:
data['Subject'].value_counts()

Visualization and correlation analysis

In [None]:
import seaborn as sns

f, ax = plt.subplots(figsize=(10, 8))
corr = data[['X', 'Y', 'Z']].corr()
sns.heatmap(corr, annot=True,
    cmap='Blues',
    vmin=-1.0, vmax=1.0,
    square=True, ax=ax)

In [None]:
data[['X', 'Y', 'Z']].iloc[50000:50200].plot(figsize=(10, 5))

#plt.legend(fontsize="large")
plt.xlabel("Time (1/60 S)")
plt.ylabel("Acceleration");

Data splitting

In [None]:
np.random.seed(7)
tf.compat.v1.random.set_random_seed(1234)
dataset_train = data[data['Subject'].isin(['C16','C17','C19','C20','C21','C22','C23','C25','C31','C33',
                                           'C34','C35','C36', 'C50','C57',
                                           'P527','P436','P440','P445','P513','P507','P432','P441','P488','P433'
                                           ,'P484','P483','P482','P528','P466','P487'
                                           ])]
dataset_valid = data[data['Subject'].isin(['C37','C39','C40','C41','P469','P462','P444','P460'])]
dataset_test = data[data['Subject'].isin(['C11','C49','C53','C48','C38','C43','P459','P458','P457'
,'P450','P473','P477'
,'P503'])]

In [None]:
print(dataset_train.Subject.unique())
print(dataset_valid.Subject.unique())
print(dataset_test.Subject.unique())

In [None]:
dataset_train.Subject.isin(dataset_test.Subject).unique()

In [None]:
x_train1  = pd.DataFrame(dataset_train.iloc[:, [1,2]].values)
y_train1 = pd.DataFrame(dataset_train.iloc[:, 3:4].values)

x_valid1  = pd.DataFrame(dataset_valid.iloc[:, [1,2]].values)
y_valid1 = pd.DataFrame(dataset_valid.iloc[:, 3:4].values)

x_test1  = pd.DataFrame(dataset_test.iloc[:, [1,2]].values)
y_test1 = pd.DataFrame(dataset_test.iloc[:, 3:4].values)
print('x_train1.shape',x_train1.shape)
print('x_valid1.shape',x_valid1.shape)
print('x_test1.shape',x_test1.shape)
print('y_train1.shape',y_train1.shape)
print('y_valid1.shape',y_valid1.shape)
print('y_test1.shape',y_test1.shape)

Building the final model using the hyperparameters obtained from tuning

In [None]:
le = LabelEncoder()
y_train1 = le.fit_transform(y_train1)
y_valid1 = le.transform(y_valid1)
y_test1 = le.transform(y_test1)

scaler_x = StandardScaler()

x_train1_sc = scaler_x.fit_transform(x_train1)
x_valid1_sc = scaler_x.transform(x_valid1)
x_test1_sc = scaler_x.transform(x_test1)

# to keep constant notation
y_train1_sc = y_train1
y_valid1_sc = y_valid1
y_test1_sc = y_test1


x_train1_sc = np.array(x_train1_sc)
x_valid1_sc = np.array(x_valid1_sc)
x_test1_sc = np.array(x_test1_sc)

y_train1_sc = np.array(y_train1_sc)
y_valid1_sc = np.array(y_valid1_sc)
y_test1_sc = np.array(y_test1_sc)

num_steps = 100
# training set
(x_train1_transformed,
 y_train1_transformed) = lstm_data_transform(x_train1_sc, y_train1_sc, num_steps=num_steps)
assert x_train1_transformed.shape[0] == y_train1_transformed.shape[0]
# validation set
(x_valid1_transformed,
 y_valid1_transformed) = lstm_data_transform(x_valid1_sc, y_valid1_sc, num_steps=num_steps)
assert x_valid1_transformed.shape[0] == y_valid1_transformed.shape[0]
# test set
(x_test1_transformed,
 y_test1_transformed) = lstm_data_transform(x_test1_sc, y_test1_sc, num_steps=num_steps)
assert x_test1_transformed.shape[0] == y_test1_transformed.shape[0]


shuffler = np.random.permutation(len(x_train1_transformed))
x_train1_transformed = x_train1_transformed[shuffler]
y_train1_transformed = y_train1_transformed[shuffler]


num_steps = 100
num_features = 2
max_epochs = 1
patience = 7
min_delta = 1e-5
batch_size = 4
Beta1 = 0.9
Beta2 = 0.999
Epsilon = 10^-8
optimizer = 'adam'


model_1 = Sequential()
model_1.add(LSTM(4, activation='tanh', input_shape=(num_steps, num_features), return_sequences=True))
model_1.add(Dropout(0.2))
#model_1.add(LSTM(units = 16, return_sequences = True))
#model_1.add(Dropout(0.5))
#model_1.add(LSTM(units = 16, return_sequences = True))
#model_1.add(Dropout(0.5))
model_1.add(LSTM(units = 4, return_sequences = False))
model_1.add(Dense(1, activation='sigmoid'))



# compile
model_1.compile(loss=tf.keras.losses.BinaryCrossentropy(from_logits=False),
                  optimizer=optimizer,
                  metrics=['accuracy'])


# train
model_1_history = model_1.fit(x_train1_transformed, y_train1_transformed,
                        epochs=max_epochs,
                        validation_data=(x_valid1_transformed, y_valid1_transformed),
                        callbacks=[earlystopping(min_delta, patience)],
                        batch_size=batch_size)
# evaluate
model_1_scores = model_1.evaluate(x_test1_transformed, y_test1_transformed, verbose=0)

# predict
test_predict = model_1.predict(x_test1_transformed)
y_pred = (test_predict > 0.5)

# Confusion Matrix
from sklearn.metrics import confusion_matrix, accuracy_score
cm = confusion_matrix(y_test1_transformed, y_pred)
print(cm)
accuracy_score(y_test1_transformed, y_pred)

df_cm = pd.DataFrame(cm, range(2), range(2))
sn.set(font_scale=1.4) # for label size
sn.heatmap(df_cm, annot=True, annot_kws={"size": 16}) # font size
plt.title("Accuracy: %.2f%%" % (accuracy_score(y_test1_transformed, y_pred)*100))
plt.xlabel("Predictions")
plt.ylabel("Actual")
plt.show()

model_1_acc = model_1_scores[1]
print(model_1_acc)

In [None]:
s = pd.DataFrame(dataset_test['Subject']).iloc[100: , :].reset_index(drop=True)

In [None]:
final = pd.DataFrame(np.concatenate((y_pred.reshape(len(y_pred),1), y_test1_transformed.reshape(len(y_test1_transformed),1)),1))
final = pd.concat([final, s], axis=1)
final.columns = ['Prediction','Actual','Subject']
final.shape

In [None]:
final = final.groupby('Subject', as_index=False).mean()
final['Prediction'] = (final.Prediction > 0.5)

In [None]:
final['Prediction'] = final['Prediction'].astype(int)
final

In [None]:
from keras.backend import repeat
final = pd.DataFrame(np.concatenate((y_pred.reshape(len(y_pred),1), y_test1_transformed.reshape(len(y_test1_transformed),1)),1))
final.columns = ['predict','test']
nrow = 3600
final['pred'] = final.groupby(final.index // nrow)['predict'].transform('mean')
final['prediction_result'] = (final.pred > 0.5)
final['prediction'] = final['prediction_result'].astype(int)
final = final.iloc[::3600,[1,4]]
yt = final.iloc[:,[0]]
yp = final.iloc[:,[1]]
cm1 = confusion_matrix(yt, yp)
print(cm1)
accuracy_score(yt, yp)

df_cm1 = pd.DataFrame(cm1, range(2), range(2))
sn.set(font_scale=1.4) # for label size
sn.heatmap(df_cm1, annot=True, annot_kws={"size": 16}) # font size
plt.title("Accuracy: %.2f%%" % (accuracy_score(yt, yp)*100))
plt.xlabel("Predictions")
plt.ylabel("Actual")
plt.show()

Testing the model on ActiGraph Acceleration

In [None]:
data = pd.read_csv('combined2.csv')

dataset_test = data[data['Subject'].isin(['s1007','s1003','s2015','s2021','s2012','s2038','s2014'])]

x_test1  = dataset_test.iloc[:, [0,2]].values
y_test1 = dataset_test.iloc[:, 6:7].values

In [None]:
y_test1 = le.transform(y_test1)

x_test1_sc = scaler_x.transform(x_test1)

y_test1_sc = y_test1

x_test1_sc = np.array(x_test1_sc)

y_test1_sc = np.array(y_test1_sc)

In [None]:
(x_test1_transformed,
 y_test1_transformed) = lstm_data_transform(x_test1_sc, y_test1_sc, num_steps=num_steps)
assert x_test1_transformed.shape[0] == y_test1_transformed.shape[0]

In [None]:
test_predict = model_1.predict(x_test1_transformed)
y_pred = (test_predict > 0.5)

In [None]:
cm = confusion_matrix(y_test1_transformed, y_pred)
print(cm)
accuracy_score(y_test1_transformed, y_pred)

df_cm = pd.DataFrame(cm, range(2), range(2))
sn.set(font_scale=1.4) # for label size
sn.heatmap(df_cm, annot=True, annot_kws={"size": 16}) # font size
plt.title("Accuracy: %.2f%%" % (accuracy_score(y_test1_transformed, y_pred)*100))
plt.xlabel("Predictions")
plt.ylabel("Actual")
plt.show()

In [None]:
final = pd.DataFrame(np.concatenate((y_pred.reshape(len(y_pred),1), y_test1_transformed.reshape(len(y_test1_transformed),1)),1))
final.columns = ['Predict','Actual']

final = final.reset_index()

sub = pd.DataFrame(dataset_test.iloc[100:,5:]).reset_index()

final1 = pd.concat([final, sub], axis=1)

final1['pred'] = final1.groupby('Subject')['Predict'].transform('mean')
final1['prediction_result'] = (final1.pred > 0.5)
final1['Prediction'] = final1['prediction_result'].astype(int)

In [None]:
final1 = final1[['Subject', 'Prediction', 'Actual']]
final1 = final1.groupby(['Subject'], as_index = False).mean()

In [None]:
yt = final1['Actual']
yp = final1['Prediction']
cm1 = confusion_matrix(yt, yp)
print(cm1)
accuracy_score(yt, yp)

df_cm1 = pd.DataFrame(cm1, range(2), range(2))
sn.set(font_scale=1.4) # for label size
sn.heatmap(df_cm1, annot=True, annot_kws={"size": 16}) # font size
plt.title("Accuracy: %.2f%%" % (accuracy_score(yt, yp)*100))
plt.xlabel("Predictions")
plt.ylabel("Actual")
plt.show()

In [None]:
print(classification_report(yt, yp))
print("Precision:{}".format(precision_score(yp,yt)))
print("Recall:{}".format(recall_score(yp,yt)))
print("F1 Score:{}".format((f1_score(yp,yt))))