In [21]:
import os

import pandas as pd
import scipy
import numpy as np
import matplotlib.pyplot as plt

import sys
sys.path.append("../Share")
import config, utils, baseline


from scipy.signal import lfilter, medfilt
from scipy.signal import hilbert
from sklearn.decomposition import PCA

class EMGFeatureExtractor:
    def __init__(self, feat_mean, feat_std, filter_b, filter_a, Norm_bool):
        self.feat_mean = feat_mean
        self.feat_std = feat_std
        self.filter_b = filter_b
        self.filter_a = filter_a
        self.buffer = None  # to be set externally
        self.normalization = Norm_bool


    def extract_features(self, win_size=600, win_step=120, feat_exclude=60):
        buf = lfilter(self.filter_b, self.filter_a, self.buffer, axis=1)
        nch, len_x = buf.shape
        n_steps = (len_x - win_size) // win_step + 1

        features = np.zeros((nch, 14, n_steps))
        for i in range(n_steps):
            x = buf[:, i*win_step:i*win_step+win_size]
            features[:, :, i] = self.extract_feature_win(x)

        if self.normalization:
            features = (features - self.feat_mean[:, :, np.newaxis]) / self.feat_std[:, :, np.newaxis]

        if features.shape[2] > feat_exclude:
            features = features[:, :, feat_exclude-1:]  # <-- FIXED HERE
        return features


    def extract_tail_features(self, feat_num, win_size=600, win_step=120):
        n_samples = win_size + (feat_num - 1) * win_step
        if self.buffer.shape[1] < n_samples:
            return None

        buf = self.buffer[:, -n_samples:]
        buf = lfilter(self.filter_b, self.filter_a, buf, axis=1)

        nch, len_x = buf.shape
        n_steps = (len_x - win_size) // win_step + 1
        features = np.zeros((nch, 14, n_steps))

        for i in range(n_steps):
            x = buf[:, i*win_step:i*win_step+win_size]
            features[:, :, i] = self.extract_feature_win(x)

        features = (features - self.feat_mean[:, :, np.newaxis]) / self.feat_std[:, :, np.newaxis]
        return features


    def filter_features(self, features):
        reshaped = features.reshape(features.shape[0]*features.shape[1], -1).T
        pca = PCA(n_components=1)
        pcomp = pca.fit_transform(reshaped).squeeze()
        pcomp = (pcomp - np.mean(pcomp)) / np.std(pcomp)
        med = medfilt(pcomp, kernel_size=5)

        if np.mean(med[:40]) > 0:
            med = -med

        # RMS envelope approximation
        window_size = 3
        rms_env = np.sqrt(np.convolve(med**2, np.ones(window_size)/window_size, mode='same'))
        med = (med - rms_env) - 0.5
        return med


    def extract_feature_win(self, x):
        len_x = x.shape[1]
        sum_x = np.sum(x, axis=1)
        mean_x = sum_x / len_x
        ssq_x = np.sum(x**2, axis=1)
        std_x = np.sqrt((ssq_x - 2*sum_x*mean_x + len_x*mean_x**2)/(len_x-1))
        diff_x = np.diff(x, axis=1)

        zc = np.mean(np.sign(x[:,1:]) != np.sign(x[:,:-1]), axis=1)
        ssc = np.mean(np.sign(diff_x[:,1:]) != np.sign(diff_x[:,:-1]), axis=1)
        wl = np.mean(np.abs(diff_x), axis=1)
        wamp = np.mean(np.abs(np.diff(x, axis=1)) > std_x[:, np.newaxis], axis=1)
        mab = np.mean(np.abs(x), axis=1)
        msq = ssq_x / len_x
        rms = np.sqrt(msq)
        v3 = np.cbrt(np.mean(x**3, axis=1))
        lgdec = np.exp(np.mean(np.log(np.abs(x) + 1), axis=1))
        dabs = np.sqrt(np.mean(diff_x**2, axis=1))
        mfl = np.log(np.sqrt(np.mean(diff_x**2, axis=1)) + 1)
        mpr = np.mean(x > std_x[:, np.newaxis], axis=1)
        mid = x.shape[1] // 2
        mavs = np.mean(np.abs(x[:, mid:]), axis=1) - np.mean(np.abs(x[:, :mid]), axis=1)

        weight = np.ones_like(x)
        weight[:, :int(0.25*len_x)] = 0.5
        weight[:, int(0.75*len_x):] = 0.5
        wmab = np.mean(weight * np.abs(x), axis=1)

        return np.stack([zc, ssc, wl, wamp, mab, msq, rms, v3, lgdec, dabs, mfl, mpr, mavs, wmab], axis=1)

from scipy.signal import cheby2

