# Google Drive Setup

In [2]:
import sys
from google.colab import drive
drive.mount('/content/gdrive')
sys.path.append('/content/gdrive/My Drive/BYB Projects/BYB 2021 Fellows/Projects/Hacker Hand/code')

PROJECT_PATH = '/content/gdrive/My Drive/BYB Projects/BYB 2021 Fellows/Projects/Hacker Hand/code'

Mounted at /content/gdrive


# Required Libraries

In [3]:
import numpy as np
import os
import scipy.signal

from cycler import cycler
from matplotlib import pyplot as plt
from scipy.io import wavfile
from scipy.signal import butter,filtfilt, hilbert, decimate

import plotly.express as px
import plotly.graph_objects as go

# Environment Configuration

In [4]:
plt.style.use('default')

MAGENTA = '#EA00FF'
CYAN = '#04D9FF'
GREEN = '#39FF14'
YELLOW = '#CCFF00'
ORANGE = '#FFAD00'

# plt.rc('axes', prop_cycle=(cycler('color', [GREEN, YELLOW, CYAN, MAGENTA, ORANGE])))

# Helper Functions

In [5]:
def moving_avg(x, n):
    cumsum = np.cumsum(np.insert(x, 0, 0)) 
    return (cumsum[n:] - cumsum[:-n]) / float(n)

In [6]:
def moving_rms(fs, data, window_size_t = 0.050, window_increase_t = 0.025):
  '''
  Return running RMS values of an array of data.

  Parameters:
    fs: sampling frequency
    data: signal samples
    window_size_t: size of the RMS window in seconds
    window_increase: step size in seconds when sliding the window

  Returns:
    RMS values computed from each window
  '''
  
  data = data.astype('float32')

  window_size = int( window_size_t * fs) # samples
  window_increase = int( window_increase_t * fs) # samples
  
  n_windows = int( np.floor( ( len( data) - window_size) / window_increase))
  rms = []

  for i in range(0, n_windows):
    start_i =  i * window_increase
    end_i = start_i + window_size

    window_data = data[ start_i : end_i]
    
    rms_i = np.sqrt( np.mean( np.square( window_data)))
    rms.append( rms_i)
    
  return rms

In [7]:
def get_hysteretic_triggers( data, up_thresh, lo_thresh):
  # State Macros
  IDLE = 0
  RECORDING = 1
  
  # Initial Conditions
  state = IDLE
  next_state = IDLE

  # Output Array
  hyst_trigs = []

  for sample in data:
    
    state = next_state

    if state == IDLE:
      if sample > up_thresh:
        next_state = RECORDING
        hyst_trigs.append( True)
      else:
        next_state = IDLE
        hyst_trigs.append( False)
    
    else:
      if sample < lo_thresh:
        next_state = IDLE

      else:
        next_state = RECORDING
      
      hyst_trigs.append( False)

  return hyst_trigs

In [8]:
def plot_channels(data, n_channels, t):
  fig = go.Figure()
  for chan in range( n_channels):
    fig.add_trace( go.Scatter( x = t, 
                              y = data[:, chan],
                              mode = 'lines',
                              name = f'EMG {chan + 1}'))
  return fig

# Set Data Directory and List Contents

In [9]:
data_dir = os.path.join(PROJECT_PATH, 'data')
wav_files = [os.path.join(data_dir, filepath) for filepath in os.listdir(data_dir) if filepath.endswith('.wav')]

for index, wav_file in enumerate(wav_files):
    print(f'{index}: {wav_file[ len( data_dir) + 1:]}')

0: emg-test1.wav
1: raw-emg.wav
2: no-envolpe.wav
3: envelope.wav
4: protocol-0a-001.wav
5: protocol-0b-001.wav
6: env-spiker-pro.wav
7: raw-spiker-pro.wav


# Read and Plot Enveloped Data

In [10]:
wf = wav_files[ 6]
fs, env_data = wavfile.read( wf)

n_samples, n_channels = env_data.shape
t = np.arange( n_samples) / fs

