In [2]:
import scipy.io as sio
import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
import matplotlib.mlab as mlab


In [3]:
class SignalFeature():
    # signal basic parameters
    rest_original_signal = []   # original signal in 1 row []
    active_original_signal = [] # original signal in 1 row []
    fs = 0

    rest_signal_len = 0     # original signal length
    active_signal_len = 0   # original signal length
    # data segment list, 2D list
    # initiate in signal_segment function
    rest_signal_segment = []
    active_signal_segment = []
    # init function
    def __init__(self, rest_signal, active_signal, sample_freq):
        # input signal: 1D list []
        self.rest_original_signal = rest_signal
        self.active_original_signal = active_signal
        fs = sample_freq

        self.rest_signal_len = len(rest_signal)
        self.active_signal_len = len(active_signal)
        pass
    # signal split function
    def signal_segment(self, window_len, step_len):
        'split active/rest signal using sliding window'
        # reset segment list
        self.rest_signal_segment = []
        self.active_signal_segment = []
        # split rest signal
        if self.rest_signal_len < window_len:
            print("Rest signal length is below the window length")
            self.rest_signal_segment.append(self.rest_original_signal)
        else:
            for i in range((self.rest_signal_len - window_len)//step_len + 1):
                self.rest_signal_segment.append(self.rest_original_signal[i*step_len : i*step_len + window_len])
                pass
            pass

        # split active signal
        if self.active_signal_len < window_len:
            print("active signal length is below the window length")
            self.active_signal_segment.append(self.active_original_signal)
        else:
            for i in range((self.active_signal_len - window_len)//step_len + 1):
                self.active_signal_segment.append(self.active_original_signal[i*step_len : i*step_len + window_len])
                pass
            pass

    pass


class FMGFeatures(SignalFeature):
    def __init__(self, rest_signal, active_signal, sample_freq):
        super().__init__(rest_signal, active_signal, sample_freq)
        pass

    def rest_mean(self):
        result_list = []
        for i in self.rest_signal_segment:
            result_list.append(sum(i)/len(i))
            pass
        return result_list
    
    def active_mean(self):
        result_list = []
        for i in self.active_signal_segment:
            result_list.append(sum(i)/len(i))
            pass
        return result_list

    def FMG_increase(self):
        'calculate FMG increase: (active - rest) / rest'
        result_list = []
        # calculate average FMG value in rest segment
        rest_mean_FMG = sum(self.rest_original_signal) / self.rest_signal_len
        # calculate results
        for i in self.active_signal_segment:
            result_list.append((sum(i)/len(i) - rest_mean_FMG)/rest_mean_FMG)
            pass
        return result_list
    pass


class sEMGFeatures(SignalFeature):
    def __init__(self, rest_signal, active_signal, sample_freq):
        super().__init__(rest_signal, active_signal, sample_freq)
        pass

    def time_features(self):
        # calculate average absolute value of rest sEMG
        result_list = []
        rest_mean_sEMG = sum([abs(i) for i in self.rest_original_signal])/self.rest_signal_len
        for i in self.active_signal_segment:
            result_list.append((sum([abs(num) for num in i])/len(i) - rest_mean_sEMG)/rest_mean_sEMG)
        return result_list

    def freq_fratures(self):
        result_list = []
        for data in self.active_signal_segment:
            # calculate psd specture
            pxx, f = plt.psd(data, NFFT = 256, Fs = self.fs, Fc = 0, detrend = mlab.detrend_none,
                            window = mlab.window_hanning, noverlap = 0, pad_to = None, 
                            sides = 'default', scale_by_freq = None, return_line = None)
            plt.close()
            '''
            scipy.signal.welch(x, fs=1.0, window='hann', nperseg=None, noverlap=None, nfft=None, detrend='constant', return_onesided=True, scaling='density', axis=- 1, average='mean')
            '''
            # med frequency
            N = len(f)
            # calculate (psd curve) integration
            MSUM = [0]
            for i in range(1, N, 1):
                MSUM.append(MSUM[i - 1] + pxx[i - 1] * (f[i] - f[i - 1]))
            
            diff = []
            for i in range(0, N, 1):
                diff.append(MSUM[i] - MSUM[N-1]/2)
            for i in range(N):
                if diff[i] <= 0 and diff[i + 1] >= 0:
                    mf_x1= i
                    mf_x2 = i + 1
                    break
            # linear interpolation based mf calculation
            mf = (f[mf_x1]*diff[mf_x2] - f[mf_x2]*diff[mf_x1])/(diff[mf_x2] - diff[mf_x1])
            
            # average power frequency
            FSUM = [0]
            for i in range(1, N, 1):
                FSUM.append(FSUM[i - 1] + f[i] * pxx[i - 1] * (f[i] - f[i - 1]))
            mpf = FSUM[N - 1]/MSUM[N - 1]
            # power = FSUM[N - 1]
            # power_time = sum([num*num for num in data])
            result_list.append([mf, mpf])
            pass
        return result_list
    pass

In [4]:
# load all experimental data in signal_df 
# file path format: 'D:\\folder path\\subject name\\label_(rest/active).mat'
file_folder_path = 'D:\code\data\iFEMG_data_set'
subjects = ['\zpk1', '\zpk2', '\zpk3', '\zpk4', '\zpk5', '\zpk6']
mat_name = ['d0', 'd2', 'd5', 'd7', 'dm']

signal_df = pd.DataFrame(columns = ('subject_name', 'rest_signal', 'active_signal', 'label', 'sensor_channel'))
# rest/active signal: [:, 0]FMG, [:, 1]sEMG

for i in subjects:
    for j in mat_name:
        # print('processing: ', i + j)
        # build data frame, every row has a unique label
        signal_df = signal_df.append({'subject_name' : i,
                                    'rest_signal': sio.loadmat(file_folder_path + i + '\\' + j + '_rest.mat')[j + '_rest'],
                                    'active_signal': sio.loadmat(file_folder_path + i + '\\' + j + '_active.mat')[j + '_active'], 
                                    'label' : j,
                                    'sensor_channel' : 'bicps_br'}, ignore_index=True)
    pass

In [10]:
'''
subjects
force level
sensor channel
FMG + sEMG + ultrasound
'''
# 使用boolean值索引出某一名受试者的实验数据
data = signal_df.loc[signal_df.loc[:, 'subject_name'] == '\zpk1']

FMG = SignalFeature(data.loc[0, 'rest_signal'][:, 0], data.loc[0, 'active_signal'][:, 0], 1223)
sEMG = SignalFeature(data.loc[0, 'rest_signal'][:, 1], data.loc[0, 'active_signal'][:, 1], 1223)
FMG.signal_segment(1223, 500)
sEMG.signal_segment(1223, 500)

temp = []


<class 'pandas.core.frame.DataFrame'>
0
83


In [13]:
for row in data.itertuples():
    print(row.rest_signal[:, 0])

[88. 88. 88. ... 88. 88. 88.]
[106. 106. 106. ...  88.  88.  88.]
[  0.   0. 106. ... 106. 106. 106.]
[115. 115. 115. ... 106. 106. 106.]
[79. 79. 79. ... 79. 79. 79.]


In [15]:
print(data.info)

<bound method DataFrame.info of   subject_name                                        rest_signal  \
0        \zpk1  [[88.0, 311.8869405619067], [88.0, 283.8395630...   
1        \zpk1  [[106.0, -95.733998252022], [106.0, -159.97312...   
2        \zpk1  [[0.0, -115.96887136551932], [0.0, -118.489493...   
3        \zpk1  [[115.0, -152.71052914404424], [115.0, -188.92...   
4        \zpk1  [[79.0, -122.4545388065741], [79.0, -73.193093...   

                                       active_signal label sensor_channel  
0  [[203.0, -295.63014277918353], [203.0, -287.21...    d0       bicps_br  
1  [[283.0, -118.90756361272028], [283.0, -653.43...    d2       bicps_br  
2  [[300.0, -92.69143911160802], [300.0, 925.2660...    d5       bicps_br  
3  [[265.0, -1168.724526078126], [265.0, -240.337...    d7       bicps_br  
4  [[265.0, 868.3033449275356], [265.0, 800.74755...    dm       bicps_br  >
