DorothÃ©e Morand-Grondin

Working on Ingrid Demoly research project - Physiological analysis

Second script: cardiac metrics at each marker

In [6]:
# libraries
import pandas as pd
import numpy as np
import unicodedata
import unicodedata

# identify subject IDs
subjects = [f"Subject{str(i).zfill(2)}" for i in range(1, 83)]

find the shortest interval across participants
This will serve as our reference for induction period

In [None]:

shortest_intervals = []  # collect per subject shortest intervals

for subject in subjects:
    
    print(f'processing {subject}') # tracking
    
    # load data
    ECG_file = f"data/{subject}_ECG.csv"
    marker_file = pd.read_csv(ECG_file, dtype={"Marqueurs": str})
    marker_file.rename(columns={"Time": "time", "EKG": "amplitude", "Marqueurs": "markers"}, inplace=True) # rename columns

    # keep only rows with markers
    markers = marker_file[marker_file["markers"].notna()].copy()

    # get all 'debut' markers
    debut_df = markers[markers["markers"].str.startswith("Debut")].copy()
    debut_df["emotion"] = debut_df["markers"].str.replace("Debut ", "", regex=False)

    # compute durations between consecutive 'debut' markers
    stimuli_durations = []
    for i in range(len(debut_df) - 1):
        current = debut_df.iloc[i]
        nxt = debut_df.iloc[i + 1]

        duration = nxt["time"] - current["time"]
        stimuli_durations.append(duration)

    if stimuli_durations:  # safeguard in case of empty
        shortest = min(stimuli_durations)
        shortest_intervals.append({"subject": subject, "shortest_interval": shortest})

# make this into a dataframe
shortest_df = pd.DataFrame(shortest_intervals)

# compute global stats
mean_val = shortest_df["shortest_interval"].mean()
median_val = shortest_df["shortest_interval"].median()
sd_val = shortest_df["shortest_interval"].std()

# print the details
print(f"\nOverall shortest interval across all participants: {shortest_df['shortest_interval'].min():.4f} sec")
print(f"Mean:   {mean_val:.4f} sec")
print(f"Median: {median_val:.4f} sec")
print(f"SD:     {sd_val:.4f} sec")


make the new dataframe

In [None]:

#WINDOW = shortest_df['shortest_interval'].min()
WINDOW = 6.1875 # shortcutting automatization - careful!   
all_results = []
qc_summary = [] # will store quality control info per participant

for subject in subjects:
    
    print(f'computing {subject}') # tracking

    # load markers
    ecg_df = pd.read_csv(f"data/{subject}_ECG.csv", dtype={"Marqueurs": str}) # force markers to be strings
    ecg_df.rename(columns={"Time": "time", "EKG": "amplitude", "Marqueurs": "markers"}, inplace=True) # replace names

    # keep only starting markers ('debut')
    debut_df = ecg_df[ecg_df["markers"].notna() & ecg_df["markers"].str.startswith("Debut")].copy()
    debut_df["emotion"] = debut_df["markers"].str.replace("Debut ", "", regex=False).str.strip() # take the emotion label

    debut_df["emotion"] = ( # for the words with accents or not
        debut_df["emotion"]
        .str.lower()  # make lowercase
        .apply(lambda s: ''.join(
            c for c in unicodedata.normalize('NFD', s)
            if unicodedata.category(c) != 'Mn' # remove accents
        ).strip()))

    # load clean RR intervals
    rr_df = pd.read_csv(f"cleaned_data/{subject}_cleaned_ECG.csv")
    
    # quality check (whole recording)
    percent_interp_total = (~rr_df["is_valid"]).mean() * 100
    n_interpolated = (~rr_df["is_valid"]).sum() # number of interpolated RR intervals

    # longest interpolated gap 

    # create a column identifying consecutive segments of valid/invalid data
    rr_df["block"] = (rr_df["is_valid"] != rr_df["is_valid"].shift()).cumsum()

    # keep only interpolated segments
    interp_blocks = rr_df[~rr_df["is_valid"]].groupby("block")

    if len(interp_blocks) > 0:
        gap_durations = interp_blocks["rr_interval_interpol"].sum()
        longest_gap_sec = gap_durations.max()
    else:
        longest_gap_sec = 0

    results = {"ID": subject}  # start with participant ID
    emotion_counts = {} # tracking - and identify emotions correctly
    all_windows = [] # collect RR intervals inside the stimulus window

    for _, stim in debut_df.iterrows(): # for each emotion stimulus
        
        emotion = stim["emotion"]
        emotion_counts[emotion] = emotion_counts.get(emotion, 0) + 1 # add 1 each time
        name = f"{emotion}_{emotion_counts[emotion]}" # emotion + nb

        start = stim["time"] # the start of the stimulus
        end = start + WINDOW # the end of the stimulus
        
        # now isolate the data in this window
        window_rr_df = rr_df[
            (rr_df["rr_time_interpol"] >= start) &
            (rr_df["rr_time_interpol"] <= end)]

        all_windows.append(window_rr_df)  # store dataframe for quality control

        window_rr = window_rr_df["rr_interval_interpol"].to_numpy()  # use values for metrics

        # compute RR intervals
        diffs = np.diff(window_rr)
        
        # HR (BPM)
        results[f"{name}_HR"] = (60 / window_rr).mean()
        # HRV (IBI)
        results[f"{name}_IBI"] = window_rr.mean()
        # HRV (SDNN)
        results[f"{name}_SDNN"] = window_rr.std()
        # HRV (SDNN log)
        results[f"{name}_logSDNN"] = np.log(results[f"{name}_SDNN"])
        # HRV (MSSD)
        results[f"{name}_MSSD"] = np.mean(diffs ** 2)
       # HRV (MSD)
        results[f"{name}_MSD"] = np.mean(np.abs(diffs))
        # HRV (RMSSD)
        diffs = np.diff(window_rr)
        results[f"{name}_RMSSD"] = np.sqrt(np.mean(diffs ** 2))
        
    # quality check (stimulus only)
    if len(all_windows) > 0:
        all_windows_df = pd.concat(all_windows, ignore_index=True)
        percent_interp_stimuli = (~all_windows_df["is_valid"]).mean() * 100
        n_interpolated_stimuli = (~all_windows_df["is_valid"]).sum()
        n_rr_stimuli_total = len(all_windows_df)
    else:
        percent_interp_stimuli = np.nan
            
    qc_summary.append({
        "Subject": subject,
        "Percent_interpolated_whole_recording": percent_interp_total,
        "Percent_interpolated_stimuli": percent_interp_stimuli,
        "n_interpolated_RR_whole": n_interpolated,
        "n_interpolated_RR_stimuli": n_interpolated_stimuli,
        "n_RR_total": len(rr_df),
        "n_RR_stimuli": n_rr_stimuli_total,
        "longest_interpolated_gap_sec": longest_gap_sec})

    
    all_results.append(results)
    