print('''\n_______________________________
FILE SUMMARY:
-------------------------------
|| File Name: {}
|| Raw Data Channels: {}
|| Raw Data Samples: {}
|| Sampling Frequency: {} Hz
|| Recording Length: {:.3f} s
-------------------------------
_______________________________\n'''.format( wf[len( data_dir) + 1:],
                                            n_channels,
                                            n_samples, 
                                            fs, 
                                            t[-1]))

TARGET_FREQ = 500 #Hz
down_factor = int( fs / TARGET_FREQ)
denv_data = decimate( env_data, down_factor, axis = 0)
down_fs = fs / down_factor
down_t = np.arange( len(denv_data)) / down_fs

print('''\n_______________________________
DOWNSAMPLING
-------------------------------
|| Starting Frequency: {} ===>> Target Frequency: {}
|| Raw Data Sample Count: {} ===>> New Sample Count: {}
|| Raw Sampling Frequency: {} Hz ===>> New Sampling Frequency {} Hz
|| Recording Length: {:.3f} s ===>> New Recording Length {:.3f} s
-------------------------------
_______________________________\n'''.format( fs, TARGET_FREQ,
                                            n_samples, len(denv_data),
                                            fs, down_fs,
                                            t[-1], down_t[-1]))

'''
# Remove DC offset and normalize data range using arbitrary powers of 2
raw_data = ( raw_data + ( 2**14)) / ( 2**16)

# Set all negative values to the smallest positive value for each channel
for chan in range(0, n_channels):
  min_val = np.amin( np.abs( raw_data[:, chan]))
  outlier_check = raw_data[:, chan] < 0
  outlier_locations = np.where( outlier_check)[0]

  for location in outlier_locations:
    raw_data[location, chan] = min_val
'''
'''

env_fig = plot_channels( denv_data, n_channels, down_t)
'''
env_fig.update_layout( title_text = 'Raw EMG data',
                       title_font_size = 30,
                       xaxis_title = 'Time [s]',
                       yaxis_title = 'Arbitrary A/D Units',
                       template = 'plotly_white')
