Dataframe creation

In [None]:
import pandas as pd
import pickle
import os
import numpy as np
import neurokit2 as nk
import scipy.stats as stats
import cvxEDA.src.cvxEDA as cvxEDA
from sklearn.preprocessing import MinMaxScaler
import warnings
warnings.filterwarnings('ignore')

In [None]:
df=pd.DataFrame()
# df should atleast contain columns as PID, EEG, PPG, arousal_category, arousal_category, unique_id (this is the id for that particular sample)

Binning

In [None]:
# binning for arousal into 0 and 1 for the dataset. 0 conveying low arousal and 1 conveying high arousal
# binning for valence into 0 and 1 for the dataset. 0 conveying negative valence and 1 conveying positive valence


In [None]:
raw_eda = df[['EDA','PID','unique_id','valence_category','arousal_category']]
print(raw_eda.head())

raw_ppg = df[['PPG','PID','unique_id','valence_category','arousal_category']]
print(raw_ppg.head())

Data Pre-processing for raw signals

1) winorization
2) nk.clean

In [None]:
from scipy.stats.mstats import winsorize

eda_values = raw_eda['EDA'].tolist()
print(f"Max EDA before Win: {max(eda_values)}")
print(f"Min EDA before Win: {min(eda_values)}")

raw_eda['EDA'] = winsorize(raw_eda['EDA'], limits=[0.01, 0.01]) 
#choosing the lower limit as we have relatively cleaned data

eda_values = raw_eda['EDA'].tolist()
print(f"Max EDA after Win: {max(eda_values)}")
print(f"Min EDA after Win: {min(eda_values)}")

sampling_freq_eda = None # sampling frequency for eda data
eda_data = raw_eda['EDA'].to_numpy()
eda_clean = nk.eda_clean(eda_data, sampling_rate=sampling_freq_eda ,method='neurokit')
x_eda = np.array(eda_clean).reshape(-1, 1)
raw_eda['EDA'] = x_eda.flatten()

eda_values = raw_eda['EDA'].tolist()
print(f"Max EDA after nk.clean: {max(eda_values)}")
print(f"Min EDA after nk.clean: {min(eda_values)}")
raw_eda

In [None]:
from scipy.stats.mstats import winsorize

ppg_values = raw_ppg['PPG'].tolist()
print(f"Max PPG before Win: {max(ppg_values)}")
print(f"Min PPG before Win: {min(ppg_values)}")

raw_ppg['PPG'] = winsorize(raw_ppg['PPG'], limits=[0.01, 0.01]) 
#choosing the lower limit as we have relatively cleaned data

ppg_values = raw_ppg['PPG'].tolist()
print(f"Max PPG after Win: {max(ppg_values)}")
print(f"Min PPG after Win: {min(ppg_values)}")

sampling_freq_ppg = # sampling frequency for ppg data
ppg_data = raw_ppg['PPG'].to_numpy()
ppg_clean = nk.ppg_clean(ppg_data, sampling_rate=sampling_freq_ppg)
x_ppg = np.array(ppg_clean).reshape(-1, 1)
raw_ppg['PPG'] = x_ppg.flatten()

ppg_values = raw_ppg['PPG'].tolist()
print(f"Max PPG after nk.clean: {max(ppg_values)}")
print(f"Min PPG after nk.clean: {min(ppg_values)}")

raw_ppg

EDA and PPG signals have been cleaned, save them into CASE Dataset folder

In [None]:
print(raw_eda.isnull().any())
print(raw_ppg.isnull().any())

In [None]:
raw_eda_final = (
    raw_eda
    .groupby('unique_id', sort=False)
    .agg(
        PID=('PID', 'first'),
        arousal_category=('arousal_category', 'first'),
        valence_category=('valence_category', 'first'),
        Data=('EDA', list),
    )
    .reset_index(drop=True)
)

# inspect
raw_eda_final

In [None]:
raw_eda_path = "<path to save the raw eda file>"
raw_eda_final.to_csv(raw_eda_path,index=False)

In [None]:
raw_ppg_final = (
    raw_ppg
    .groupby('unique_id', sort=False)
    .agg(
        PID=('PID', 'first'),
        arousal_category=('arousal_category', 'first'),
        valence_category=('valence_category', 'first'),
        Data=('PPG', list),
    )
    .reset_index(drop=True)
)

# inspect
raw_ppg_final

In [None]:
raw_ppg_path = "<path to save the raw ppg file>"
raw_ppg_final.to_csv(raw_ppg_path,index=False)

Window spliting

