In [None]:
import os
import re
import numpy as np
import pandas as pd
import scipy.signal
import tensorflow as tf
from tensorflow.python.ops import gen_audio_ops as audio_ops
from sklearn.metrics import classification_report, confusion_matrix
from tqdm import tqdm
import matplotlib.pyplot as plt
%matplotlib inline

___
## EMG Data Loading

In [None]:
fs = 250

task_label_map = {'Angry': 0, 'Chewing': 1, 'Eye': 2, 'Eye-Iso': 3, 'In-Iso': 4, \
                  'Jaw': 5, 'L Gaze-L': 6, 'L Gaze-R': 7, 'Out-Iso': 8, 'Sad': 9, \
                  'Smile-Iso': 10, 'Surprise': 11, 'Swallowing': 12, 'Talk': 13, \
                  'Up Gaze': 14, 'Wrinkle-Iso': 15}

rev_map = {value: key for key, value in task_label_map.items()}

subject_ids = ['subject0', 'subject1', 'subject2', 'subject3', 'subject4', \
               'subject5', 'subject6', 'subject7', 'subject8', 'subject9']

In [None]:
# Iterate over each subject's EMG data from their Morning and Evening sessions.
# Save the signal segments and associated labels dictionaries that map the subject's
# ID to their activities.
subject_activities, subject_labels = {sid: [] for sid in subject_ids}, {sid: [] for sid in subject_ids}

data_dir = '../data/signal_data'
timestamp_dir = '../data/timestamps/'
for sid in tqdm(subject_ids):    
    for time in ['Morning', 'Evening']:
        df = pd.read_csv(os.path.join(data_dir, sid, '{}_{}_separated_signals.csv'.format(sid, time)))
        signal_data = df[['EMG Channel 1', 'EMG Channel 2']].to_numpy().T
        
        event_timestamp_df = pd.read_csv(os.path.join(timestamp_dir, sid, '{}_{}_timestamps.csv'.format(sid, time)))
        event_timestamp_info = event_timestamp_df[['Event Start (s)', 'Event Stop (s)', 'Task Label']].to_numpy()
        
        activities, labels = [], []
        for event_start, event_stop, event_label in event_timestamp_info:
            activities.append(signal_data[:,int(event_start*fs):int(event_stop*fs)])
            labels.append(task_label_map[event_label])
        
        subject_activities[sid].extend(activities)
        subject_labels[sid].extend(labels)

In [None]:
# Iterate over each subject's EMG data from their Morning and Evening sessions.
# Save the signal segments and associated labels dictionaries that map the subject's
# ID to their activities.
subject_activities, subject_labels = {sid: [] for sid in subject_ids}, {sid: [] for sid in subject_ids}

data_dir = '../data/signal_data'
timestamp_dir = '../data/timestamps/'
for sid in tqdm(subject_ids):    
    for time in ['Morning', 'Evening']:
        df = pd.read_csv(os.path.join(data_dir, sid, '{}_{}_separated_signals.csv'.format(sid, time)))
        signal_data = df[['EMG Channel 1', 'EMG Channel 2']].to_numpy().T
        
        event_timestamp_df = pd.read_csv(os.path.join(timestamp_dir, sid, '{}_{}_timestamps.csv'.format(sid, time)))
        event_timestamp_info = event_timestamp_df[['Event Start (s)', 'Event Stop (s)', 'Task Label']].to_numpy()
        
        activities, labels = [], []
        for event_start, event_stop, event_label in event_timestamp_info:
            activities.append(signal_data[:,int(event_start*fs):int(event_stop*fs)])
            labels.append(task_label_map[event_label])
        
        subject_activities[sid].extend(activities)
        subject_labels[sid].extend(labels)

___
## Extract Training and Test Data

Training and testing datasets are established using the previously loaded, labeled signal data.  Since Deep Learning models often require large datasets to learn generalizable functions, data augmentation is employed here in effort to maximize the diversity that we see in the training set.  Each time a signal segment is read into the training data set, multiple random croppings of this segment are also added to the training set.  This is not done for the testing set.

