## Reading Data

In [None]:
import os


def read_emg_csv(parent_folder, folder, subfolder):
    path = os.path.join(parent_folder, folder, "Myo", subfolder, "EMG.csv")
    if os.path.exists(path):
        df = pd.read_csv(path)
        # markers = df[' Marker'].unique()
        # # print(markers)
        # # count = df[' Marker'].value_counts()
        # # print(count)
        data = {}
        myo_elctrodes = [' Electrode 1', ' Electrode 2', ' Electrode 3',' Electrode 4', ' Electrode 5', ' Electrode 6', ' Electrode 7',' Electrode 8',]
        for i , (dir, s,e) in enumerate(zip(['Y' , 'Z', 'X'],[1000, 2000, 3000], [1001, 2001, 3001])):
            start_row_indices = df[df[' Marker'] == s].index
            end_row_indices = df[df[' Marker'] == e].index
            data[f'TskDir_{dir}'] = []
            for start, end in zip(start_row_indices, end_row_indices):
                data[f'TskDir_{dir}'].append(df.iloc[start + 1: end][myo_elctrodes].values)
                  
        # print(f"Successfully read EMG.csv in {os.path.join(parent_folder, folder, 'Myo', subfolder)}")
        return data
    else:
        # print(f"EMG.csv not found in {os.path.join(parent_folder, folder, 'Myo', subfolder)}")
        return None
    
parent_folder_path = '/Users/alireza/Desktop/ISMAR/Experimental_Database'
subfolder_names = ["ControllerAndPen", "TwoControllers", "TwoHand"]
EMG_Dataset = {}
for folder in os.listdir(parent_folder_path):
    if os.path.isdir(os.path.join(parent_folder_path, folder)):
        EMG_Dataset[f'{folder}'] = {}
        for subfolder in subfolder_names:
            EMG_Dataset[f'{folder}'][f'{subfolder}'] = read_emg_csv(parent_folder_path, folder, subfolder)
            
print(EMG_Dataset.keys())       
print(EMG_Dataset['P11_Data'].keys())
print(EMG_Dataset['P11_Data']['TwoControllers'].keys())

## Required Functions

In [None]:
import pywt
from scipy.stats import moment
from scipy.signal import welch
import numpy as np


#================================================================================
#                            [Willison Amplitude]
#================================================================================
def cal_willison_amplitude(emg_data, threshold=0.1):
    'emg shape: [pnts, channel]'
    diff_data = np.diff(emg_data, axis=0)
    wamp = np.sum(np.abs(diff_data) > threshold, axis=0)
    return wamp

#================================================================================
#                            [Simple Square Integral]
#================================================================================
def cal_simple_square_integral(emg_data):
    'emg shape: [pnts, channel]'
    ssi = np.sum(emg_data**2, axis=0)
    return ssi
#================================================================================
#                            [Integrated EMG]
#================================================================================
def cal_integrated_emg(emg_data):
    'emg shape: [pnts, channel]'
    iemg = np.sum(np.abs(emg_data), axis=0)
    return iemg

#================================================================================
#                            [Mean Absolute Value]
#================================================================================

def cal_mav(emg_data):
    'emg shape: [pnts, channel]'
    mav = np.mean(np.abs(emg_data), axis=0)
    return mav

#================================================================================
#                            [Root Mean Square]
#================================================================================
def cal_root_mean_square(emg_data):
    'emg shape: [pnts, channel]'
    rms = np.sqrt(np.mean(emg_data**2, axis=0))
    return rms

#================================================================================
#                            [Median Frequency]
#================================================================================
def cal_median_frequency(emg_data, fs=200, nperseg=200):
    'emg shape: [pnts, channel]'
    mdf = []
    for elec_idx in range(emg_data.shape[1]):
        electrode_data = emg_data[:, elec_idx]
        freqs, Pxx = welch(electrode_data, fs, nperseg=nperseg)
        cumsum = np.cumsum(Pxx)
        median_freq = freqs[np.where(cumsum >= cumsum[-1] / 2)[0][0]]
        mdf.append(median_freq)
    mdf = np.array(mdf)
    return mdf