In [None]:
raw_eda_path = "<path to load the raw eda file>"
raw_ppg_path = "<path to load the raw ppg file>"
raw_eda_final = pd.read_csv(raw_eda_path)
raw_ppg_final = pd.read_csv(raw_ppg_path)

In [None]:
# EDA
no_split = None # <number of splits to be done for each data sample>
raw_eda_window = pd.DataFrame(columns=["PID", "arousal_category", "valence_category", "Data"])

for index, row in raw_eda_final.iterrows():
    pid = row['PID']
    ac = row['arousal_category']
    vc = row['valence_category']
    eda_bs = row['Data']
    parts = eda_bs.split() 
    eda_bs = [float(part.strip("[], ")) for part in parts if part.strip("[], ")]
    eda_l = len(eda_bs)
    # print(len(eda_bs))
    for s in range(0,no_split):
        eda_temp = eda_bs[int((eda_l/no_split)*s):int((eda_l/no_split)*(s+1))]
        # print(len(eda_temp))
        new_row = {"PID": pid, "arousal_category": ac, "valence_category": vc, "Data": eda_temp}
        new_row_df = pd.DataFrame([new_row])
        raw_eda_window = pd.concat([raw_eda_window, new_row_df], ignore_index=True)
    # break
    
raw_eda_window
    

In [None]:
# PPG
no_split = None # <number of splits to be done for each data sample>
raw_ppg_window = pd.DataFrame(columns=["PID", "arousal_category", "valence_category", "Data"])

for index, row in raw_ppg_final.iterrows():
    pid = row['PID']
    ac = row['arousal_category']
    vc = row['valence_category']
    ppg_bs = row['Data']
    parts = ppg_bs.split() 
    ppg_bs = [float(part.strip("[], ")) for part in parts if part.strip("[], ")]
    ppg_l = len(ppg_bs)
    # print(len(eda_bs))
    for s in range(0,no_split):
        ppg_temp = ppg_bs[int((ppg_l/no_split)*s):int((ppg_l/no_split)*(s+1))]
        # print(len(eda_temp))
        new_row = {"PID": pid, "arousal_category": ac, "valence_category": vc, "Data": ppg_temp}
        new_row_df = pd.DataFrame([new_row])
        raw_ppg_window = pd.concat([raw_ppg_window, new_row_df], ignore_index=True)
    # break
    
raw_ppg_window

Feature Extraction on these raw files

In [None]:
fet_eda = ['ku_eda','sk_eda','dynrange','slope','variance','entropy','insc','fd_mean','max_scr','min_scr','nSCR','meanAmpSCR','meanRespSCR','sumAmpSCR','sumRespSCR']
fet_ppg = ['BPM', 'PPG_Rate_Mean', 'HRV_MedianNN','HRV_Prc20NN', 'HRV_MinNN', 'HRV_HTI', 'HRV_TINN', 'HRV_LF', 'HRV_VHF','HRV_LFn', 'HRV_HFn', 'HRV_LnHF', 
            'HRV_SD1SD2', 'HRV_CVI', 'HRV_PSS','HRV_PAS', 'HRV_PI', 'HRV_C1d', 'HRV_C1a', 'HRV_DFA_alpha1','HRV_MFDFA_alpha1_Width', 'HRV_MFDFA_alpha1_Peak',
            'HRV_MFDFA_alpha1_Mean', 'HRV_MFDFA_alpha1_Max','HRV_MFDFA_alpha1_Delta', 'HRV_MFDFA_alpha1_Asymmetry', 'HRV_ApEn','HRV_ShanEn', 'HRV_FuzzyEn', 'HRV_MSEn', 
            'HRV_CMSEn', 'HRV_RCMSEn','HRV_CD', 'HRV_HFD', 'HRV_KFD', 'HRV_LZC']

In [None]:
# cvxEDA
def eda_stats(y):
    Fs = 15.625
    yn = (y - y.mean()) / y.std()
    [r, p, t, l, d, e, obj] = cvxEDA.cvxEDA(yn, 1. / Fs)
    return [r, p, t, l, d, e, obj]

def shannon_entropy(window):
    p = np.abs(window) / np.sum(np.abs(window))
    return -np.sum(p * np.log2(p + 1e-10))

def first_derivative(signal):
    if len(signal) > 1:
        time_values = np.arange(len(signal))
        first_derivative = np.gradient(signal, time_values)
        return first_derivative
    else:
        return np.array([])


def second_derivative(signal):
    fd = first_derivative(signal)
    time_values = np.arange(len(fd))
    second_derivative = np.gradient(first_derivative)
    return second_derivative


def calculate_integral(window):
    a = np.sum(np.abs(window))
    return a