# convert to dataframe and save
qc_df = pd.DataFrame(qc_summary)
qc_df.to_csv("QC_interpolation_summary.csv", index=False)

print(qc_df.head())
print("QC table saved.")


df = pd.DataFrame(all_results)
df.to_excel("ECG_results_per_marker.xlsx", index=False)
print(df.head())


metrics per emotion

In [None]:
import pandas as pd

# load data
df = pd.read_excel("ECG_results_per_marker.xlsx")

summary = []

for _, row in df.iterrows():
    subj = {"ID": row["ID"]}
    
    # loop over all columns except ID
    for col in df.columns[1:]:
        parts = col.split("_", 2)  # split emotion, repetition, metric
        if len(parts) < 3:
            continue
        emotion, _, metric = parts
        key = f"{emotion}_{metric}"
        subj.setdefault(key, []).append(row[col])
    
    # average over repetitions
    for k, v in subj.items():
        if k == "ID":
            continue
        subj[k] = pd.Series(v).mean(skipna=True)
    
    summary.append(subj)

# build and save
summary_df = pd.DataFrame(summary)
summary_df.to_excel("ECG_results_summary.xlsx", index=False)
print(summary_df.head())



Level regrouping

In [None]:
WINDOW = shortest_df['shortest_interval'].min()
all_results = []

for subject in subjects:
    print(f'computing {subject}')

    # load ECG and markers
    ecg_df = pd.read_csv(f"data/{subject}_ECG.csv", dtype={"Marqueurs": str})
    ecg_df.rename(columns={"Time": "time", "EKG": "amplitude", "Marqueurs": "markers"}, inplace=True)

    # keep only start markers
    debut_df = ecg_df[ecg_df["markers"].notna() & ecg_df["markers"].str.startswith("Debut")].copy()
    debut_df["emotion"] = debut_df["markers"].str.replace("Debut ", "", regex=False).str.strip()
    debut_df["emotion"] = (
        debut_df["emotion"]
        .str.lower()
        .apply(lambda s: ''.join(
            c for c in unicodedata.normalize('NFD', s)
            if unicodedata.category(c) != 'Mn'
        ).strip()))

    rr_df = pd.read_csv(f"cleaned_data/{subject}_cleaned_ECG.csv")

    results = {"ID": subject}
    emotion_counts = {}

    # regrouping per emotion
    for emotion in debut_df["emotion"].unique():
        rr_concat = []
        for _, stim in debut_df[debut_df["emotion"] == emotion].iterrows():
            start = stim["time"]
            end = start + WINDOW
            window_rr = rr_df[(rr_df["rr_time_interpol"] >= start) & (rr_df["rr_time_interpol"] <= end)]["rr_interval_interpol"].to_numpy()
            rr_concat.extend(window_rr)

        rr_concat = np.array(rr_concat)
        diffs = np.diff(rr_concat)

        results[f"{emotion}_concat_HR"] = (60 / rr_concat).mean()
        results[f"{emotion}_concat_IBI"] = rr_concat.mean()
        results[f"{emotion}_concat_SDNN"] = rr_concat.std()
        results[f"{emotion}_concat_logSDNN"] = np.log(results[f"{emotion}_concat_SDNN"])
        results[f"{emotion}_concat_MSSD"] = np.mean(diffs ** 2)
        results[f"{emotion}_concat_MSD"] = np.mean(np.abs(diffs))
        results[f"{emotion}_concat_RMSSD"] = np.sqrt(np.mean(diffs ** 2))

    all_results.append(results)

# save
df = pd.DataFrame(all_results)
df.to_excel("ECG_results_per_marker_concat.xlsx", index=False)
print(df.head())
