In [1]:
import numpy as np
import pandas as pd
import scipy as sp
import scipy.signal
import matplotlib.pyplot as plt
import os
import sklearn
import mne
import tensorflow as tf

In [2]:
# Import data
directory = 'Data'
participants = ['Aharon', 'Conor', 'John']
directory_2 = 'One\Thalmic Labs MyoMyo'
frames = []
for participant in participants:
    for count, filename in enumerate(os.listdir(os.path.join(directory, participant, directory_2))):
        df = pd.read_csv(os.path.join(directory, participant, directory_2, filename))
        df.drop(['Timestamp','Device ID','Warm?','Sync','Arm','Timestamp.1','Pose','Locked','RSSI','Device_Time'], axis=1, inplace=True)
        
        df.drop(['Orientation_W'], axis=1, inplace=True)
        
        df['Participant'] = [participant] * df.shape[0]
        filename = filename.split('_')
        if len(filename) == 3:
            df['Class'] = [filename[0]] * df.shape[0]
            df['Trial'] = [filename[1]] * df.shape[0]
        else:
            df['Class'] = [filename[0] + ' ' + filename[1]] * df.shape[0]
            df['Trial'] = [filename[2]] * df.shape[0]
        frames.append(df)
# data = pd.concat(frames)

In [3]:
# Preprocess EMU
def filteremg(emg, notch=60, quality=60, sfreq=200, high_band=20, low_band=95):
    """
    emg: EMG data
    high: high-pass cut off frequency
    low: low-pass cut off frequency
    sfreq: sampling frequency
    """
    # Zero mean emg signal
    emg = emg - emg.mean()
    
    # normalise cut-off frequencies to sampling frequency
    high_band = high_band/(sfreq/2)
    low_band = low_band/(sfreq/2)
    
    
    # create bandpass filter for EMG
    b1, a1 = sp.signal.butter(4, [high_band,low_band], btype='bandpass', analog=True)
    
    # process EMG signal: filter EMG
    emg_filtered = sp.signal.filtfilt(b1, a1, emg)    
    
    # process EMG signal: rectify
    emg_rectified = abs(emg_filtered)
    
    # create lowpass filter and apply to rectified signal to get EMG envelope
    notch = notch/(sfreq)
    b2, a2 = sp.signal.iirnotch(notch, quality, fs=sfreq)
    emg_envelope = sp.signal.lfilter(b2, a2, emg_rectified)
    
    return emg_envelope

emg_keys = ['EMG_' + str(i) for i in range(1, 9)]
imu_keys = ['Orientation_X','Orientation_Y','Orientation_Z','Gyro_X','Gyro_Y','Gyro_Z','Acc_X', 'Acc_Y', 'Acc_Z']

for frame in frames:
    frame[emg_keys] = frame[emg_keys].apply(filteremg, raw=True)

    # for channel in range(1,9):
    #     f, Pxx_den = sp.signal.periodogram(frame['EMG_' + str(channel)], 200)
    #     plt.figure()
    #     plt.semilogy(f, Pxx_den)
    #     plt.title('EMG_' + str(channel) + ' PSD')
    #     plt.ylim([1e-7, 1e2])
    #     plt.xlabel('frequency [Hz]')
    #     plt.ylabel('PSD [V**2/Hz]')
    #     plt.show()
    #         plt.savefig(directory + 'EMG_' + str(channel) + ' PSD.png')
    #     display('EMG_' + str(channel) + ' max power:', max(Pxx_den))
    for col in emg_keys:
        frame[col] = frame[col].rolling(20).mean() # how many samples should I make my window?
#     for channel in range(1,9):
#         plt.figure()
#         ax = frame['EMG_' + str(channel)].plot()
#         plt.title('EMG_' + str(channel) + ' Filtered')
#         plt.ylabel('mVolts')
#         plt.xlabel('Time')
    #         plt.savefig(directory + 'EMG_' + str(channel) + ' Filtered.png')

In [4]:
# Preprocess IMU
for frame in frames:
    for col in imu_keys:
        frame[col] = frame[col].rolling(5).mean()  # how many samples should I make my window?
    # for channel in ['X', 'Y', 'Z']:
    #     plt.figure()
    #     frame['Acc_' + channel].plot()
    #     plt.title('Acc_' + str(channel))
    #     plt.ylabel('g')
    #     plt.xlabel('Time')

In [5]:
# Feature Extraction
# Root Mean Square
def get_RMS(signal):
    ans = np.sqrt(np.average(np.square(signal)))
    return ans

# # Integral
# def get_IL(signal):
#     ans = np.sum(np.absolute(signal))
#     return ans

# Waveform Length
def get_WL(signal):
    ans = np.log(sum([np.absolute(signal[i+1] - signal[i]) for i in range(len(signal)-1)]))
    return ans

# Mean Absolute Value
def get_MAV(signal):
    ans = np.average(np.absolute(signal))
    return ans

# Slope Sign Change
# def get_SSC(signal):
#     ans = 0
#     for i in range(len(signal)-1):
#         if 
#     return ans

# Variance
def get_VAR(signal):
    ans = np.var(signal)
    return ans

