# Détection des FOG

# I. Chargement des fichiers

In [1]:
import ezc3d
from pyomeca import Analogs, Markers
import os
import json
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from scipy.signal import butter, filtfilt

In [None]:

# Obtenez le répertoire imus6_subjects7
repertoire_imus6_subjects7 = 'C:/Users/antho/Documents/MEMOIRE_M2/CODE_STAGE_M2/data_article/raw/imus6_subjects7'

# Chargez les données à partir du fichier Excel en utilisant le chemin relatif
pt1_visit_24_tbc_walklr_0_trial_2 = pd.read_excel(os.path.join(repertoire_imus6_subjects7, 'pt1_visit_24_tbc_walklr_0_trial_2.xlsx'))
pt1_visit_24_tbc_walklr_0_trial_4 = pd.read_excel(os.path.join(repertoire_imus6_subjects7, 'pt1_visit_24_tbc_walklr_0_trial_4.xlsx'))

# Supprimez les colonnes contenant des valeurs manquantes pour avoir les colonnes avec les IMUs actifs
pt1_visit_24_tbc_walklr_0_trial_2.dropna(axis=1, inplace=True)
pt1_visit_24_tbc_walklr_0_trial_4.dropna(axis=1, inplace=True)

In [7]:
fichier_brut = "C:/Users/antho/Documents/MEMOIRE_M2/CODE_STAGE_M2/DATA_FOG/LE_LIEVRE_Emmanuel_1971_03_19_LEEM1971/2023-05-26/2023-05-26_overlay_detectFOG/Video_overlay_3.c3d"
c3d = ezc3d.c3d(fichier_brut)

labels = c3d['parameters']['ANALOG']['LABELS']['value']
frequence = c3d['parameters']['ANALOG']['RATE']['value'][0]
data = c3d['data']['analogs']

# Parcours des labels et organisation des données
def associer_labels_et_data(labels,data):
    fusion_label_data = {}
    for i, label in enumerate(labels):
        fusion_label_data[label] = data[0][i, :]
    return fusion_label_data

fusion_label_data=associer_labels_et_data(labels,data)

def filtrer_labels(fusion_label_data):
    labels_filtre = []
    labels_data_filtre = {}
    for label, valeurs in fusion_label_data.items():
        if 'ACC' in label or 'GYRO' in label:
            labels_filtre.append(label)
            labels_data_filtre[label]=valeurs
    return labels_filtre, labels_data_filtre

labels_filtre, labels_data_filtre = filtrer_labels(fusion_label_data)
print(labels_data_filtre)

