In [4]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os, sys
sys.path.append('../Shared')
import processing, Model, Visualization, modules
from sklearn.model_selection import train_test_split

def get_label_from_filename(fname):
    for cls, label in class_to_label.items():
        if cls in fname:
            return label
    return None  # if no class found

def get_data_all_at_once(SUB, data_lst, label_lst):
    dataset, labels = [], []

    for l, d in zip(label_lst, data_lst):
        data = pd.read_csv(base_path + SUB + "/" + d)
        data = data.iloc[:, 1:]   # 첫 column(time) 제거
        label = get_label_from_filename(l)
        labels.append(np.array([label] * data.shape[0]))
        dataset.append(data)
        #print("\t", l, d, " ====> ", np.array([label] * data.shape[0]).shape, data.shape)

    D = np.concatenate(dataset, axis=0)
    L = np.concatenate(labels, axis=0)
    #print(D.shape, L.shape)

    balanced_idx = modules.Downsample_to_balance_class(L)
    X_emg_bal, y_bal = D[balanced_idx], L[balanced_idx]
    print("\t Balanced EMG:", X_emg_bal.shape, "y:", y_bal.shape)

    num_channels = X_emg_bal.shape[1]

    X_tmp, y_tmp = processing.sliding_window_with_labels(X=X_emg_bal, y=y_bal,  window_size=window_size, step_size=step_size)  # (num_windows, win_len, ch)

    all_X, all_y = modules.get_X_y_ZC_only(X_tmp, y_tmp)
    all_y = modules.y_change_to_int(all_y)
    all_X = all_X.reshape(-1, num_channels, 1)

    return all_X, all_y

def get_data(sub_info, data_lst, label_lst):
    dataset, labels = [], []

    for l, d in zip(label_lst, data_lst):
        data = pd.read_csv(base_path + sub_info + "/" + d)
        data = data.iloc[:, 1:]   # 첫 column(time) 제거
        label = get_label_from_filename(l)
        labels.append(np.array([label] * data.shape[0]))
        dataset.append(data)
        #print("\t", l, d, " ====> ", np.array([label] * data.shape[0]).shape, data.shape)

    D = np.concatenate(dataset, axis=0)
    L = np.concatenate(labels, axis=0)
    #print(D.shape, L.shape)

    balanced_idx = modules.Downsample_to_balance_class(L)
    X_emg_bal, y_bal = D[balanced_idx], L[balanced_idx]
    print("\t Balanced EMG:", X_emg_bal.shape, "y:", y_bal.shape)

    return X_emg_bal, y_bal


def sliding_window(X,y):
    num_channels = X.shape[1]
    X_tmp, y_tmp = processing.sliding_window_with_labels(X=X, y=y,  window_size=window_size, step_size=step_size)  # (num_windows, win_len, ch)

    all_X, all_y = modules.get_X_y_ZC_only(X_tmp, y_tmp)
    all_y = modules.y_change_to_int(all_y)
    #all_X = all_X.reshape(-1, num_channels, 1)

    return all_X, all_y

In [2]:
classes = ['STC', 'WAK', 'STDUP', 'SITDN', 'UPS', 'DNS', 'KLFT', 'TPTO', 'LLF', 'LLB', 'LLS', 'KLCL', 'HS', 'TO', 'LUGF', 'LUGB'] #16 classes
class_to_label = {cls: i for i, cls in enumerate(classes)}

Features = ['mav', 'var', 'zc', 'iemg', 'wl', 'wamp', 'mavs', 'rms', 'ssc', 'msq', 'v3', 'ld', 'dabs', 'mfl', 'mpr', 'mnf', 'psr', 'arc1', 'arc2', 'arc3', 'arc4', 'cc1', 'cc2', 'cc3', 'cca', 'dwtc1', 'dwtc2', 'dwtpc1', 'dwtpc2', 'dwtpc3']

base_path = 'D:/Data/Gait-EMG/SIAT/SIAT_LLMD20230404/'
Sub_lst_data = [f"Sub{i:02d}"+"/Data/" for i in range(1, 41)]
Sub_lst_label = [f"Sub{i:02d}"+"/Labels/" for i in range(1, 41)]
window_size, step_size = 400, 20

# Data Store
- To reduce time so that I don't have to run the sliding window/ feature extraction everytime