# Max Power
def get_MAXP(signal):
    f, Pxx_den = sp.signal.periodogram(signal, 200)
    ans = max(Pxx_den)
    return ans

# Range of Values
def get_dRange(signal):
    ans = max(signal) - min(signal)
    return ans

# Max Value
def get_MAX(signal):
    ans = max(signal)
    return ans

def extract_features(dataframe, emg_channels, or_channels, gyro_channels, acc_channels):
    feat_dict = {}
    for chan in emg_channels:
        feat_dict[chan + '_RMS'] = get_RMS(dataframe[chan].dropna().reset_index(drop=True))
#         feat_dict[chan + '_IL'] = get_IL(dataframe[chan].dropna().reset_index(drop=True))
        feat_dict[chan + '_WL'] = get_WL(dataframe[chan].dropna().reset_index(drop=True))
        feat_dict[chan + '_MAV'] = get_MAV(dataframe[chan].dropna().reset_index(drop=True))
        feat_dict[chan + '_VAR'] = get_VAR(dataframe[chan].dropna().reset_index(drop=True))
        feat_dict[chan + '_MAXP'] = get_MAXP(dataframe[chan].dropna().reset_index(drop=True))
        feat_dict[chan + '_MAX'] = get_MAX(dataframe[chan].dropna().reset_index(drop=True))
    for chan in or_channels:
        feat_dict[chan + '_dOrient'] = get_dRange(dataframe[chan].dropna().reset_index(drop=True))
    for chan in gyro_channels:
        feat_dict[chan + '_dGyro'] = get_dRange(dataframe[chan].dropna().reset_index(drop=True))
    for chan in or_channels + gyro_channels + acc_channels:
        feat_dict[chan + '_VAR'] = get_VAR(dataframe[chan].dropna().reset_index(drop=True))
    for chan in acc_channels:
        feat_dict[chan + '_MAX'] = get_MAX(dataframe[chan].dropna().reset_index(drop=True))
    return feat_dict

or_keys = imu_keys[:4]
gyro_keys = imu_keys[4:7]
acc_keys = imu_keys[7:]

feature_df = pd.DataFrame()
for frame in frames:
    feats = extract_features(frame, emg_keys, or_keys, gyro_keys, acc_keys)
    feats['Participant'] = frame['Participant']
    feats['Class'] = frame['Class']
    feats['Trial'] = frame['Trial']
    feature_df = pd.concat([feature_df, pd.DataFrame(feats, index=[0])], ignore_index=True)

In [6]:
# Setup Data for ML
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Input, Dense, Dropout, Activation
from keras.utils import np_utils
from sklearn.preprocessing import MinMaxScaler, LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay

train_cols = list(feature_df)[:-3]

classes = np.unique(feature_df['Class'])
class_to_int = {c:i for i, c in enumerate(classes)}
y_cols = ['y_' + str(i) for i in range(len(classes))]
one_hot_matrix = np.zeros((len(feature_df), len(classes)))
for i, c in enumerate(feature_df['Class']):
    index = class_to_int[c]
    one_hot_matrix[i, index] = 1
y_df = pd.DataFrame(one_hot_matrix, columns=y_cols)
feature_df = pd.concat([feature_df,y_df], axis=1)

feature_train, feature_test = train_test_split(feature_df, train_size=0.8, test_size=0.2, shuffle=True)

x = feature_train.loc[:,train_cols]
min_max_scalar = MinMaxScaler()
x_train = min_max_scalar.fit_transform(x)
x_test = min_max_scalar.transform(feature_test.loc[:,train_cols])
y_train = feature_train.loc[:,y_cols].values
y_test = feature_test.loc[:,y_cols].values

In [7]:
# Machine Learning
# print(x_train.shape)
model = Sequential([
    Input(shape=(66,)),
    Dense(128, activation='relu'),
    Dropout(0.1),
    Dense(64, activation='relu'),
    Dropout(0.1),
    Dense(15, activation='softmax')
])

model.compile(optimizer='adam',
             loss='categorical_crossentropy',
             metrics=['accuracy','mse'])

model.fit(x_train, y_train, epochs=100)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100


Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 78/100
Epoch 79/100
Epoch 80/100
Epoch 81/100
Epoch 82/100
Epoch 83/100
Epoch 84/100
Epoch 85/100
Epoch 86/100
Epoch 87/100
Epoch 88/100
Epoch 89/100
Epoch 90/100
Epoch 91/100
Epoch 92/100
Epoch 93/100
Epoch 94/100
Epoch 95/100
Epoch 96/100
Epoch 97/100
Epoch 98/100
Epoch 99/100
Epoch 100/100


<keras.callbacks.History at 0x1fc35df0f50>

In [8]:
model.evaluate(x_test, y_test)



[0.42161476612091064, 0.8833333253860474, 0.014269727282226086]

In [None]:
predicted = model.predict(x_test)
predicted = np.argmax(predicted, axis=1)+1
true = np.argmax(y_test, axis=1)+1
display(pd.DataFrame(classification_report(true, predicted, output_dict=True)).transpose())

cm = confusion_matrix(true, predicted)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot()

In [None]:
model.save("Aharon_Original_NN_Model")