#================================================================================
#                            [Mean Frequency]
#================================================================================
def cal_mean_frequency(emg_data, fs=200, nperseg=200):
    'emg shape: [pnts, channel]'
    mnf = []
    for elec_idx in range(emg_data.shape[1]):
        f, Pxx = welch(emg_data[:, elec_idx], fs, nperseg=nperseg)
        mnf.append(np.sum(f * Pxx) / np.sum(Pxx))
    mnf = np.array(mnf)
    return mnf

#================================================================================
#                            [Peak Frequency]
#================================================================================
def cal_peak_frequency(emg_data, fs=200, nperseg=200):
    'emg shape: [pnts, channel]'
    pkf = []
    for ch_idx in range(emg_data.shape[1]):
        f, Pxx = welch(emg_data[:, ch_idx], fs, nperseg=nperseg)
        pkf.append(f[np.argmax(Pxx)])
    pkf = np.array(pkf)
    return pkf

#================================================================================
#                            [Mean Power]
#================================================================================
def cal_mean_power(emg_data, fs=200, nperseg =200):
    mnp = []
    for ch_idx in range(emg_data.shape[1]):
        f, Pxx = welch(emg_data[:, ch_idx], fs, nperseg=nperseg)
        mnp.append(np.mean(Pxx))
    
    mnp = np.array(mnp)
    return mnp

#================================================================================
#                            [Total Power]
#================================================================================
def cal_total_power(emg_data, fs=200, nperseg=200):
    ttp = []
    for ch_idx in range(emg_data.shape[1]):
        f, Pxx = welch(emg_data[:, ch_idx], fs, nperseg=nperseg)
        ttp.append(np.sum(Pxx))
    ttp = np.array(ttp)

    return ttp

#================================================================================
#                            [Waveform Lenght]
#================================================================================
def cal_waveform_length(emg_data):
    'emg shape: [pnts, channel]'
    wl = np.sum(np.abs(np.diff(emg_data, axis=0)), axis=0)
    return wl

#================================================================================
#                            [Sample Entropy]
#================================================================================
def cal_sample_entropy(emg_data, m=2, r=0.2, ent=None):
    'emg shape: [pnts, channel]'
    sentropy = []
    for elec_idx in range(emg_data.shape[1]):
        se = ent.sample_entropy(emg_data[:, elec_idx], m, r)
        sentropy.append(se[-1])  # Use the last value (m = 2)
    return np.array(sentropy)

#================================================================================
#                            [Slop Sign Change]
#================================================================================
def cal_slope_sign_changes(emg_data):
    'emg shape: [pnts, channel]'
    diff_data = np.diff(emg_data, axis=0)
    ssc = np.sum((diff_data[:-1, :] * diff_data[1:, :]) < 0, axis=0)
    return ssc

#================================================================================
#                            [Zero Crossing Rate]
#================================================================================
def cal_zero_crossing_rate(emg_data):
    'emg shape: [pnts, channel]'
    zcr = np.sum((emg_data[:-1, :] * emg_data[1:, :]) < 0, axis=0)
    return zcr

#================================================================================
#                            [Hjorth]
#================================================================================
def cal_hjorth_parameters(emg_data):
    'emg shape: [pnts, channel]'
    first_derivative = np.diff(emg_data, axis=0)
    second_derivative = np.diff(first_derivative, axis=0)

    activity = np.mean(emg_data**2, axis=0)
    mobility = np.mean(first_derivative**2, axis=0) / activity
    complexity = (np.mean(second_derivative**2, axis=0) / np.mean(first_derivative**2, axis=0)) / mobility

    return activity, mobility, complexity

#================================================================================
#                            [Spectral Moments]
#================================================================================
def cal_spectral_moments(emg_data, fs=200, nperseg=200):
    'emg shape: [pnts, channel]'
    mean_frequency, variance_frequency  = [], []
    
    for elec_idx in range(emg_data.shape[1]):
        f, Pxx = welch(emg_data[:, elec_idx], fs=fs, nperseg=nperseg)
        mean_freq = np.sum(f * Pxx) / np.sum(Pxx)
        mean_frequency.append(mean_freq)
        variance_frequency.append(moment(Pxx, moment=2))
        
    mean_frequency = np.array(mean_frequency)
    variance_frequency = np.array(variance_frequency)

    return mean_frequency, variance_frequency