def create_cheby2_bandpass(fs, low_cutoff, high_cutoff, order=4, rs=30):
    nyq = fs / 2
    low = max(1, low_cutoff) / nyq
    high = min(0.99 * fs / 2, high_cutoff) / nyq
    b, a = cheby2(order, rs, [low, high], btype='bandpass')
    return b, a

#filter_b, filter_a = create_cheby2_bandpass(fs, lower_cutoff, upper_cutoff)

1. Sync up to the Data_ADC and Data_Cls
2. Extract features
3. Normalization problem
    - Without Norm.
4.

In [2]:
fs = round(10e6 / 2048)  # 4883 Hz
lower_cutoff = 100
upper_cutoff = 600
filter_b, filter_a = cheby2(4, 30, [lower_cutoff / (fs/2), upper_cutoff / (fs/2)], btype='bandpass')

feat_mean_1ch = np.array([0.1, 0.1, 2.5, 0.0, 11.0, 229.0, 13.8, -11.0, 9.0, 3.0, 1.5, 0.0, 0.0, 2.8])
feat_std_1ch = np.array([0.02, 0.05, 0.65, 0.02, 4.43, 303.9, 6.85, 12.18, 2.87, 0.87, 0.21, 0.04, 6.68, 1.12])
feat_mean = np.tile(feat_mean_1ch, (4, 1))
feat_std = np.tile(feat_std_1ch, (4, 1))

extractor = EMGFeatureExtractor(feat_mean, feat_std, filter_b, filter_a, Norm_bool=False)
trainer = baseline.TremorModelTrainer(config, subject="Hunmin")

In [3]:
#path = 'C:/Users/hml76/OneDrive/문서/MATLAB/Data_Hunmin/Exp_2025-07-02-v4/E9AD0E7DCC2B/raw/1/'
#mat = scipy.io.loadmat(path+'Data_20250702_141831.mat')
#raw_emg_data = mat['Data_ADC']
#extractor.buffer = raw_emg_data  # shape: (4, 215040)
#features = extractor.extract_features()
#print(features.shape)  # → (4, 14, 1787)

In [19]:
data_files = config.dataset_sub_H
default_path = config.default_path_sub_H

baseline_K = ['1', '6', '10', '14', '18', '22', '26', '30', '34', '38']
for K in baseline_K:
    X_train_all, y_train_all, X_test_all, y_test_all = [], [], [], []
    ACC_all = []

    for idx, session_info in enumerate(data_files):
    #for idx, session_info in enumerate(['Exp_2025-06-27-v1/E9AD0E7DCC2B/']):
        #print(f"Dataset {idx + 1}/{len(data_files)} - Session {session_info}\n{'='*40}")
        path = os.path.join(default_path, f'{session_info}raw/')
        features, class_labels = [], []
        for c_idx, c in enumerate(config.classes_5):
            raw_data = os.listdir(path+c)
            mat = scipy.io.loadmat(path+c+raw_data[0])
            extractor.buffer = mat['Data_ADC']
            class_labels.append(mat['Data_Cls'].reshape(-1))
            features_per_cls = extractor.extract_features()
            features_per_cls = np.transpose(features_per_cls, (2, 0, 1))  # shape: (1729, 4, 14)
            features.append(features_per_cls)
            #print(features_per_cls.shape, mat['Data_Cls'].reshape(-1).shape)

        print(session_info)
        X = np.concatenate(features, axis=0)
        y = np.concatenate(class_labels, axis=0)

        if X.shape[0] != y.shape[-1]:
            print(f"Incorrect shape between features and Class: {X.shape} and {y.shape}, {session_info}")
            break

        if idx < K:
            X_train, y_train, X_test, y_test = utils.split_data(X, y, ratio=0.9)
            X_train_all.append(X_train)
            y_train_all.append(y_train)
            X_test_all.append(X_test)
            y_test_all.append(y_test)

            # Concatenate all so far
            X_train_stacked = np.concatenate(X_train_all, axis=0)
            y_train_stacked = np.concatenate(y_train_all, axis=0)
            X_test_stacked = np.concatenate(X_test_all, axis=0)
            y_test_stacked = np.concatenate(y_test_all, axis=0)
            acc=0
        elif idx == K:
            print(X_train_stacked.shape, y_train_stacked.shape)
            acc, pre_trained_CNN = trainer.train_multiple_dataset(X_train, y_train, X_test, y_test)
        else:
            #Test with current data
            X = np.expand_dims(X, axis=-1)
            acc = pre_trained_CNN.evaluate(X, y, verbose=0)[1]
            print(f"\t Accuracy on unseen dataset {idx+1}: {acc * 100:.4f}%")
        ACC_all.append(acc)
    pd.DataFrame(ACC_all)



