This notebook analyzes a single recording of EMG data from a test subject's biceps. 

# Step 1: Import and Format Data

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

In [None]:
in_data = '/home/cta/Documents/College/Skywalker Legacy/Sample Data/simple_emg_test.txt' # File containing raw EMG readings
df = pd.read_csv(in_data, header=4) # Read file and extract relevant data
emg_data = df[" EXG Channel 0"]
emg_data.rename({" EXG Channel 0":"c0"})
emg_data = emg_data.to_frame('c0')
time = np.linspace(1/200,10756/200,10756) # Add time series based on pre-existing knowledge about data properties
emg_data['time'] = time

This is what the EMG data looks like before filtration.

In [None]:
plt.rcParams["figure.figsize"] = (25,10)
plt.plot(emg_data['time'], emg_data['c0'])
plt.xlabel("Time (s)")
plt.ylabel("Potential Difference (uV)")

# Step 2: Filter Data

In [None]:
fs = 200 # Sample rate = 200 Hz. Pre-existing knowledge.

https://scipy-cookbook.readthedocs.io/items/ButterworthBandpass.html

In [None]:
# Define a bandpass filter to exclude low- and high-frequency components

from scipy.signal import butter, lfilter

def butter_bandpass(lowcut, highcut, fs, order=5):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    return b, a


def butter_bandpass_filter(data, lowcut, highcut, fs, order=5):
    b, a = butter_bandpass(lowcut, highcut, fs, order=order)
    y = lfilter(b, a, data)
    return y

In [None]:
# Initialize bandpass filter, run EMG data through it, and add result to dataframe
lowcut = 5
highcut = 50
emg_butter = butter_bandpass_filter(emg_data['c0'],lowcut,highcut,fs,order=2)
emg_data['c0_filtered'] = emg_butter

In [None]:
plt.plot(emg_data['time'], emg_data['c0_filtered'])

https://neurobb.com/t/openbci-why-are-1-50hz-bandpass-and-60hz-notch-filters-both-applied-by-default/23/2

# Step 3: Extract Features and Classify Segments

In [None]:
# calculates root mean square feature for a given data segment
def calc_rms(data):
    sq_sum = 0
    for i in range(0, len(data)):
        sq_sum += data[i]**2
    sq_sum /= len(data)
    sq_sum **= (1/2)
    return sq_sum

In [None]:
# calculates waveform length feature (as described in several articles on EMG) for a given data segment
def calc_wvlen(data):
    wv_len = 0
    for i in range(1,len(data)):
        wv_len += abs(data[i]-data[i-1])
    return wv_len

In [None]:
# label dataset based on pre-existing measurements (the muscle was either clenched or relaxed. It started relaxed, was clenched at t = 25.13 s, and relaxed again at t = 39.7 s)
t_clench = 25.13
t_clench -= 0.6 # I'm fudging the data a little because I don't think we labeled it right. Obviously, this is bad practice in real situations
t_relax = 39.7
t_stop = 53.97
offset = emg_data['time'].to_numpy()[-1] - t_stop
t_stop_true = t_stop+offset

In [None]:
t_stop_true

### Point generating function

In [None]:
import random
import math

Removed the "ambiguous" classification

In [None]:
# takes in a start time and an end time (bounds) and returns the EMG data over that time segment. If no bounds are given, it generates some at random based on the window size.
def gen_point(emg_data,bounds=None,w_size=50, return_bounds=False):
    if bounds == None:
        t_start = math.floor(random.random()*(len(emg_data)-w_size+1)) # Generates number in range [0,10706]
        bounds = (t_start,t_start+w_size-1)
    time_seg = emg_data['time'][bounds[0]:bounds[1]+1].to_numpy()
    emg_seg = emg_data['c0_filtered'][bounds[0]:bounds[1]+1].to_numpy()
    
    #label = 'ambiguous'
    label = 'error'
    if bounds[1]>t_stop_true*fs:
        label = 'out_of_bounds'
    elif bounds[1]<t_clench*fs or bounds[1]>t_relax*fs: # If the window is outside of the muscle activation time, label it
        label = 'relax'
    elif t_clench*fs<=bounds[1] and bounds[1]<=t_relax*fs: # If the window is within the muscle activation time, label it
        label = 'active'
        
    if label == 'error':
        print('error')
    if return_bounds:   
        return emg_seg, label, bounds
    else:
        return emg_seg, label

