In [1]:
from libemg import streamers, data_handler, filtering, gui, emg_predictor, feature_extractor, utils
import os
import json
import time
from os import walk

import numpy as np
from libemg.offline_metrics import OfflineMetrics

In [2]:
WINDOW_SIZE = 200 # 40
WINDOW_INC = 20
CLASSES = [0, 1, 2 , 3, 4]
REPS = [0, 1, 2, 3, 4 , 5]
STAGE = 4 # 0: collect data, 1: prepare model, 2: test band, 3: test model, 4: prepare emg model, 5: prepare emg imu ppg model


In [None]:
def get_training_data(folder_location, emg_regex, imu_regex, ppg_regex):
    # Dictionnaire pour stocker les features et labels
    feature_dic = {
        'training_features': None,
        'training_labels': None
    }

    # ----------------------- Traitement EMG --------------------------
    odh_emg = data_handler.OfflineDataHandler()
    odh_emg.get_data(folder_location=folder_location, regex_filters=emg_regex)
    windows_emg, metadata_emg = odh_emg.parse_windows(WINDOW_SIZE, WINDOW_INC)
    
    # Filtrage EMG
    fi_emg = filtering.Filter(2000)  # Fréquence d'échantillonnage EMG
    fi_emg.install_filters({"name": "notch", "cutoff": 60, "bandwidth": 3})
    fi_emg.install_filters({"name": "bandpass", "cutoff": [20, 450], "order": 4})
    fi_emg.install_filters({"name": "standardization"})
        
    filtered_emg = fi_emg.filter(windows_emg)
    
    # Extraction des caractéristiques EMG
    fe_emg = feature_extractor.FeatureExtractor()
    features_emg = fe_emg.extract_feature_group("LS4", filtered_emg)
    print("EMG features: ", features_emg)
    print("EMG feature LS shapes: ", features_emg['LS'].shape)
    print("EMG feature MSR shapes: ", features_emg['MSR'].shape)
    print("EMG feature MFL shapes: ", features_emg['MFL'].shape)
    print("EMG feature WAMP shapes: ", features_emg['WAMP'].shape)

    # ----------------------- Traitement IMU --------------------------
    odh_imu = data_handler.OfflineDataHandler()
    odh_imu.get_data(folder_location=folder_location, regex_filters=imu_regex)
    windows_imu, metadata_imu = odh_imu.parse_windows(WINDOW_SIZE, WINDOW_INC)
    
    # Filtrage IMU (ex: passe-bas pour mouvement)
    fi_imu = filtering.Filter(100)  # Fréquence d'échantillonnage IMU
    fi_imu.install_filters({"name": "lowpass", "cutoff": 20, "order": 4})
    fi_imu.install_filters({"name": "standardization"})
    filtered_imu = fi_imu.filter(windows_imu)
    
    # Extraction des caractéristiques IMU
    fe_imu = feature_extractor.FeatureExtractor()
    features_imu = fe_imu.extract_features(["WLPHASOR" , "DFTR" , "WENG" ,"RMSPHASOR"], filtered_imu)
    #print("IMU features: ", imu_features)
    print("IMU feature WLPHASOR shapes: ", features_imu['WLPHASOR'].shape)
    print("IMU feature DFTR shapes: ", features_imu['DFTR'].shape)
    print("IMU feature WENG shapes: ", features_imu['WENG'].shape)
    print("IMU feature RMSPHASOR shapes: ", features_imu['RMSPHASOR'].shape)
    

    # ----------------------- Traitement PPG --------------------------
    odh_ppg = data_handler.OfflineDataHandler()
    odh_ppg.get_data(folder_location=folder_location, regex_filters=ppg_regex)
    windows_ppg, metadata_ppg = odh_ppg.parse_windows(WINDOW_SIZE, WINDOW_INC)
    
    # Filtrage PPG (ex: passe-bande pour le pouls)
    fi_ppg = filtering.Filter(100)  # Fréquence d'échantillonnage PPG
    fi_ppg.install_filters({ "name": "bandpass", "cutoff": [0.66, 3] , "order": 4})
    fi_ppg.install_filters({ "name": "pca", "num_components": 2})
    fi_ppg.install_filters({"name": "standardization"})
    filtered_ppg = fi_ppg.filter(windows_ppg)
    
    # Extraction des caractéristiques PPG
    fe_ppg = feature_extractor.FeatureExtractor()
    features_ppg = fe_ppg.extract_features(["MPK", "WENG", "MEAN"], filtered_ppg)
    #print("PPG features: ", ppg_features)
    print("PPG feature MPK shapes: ", features_ppg['MPK'].shape)
    print("PPG feature WENG shapes: ", features_ppg['WENG'].shape)
    print("PPG feature MEAN shapes: ", features_ppg['MEAN'].shape)

    # ----------------- Fusion des caractéristiques -------------------
    # Vérification de l'alignement des fenêtres
    assert len(features_emg) == len(features_imu) == len(features_ppg), "Les fenêtres ne sont pas alignées."
    
    # Concaténation horizontale des caractéristiques
    combined_features = np.hstack((features_emg, features_imu, features_ppg))
    
    # Mise à jour du dictionnaire de retour
    feature_dic['training_features'] = combined_features
    print("Combined features : ", feature_dic['training_features'])
    feature_dic['training_labels'] = metadata_emg['classes']  # Labels supposés communs
    print("Labels shape: ", feature_dic['training_labels'].shape)
    
    return feature_dic, combined_features