def calculate_avg_power(window):
    avg_power = np.mean(np.square(np.abs(window)))
    return avg_power

def calculate_arc_length(window):
    diff_signal = np.diff(window)
    arc_length = np.sum(np.sqrt(1 + np.square(diff_signal)))
    return arc_length

def slope(window):
    if len(window) > 1:
        time_values = np.arange(len(window))
        slope, _ = np.polyfit(time_values, window, 1)
        return slope
    else:
        return np.nan

In [None]:
eda_fet = pd.DataFrame()

for index, row in raw_eda_window.iterrows():
    row_pi = {}
    row_pi['PID'] = row['PID']
    row_pi['arousal_category'] = row['arousal_category']
    row_pi['valence_category'] = row['valence_category']
    raw_eda = row['Data']
    x = np.array(raw_eda)
    eda_phasic = nk.eda_phasic(x, sampling_freq_eda)
    scr = np.array(eda_phasic['EDA_Phasic'])
    scl = np.array(eda_phasic['EDA_Tonic'])
    x_axis = np.linspace(0, scl.shape[0]/sampling_freq_eda, scl.shape[0])
    row_pi['mean'] = np.mean(x) # Mean
    row_pi['std'] = np.std(x) # Standard Deviation
    row_pi['min'] = np.min(x) # Minimum
    row_pi['max'] = np.max(x) # Maximum
    row_pi['median_eda'] = np.quantile(x,0.5) #median
    row_pi['ku_eda'] = stats.kurtosis(x) #kurtosis
    row_pi['sk_eda'] = stats.skew(x) #skewness
    row_pi['dynrange'] = x.max()/x.min()#dynamic range
    row_pi['slope'] = np.polyfit(x_axis,scl,1)[0] #slope
    row_pi['variance'] = np.var(x) # Variance
    row_pi['entropy'] = shannon_entropy(x) # Shannon Entropy
    row_pi['insc'] = calculate_integral(x) # insc
    fd = first_derivative(x)
    row_pi['fd_mean'] = np.mean(fd)
    row_pi['fd_std'] = np.std(fd)
    
    row_pi['max_scr'] = np.max(scr) #min
    row_pi['min_scr'] = np.min(scr) #max
    row_pi['mean_scr'] = np.mean(scr) # Mean
    row_pi['sd_scr'] = np.std(scr) # Standard Deviation

    _, info = nk.eda_peaks(scr, sampling_freq_eda) #scr peak
    peaks = info['SCR_Peaks']
    amplitude = info['SCR_Amplitude']
    recovery = info['SCR_RecoveryTime']
    
    row_pi['nSCR'] = len(info['SCR_Peaks']) / (x.shape[0]/sampling_freq_eda/60) #to get the number of peaks per minute
    row_pi['aucSCR'] = np.trapz(scr)
    row_pi['meanAmpSCR'] = np.nanmean(amplitude)
    row_pi['maxAmpSCR'] = np.nanmax(amplitude)
    row_pi['meanRespSCR'] = np.nanmean(recovery)
    row_pi['sumAmpSCR'] = np.nansum(amplitude) / (x.shape[0]/sampling_freq_eda/60) # per minute
    row_pi['sumRespSCR'] = np.nansum(recovery) / (x.shape[0]/sampling_freq_eda/60) # per minute

    #scl features
    row_pi['max_scl'] = np.max(scl) #min
    row_pi['min_scl'] = np.min(scl) #max
    row_pi['mean_scl'] = np.mean(scl) # Mean
    row_pi['sd_scl'] = np.std(scl) # Standard Deviation


    new_row = pd.DataFrame(row_pi , index=[0])
    eda_fet = pd.concat([eda_fet, new_row], ignore_index=True)
    
    
eda_fet


In [None]:
eda_fet = eda_fet[["PID","arousal_category","valence_category",'mean', 'std', 'min', 'max', 'median_eda', 'ku_eda', 'sk_eda',
       'dynrange', 'slope', 'variance', 'entropy', 'insc', 'fd_mean', 'fd_std',
       'max_scr', 'min_scr', 'mean_scr', 'sd_scr', 'nSCR', 'aucSCR',
       'meanAmpSCR', 'maxAmpSCR', 'meanRespSCR', 'sumAmpSCR', 'sumRespSCR',
       'max_scl', 'min_scl', 'mean_scl', 'sd_scl']]

In [None]:
import numpy as np
from sklearn.preprocessing import MinMaxScaler

# Define the columns you want to scale
columns_to_scale = eda_fet.columns.difference(["PID","arousal_category","valence_category"])

