In [1]:
import os
import pandas as pd
import numpy  as np
from scipy.fft    import dct, idct
from scipy.signal import butter, savgol_filter, ellip, filtfilt, detrend
from matplotlib   import pylab as plt

## Loading data

In [2]:
# paths were the raw data is
root_path = '../../01_Data/'
data_path = os.path.join(root_path, 'Labeled')
emg_path  = os.path.join(data_path, 'EMG')

# paths were the preprocessed data should be
out_path = os.path.join(root_path, 'Processed')
emg_out_path = os.path.join(out_path, 'EMG')

# check if the output folders exist and create them if necessary
for folder in [out_path, emg_out_path]:
    if not os.path.exists(folder):
        os.mkdir(folder)

# list all emg and eda files (ignoring the hidden macos file)
emg_files = [x for x in os.listdir(emg_path) if not '.DS_Store' in x]

## Preprocessing

In [None]:
# Reshape a numpy array 'a' of shape (n, x) to form shape((n - window_size), window_size, x))
def rolling_window(a, window, step_size):
    shape = a.shape[:-1] + (a.shape[-1] - window + 1 - step_size + 1, window)
    strides = a.strides + (a.strides[-1] * step_size,)
    return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)

In [None]:
# all 3 functions taken from matlab implementation after Anne Grübler
def notch_filter(data, notch, width, fs):
    fa = (notch - width) / (fs/2)
    fb = (notch + width) / (fs/2)
    Wn = np.array([fa, fb])
    order = 4
    b, a = butter(order, Wn, btype='stop')
    return filtfilt(b, a, data)

def elliptic_filter(data, flow, fhigh, fs):
    Wn    = np.array([flow, fhigh]) * (2/fs)
    order = 4
    maxRipple      = 0.1
    minAttenuation = 40
    b, a  = ellip(order, maxRipple, minAttenuation, Wn, btype='stop')
    return filtfilt(b, a, data)

def local_filter(data, notch, width, flow, fhigh, fs):
    signal = data
    if (len(notch) > 1):
        for kk in range(len(notch)):
            signal = notch_filter(signal, notch[kk], width[kk], fs)
    else:
        signal = notch_filter(signal, notch, width, fs)
    signal = elliptic_filter(signal, flow, fhigh, fs)
    return detrend(signal)

In [None]:
fs      = 1000 # emg sampling frequency
f_notch = np.array([50, 100, 150, 200, 250, 300]) # notch frequencies to filter out
width   = np.ones(len(f_notch)) * 3;              # width/2 of each notch
cutlow  = 5    # lower cut-off frequency for elliptic filter
cuthigh = 350  # upper cut-off frequency for elliptic filter

for file in emg_files:
    filepath = os.path.join(emg_path, file)
    df       = pd.read_csv(filepath)

    # filter and save it into the data frame loaded
    df.ch1 = local_filter(df.ch1, f_notch, width, cutlow, cuthigh, fs)
    df.ch2 = local_filter(df.ch2, f_notch, width, cutlow, cuthigh, fs)
    df.ch3 = local_filter(df.ch3, f_notch, width, cutlow, cuthigh, fs)
    df.ch4 = local_filter(df.ch4, f_notch, width, cutlow, cuthigh, fs)

    # save data frame with pre-processed data to new file
    df.to_csv(os.path.join(emg_out_path, file), index=False)