In [None]:
for SUB, LAB in zip(Sub_lst_data, Sub_lst_label):

    data_lst = os.listdir(base_path + SUB)
    label_lst = os.listdir(base_path + LAB)
    X_target, y_target = get_data(SUB, data_lst, label_lst)
    #X_target, y_target = modules.random_downsample_Num(X_target, y_target, n_keep=1000)
    print(f"Sub: {SUB} Start ==> (X, y:) {X_target.shape, y_target.shape} \n======================================================\n")
    X_store, y_store = sliding_window(X_target, y_target)
    pd.DataFrame(X_store).to_csv(f"./stored_dataset/X_{SUB[:-6]}.csv", index=False)
    pd.DataFrame(y_store).to_csv(f"./stored_dataset/y_{SUB[:-6]}.csv", index=False)

	 Balanced EMG: (434624, 25) y: (434624,)
Sub: Sub01/Data/ Start ==> (X, y:) ((434624, 25), (434624,)) 

	 Balanced EMG: (326464, 25) y: (326464,)
Sub: Sub02/Data/ Start ==> (X, y:) ((326464, 25), (326464,)) 



# Baseline - LOSO

In [None]:
###나중에 여기부터 for (Subject 단위) - 지금부터 1 subject 시작
ACC_lst = []
for SUB, LAB in zip(Sub_lst_data, Sub_lst_label):
    data_lst = os.listdir(base_path + SUB)
    label_lst = os.listdir(base_path + LAB)
    X_target, y_target = get_data(SUB, data_lst, label_lst)
    X_target, y_target = modules.random_downsample_Num(X_target, y_target, n_keep=1000)
    print(f"Sub: {SUB} Start ==> (X, y:) {X_target.shape, y_target.shape} \n======================================================\n")

    X_lst, y_lst, Sub_acc = [], [], []
    for SUB2, LAB2 in zip(Sub_lst_data, Sub_lst_label):
        if SUB == SUB2:
            continue
        else:
            data_lst2 = os.listdir(base_path + SUB2)
            label_lst2 = os.listdir(base_path + LAB2)
            X_tmp, y_tmp = get_data(SUB2, data_lst2, label_lst2)
            X_tmp, y_tmp = modules.random_downsample_Num(X_tmp, y_tmp, n_keep=5000)
            X_lst.append(X_tmp)
            y_lst.append(y_tmp)
            X, y = np.concatenate(X_lst, axis=0), np.concatenate(y_lst, axis=0)

            print("\t Cumulated : ", X.shape, y.shape)

            model = Model.build_model_1D(input_shape=X.shape[1:], num_classes=len(np.unique(y)))
            history = model.fit(X, y, epochs=50, batch_size=512, verbose=0)
            loss, acc = model.evaluate(X_target, y_target, verbose=0)
            print(f"\t Subject {SUB2} - inter subject acc => {acc*100:.2f}%")
            Sub_acc.append(acc)
    ACC_lst.append(Sub_acc)

# Inter-subject : Accumulate

In [None]:
ACC_lst = []
for SUB, LAB in zip(Sub_lst_data, Sub_lst_label):
    print(f"Sub: {SUB} Start\n==================================\n")
    data_lst = os.listdir(base_path + SUB)
    label_lst = os.listdir(base_path + LAB)
    X_target, y_target = get_data(SUB, data_lst, label_lst)

    X_lst, y_lst, Sub_acc = [], [], []
    for SUB2, LAB2 in zip(Sub_lst_data, Sub_lst_label):
        if SUB == SUB2:
            continue
        else:
            data_lst2 = os.listdir(base_path + SUB2)
            label_lst2 = os.listdir(base_path + LAB2)
            X_tmp, y_tmp = get_data(SUB2, data_lst2, label_lst2)
            X_lst.append(X_tmp)
            y_lst.append(y_tmp)
            X, y = np.concatenate(X_lst, axis=0), np.concatenate(y_lst, axis=0)
            print("\t Cumulated : ", X.shape, y.shape)

            model = Model.build_model_1D(input_shape=X.shape[1:], num_classes=len(np.unique(y)))
            history = model.fit(X, y, epochs=50, batch_size=512, verbose=0)
            loss, acc = model.evaluate(X_target, y_target, verbose=0)
            print(f"\t Subject {SUB2} - inter subject acc => {acc*100:.2f}%")
            Sub_acc.append(acc)
    ACC_lst.append(Sub_acc)

# Few-shot adaptation