def scale_participant(df):
    # Replace inf, -inf with NaN
    df[columns_to_scale] = df[columns_to_scale].replace([np.inf, -np.inf], np.nan)
    
    # Replace NaN values with 0
    df[columns_to_scale] = df[columns_to_scale].fillna(0)
    
    # Apply scaling
    df[columns_to_scale] = scaler.fit_transform(df[columns_to_scale])
    return df

# Instantiate the scaler
scaler = MinMaxScaler()

# Apply the scaling function to each participant's data
eda_fet = eda_fet.groupby('PID').apply(scale_participant).reset_index(drop=True)

eda_fet

In [None]:
fet_eda_path = "<path to save the features extracted from eda>"
eda_fet.to_csv(fet_eda_path,index=False)

In [None]:
print(f"No of High arousal Sample: {eda_fet['arousal_category'].tolist().count(1)}")
print(f"No of Low arousal Sample: {eda_fet['arousal_category'].tolist().count(0)}")
print(f"No of Negative Valence Sample: {eda_fet['valence_category'].tolist().count(0)}")
print(f"No of Positive Valence Sample: {eda_fet['valence_category'].tolist().count(1)}")

In [None]:
ppg_fet = pd.DataFrame()
for index, row in raw_ppg_window.iterrows():
    row_pi = {}
    row_pi['PID'] = row['PID']
    row_pi['arousal_category'] = row['arousal_category']
    row_pi['valence_category'] = row['valence_category']
    raw_ppg = row['Data']
    ppg_clean = nk.ppg_clean(raw_ppg, sampling_rate=sampling_freq_ppg)
    
    if len(ppg_clean) > 10 * sampling_freq_ppg:  # Ensure at least 10 seconds of data
        ppg_signals, ppg_info = nk.ppg_process(ppg_clean, sampling_rate=sampling_freq_ppg)
        
        # Perform the analysis
        analyze_df = nk.ppg_analyze(ppg_signals, sampling_rate=sampling_freq_ppg)
        
        # Update your data with analysis results
        row_pi.update(analyze_df.iloc[0])
        heart_rate = ppg_signals['PPG_Rate']
        bpm = heart_rate.mean()
        # print(bpm)
        row_pi['BPM'] = bpm
        row_pi = pd.DataFrame(row_pi , index=[0])
        ppg_fet = pd.concat([ppg_fet, row_pi], ignore_index=True)

    else:
        print(f"Signal too short for analysis at index")
        continue
    
print(ppg_fet.keys())
columns_with_nan = ppg_fet.columns[ppg_fet.isna().any()].tolist()
print(columns_with_nan)

ppg_fet

In [None]:
pulse_values = ppg_fet['PPG_Rate_Mean'].tolist()
hrv_values = ppg_fet['HRV_RMSSD'].tolist()

print(f"Max Pulse Rate for a window: {max(pulse_values)}")
print(f"Min Pulse Rate for a window: {min(pulse_values)}")

print(f"Max HRV RMSSD for a window: {max(hrv_values)}")
print(f"Min HRV RMSSD for a window: {min(hrv_values)}")

In [None]:
import numpy as np
from sklearn.preprocessing import MinMaxScaler

relevant_features_ppg = ["PID","valence_category","arousal_category",'BPM', 'PPG_Rate_Mean', 'HRV_MedianNN',
       'HRV_Prc20NN', 'HRV_MinNN', 'HRV_HTI', 'HRV_TINN', 'HRV_LF', 'HRV_VHF',
       'HRV_LFn', 'HRV_HFn', 'HRV_LnHF', 'HRV_SD1SD2', 'HRV_CVI', 'HRV_PSS',
       'HRV_PAS', 'HRV_PI', 'HRV_C1d', 'HRV_C1a', 'HRV_DFA_alpha1',
       'HRV_MFDFA_alpha1_Width', 'HRV_MFDFA_alpha1_Peak',
       'HRV_MFDFA_alpha1_Mean', 'HRV_MFDFA_alpha1_Max',
       'HRV_MFDFA_alpha1_Delta', 'HRV_MFDFA_alpha1_Asymmetry', 'HRV_ApEn',
       'HRV_ShanEn', 'HRV_FuzzyEn', 'HRV_MSEn', 'HRV_CMSEn', 'HRV_RCMSEn',
       'HRV_CD', 'HRV_HFD', 'HRV_KFD', 'HRV_LZC']

ppg_fet = ppg_fet[relevant_features_ppg]

# Define the columns you want to scale
columns_to_scale = ppg_fet.columns.difference(["PID","arousal_category","valence_category"])