### Display of data

In [None]:
plt.rcParams["figure.figsize"] = (25,10)
plt.plot(emg_data['time'], emg_data['c0_filtered'])
plt.axvline(x=t_clench,color='red')
plt.axvline(x=t_relax,color='red')

The real-deal code that does ML on this dataset

### General functions for feature extraction

In [None]:
# normalizes a given array based on its own elements
def norm_arr_simple(arr,yardstick=None):
    arr = np.array(arr)
    if yardstick is None:
        yardstick = max(abs(arr))
    new_arr = np.empty(np.shape(arr))
    for i in range(0,len(arr)):
        new_arr[i] = arr[i]/yardstick
    return new_arr

In [None]:
# given a dataset and an array of functions that compute features from datasets of that type, computes normalized features for the given dataset
def extract_norm_features(data,f_arr,ys_arr=None):
    data_features = np.empty((len(data),len(f_arr)))
    for i in range(0,len(data)):
        for j in range(0,len(f_arr)):
            data_features[i][j] = f_arr[j](data[i])
    for i in range(0,len(data_features[0])):
        if ys_arr is None:
            data_features[:,i] = norm_arr_simple(data_features[:,i])
        else:
            data_features[:,i] = norm_arr_simple(data_features[:,i],yardstick=ys_arr[i])
    return data_features

In [None]:
from sklearn.model_selection import train_test_split
from sklearn import svm

### Gets points for classification

In [None]:
# get points for classification

ws = 100 # window size (units of 5 ms; ws = 100 --> window size of 0.5 seconds)
n_points = 96 # number of points

X = np.zeros((n_points,ws))
y = np.empty(n_points,dtype=object)
bounds = np.empty((n_points,2))

for i in range(0,n_points): # generates random points of the given window size
    emg_seg, label, b = gen_point(emg_data,w_size=ws,return_bounds=True)
    X[i] = emg_seg
    y[i] = label
    bounds[i] = b

### Compute maximum RMS and waveform length for windows of the chosen size

In [None]:
rms_max = -1
for i in range(0,len(emg_data['c0_filtered'])-ws):
    new_rms = calc_rms(emg_data['c0_filtered'][i:i+ws].to_numpy())
    rms_max = max(rms_max,abs(new_rms))

wvlen_max = -1
for i in range(0,len(emg_data['c0_filtered'])-ws):
    new_wvlen = calc_wvlen(emg_data['c0_filtered'][i:i+ws].to_numpy())
    wvlen_max = max(wvlen_max,abs(new_wvlen))

### Get training and testing data

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=24) # divide into training and testing data

# Extract features from raw EMG data and make those the new points
f_list = [calc_rms, calc_wvlen]
yard_arr = [rms_max, wvlen_max]
X_train_points = extract_norm_features(X_train,f_list,ys_arr=yard_arr)
X_test_points = extract_norm_features(X_test,f_list,ys_arr=yard_arr)

# Convert y_train and y_test from string format ("relax", "ambiguous", "active") to integer format (0,1,2)
#num_dict = {"relax":0, "ambiguous":1,"active":2}
num_dict = {"relax":0,"active":1}
y_train_int = np.zeros(len(y_train),dtype=int)
for i in range(0,len(y_train)):
    y_train_int[i] = num_dict[y_train[i]]
y_test_int = np.zeros(len(y_test),dtype=int)
for i in range(0,len(y_test)):
    y_test_int[i] = num_dict[y_test[i]]
    
# labels points with colors based on their value
c_dict = {'relax':'blue','active':'orange','ambiguous':'red'}

col_arr_train = np.empty(np.shape(y_train),dtype='object') # labels training points with colors
for i in range(0,len(col_arr_train)):
    col_arr_train[i] = c_dict[y_train[i]]
    
col_arr_test = np.empty(np.shape(y_test),dtype='object') # labels test points with colors
for i in range(0,len(col_arr_test)):
    col_arr_test[i] = c_dict[y_test[i]]

### Train classifier

In [None]:
clf_1 = svm.LinearSVC().fit(X_train_points,y_train_int) # create classifier based on training data

In [None]:
# classifies points in testing data
pre = clf_1.predict(X_test_points)

In [None]:
clf_1.score(X_test_points,y_test_int) # scores accuracy of classifier on testing data