{'Left_Rectus Femoris_ACC_X': array([-0.02392578, -0.02392578, -0.02392578, ..., -0.09130859,
       -0.09130859, -0.09130859]), 'Left_Rectus Femoris_ACC_Y': array([-1.00292969, -1.00292969, -1.00292969, ..., -0.34423828,
       -0.34423828, -0.34423828]), 'Left_Rectus Femoris_ACC_Z': array([0.02099609, 0.02099609, 0.02099609, ..., 0.05957031, 0.05957031,
       0.05957031]), 'Left_Rectus Femoris_GYRO_X': array([ 22.43902397,  22.43902397,  22.43902397, ..., -34.02439117,
       -34.02439117, -34.02439117]), 'Left_Rectus Femoris_GYRO_Y': array([ 11.34146404,  11.34146404,  11.34146404, ..., -70.9146347 ,
       -70.9146347 , -70.9146347 ]), 'Left_Rectus Femoris_GYRO_Z': array([  0.12195122,   0.12195122,   0.12195122, ..., 103.04878235,
       103.04878235, 103.04878235]), 'Left_Vastus Lateralis_ACC_X': array([-0.125     , -0.125     , -0.125     , ...,  0.08251953,
        0.08251953,  0.08251953]), 'Left_Vastus Lateralis_ACC_Y': array([-1.00927734, -1.00927734, -1.00927734, ..., -0.7

In [4]:
json_data = {}
for key, value in labels_data_filtre.items():
    parts = key.split('_')
    sensor = parts[1]
    side = parts[0]
    measure = parts[2]
    axis = parts[3]

        # Étape de vérification pour la partie side de la clé
    print("Side:", side)

        # Étape de vérification pour la partie sensor de la clé
    print("Sensor:", sensor)

        # Étape de vérification pour la partie measure de la clé
    print("Measure:", measure)

        # Étape de vérification pour la partie axis de la clé
    print("Axis:", axis)

    if sensor not in json_data:
        json_data[sensor] = {}

    if side not in json_data[sensor]:
        json_data[sensor][side] = {}

    if measure not in json_data[sensor][side]:
        json_data[sensor][side][measure] = {}

    json_data[sensor][side][measure][axis] = value.tolist()

# Conversion du dictionnaire en JSON avec les indentations spécifiées
json_output = json.dumps(json_data, indent=4)

Side: Left
Sensor: Rectus Femoris
Measure: ACC
Axis: X
Side: Left
Sensor: Rectus Femoris
Measure: ACC
Axis: Y
Side: Left
Sensor: Rectus Femoris
Measure: ACC
Axis: Z
Side: Left
Sensor: Rectus Femoris
Measure: GYRO
Axis: X
Side: Left
Sensor: Rectus Femoris
Measure: GYRO
Axis: Y
Side: Left
Sensor: Rectus Femoris
Measure: GYRO
Axis: Z
Side: Left
Sensor: Vastus Lateralis
Measure: ACC
Axis: X
Side: Left
Sensor: Vastus Lateralis
Measure: ACC
Axis: Y
Side: Left
Sensor: Vastus Lateralis
Measure: ACC
Axis: Z
Side: Left
Sensor: Vastus Lateralis
Measure: GYRO
Axis: X
Side: Left
Sensor: Vastus Lateralis
Measure: GYRO
Axis: Y
Side: Left
Sensor: Vastus Lateralis
Measure: GYRO
Axis: Z
Side: Left
Sensor: Tibialis Anterior
Measure: ACC
Axis: X
Side: Left
Sensor: Tibialis Anterior
Measure: ACC
Axis: Y
Side: Left
Sensor: Tibialis Anterior
Measure: ACC
Axis: Z
Side: Left
Sensor: Tibialis Anterior
Measure: GYRO
Axis: X
Side: Left
Sensor: Tibialis Anterior
Measure: GYRO
Axis: Y
Side: Left
Sensor: Tibialis An

# Fonction json final

In [2]:
import numpy as np
import ezc3d
import json

def recuperer_evenements(file_path):
    c3d = ezc3d.c3d(file_path)
    events = c3d['parameters']['EVENT']['LABELS']['value']
    frames = c3d['parameters']['EVENT']['TIMES']['value']
    events_dict = {}
    for event, frame in zip(events, frames):
        events_dict[event] = frame
    return events_dict


def associer_labels_et_data(file_path):
    c3d = ezc3d.c3d(file_path)
    labels = c3d['parameters']['ANALOG']['LABELS']['value']
    frequence = c3d['parameters']['ANALOG']['RATE']['value'][0]
    data = c3d['data']['analogs']
    nb_frame = len(data[0][0])
    temps = np.linspace(0., nb_frame / frequence, num=nb_frame)
    fusion_label_data = {}
    for i, label in enumerate(labels):
        fusion_label_data[label] = data[0][i, :]
    return fusion_label_data, temps   # Dictionnaire avec tous les labels associés aux données

def filtrer_labels(fusion_label_data):
    labels_filtre = []
    labels_data_filtre = {}
    for label, valeurs in fusion_label_data.items():
        if 'ACC' in label or 'GYRO' in label:
            labels_filtre.append(label)
            labels_data_filtre[label] = valeurs
    return labels_filtre, labels_data_filtre # Dictionnaire avec les labels filtrés et les données associées

def creer_structure_json(labels_data_filtre, patient_id, date_de_naissance, medicaments, temps, events_dict):
    json_data = {
        "metadata": {
            "Détails du patient": {
                "Identifiant": patient_id,
                "Date de naissance": date_de_naissance,
                "Medicaments": medicaments
            },
            "Temps": temps.tolist()
        }
    }
    
    for key, value in labels_data_filtre.items():
        parts = key.split('_')
        sensor = parts[1]
        side = parts[0]
        measure = parts[2]
        axis = parts[3]
        
        if sensor not in json_data:
            json_data[sensor] = {}

        if side not in json_data[sensor]:
            json_data[sensor][side] = {}

        if measure not in json_data[sensor][side]:
            json_data[sensor][side][measure] = {}

        json_data[sensor][side][measure][axis] = value.tolist()
    
        # Ajouter les événements FOG
    json_data["FOG"] = {
        "Debut": events_dict["FOG_begin"].tolist(),
        "Fin": events_dict["FOG_end"].tolist()
    }
    
    
    return json_data

    


def creation_json_grace_c3d(file_path, patient_id, date_de_naissance, medicaments, output_path):
    events_dict = recuperer_evenements(file_path) 
    fusion_label_data, temps = associer_labels_et_data(file_path)
    labels_filtre, labels_data_filtre = filtrer_labels(fusion_label_data)
    json_structure = creer_structure_json(labels_data_filtre, patient_id, date_de_naissance, medicaments, temps, events_dict)
    
    with open(output_path, "w") as fichier_json:
        json.dump(json_structure, fichier_json, indent=4)

# Utilisation de la fonction pour traiter le fichier C3D et générer le fichier JSON
fichier_brut = "C:/Users/antho/Documents/MEMOIRE_M2/CODE_STAGE_M2/DATA_FOG/LE_LIEVRE_Emmanuel_1971_03_19_LEEM1971/2023-05-26/2023-05-26_overlay_detectFOG/Video_overlay_3.c3d"
patient_id = 1234
date_de_naissance = 45
medicaments = "ON"
chemin_sortie = "C:/Users/antho/Documents/MEMOIRE_M2/CODE_STAGE_M2/DATABASE/video_overlay_3.json"

creation_json_grace_c3d(fichier_brut, patient_id, date_de_naissance, medicaments, chemin_sortie)


In [4]:
# Charger le fichier JSON
chemin_fichier_json = "C:/Users/antho/Documents/MEMOIRE_M2/CODE_STAGE_M2/DATABASE/video_overlay_3.json"
with open(chemin_fichier_json, "r") as fichier_json:
    donnees_patient = json.load(fichier_json)

# Accéder aux données spécifiques
accelerometre_left_Z_rectus_femoris = donnees_patient["Rectus Femoris"]["Left"]["GYRO"]["Z"]


# II. Filtrage des signaux

In [None]:
'''from scipy.signal import butter, filtfilt
# Fonction pour concevoir le filtre Butterworth
def butterworth_filter(data, cutoff_frequency, sampling_rate, order=8):
    nyquist_frequency = 0.5 * sampling_rate
    normal_cutoff = cutoff_frequency / nyquist_frequency
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    y = filtfilt(b, a, data)
    return y

# Fréquence de coupure et taux d'échantillonnage
cutoff_frequency = 20  # Hz
sampling_rate = 1 / (pt1_visit_24_tbc_walklr_0_trial_2['time'].iloc[1] - pt1_visit_24_tbc_walklr_0_trial_2['time'].iloc[0])  # Calcul du taux d'échantillonnage

# Colonnes à exclure du filtre
columns_to_exclude = ['time', 'subject_ID', 'freeze_label']

# Colonnes à filtrer
columns_to_filter = [col for col in pt1_visit_24_tbc_walklr_0_trial_2.columns if col not in columns_to_exclude]

# Appliquer le filtre à toutes les colonnes sauf celles exclues
for column in columns_to_filter:
    filtered_data = butterworth_filter(pt1_visit_24_tbc_walklr_0_trial_2[column], cutoff_frequency, sampling_rate)
    pt1_visit_24_tbc_walklr_0_trial_2[f'filtered_{column}'] = filtered_data'''

# III. Fenêtrage de nos données

In [None]:
import numpy as np

def decouper_en_fenetres(signal, taille_fenetre, decalage):
    """
    Découper le signal en fenêtres temporelles avec un chevauchement de 80%, autrement dit un décalage de 20% entre chaque fenêtre.

    """
    taille_signal = len(signal)
    taille_fenetre_echantillons = int(taille_fenetre * taux_echantillonnage)
    decalage_fenetre = int(decalage * taille_fenetre_echantillons)

    fenetres = []

    debut = 0
    fin = taille_fenetre_echantillons

    while fin <= taille_signal:
        fenetre = signal[debut:fin]
        fenetres.append(fenetre)

        # Mise à jour des indices pour la prochaine fenêtre
        debut = debut + decalage_fenetre
        fin = fin + decalage_fenetre

    return fenetres

# Utilisation de la fonction pour découper le signal en fenêtres
taux_echantillonnage = 148 # en Hz
taille_fenetre = 3  # en secondes
decalage = 0.2  # en %, soit 0.6s


fenetres = decouper_en_fenetres(pt1_visit_24_tbc_walklr_0_trial_2, taille_fenetre, decalage)
    
print("Nombre de fenêtres:", len(fenetres))
print("Taille de chaque fenêtre:", len(fenetres[0]), "frames")
print("Taille du décalage: ", int(decalage * (taille_fenetre * taux_echantillonnage))," frames")

# Pour vérifier si toutes les fenêtres respectent le bon format
for i, fenetre in enumerate(fenetres[0:10]):
    print("Taille de la fenêtre",i+1,":", len(fenetres[i+1]), "frames")
    
'''Attention : Il manque 82 frames dans nos données finales, car nous avons un décalage inférieur à 88 frames pour générer une dernière fenêtre.'''

# III. Visualisation graphique 

In [None]:
start_time = 0
end_time = 1000

def detect_and_plot_fog_periods(data, start_time, end_time, signals, title):
    # Trouver les indices où le label change de 0 à 1 (début du FOG)
    start_indices = np.where(np.diff(data['freeze_label']) == 1)[0] + 1

    # Trouver les indices où le label change de 1 à 0 (fin du FOG)
    end_indices = np.where(np.diff(data['freeze_label']) == -1)[0] + 1

    # Filtrer les données en fonction des temps spécifiés
    data_time_filtered = data[(data['time'] >= start_time) & (data['time'] <= end_time)]

    # Créer des sous-graphiques pour chaque axe
    fig, axs = plt.subplots(len(signals), figsize=(12, 6 * len(signals)), sharex=True)

    colors = ['royalblue', 'forestgreen', 'darkorange']

    for i, (signal, color) in enumerate(zip(signals, colors)):
        # Tracer les signaux inertiométriques sur chaque sous-graphique
        axs[i].plot(data_time_filtered['time'], data_time_filtered[signal], label="Signal", color=color)

        # Ajouter des lignes verticales pour marquer le début et la fin de chaque période de FOG
        for start, end in zip(start_indices, end_indices):
            axs[i].axvline(data_time_filtered['time'].iloc[start], color='r', linestyle='--', label='Début de FOG')
            axs[i].axvline(data_time_filtered['time'].iloc[end], color='g', linestyle='--', label='Fin de FOG')
        
        # Colorier la zone entre les lignes verticales
            axs[i].fill_between(data_time_filtered['time'].iloc[start:end + 1],
                0, 1, color='gray', alpha=0.5, transform=axs[i].get_xaxis_transform())
        
        # Paramètres des sous-graphiques
        axs[i].set(ylabel='Accélération (m/s)')
        axs[i].set_title(f'{signal}')
        
        # Créer une légende pour chaque sous-graphique
        handles, labels = axs[i].get_legend_handles_labels()
        by_label = dict(zip(labels, handles))
        axs[i].legend(by_label.values(), by_label.keys(), loc='upper left')

    # Paramètres communs à tous les sous-graphiques
    fig.suptitle(title)
    axs[-1].set(xlabel='Temps')

    plt.show()

# Application de la fonction

#Tracer les signaux provenant de l'accéléromètre du pied droit
signals_acc_lumbar_to_plot = ['imu_foot_r_ax', 'imu_foot_r_ay', 'imu_foot_r_az']
detect_and_plot_fog_periods(pt1_visit_24_tbc_walklr_0_trial_4, start_time, end_time, signals_acc_lumbar_to_plot, 'Détection des Périodes de FOG des signaux provenant de l\'accéléromètre du pied droit')

#Tracer les signaux provenant du gyroscope du pied droit
signals_gyr_lumbar_to_plot = ['imu_foot_r_gx', 'imu_foot_r_gy', 'imu_foot_r_gz']
detect_and_plot_fog_periods(pt1_visit_24_tbc_walklr_0_trial_4, start_time, end_time, signals_gyr_lumbar_to_plot, 'Détection des Périodes de FOG des signaux provenant du gyroscope du pied droit')

#Tracer les signaux provenant de l'accéléromètre du pied gauche
signals_acc_lumbar_to_plot = ['imu_foot_l_ax', 'imu_foot_l_ay', 'imu_foot_l_az']
detect_and_plot_fog_periods(pt1_visit_24_tbc_walklr_0_trial_4, start_time, end_time, signals_acc_lumbar_to_plot, 'Détection des Périodes de FOG des signaux provenant de l\'accéléromètre du pied droit')

#Tracer les signaux provenant du gyroscope du pied gauche
signals_gyr_lumbar_to_plot = ['imu_foot_l_gx', 'imu_foot_l_gy', 'imu_foot_l_gz']
detect_and_plot_fog_periods(pt1_visit_24_tbc_walklr_0_trial_4, start_time, end_time, signals_gyr_lumbar_to_plot, 'Détection des Périodes de FOG des signaux provenant du gyroscope du pied droit')

# III. Extraction des caractéristiques du signal

# 1. Indice de gel (Moore et al, 2008)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import welch

'''
# Fonction pour calculer l'Indice de Gel (Freezing Index)
def calculate_freezing_index(signal, fs):
    # Paramètres de la méthode de Moore et al. (2008)
    fb_band = (3, 8)  # Bande de gel de la marche en Hz
    lb_band = (0.5, 3)  # Bande de locomotion en Hz
    window_size = int(6 * fs)  # Fenêtre temporelle de 6 secondes

    # Découper le signal en segments de 6 secondes
    num_segments = len(signal) // window_size
    segments = np.array_split(signal, num_segments)

    freezing_index_values = []

    for segment in segments:
        # Calcul de la densité spectrale de puissance (PSD) avec la méthode de Welch
        f, psd = welch(segment, fs=fs, nperseg=window_size)

        # Extraire les indices de fréquence pour les bandes de gel et de locomotion
        fb_indices = np.where((f >= fb_band[0]) & (f <= fb_band[1]))[0]
        lb_indices = np.where((f >= lb_band[0]) & (f <= lb_band[1]))[0]

        # Calculer l'Indice de Gel (Freezing Index)
        freezing_index = np.sum(psd[fb_indices]) / np.sum(psd[lb_indices])
        freezing_index_values.append(freezing_index)

    return np.array(freezing_index_values)

#Application de la fonction
fs = 50  # Fréquence d'échantillonnage en Hz
time = np.arange(0, len(pt1_visit_24_tbc_walklr_0_trial_2['imu_lumbar_ax']), 1) / fs
acceleration_signal = pt1_visit_24_tbc_walklr_0_trial_2['imu_lumbar_ax']

freezing_index_values = calculate_freezing_index(acceleration_signal, fs)

# Tracer l'Indice de Gel sur le temps
plt.figure(figsize=(12, 6))
plt.plot(time[:len(freezing_index_values)], freezing_index_values, label='Indice de Gel (FI)')
plt.xlabel('Temps (s)')
plt.ylabel('Indice de Gel (FI)')
plt.title('Analyse fréquentielle des signaux d\'accéléromètre - Méthode de Moore et al. (2008)')
plt.legend()
plt.grid(True)
plt.show() '''