def scale_participant(df):
    # Replace inf, -inf with NaN
    df[columns_to_scale] = df[columns_to_scale].replace([np.inf, -np.inf], np.nan)
    # Replace NaN values with 0
    df[columns_to_scale] = df[columns_to_scale].fillna(0)
    # Apply scaling
    df[columns_to_scale] = scaler.fit_transform(df[columns_to_scale])
    return df

# Instantiate the scaler
scaler = MinMaxScaler()

# Apply the scaling function to each participant's data
ppg_fet = ppg_fet.groupby('PID').apply(scale_participant).reset_index(drop=True)

ppg_fet

In [None]:
fet_ppg_path = "<path to save the features extracted from ppg>"
ppg_fet.to_csv(fet_ppg_path,index=False)

In [None]:
raw_eda_path = "<path to save the raw data from eda>"
raw_ppg_path = "<path to save the raw data from ppg>"
raw_ppg_window.to_csv(raw_ppg_path,index=False)
raw_eda_window.to_csv(raw_eda_path,index=False)

In [None]:
combined_fet = pd.concat([ppg_fet, eda_fet], axis=1)
combined_fet = combined_fet.loc[:, ~combined_fet.columns.duplicated()]
combined_fet

In [None]:
fet_com_path = "<path to save the fetaures extracted from EDA and PPG as combined>"
combined_fet.to_csv(fet_com_path,index=False)

Features and Raw Signals Data saved. We have created as possible window split for each category of data in order to increase data points while training. 

Data Visualization

1) TSNE
2) UMAP

In [None]:
import sys
import os

# Get the absolute path to the "Scripts" folder
current_dir = os.getcwd()
scripts_dir = os.path.join(current_dir, "..", "Scripts")
sys.path.append(scripts_dir)

from visualize import visualize

# from import visualize

fet_eda_path = "<path to load the features for eda data>"
fet_ppg_path = "<path to load the features for ppg data>"
fet_com_path = "<path to load the features for eda+ppg data>"

output_folder = "<path to output folder for results>"

# EDA
visualize(input_path = fet_eda_path, output_folder = output_folder + "EDA_")

# PPG
visualize(input_path = fet_ppg_path, output_folder = output_folder + "PPG_")

# Combined
visualize(input_path = fet_com_path, output_folder = output_folder + "Combined_")

3) In-distribution (ID) datapoints and uncertinity in the features

In [None]:
# Feature Confidence Intervals (CIs)

import sys
import os
# Get the absolute path to the "Scripts" folder
current_dir = os.getcwd()
scripts_dir = os.path.join(current_dir, "..", "Scripts")
sys.path.append(scripts_dir)
import pandas as pd
import numpy as np
from data_suite import Data_SUITE
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings("ignore")

def train_and_predict(X_train, X_test, features):
    ds = Data_SUITE(
        copula_type='vine',
        n_copula_samples=2000,
        representer='pca',
        rep_dim=10
    )
    
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    ds.fit(X_train_scaled)
    
    X_test_scaled = scaler.transform(X_test)
    conformal_dict, _ = ds.predict(X_test_scaled)
    
    inconsistent = {}
    uncertainty = {}
    feature_ranges = X_train.max(axis=0) - X_train.min(axis=0)
    
    for feat_idx in conformal_dict:
        feat_name = features.columns[feat_idx]
        lower = conformal_dict[feat_idx]['min']
        upper = conformal_dict[feat_idx]['max']
        test_vals = X_test_scaled[:, feat_idx]
        inconsistent[feat_name] = np.mean((test_vals < lower) | (test_vals > upper))
        ci_width = upper - lower
        uncertainty[feat_name] = np.median(ci_width / feature_ranges[feat_idx])
    
    return inconsistent, uncertainty


def CI(data, strat):
    identifiers = data[['PID','arousal_category','valence_category']]
    if strat == 'arousal_category':
        features = data.drop(columns=['PID','valence_category'])
    else:
        features = data.drop(columns=['PID','arousal_category'])
    
    # Train-test split (stratify by original distribution)
    X_train_1, X_test_1 = train_test_split(features, test_size=0.5, random_state=42, stratify=features[[strat]])
    X_train_2, X_test_2 = X_test_1, X_train_1 
    
    inconsistent_1, uncertainty_1 = train_and_predict(X_train_1, X_test_1, features)
    inconsistent_2, uncertainty_2 = train_and_predict(X_train_2, X_test_2, features)

    # Compute average values
    avg_inconsistent = {k: (inconsistent_1.get(k, 0) + inconsistent_2.get(k, 0)) / 2 for k in set(inconsistent_1) | set(inconsistent_2)}
    avg_uncertainty = {k: (uncertainty_1.get(k, 0) + uncertainty_2.get(k, 0)) / 2 for k in set(uncertainty_1) | set(uncertainty_2)}
    
    return avg_inconsistent