#================================================================================
#                            [Wavelet Transform]
#================================================================================

def cal_wavelet_transform(emg_data, wavelet='db4'):
    'emg shape: [pnts, channel]'

    wavelet_coeffs_list = pywt.wavedec(emg_data, wavelet, axis=0)

    return wavelet_coeffs_list

#================================================================================
#                            [Deterended Fluctuations Analysis]
#================================================================================

def cal_detrended_fluctuation_analysis(emg_data):
    'emg shape: [pnts, channel]'
    dfa = []
    for elec_idx in range(emg_data.shape[1]):
        dfa.append(nolds.dfa(emg_data[:, elec_idx]))
    return np.array(dfa)

#================================================================================
#                            [Windowing]
#================================================================================

def window_emg_data(emg_data, fs, window_length, overlap=0):
    'emg shape: [pnts, channel]'
    num_samples, num_electrodes = emg_data.shape
    window_length = int(window_length*fs)
    overlap = int(overlap * fs)
    step_size = window_length - overlap
    num_windows = (num_samples - overlap) // step_size

    windowed_emg_data = np.empty((window_length, num_windows, num_electrodes))

    for win_idx in range(num_windows):
        start_sample = win_idx * step_size
        end_sample = start_sample + window_length
        windowed_emg_data[:, win_idx, :] = emg_data[start_sample:end_sample, :]
    return windowed_emg_data

#================================================================================
#                            [Plot EMG]
#================================================================================

def plot_emg(data_list, fig_size=(12, 8), scale=1.01, ylim=None):
    'data is a list of inputs with shape of [pnts, channel]'
    
    channel= data_list[0].shape[1]
    channel_max = 0.0
    fig, ax = plt.subplots(channel, 1, sharex=True, figsize=fig_size)
    for i in range(channel):
        for j in range(len(data_list)):
            data = data_list[j]
            ax[i].plot(data[:, i], linewidth=0.5)
            if ylim is None:
                channel_max_cur = np.max(np.abs(data[:,i])) * scale
                if channel_max_cur>channel_max:
                    channel_max = channel_max_cur
                ax[i].set_ylim(-channel_max, channel_max)
            else:
                ax[i].set_ylim(ylim[0], ylim[1])
            ax[i].set_ylabel(f'Channel {i+1}')
            if i == channel-1:
                ax[i].set_xlabel('Time')
    plt.tight_layout()
    plt.show()
    

#================================================================================
#                            [EMG Trend]
#================================================================================ 

def channel_trend(data, fs=200, regressor='linear'):
    'data shape: [pnts, channel]'
    time = np.arange(0, len(data))
    num_channels = data.shape[1]
    
    regression_models = []
    for i in range(num_channels):
        channel_data = data[:, i].reshape(-1, 1)
        time_reshaped = time.reshape(-1, 1)
        
        if regressor=='linear':
            # linear regressor
            model = LinearRegression()
            model.fit(time_reshaped, channel_data)
            regression_models.append(model)
            
        elif regressor=='svr':
            # Support vector regressor 
            model = SVR(kernel='rbf', C=1e3, gamma=0.1)
            model.fit(time_reshaped, channel_data.ravel())
            regression_models.append(model)
            
        elif regressor=='rf':
            # RF regressor 
            model = RandomForestRegressor(n_estimators=100, random_state=0)
            model.fit(time_reshaped, channel_data.ravel())
            regression_models.append(model)
        else:
            raise 'TypeError' "current regressors are 'linear' | 'svr' | 'rf'"

    # Visualize the fitted regression curves
    # fitted_curve = np.array([model.predict(time_reshaped).squeeze() for model in regression_models])
    # plot_emg([data, fitted_curve.T])
    if num_channels == 1:
        fig, ax = plt.subplots(figsize=(10, 5))
        axs = [ax]
    else:
        fig, axs = plt.subplots(num_channels, 1, figsize=(10, 5 * num_channels))
    
    # fig, axs = plt.subplots(num_channels, 1, figsize=(10, 5 * num_channels))
    for i, model in enumerate(regression_models):
        channel_data = data[:, i]
        fitted_curve = model.predict(time_reshaped)

        axs[i].plot(time, channel_data, label='EMG Channel {}'.format(i+1))
        axs[i].plot(time, fitted_curve, label='Fitted Regression Curve', linestyle='--')
        axs[i].set_xlabel('Time (s)')
        axs[i].set_ylabel('Amplitude')
        axs[i].legend()

    plt.tight_layout()
    plt.show() 