In [None]:
42/43

In [None]:
X_points = np.append(X_train_points,X_test_points,axis=0)

In [None]:
y_int = np.append(y_train_int,y_test_int)

## Plot classification results

In [None]:
from mlxtend.plotting import plot_decision_regions
fig = plt.figure(figsize=(8,8))

ax = plot_decision_regions(X_points, y_int, clf=clf_1,colors="blue,orange",X_highlight=X_test_points)
ax.set_xlim([0,1])
ax.set_ylim([0,1])

In [None]:
fig = plt.figure(figsize=(8,8))

ax = plot_decision_regions(X_train_points, y_train_int, clf=clf_1,colors="blue,orange")
ax.set_xlim([0,1])
ax.set_ylim([0,1])

In [None]:
fig = plt.figure(figsize=(8,8))

ax = plot_decision_regions(X_test_points, y_test_int, clf=clf_1,colors="blue,orange")
ax.set_xlim([0,1])
ax.set_ylim([0,1])

### Run everything repeatedly to find the best parameters

In [None]:
ws = 100 # window size (units of 5 ms; ws = 100 --> window size of 0.5 seconds)
n_points = 64 # number of points
mean_score = 0
n = 10
for i in range(0,n):
    X = np.zeros((n_points,ws))
    y = np.empty(n_points,dtype=object)
    bounds = np.empty((n_points,2))

    for i in range(0,n_points): # generates random points of the given window size
        emg_seg, label, b = gen_point(emg_data,w_size=ws,return_bounds=True)
        X[i] = emg_seg
        y[i] = label
        bounds[i] = b
        
    rms_max = -1
    for i in range(0,len(emg_data['c0_filtered'])-ws):
        new_rms = calc_rms(emg_data['c0_filtered'][i:i+ws].to_numpy())
        rms_max = max(rms_max,abs(new_rms))

    wvlen_max = -1
    for i in range(0,len(emg_data['c0_filtered'])-ws):
        new_wvlen = calc_wvlen(emg_data['c0_filtered'][i:i+ws].to_numpy())
        wvlen_max = max(wvlen_max,abs(new_wvlen))
        
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=24) # divide into training and testing data

    # Extract features from raw EMG data and make those the new points
    f_list = [calc_rms, calc_wvlen]
    yard_arr = [rms_max, wvlen_max]
    X_train_points = extract_norm_features(X_train,f_list,ys_arr=yard_arr)
    X_test_points = extract_norm_features(X_test,f_list,ys_arr=yard_arr)

    # Convert y_train and y_test from string format ("relax", "ambiguous", "active") to integer format (0,1,2)
    #num_dict = {"relax":0, "ambiguous":1,"active":2}
    num_dict = {"relax":0,"active":1}
    y_train_int = np.zeros(len(y_train),dtype=int)
    for i in range(0,len(y_train)):
        y_train_int[i] = num_dict[y_train[i]]
    y_test_int = np.zeros(len(y_test),dtype=int)
    for i in range(0,len(y_test)):
        y_test_int[i] = num_dict[y_test[i]]

    # labels points with colors based on their value
    c_dict = {'relax':'blue','active':'orange','ambiguous':'red'}

    col_arr_train = np.empty(np.shape(y_train),dtype='object') # labels training points with colors
    for i in range(0,len(col_arr_train)):
        col_arr_train[i] = c_dict[y_train[i]]

    col_arr_test = np.empty(np.shape(y_test),dtype='object') # labels test points with colors
    for i in range(0,len(col_arr_test)):
        col_arr_test[i] = c_dict[y_test[i]]
        
    clf_1 = svm.LinearSVC().fit(X_train_points,y_train_int) # create classifier based on training data
    s = clf_1.score(X_test_points,y_test_int) # scores accuracy of classifier on testing data
    mean_score += s/n

ws = 100; n_points = 128; mean_score = 0.9279

**ws = 100; n_points = 96; mean_score = 0.9219**

ws = 100; n_points = 64; mean_score = 0.9364

ws = 75; n_points = 128; mean_score = 0.9023

ws = 75; n_points = 96; mean_score = 0.8969

ws = 75; n_points = 64; mean_score = 0.9000

ws = 50; n_points = 128; mean_score = 0.8953

ws = 50; n_points = 96; mean_score = 0.8750

ws = 50; n_points = 64; mean_score = 0.8591