In [None]:
fet_eda_path = "<path to load the features for eda data>"
eda_fet = pd.read_csv(fet_eda_path)
ci_eda_ar = CI(data = eda_fet, strat = "arousal_category")
ci_eda_va = CI(data = eda_fet, strat = "valence_category")

fet_ppg_path = "<path to load the features for ppg data>"
ppg_fet = pd.read_csv(fet_ppg_path)
ci_ppg_ar = CI(data = ppg_fet, strat = "arousal_category")
ci_ppg_va = CI(data = ppg_fet, strat = "valence_category")

fet_com_path =  "<path to load the features for eda+ppg data>"
com_fet = pd.read_csv(fet_com_path)
ci_com_ar = CI(data = com_fet, strat = "arousal_category")
ci_com_va = CI(data = com_fet, strat = "valence_category")

In [None]:
import pprint

di_path = "<path to save the Feature Confidence Intervals output>"
with open(di_path, "w") as f:
    # Write each dictionary with a blank line in between
    f.write("The CIs for Combined's Features keeping stratify as Valence\n")
    pprint.pprint(ci_com_va, stream=f)
    f.write("\n")
    
    f.write("The CIs for Combined's Features keeping stratify as Arousal\n")
    pprint.pprint(ci_com_ar, stream=f)
    f.write("\n")
    
    f.write("The CIs for PPG's Features keeping stratify as Valence\n")
    pprint.pprint(ci_ppg_va, stream=f)
    f.write("\n")
    
    f.write("The CIs for PPG's Features keeping stratify as Arousal\n")
    pprint.pprint(ci_ppg_ar, stream=f)
    f.write("\n")
    
    f.write("The CIs for EDA's Features keeping stratify as Valence\n")
    pprint.pprint(ci_eda_va, stream=f)
    f.write("\n")
    
    f.write("The CIs for EDA's Features keeping stratify as Arousal\n")
    pprint.pprint(ci_eda_ar, stream=f)
    f.write("\n")


In [None]:
print("The features which are highly variable(inconsistent) wrt to Valence in Combined")
print({k: f"{v:.1%}" for k,v in ci_com_va.items() if v > 0.10})

print("The features which are highly variable(inconsistent) wrt to Arousal in Combined")
print({k: f"{v:.1%}" for k,v in ci_com_ar.items() if v > 0.10})

print("The features which are highly variable(inconsistent) wrt to Valence in PPG")
print({k: f"{v:.1%}" for k,v in ci_ppg_va.items() if v > 0.10})

print("The features which are highly variable(inconsistent) wrt to Arousal in PPG")
print({k: f"{v:.1%}" for k,v in ci_ppg_ar.items() if v > 0.10})

print("The features which are highly variable(inconsistent) wrt to Valence in EDA")
print({k: f"{v:.1%}" for k,v in ci_eda_va.items() if v > 0.10})

print("The features which are highly variable(inconsistent) wrt to Arousal in EDA")
print({k: f"{v:.1%}" for k,v in ci_eda_ar.items() if v > 0.10})

4. Data distribution - per window - HVR SD1SD2, EDA MEAN - Histogram

In [None]:
import matplotlib.pyplot as plt
import numpy as np
fet_eda_path = "<path to load the features for eda data>"
eda_fet = pd.read_csv(fet_eda_path)
fet_ppg_path = "<path to load the features for ppg data>"
ppg_fet = pd.read_csv(fet_ppg_path)
fet_com_path = "<path to load the features for eda+ppg data>"
com_fet = pd.read_csv(fet_com_path)
# ----- Plot for EDA Means -----
eda_mean_path = "<path to save the results>"
eda_means = eda_fet['mean'].tolist()  # List of float values

plt.figure(figsize=(8, 6))
plt.hist(eda_means, bins=20, edgecolor='black', alpha=0.75)
plt.title('Distribution of EDA Means')
plt.xlabel('EDA Mean')
plt.ylabel('Windows')

# Create x-ticks using np.arange. Rounding min and max to integers.
ticks = np.arange(int(np.floor(min(eda_means))), int(np.ceil(max(eda_means))), 1)
plt.xticks(ticks)

plt.grid(axis='y', alpha=0.75)
plt.savefig(eda_mean_path)
plt.close()

# ----- Plot for HRV_SD1SD2 -----
hrv_sd1sd2_path = "<path to save the resultsF"
hrv_sd1sd2 = ppg_fet['HRV_SD1SD2'].tolist()  # List of float values