(1724, 4, 14) (1724,)
(1729, 4, 14) (1729,)
(1724, 4, 14) (1724,)
(1724, 4, 14) (1724,)
(1724, 4, 14) (1724,)
Exp_2025-05-27/E8331D05289A/
(1729, 4, 14) (1729,)
(1724, 4, 14) (1724,)
(1720, 4, 14) (1720,)
(1724, 4, 14) (1724,)
(1724, 4, 14) (1724,)
Exp_2025-06-18/E9AD0E7DCC2B/
(1729, 4, 14) (1729,)
(1729, 4, 14) (1729,)
(1724, 4, 14) (1724,)
(1729, 4, 14) (1729,)
(1677, 4, 14) (1677,)
Exp_2025-06-20-v1/E9AD0E7DCC2B/
(1720, 4, 14) (1720,)
(1724, 4, 14) (1724,)
(1729, 4, 14) (1729,)
(1724, 4, 14) (1724,)
(1716, 4, 14) (1716,)
Exp_2025-06-20-v2/E9AD0E7DCC2B/
(1592, 4, 14) (1592,)
(1191, 4, 14) (1191,)
(1494, 4, 14) (1494,)
(1686, 4, 14) (1686,)
(1724, 4, 14) (1724,)
Exp_2025-06-20-v3/E9AD0E7DCC2B/
(1729, 4, 14) (1729,)
(1729, 4, 14) (1729,)
(1720, 4, 14) (1720,)
(1724, 4, 14) (1724,)
(1729, 4, 14) (1729,)
Exp_2025-06-20-v4/E9AD0E7DCC2B/
(1724, 4, 14) (1724,)
(1711, 4, 14) (1711,)
(1724, 4, 14) (1724,)
(1724, 4, 14) (1724,)
(1707, 4, 14) (1707,)
Exp_2025-06-20-v5/E9AD0E7DCC2B/
(1724, 4, 14

In [24]:
ACC_all = [1,2,3]
pd.DataFrame(ACC_all).to_csv(f'../Results/Baseline_results_train_with_{K}data_noNorm_H.csv')



In [None]:
def get_dataset(path, classes, show_labels):
    for c_idx, c in enumerate(classes):
        raw_data = os.listdir(path+c)
        if len(raw_data) == 1:
            mat = scipy.io.loadmat(path+c+raw_data[0])
        else:
            print(f"There is more than one dataset - check - {path} - {classes}")

        if c_idx == 0:
            #print(f"Import matlab raw dataset - Matlab file Keys: {mat.keys()}")
            pass


        if c_idx == 0:
            feature_set = np.transpose(mat['Data_ADC'], (2, 0, 1))  # shape: (1729, 4, 14)
            labels = mat['Data_Cls'].flatten()  #0 or class (either one)

        elif c_idx > 0:
            feature_set_added = np.transpose(mat['Data_ADC'], (2, 0, 1))
            labels_addend = mat['Data_Cls'].flatten()  #0 or class (either one)

            feature_set = np.concatenate([feature_set, feature_set_added], axis=0)
            labels = np.concatenate([labels, labels_addend], axis=0)
        else:
            print("Error in c_idx")
            break

    return feature_set, labels

In [None]:
def get_dataset(path, classes, show_labels):
    for c_idx, c in enumerate(classes):
        raw_data = os.listdir(path+c)
        if len(raw_data) == 1:
            mat = scipy.io.loadmat(path+c+raw_data[0])
        else:
            print(f"There is more than one dataset - check - {path} - {classes}")

        if c_idx == 0:
            #print(f"Import matlab raw dataset - Matlab file Keys: {mat.keys()}")
            pass

        if mat['Data_Cls'].shape[-1] != mat['Data_ADC'].shape[-1]:
            print(f"Label and dataset do not match! : {len(mat['Data_Cls'])}, {len(mat['Data_Fea'])}")
            break

        if c_idx == 0:
            feature_set = np.transpose(mat['Data_ADC'], (2, 0, 1))  # shape: (1729, 4, 14)
            labels = mat['Data_Cls'].flatten()  #0 or class (either one)

        elif c_idx > 0:
            feature_set_added = np.transpose(mat['Data_ADC'], (2, 0, 1))
            labels_addend = mat['Data_Cls'].flatten()  #0 or class (either one)

            feature_set = np.concatenate([feature_set, feature_set_added], axis=0)
            labels = np.concatenate([labels, labels_addend], axis=0)
        else:
            print("Error in c_idx")
            break

    if show_labels:
        plt.figure(figsize=(8,3))
        plt.plot(labels)
        plt.ylabel('Class label', fontsize=13)
        plt.xlabel('Samples', fontsize=13)
        plt.grid(True, which='both', linestyle='--')
        plt.show()

    return feature_set, labels

In [None]:
default_path = config.default_path_sub_H
dataset_info = config.dataset_sub_H

path = os.path.join(default_path, f'{session_info}/raw/')

feature_set, labels = get_dataset(path, classes, show_labels=False)
X_train, y_train, X_test, y_test = utils.split_data(feature_set, labels, ratio=train_ratio)