## Data Preprocessing
* Working On Specific Participant

In [None]:
# participant here is "P02_Data"
x = EMG_Dataset['P02_Data']['TwoControllers']['TskDir_Y'][4]    # ['ControllerAndPen', 'TwoControllers', 'TwoHand']
x_f = butter_bandpass_filter(x, fs=200, lowcut=20, highcut=90, axis=0)
x_rect = np.abs(x_f)
# x_smooth = savgol_filter(x_rect, window_length=10, polyorder=2, axis=0, mode='interp')
maximum_voluntary_contraction = np.max(x_rect, axis=0)
x_norm = x_rect/maximum_voluntary_contraction
plot_emg([x], fig_size=(12,10))

x_win = window_emg_data(x_f, fs=200, window_length=0.15, overlap=0.075)

print(f'data shape: {x.shape} | windowed: {x_win.shape}')

## Track the fatigue based on features

In [None]:
x_features = []
for i in range(x_win.shape[1]):
    # x_features.append(cal_integrated_emg(x_win[:,i,:]))
    # x_features.append(cal_willison_amplitude(x_win[:,i,:],threshold=0.1))
    # x_features.append(cal_simple_square_integral(x_win[:,i,:]))
    # x_features.append(cal_mav(x_win[:,i,:]))
    # x_features.append(cal_mean_frequency(x_win[:,i,:], fs=200, nperseg=x_win.shape[0]))
    # x_features.append(cal_median_frequency(x_win[:,i,:], fs=200, nperseg=x_win.shape[0]))
    x_features.append(cal_root_mean_square(x_win[:,i,:]))
    # x_features.append(cal_sample_entropy(x_win[:,i,:]))
    # x_features.append(cal_slope_sign_changes(x_win[:,i,:]))
    # x_features.append(cal_spectral_moments(x_win[:,i,:]))
    # x_features.append(cal_zero_crossing_rate(x_win[:,i,:]))
    # x_features.append(cal_waveform_length(x_win[:,i,:]))
    
x_features = np.array(x_features)
print(x_features.shape)

input = np.mean(x_features, axis=1, keepdims=True)
input = x_features

channel_trend(input, regressor='linear')

## Evaluating the most Important EMG Electrodes 

In [None]:
pca = PCA(n_components=0.99999999)
pca.fit(x_features)

explained_variance_ratios = pca.explained_variance_ratio_
# print("Explained variance ratios: ", explained_variance_ratios)
components = pca.components_
print(len(components))
most_important_channels = np.argmax(np.abs(components), axis=1) + 1
# print("Most important channels for each principal component: ", most_important_channels)

fig, ax = plt.subplots()
bars = ax.bar(range(1, 8 + 1), explained_variance_ratios)
ax.set_xlabel('Principal Components')
ax.set_ylabel('Explained Variance Ratio')
for i, bar in enumerate(bars):
    height = bar.get_height()
    ax.annotate(f'Ch {most_important_channels[i]}',
                xy=(bar.get_x() + bar.get_width() / 2, height),
                xytext=(0, 3),  
                textcoords="offset points",
                ha='center', va='bottom')
plt.show()


## Next Paper Code

In [None]:
# from sklearn.model_selection import train_test_split
# from sklearn.ensemble import RandomForestClassifier
# from sklearn.metrics import classification_report

# # Prepare the dataset (replace with your own data)
# labels = np.array([...])  # Array of task labels (X, Y, Z) corresponding to each time point in the EMG data
# X_train, X_test, y_train, y_test = train_test_split(emg_data, labels, test_size=0.2, random_state=42)

# # Train a RandomForest classifier
# clf = RandomForestClassifier(n_estimators=100, random_state=42)
# clf.fit(X_train, y_train)