plt.figure(figsize=(8, 6))
plt.hist(hrv_sd1sd2, bins=20, edgecolor='black', alpha=0.75)
plt.title('Distribution of HRV_SD1SD2')
plt.xlabel('HRV_SD1SD2')
plt.ylabel('Windows')

# Create x-ticks for HRV_SD1SD2
ticks = np.arange(int(np.floor(min(hrv_sd1sd2))), int(np.ceil(max(hrv_sd1sd2))), 1)
plt.xticks(ticks)

plt.grid(axis='y', alpha=0.75)
plt.savefig(hrv_sd1sd2_path)
plt.close()

Artifacts Detection on the Raw Data

EDA Artifacts

In [None]:
import sys
import os
# Get the absolute path to the "Scripts" folder
current_dir = os.getcwd()
scripts_dir = os.path.join(current_dir, "..", "Scripts")
sys.path.append(scripts_dir)

import eda_artifact

out_path = "<path to save the eda artifacts results>"
data_file_eda = "<path to load the raw eda data>"
window_size = None # the window size chosen
SAMPLE_RATE = None # sampling rate chosen
average_artifact, sd_artifact= eda_artifact.main(data_file = data_file_eda,window_size = window_size, out_path = out_path, SAMPLE_RATE = SAMPLE_RATE )
print(average_artifact, sd_artifact)

In [None]:
eda_art_df = pd.read_csv(out_path)
pid_list = list(set(eda_art_df['PID'].tolist()))
art_list_p = []
for i in pid_list: 
    art_list = eda_art_df[eda_art_df['PID'] == i]['Artifact (%)'].tolist()
    art_pid = [0 if j == "Not Applicable (not enough data)" else float(j) for j in art_list]
    art_list_p.extend(art_pid)
    art_pid = sum(art_pid)
    count_pid = eda_art_df['PID'].tolist().count(i)
    print(f"The total percent of artifacts in Participant ID: {i} is {art_pid/count_pid}")

In [None]:
average_artifact_p = sum(art_list_p)/len(art_list_p)
sd_artifact_p = np.std(art_list_p)

print(f"The average percent (across window) of EDA artifacts {average_artifact}%")
print(f"The standard deviation (across window) of EDA artifacts {sd_artifact}%")

print(f"The average percent (across participants) of EDA artifacts {average_artifact_p}%")
print(f"The standard deviation (across participants) of EDA artifacts {sd_artifact_p}%")

PPG Artifacts (only Motion)

In [None]:
import sys
import os
# Get the absolute path to the "Scripts" folder
current_dir = os.getcwd()
scripts_dir = os.path.join(current_dir, "..", "Scripts")
sys.path.append(scripts_dir)

import ppg_artifact

out_path = "<path to save the ppg artifacts results>"
data_file_ppg = "<path to load the raw ppg file>"
window_size = None # the window size chosen
ppg_artifact.find_ppg_artifact(input_path = data_file_ppg, output_path = out_path, window_size = window_size)

In [None]:
ppg_art_df = pd.read_csv(out_path)
pid_list = list(set(ppg_art_df['PID'].tolist()))
art_list_p = []
for i in pid_list: 
    art_list = ppg_art_df[ppg_art_df['PID'] == i]['Artifact (%)'].tolist()
    art_pid = [0 if j == "Not Applicable (not enough data)" else float(j) for j in art_list]
    art_list_p.extend(art_pid)
    art_pid = sum(art_pid)
    count_pid = ppg_art_df['PID'].tolist().count(i)
    print(f"The total percent of artifacts in Participant ID: {i} is {art_pid/count_pid}")

In [None]:
average_artifact_p = sum(art_list_p)/len(art_list_p)
sd_artifact_p = np.std(art_list_p)

print(f"The average percent (across participants) of PPG artifacts (motion) {average_artifact_p}%")
print(f"The standard deviation (across participants) of PPG artifacts (motion) {sd_artifact_p}%")

Normalizing (Z-Score) the Raw Signal to be used for Models

In [None]:
def downsample_list(lst, original_rate=1000, target_rate=256):
    factor = original_rate / target_rate
    return [lst[int(i * factor)] for i in range(int(len(lst) / factor))]

In [None]:
def downsample_dataframe(df, original_rate=1000, target_rate=256):
    df['Data'] = df['Data'].apply(lambda lst: downsample_list(lst, original_rate, target_rate))
    return df

In [None]:
raw_eda_path = "<path to the raw eda data file>"
raw_ppg_path = "<path to the raw ppg data file>"
raw_eda = pd.read_csv(raw_eda_path)
raw_ppg = pd.read_csv(raw_ppg_path)