'''

env_fig.show()


_______________________________
FILE SUMMARY:
-------------------------------
|| File Name: env-spiker-pro.wav
|| Raw Data Channels: 3
|| Raw Data Samples: 420509
|| Sampling Frequency: 3333 Hz
|| Recording Length: 126.165 s
-------------------------------
_______________________________


_______________________________
DOWNSAMPLING
-------------------------------
|| Starting Frequency: 3333 ===>> Target Frequency: 500
|| Raw Data Sample Count: 420509 ===>> New Sample Count: 70085
|| Raw Sampling Frequency: 3333 Hz ===>> New Sampling Frequency 555.5 Hz
|| Recording Length: 126.165 s ===>> New Recording Length 126.164 s
-------------------------------
_______________________________



# Moving RMS Feature on Envelopped Data



In [None]:
emg_1 = moving_rms(fs, raw_data[:, 0])
emg_2 = moving_rms(fs, raw_data[:, 1])
emg_3 = moving_rms(fs, raw_data[:, 2])

emg_t = np.add( emg_1, emg_2)
emg_t = np.add( emg_t, emg_3)

plt.plot( emg_1, alpha = 0.25)
plt.plot( emg_2, alpha = 0.75)
plt.plot( emg_3, alpha = 0.75)
plt.plot( emg_t, alpha = 0.75)

plt.grid( )
plt.xlabel( '$i^{th}$ Window')
plt.ylabel( 'RMS Amplitude')
plt.legend( [ 'EMG 1', 'EMG 2', 'EMG 3', 'EMG Total'])
plt.title( 'Moving RMS Over Raw Data')

plt.show()

# RMS Histogram

In [None]:
n_bins = 75

plt.hist( emg_1, bins = n_bins, alpha= 0.5)
plt.hist( emg_2, bins = n_bins, alpha= 0.5)
plt.hist( emg_3, bins = n_bins, alpha= 0.5)
plt.hist( emg_t, bins = n_bins, alpha = 0.5)
plt.grid( )

plt.xlabel( 'RMS Amplitude')
plt.ylabel( 'Sample Count')
plt.legend( [ 'EMG 1', 'EMG 2', 'EMG 3', 'EMG Total'])
plt.title( 'Histogram of Moving RMS Over Raw Data')

plt.show()

# Thresold Crossing Locations

In [None]:
threshold = 0.10

threshold_check = emg_t > threshold 
rms_triggers = np.where(threshold_check)[0]

print(type(emg_t))
print(emg_t.shape)

# plot triggers in RMS data
plt.plot( emg_1, alpha = 0.5)
plt.plot( emg_2, alpha = 0.5)
plt.plot( emg_3, alpha = 0.5)
plt.vlines( rms_triggers, ymin = 0, ymax = 0.5, color = 'r', alpha = 0.25)

plt.grid()
plt.xlabel( '$i^{th} Window$')
plt.ylabel( 'RMS Amplitude')
plt.legend( [ 'Channel 1', 'Channel 2', 'Channel 3'])
plt.title( "Envelope-EMG RMS Data")
plt.show()

# transform back to sample(?) domain
window_step = int(0.025 * fs)
raw_triggers = [ i * window_step for i in rms_triggers]

plt.plot( raw_data, alpha = 0.5)
plt.vlines( raw_triggers, ymin = 0, ymax = 0.5, color = 'r', alpha = 0.25)

plt.grid()
plt.xlabel( '$i^{th}$ Sample')
plt.ylabel( 'Amplitude [mV](?)')
plt.legend( [ 'Channel 1', 'Channel 2', 'Channel 3'])
plt.title( "Envelope-EMG Data at {} Hz".format( fs))
plt.show()

# Schmitt Threshold Locations

In [None]:
threshold = 0.10

threshold_check = get_hysteretic_triggers(emg_t, threshold, 0.75*threshold)
rms_triggers = np.where(threshold_check)[0]

print('''\n_______________________________
-------------------------------
|| Events Detexted: {}
-------------------------------
_______________________________\n'''.format(len(rms_triggers)))

# plot triggers in RMS data
plt.plot( emg_1, alpha = 0.5)
plt.plot( emg_2, alpha = 0.5)
plt.plot( emg_3, alpha = 0.5)
plt.vlines( rms_triggers, ymin = 0, ymax = 0.5, color = 'r', alpha = 0.25)

plt.grid()
plt.xlabel( '$i^{th} Window$')
plt.ylabel( 'RMS Amplitude')
plt.legend( [ 'Channel 1', 'Channel 2', 'Channel 3'])
plt.title( "Envelope-EMG RMS Data")
plt.show()

# transform back to sample(?) domain
window_step = int(0.025 * fs)
raw_triggers = [ i * window_step for i in rms_triggers]

plt.plot( raw_data, alpha = 0.5)
plt.vlines( raw_triggers, ymin = 0, ymax = 0.5, color = 'r', alpha = 0.25)

plt.grid()
plt.xlabel( '$i^{th}$ Sample')
plt.ylabel( 'Amplitude [mV](?)')
plt.legend( [ 'Channel 1', 'Channel 2', 'Channel 3'])
plt.title( "Envelope-EMG Data at {} Hz".format( fs))
plt.show()

# Extract Thresholds for Each Finger

In [None]:
F1_LIM = 18 * fs # sec * Hz = samples
F2_LIM = 29 * fs #
F3_LIM = 44 * fs #

raw_triggers = np.asarray(raw_triggers)
f1_triggers = raw_triggers[ raw_triggers < F1_LIM]

f2_triggers = [ x for x in raw_triggers if x > F1_LIM and x < F2_LIM]
f2_triggers = np.asarray(f2_triggers)

f3_triggers = raw_triggers[ raw_triggers > F2_LIM]

print(len(raw_triggers))
print(len(f1_triggers))
print(len(f2_triggers))
print(len(f3_triggers))

# "ERP" Plot




In [None]:
PRE_TRIGGER_SEC = 0.5
POST_TRIGGER_SEC = 0.5

post_trigger = int( fs * PRE_TRIGGER_SEC)
pre_trigger = int( fs * POST_TRIGGER_SEC)


for chan in range(n_channels):
  plt.subplot(3, n_channels, chan + 1)
  for trigger in f1_triggers:
    plt.plot( raw_data[ trigger - pre_trigger : trigger + post_trigger, chan], color='#87EBB0', alpha = 0.35)
  plt.vlines( pre_trigger, ymin = 0, ymax = 0.5, color = 'r')
  plt.grid()
  plt.axhline(threshold)
  plt.title(" F1 Channel {}".format(chan+1))

for chan in range(n_channels):
  plt.subplot(3, n_channels, 3 + chan + 1)
  for trigger in f2_triggers:
    plt.plot( raw_data[ trigger - pre_trigger : trigger + post_trigger, chan], color='c', alpha = 0.35)
  plt.vlines( pre_trigger, ymin = 0, ymax = 0.5, color = 'r')
  plt.grid()
  plt.axhline(threshold)
  plt.title(" F2 Channel {}".format(chan+1))

for chan in range(n_channels):
  plt.subplot(3, n_channels, 6 + chan + 1)
  for trigger in f3_triggers:
    plt.plot( raw_data[ trigger - pre_trigger : trigger + post_trigger, chan], color='#FA9836', alpha = 0.35)
  plt.vlines( pre_trigger, ymin = 0, ymax = 0.5, color = 'r')
  plt.grid()
  plt.axhline(threshold)
  plt.title(" F3 Channel {}".format(chan+1))

fig = plt.gcf()
fig.set_size_inches(20, 12)
plt.show()

# Channel Means



In [None]:
PRE_TRIGGER_SEC = 0.5
POST_TRIGGER_SEC = 0.5

post_trigger = int( fs * PRE_TRIGGER_SEC)
pre_trigger = int( fs * POST_TRIGGER_SEC)


for chan in range(n_channels):
  plt.subplot(1, n_channels, chan + 1)
  mean_signal = []
  for trigger in f1_triggers:
    epoch_data = raw_data[ trigger - pre_trigger : trigger + post_trigger, chan]
    mean_signal.append(epoch_data)
  mean_signal = np.asarray(mean_signal)
  mean_signal = np.mean(mean_signal, axis=0)
  plt.plot(mean_signal, color='#87EBB0', label = 'F1 Mean')
  

for chan in range(n_channels):
  plt.subplot(1, n_channels, chan + 1)
  mean_signal = []
  for trigger in f2_triggers:
    epoch_data = raw_data[ trigger - pre_trigger : trigger + post_trigger, chan]
    mean_signal.append(epoch_data)
  mean_signal = np.asarray(mean_signal)
  mean_signal = np.mean(mean_signal, axis=0)
  plt.plot(mean_signal, color='c', label = 'F2 Mean')
  

for chan in range(n_channels):
  plt.subplot(1, n_channels, chan + 1)
  mean_signal = []
  for trigger in f3_triggers:
    epoch_data = raw_data[ trigger - pre_trigger : trigger + post_trigger, chan]
    mean_signal.append(epoch_data)
  mean_signal = np.asarray(mean_signal)
  mean_signal = np.mean(mean_signal, axis=0)
  plt.plot(mean_signal, color='#FA9836', label = 'F3 Mean')
  
  plt.vlines( pre_trigger, ymin = 0, ymax = 0.20, color = 'r')
  plt.grid()
  plt.axhline(threshold)
  plt.title("Channel {}".format(chan+1))
  plt.legend()


fig = plt.gcf()
fig.set_size_inches( 18, 6)
plt.show()

# Output Event Markers

In [None]:
f1_trigger_sec = f1_triggers / fs
marker_id = 1

f = open("envelope-finger-{}.txt".format(marker_id), "w")

f.write("# Marker IDs can be arbitrary strings.\n")
f.write("# Marker ID,\tTime (in s)")

for marker in f1_trigger_sec:
  f.write("{},\t{}\n".format(marker_id, marker))

f.close()