In [None]:
def get_random_crop(emg_data, cropped_len=750):
    '''
    Given a EMG signal segment of arbitrary length, output a
    fixed length signal segment representing the original signal.

    input:
      emg_data (ndarray): 2 channel EMG signal segment
      cropped_len (int): length (in samples) of output segment

     output:
      cropped_seg (ndarray): the fixed length, random crop of the input
      signal segment
    '''
    emg_data = np.array(emg_data)
    if emg_data.shape[-1] == cropped_len:
        # Input segment is the desired length
        cropped_seg = emg_data

    elif emg_data.shape[-1] > cropped_len:
        # Input segment is longer than desired.  Select
        # a random crop of length `cropped_len` from it.
        max_idx = emg_data.shape[-1] - cropped_len
        idx = np.random.randint(0, max_idx+1)
        cropped_seg = emg_data[:,idx:idx+cropped_len]
    else:
        # Input segment is shorter than desired.  Pad the input
        # signal with zeros, randomly centering the input segment.
        cropped_seg = np.zeros((2, cropped_len))
        i = 0
        while i < cropped_len:
            if i + emg_data.shape[-1] <= cropped_len:
                cropped_seg[:,i:i+emg_data.shape[-1]] = emg_data
            else:
                cropped_seg[:,i:i+emg_data.shape[-1]] = emg_data[:,:cropped_len-i]
            i += emg_data.shape[-1]
    return cropped_seg

In [None]:
# Acquired fixed-length activity segments for each action
X_subject, y_subject = {}, {}

# Crop data to a 30 second segment
crop_len = fs*30

for sid in subject_ids:
    X, y = [], []
    for i, act in enumerate(subject_activities[sid]):
        cropped_activity = get_random_crop(act, crop_len)
        act_label = subject_labels[sid][i]
        X.append(cropped_activity)
        y.append(act_label)
    X = np.array(X)
    X[:,0,:] = (X[:,0,:] - np.mean(X[:,0,:]))/np.std(X[:,0,:])
    X[:,1,:] = (X[:,1,:] - np.mean(X[:,1,:]))/np.std(X[:,1,:])
    X_subject[sid] = np.array(X)
    y_subject[sid] = np.array(y)
    
    
X_full = np.vstack([X_subject[sid] for sid in subject_ids])
y_full = np.hstack([y_subject[sid] for sid in subject_ids])

inds = np.random.permutation(X_full.shape[0])
cutoff = int(len(inds)*0.8)

# Define Training Dataset (With Augmentation)
X_train, y_train = X_full[:cutoff], y_full[:cutoff]
X_train_augmented, y_train_augmented = [], []
for i, act in enumerate(X_train):
    for _ in range(10): # Extract 10 random croppings
        cropped_activity = get_random_crop(act, crop_len)
        act_label = y_train[i]
        X_train_augmented.append(cropped_activity)
        y_train_augmented.append(act_label)
X_train, y_train = np.array(X_train_augmented), np.array(y_train_augmented)

# Define Testing Dataset (No Augmentation)
X_test, y_test = X_full[cutoff:], y_full[cutoff:]

___
## Model Instantiation, Fitting, and Evaluation

In [None]:
def extract_feature(data_in, frame_length=256, frame_step=32):
    spectrogram = tf.signal.stft(scipy.signal.detrend(data_in.reshape(-1)), frame_length=frame_length, \
                                 frame_step=frame_step, pad_end=False)
    spectrogram = tf.abs(spectrogram)
    spectrogram = tf.math.log(spectrogram)/np.log(10)
    return spectrogram.numpy().squeeze()

def extract_spectrogram_feature(data, labels, frequency_range = (10,125)):
    train_spectrogram_feature = np.stack([ np.stack([extract_feature(d[:,None]) for d in datum],axis=-1) for datum in tqdm(data)], axis=0)
    if frequency_range:
        train_spectrogram_feature = train_spectrogram_feature[:,:,frequency_range[0]:frequency_range[1],:]
    return (train_spectrogram_feature, labels)