In [None]:
def convert_str_to_list(temp_df):
    for index, row in temp_df.iterrows():
        eda_bs = row['Data']
        parts = eda_bs.split() 
        eda_bs = [float(part.strip("[], ")) for part in parts if part.strip("[], ")]
        temp_df.at[index, 'Data'] = eda_bs
        
    return temp_df

def normalize_list(lst, mean, std):
    return [(x - mean) / std for x in lst]
    
def z_score(temp_df):
    all_values = [val for sublist in temp_df['Data'] for val in sublist]
    mean = np.mean(all_values)
    std = np.std(all_values)
    temp_df['Data'] = temp_df['Data'].apply(lambda lst: normalize_list(lst, mean, std))
    return temp_df  

normalized_eda = pd.DataFrame()
normalized_ppg = pd.DataFrame()  

pid_list = list(set(raw_eda['PID'].tolist()))
for pid in pid_list:
    raw_eda_pid = raw_eda[raw_eda['PID'] == pid]
    raw_eda_pid = convert_str_to_list(temp_df=raw_eda_pid)
    raw_eda_pid = z_score(temp_df=raw_eda_pid)

    raw_ppg_pid = raw_ppg[raw_ppg['PID'] == pid]
    raw_ppg_pid = convert_str_to_list(temp_df=raw_ppg_pid)
    raw_ppg_pid = z_score(temp_df=raw_ppg_pid)

    normalized_eda = pd.concat([normalized_eda, raw_eda_pid], ignore_index=True)
    normalized_ppg = pd.concat([normalized_ppg, raw_ppg_pid], ignore_index=True)

In [None]:
normalized_eda.to_csv(raw_eda_path,index=False)
normalized_ppg.to_csv(raw_ppg_path,index=False)

In [None]:
raw_eda_path = "<path to the raw eda data file>"
raw_ppg_path = "<path to the raw ppg data file>"
raw_eda = pd.read_csv(raw_eda_path)
raw_ppg = pd.read_csv(raw_ppg_path)
raw_ppg

For Benchmarking on ML and DL models for Raw and Features of the a dataset, run the following command in the terminal. 

python3 Model_running.py --eda_fet_path <eda_fet_path> --ppg_fet_path <ppg_fet_path> --com_fet_path <com_det_path> --eda_raw_path <eda_raw_path> --ppg_raw_path <ppg_raw_path> --out_path_benchmark <out_path_benchmark> \

Then the following cell with the <out_path_benchmark> as read_csv

In [None]:
import pandas as pd
df = pd.read_csv("<Path to benchamarking outputs result>")

# Group by and compute mean and std
grouped = df.groupby(['Classification','Signal Type','Input Type', 'Model Name'])[['Test_Accuracy', 'Test_F1']].agg(['mean', 'std'])

# Format mean ± std for both Accuracy and F1 in same row, multiplied by 100
formatted = grouped.apply(
    lambda x: pd.Series({
        'Accuracy ± Std | F1 ± Std': 
        f"{x[('Test_Accuracy', 'mean')]*100:.3f} ± {x[('Test_Accuracy', 'std')]*100:.3f} | {x[('Test_F1', 'mean')]:.3f} ± {x[('Test_F1', 'std')]:.3f}"
    }), axis=1
)

print(formatted)

BenchMarking CLSP Models (zero-shot) on the Features of the Dataset

In [None]:
import sys
import os
# Get the absolute path to the "Scripts" folder
current_dir = os.getcwd()
scripts_dir = os.path.join(current_dir, "..", "Scripts")
sys.path.append(scripts_dir)
import clsp_com

CLSP Zero-Shot on Combined Features

In [None]:
print("Zero-shot on CLSP for Combined Features")

com_fet_path = "<Path to eda+ppg extracted features file>"
clsp_com.Arousal(com_path= com_fet_path)
clsp_com.Valence(com_path=com_fet_path)

CLSP Zero-Shot on PPG Features

In [None]:
import clsp_ppg

print("Zero-shot on CLSP for PPG Features")

ppg_fet_path = "<Path to ppg extracted features file>"
clsp_ppg.Arousal(ppg_path= ppg_fet_path)
clsp_ppg.Valence(ppg_path=ppg_fet_path)

CLSP Zero-shot for CLSP on EDA Features

In [None]:
import clsp_eda

print("Zero-shot on CLSP for EDA Features")

eda_fet_path = "<Path to eda extracted features file>"
clsp_eda.Arousal(eda_path= eda_fet_path)
clsp_eda.Valence(eda_path=eda_fet_path)