# # Predict and evaluate the classifier on the test set
# y_pred = clf.predict(X_test)
# print(classification_report(y_test, y_pred))

# # Get feature importances for each channel
# importances = clf.feature_importances_

# # Print the feature importances
# print("Feature importances: ", importances)

# # You can also visualize the feature importances as a bar chart
# fig, ax = plt.subplots()
# ax.bar(range(1, num_channels + 1), importances)
# ax.set_xlabel('Channels')
# ax.set_ylabel('Feature Importance')
# plt.show()

In [None]:
# num_participants = 10
# num_repetitions = 10
# num_tasks = 3
# num_channels = 8
# participant_rms = []
# for i, (participant_key, participant_val) in enumerate(EMG_Dataset.items()):
#     repetition_rms = []
#     for repetition in range(10):
#         task_rms = []
#         emg_data_participant = participant_val['TwoHand']    # ['ControllerAndPen', 'TwoControllers', 'TwoHand']
#         for dir_key, dir_val in emg_data_participant.items():
            
#             task_rms.append(cal_root_mean_square(dir_val[repetition]))
            
#         repetition_rms.append(np.array(task_rms))
#     participant_rms.append(np.array(repetition_rms))
# participant_rms = np.array(participant_rms)
    

# participants, repetitions, tasks, channels = np.meshgrid(range(num_participants), range(num_repetitions), range(num_tasks), range(num_channels), indexing='ij')
# rms_values = np.moveaxis(participant_rms, -1, -2)
# f_value, p_value = stats.f_oneway(*rms_values)

# print("ANOVA results:")
# print("F-value:", f_value)
# print("P-value:", p_value)
# print(p_value.shape)




In [None]:
# import numpy as np
# import scipy.stats as stats

# num_participants = 12
# num_repetitions = 10

# # Calculate the RMS values for each channel during each task for each participant and repetition
# participant_rms = {}
# for participant in range(num_participants):
#     repetition_rms = {}
#     for repetition in range(num_repetitions):
#         task_rms = {}
#         emg_data_participant = emg_data[participant]  # Replace with the appropriate indexing for your data
#         for task, (start_time, end_time) in task_intervals.items():
#             start_idx = int(start_time * fs)
#             end_idx = int(end_time * fs)
#             task_signal = emg_data_participant[start_idx:end_idx, :]
#             task_rms[task] = compute_rms(task_signal)
#         repetition_rms[repetition] = task_rms
#     participant_rms[participant] = repetition_rms

# # Perform a four-way ANOVA to compare the RMS values across participants, repetitions, channels, and tasks
# participants, repetitions, tasks, channels = np.meshgrid(range(num_participants), range(num_repetitions), range(len(task_intervals)), range(num_channels), indexing='ij')
# rms_values = np.array([[[[participant_rms[participant][repetition][task][channel] for channel in range(num_channels)] for task in task_intervals] for repetition in range(num_repetitions)] for participant in range(num_participants)])
# f_value, p_value = stats.f_oneway(*rms_values.T)

# print("ANOVA results:")
# print("F-value:", f_value)
# print("P-value:", p_value)

# # If the ANOVA test is significant, perform post-hoc tests (e.g., Tukey's HSD test)
# alpha = 0.05
# if p_value < alpha:
#     from statsmodels.stats.multicomp import pairwise_tukeyhsd

#     # Prepare the data for the Tukey's HSD test
#     rms_values_flat = rms_values.flatten()
#     participant_labels = np.repeat(participants.ravel(), len(task_intervals) * num_channels * num_repetitions)
#     repetition_labels = np.tile(np.repeat(repetitions.ravel(), len(task_intervals) * num_channels), num_participants)
#     channel_labels = np.tile(np.repeat(channels.ravel(), len(task_intervals)), num_participants * num_repetitions)
#     task_labels = np.tile(tasks.ravel(), num_channels * num_participants * num_repetitions)

#     # Perform the Tukey's HSD test
#     tukey_results = pairwise_tukeyhsd(rms_values_flat, np.stack((participant_labels, repetition_labels, task_labels, channel_labels), axis=1))
#     print("\nTukey's HSD test results:")
#     print(tukey_results)
# else:
#     print("\nNo significant difference between participants, repetitions, channels, and tasks.")