def normalize_spectrogram_feature(train_feature, valid_feature):
    train_mean = train_feature.mean(axis=(0,1),keepdims=True)
    train_std = train_feature.std(axis=(0,1),keepdims=True)
    normalized_train_feature = (train_feature - train_mean)/train_std
    normalized_valid_feature = (valid_feature - train_mean)/train_std
    return normalized_train_feature, normalized_valid_feature, train_mean, train_std

X_train, y_train = extract_spectrogram_feature(X_train, y_train)
X_test, y_test = extract_spectrogram_feature(X_test, y_test)

normalized_train_feature, normalized_test_feature, train_mean, train_std = \
                            normalize_spectrogram_feature(X_train, X_test)

In [None]:
def get_model(input_shape, lr=0.01, loss='sparse_categorical_crossentropy'):
    model = tf.keras.Sequential([
                tf.keras.layers.Conv2D(16, (5,5), strides=(2,2),padding='valid',
                                                activation='relu',
                                                kernel_regularizer=tf.keras.regularizers.l2(0.01),
                                                input_shape=input_shape),
                tf.keras.layers.Dropout(0.4),
                tf.keras.layers.Conv2D(32, (5,5), strides=(2,2),padding='valid', 
                                                activation='relu',
                                                kernel_regularizer=tf.keras.regularizers.l2(0.01)),
                tf.keras.layers.MaxPool2D(),
                tf.keras.layers.Flatten(),
                tf.keras.layers.Dropout(0.4),
                tf.keras.layers.Dense(32,
                                      kernel_regularizer=tf.keras.regularizers.l2(0.01),
                                      activation='relu'),
                tf.keras.layers.Dropout(0.4),
                tf.keras.layers.Dense(16,
                                      kernel_regularizer=tf.keras.regularizers.l2(0.01),
                                      activation='softmax')
            ])

    optim = tf.keras.optimizers.Adam(lr)
    model.compile(loss=loss, metrics='accuracy')
    return model

In [None]:
input_shape = normalized_train_feature.shape[1:]
model = get_model(input_shape, lr=0.0003)

#callback
best_model_path = f'./cnn_model.h5'
model_checkpoint_cb = tf.keras.callbacks.ModelCheckpoint(best_model_path, monitor='val_accuracy', mode='max', save_best_only=True)

print(f'Training the model ...')
history = model.fit(x=normalized_train_feature, y=y_train, batch_size=256, validation_split=0.2, \
                    epochs=100, callbacks=[model_checkpoint_cb], verbose=1)

In [None]:
saved_model = get_model(input_shape)
saved_model.load_weights(best_model_path)
y_pred = np.argmax(saved_model.predict(normalized_test_feature), axis=-1)
print(classification_report(y_test, y_pred))

In [None]:
saved_model = get_model(input_shape)
saved_model.load_weights(best_model_path)
y_pred = np.argmax(saved_model.predict(normalized_test_feature), axis=-1)
print(classification_report(y_test, y_pred))

              precision    recall  f1-score   support

           0       0.25      0.19      0.21        16
           1       0.94      0.85      0.89        20
           2       0.78      0.88      0.82        16
           3       1.00      0.18      0.30        17
           4       0.50      0.18      0.26        17
           5       0.88      0.82      0.85        17
           6       0.52      0.61      0.56        18
           7       0.40      0.35      0.38        17
           8       0.33      0.25      0.29        12
           9       0.28      0.55      0.37        20
          10       0.59      0.67      0.62        15
          11       0.11      0.12      0.12        16
          12       0.50      0.22      0.31         9
          13       0.57      0.67      0.62        18
          14       0.11      0.12      0.12         8
          15       0.12      0.20      0.15        15

    accuracy                           0.46       251
   macro avg       0.49   