In [None]:
def prepareemgimuppgmodel():

    emg_regex_filters = [
        data_handler.RegexFilter(left_bound = "C_", right_bound="_R_", values = [str(i) for i in REPS], description='reps'),
        data_handler.RegexFilter(left_bound = "_R_", right_bound="_emg.csv", values = [str(i) for i in CLASSES], description='classes')

    ]
    imu_regex_filters = [
        data_handler.RegexFilter(left_bound = "C_", right_bound="_R_", values = [str(i) for i in REPS], description='reps'),
        data_handler.RegexFilter(left_bound = "_R_", right_bound="_imu.csv", values = [str(i) for i in CLASSES], description='classes')
    ]

    ppg_regex_filters = [
        data_handler.RegexFilter(left_bound = "C_", right_bound="_R_", values = [str(i) for i in REPS], description='reps'),
        data_handler.RegexFilter(left_bound = "_R_", right_bound="_ppg.csv", values = [str(i) for i in CLASSES], description='classes')
    ]

    emg_feature_dic_0 , emg_windows_0 = get_training_data(folder_location="data/S" + str("0") + "/", regex_filters= emg_regex_filters)
    emg_feature_dic_1 , emg_windows_1 = get_training_data(folder_location="data/S" + str("1") + "/", regex_filters= emg_regex_filters)
    emg_feature_dic_2 , emg_windows_2 = get_training_data(folder_location="data/S" + str("2") + "/", regex_filters= emg_regex_filters)
    emg_feature_dic_3 , emg_windows_3 = get_training_data(folder_location="data/S" + str("3") + "/", regex_filters= emg_regex_filters)
    emg_feature_dic_4 , emg_windows_4 = get_training_data(folder_location="data/S" + str("4") + "/", regex_filters= emg_regex_filters)

    imu_feature_dic_0 , imu_windows_0 = get_training_data(folder_location="data/S" + str("0") + "/", regex_filters= imu_regex_filters)
    imu_feature_dic_1 , imu_windows_1 = get_training_data(folder_location="data/S" + str("1") + "/", regex_filters= imu_regex_filters)
    imu_feature_dic_2 , imu_windows_2 = get_training_data(folder_location="data/S" + str("2") + "/", regex_filters= imu_regex_filters)
    imu_feature_dic_3 , imu_windows_3 = get_training_data(folder_location="data/S" + str("3") + "/", regex_filters= imu_regex_filters)
    imu_feature_dic_4 , imu_windows_4 = get_training_data(folder_location="data/S" + str("4") + "/", regex_filters= imu_regex_filters)

    ppg_feature_dic_0 , ppg_windows_0 = get_training_data(folder_location="data/S" + str("0") + "/", regex_filters= ppg_regex_filters)
    ppg_feature_dic_1 , ppg_windows_1 = get_training_data(folder_location="data/S" + str("1") + "/", regex_filters= ppg_regex_filters)
    ppg_feature_dic_2 , ppg_windows_2 = get_training_data(folder_location="data/S" + str("2") + "/", regex_filters= ppg_regex_filters)
    ppg_feature_dic_3 , ppg_windows_3 = get_training_data(folder_location="data/S" + str("3") + "/", regex_filters= ppg_regex_filters)
    ppg_feature_dic_4 , ppg_windows_4 = get_training_data(folder_location="data/S" + str("4") + "/", regex_filters= ppg_regex_filters)
    

    

    #Pre-processing
    #a standardization filter(EMG , IMU , PPG)
    #EMG data, application of a 60Hz notch filter + 20-450Hz band-pass filter
    #PPG data, application of a 0.66-3Hz band-pass filter +  a Principal Component Analysis (PCA) is applied to the Infrared and Red channels
    #IMU data, the approach uses the derivative of the 3-axis accelerometer data from the standardized signals

    # fi_emg = filtering.Filter(2000)
    
    # standardization_filter = { "name": "standardization"}
    # emg_notch_filter = { "name": "notch", "cutoff": 60}
    # emg_bandpass_filter = { "name": "bandpass", "cutoff": [20, 450]}

    # fi_emg.install_filters(standardization_filter)
    # fi_emg.install_filters(emg_notch_filter)
    # fi_emg.install_filters(emg_bandpass_filter)

    # fi_ppg = filtering.Filter(100)
    # ppg_bandpass_filter = { "name": "bandpass", "cutoff": [0.66, 3]}
    # pca_filter = { "name": "pca", "num_components": 2}
    # fi_ppg.install_filters(standardization_filter)
    # fi_ppg.install_filters(ppg_bandpass_filter)

    # fi_imu = filtering.Filter(100)

    fe = feature_extractor.FeatureExtractor()
    imu_features = fe.extract_features(["WLPHASOR" , "DFTR" , "WENG" ,"RMSPHASOR"], imu_windows)
    #print("IMU features: ", imu_features)
    print("IMU feature WLPHASOR shapes: ", imu_features['WLPHASOR'].shape)
    print("IMU feature DFTR shapes: ", imu_features['DFTR'].shape)
    print("IMU feature WENG shapes: ", imu_features['WENG'].shape)
    print("IMU feature RMSPHASOR shapes: ", imu_features['RMSPHASOR'].shape)
    ppg_features = fe.extract_features(["MPK", "WENG", "MEAN"], ppg_windows)
    #print("PPG features: ", ppg_features)
    print("PPG feature MPK shapes: ", ppg_features['MPK'].shape)
    print("PPG feature WENG shapes: ", ppg_features['WENG'].shape)
    print("PPG feature MEAN shapes: ", ppg_features['MEAN'].shape)
    #print("PPG features: ", ppg_features)

    import numpy as np
    combined_features = np.hstack([
        emg_features['features'],  # Extract 'features' from each dictionary
        imu_features['features'],
        ppg_features['features']
    ])
