In [None]:
import numpy as np
import pandas as pd
import os
import neurokit2 as nk
import cvxEDA.src.cvxEDA as cvxEDA
import scipy.stats as stats
import scipy.io
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import plotly.express as px
import plotly.figure_factory as ff
from sklearn.preprocessing import MinMaxScaler
import warnings
from sklearn.manifold import TSNE
warnings.filterwarnings('ignore')

sampling_freq = 15.625

In [None]:
raw_eda = pd.read_csv("../Data_files/EDA.csv")
raw_eda

In [None]:
final_df = raw_eda[raw_eda['CMA'].isin(['HVLA', 'LVLA', 'LVHA', 'HVHA', 'Baseline'])]

In [None]:
final_df

In [None]:
final_df['Video_ID_number'] = [int(part.split('V')[-1]) for part in final_df['Video ID']]
final_df['Video_ID_number']
final_df.to_csv("../Data_files/Stimuli_EDA.csv", index=False)

In [None]:
final_df

In [None]:
def label_prep(merged_df):
    vads = pd.read_csv("../Data_files/VADS.csv") #getting label csv
    print("reading vad")

    #adding arousal, valence and dominance columns to orginal data csv
    for index, row in tqdm(merged_df.iterrows()):
        matching_rows = vads[(vads['Participant ID'] == row['Participant ID']) & (vads['Video ID'] == row['Video_ID_number'])]

        if not matching_rows.empty:

            merged_df.at[index, 'Valence'] = matching_rows['Valence'].iloc[0]
            merged_df.at[index, 'Arousal'] = matching_rows['Arousal'].iloc[0]
            merged_df.at[index, 'Dominance'] = matching_rows['Dominance'].iloc[0]
            merged_df.at[index, 'significance'] = matching_rows['significance'].iloc[0]

    print("binning...")
    # Define the bins and labels for categorization
    bins = [1, 3, 5]  # Define the bin edges
    labels = [0, 1]   # Define the corresponding labels (0 (Low) for 1-3:, 1 (High) for 4-5)

    # Use the cut function to categorize the 'arousal' column
    merged_df['arousal_category'] = pd.cut(merged_df['Arousal'], bins=bins, labels=labels, include_lowest=True)
    merged_df['valence_category'] = pd.cut(merged_df['Valence'], bins=bins, labels=labels, include_lowest=True)

    # Convert the 'category' column to integer type if needed
    merged_df['arousal_category'] = merged_df['arousal_category'].astype(int)
    merged_df['valence_category'] = merged_df['valence_category'].astype(int)

    print("mapping")
    mapping = {
    'Baseline': 0,
    'LVLA': 0,
    'LVHA': 0,
    'HVHA': 1,
    'HVLA': 1  # Baseline and HVLA mapped to 0
    }

    # Apply the mapping to the 'CMA' column
    merged_df['taskwiselabel'] = merged_df['CMA'].map(mapping)
    # autofet_df

    three_class_mapping = {
    'Baseline': 1,
    'LVLA': 1,
    'LVHA': 0,
    'HVHA': 1,
    'HVLA': 2  
    }

    merged_df['three_class_label'] = merged_df['CMA'].map(three_class_mapping)
    return merged_df

# eda_data_with_labels = label_prep(final_df)
# eda_data_with_labels.to_csv("EDA_data_with_labels.csv")

Statistical features

In [None]:
# cvxEDA
def eda_stats(y):
    Fs = 15.625
    yn = (y - y.mean()) / y.std()
    [r, p, t, l, d, e, obj] = cvxEDA.cvxEDA(yn, 1. / Fs)
    return [r, p, t, l, d, e, obj]

def shannon_entropy(window):
    p = np.abs(window) / np.sum(np.abs(window))
    return -np.sum(p * np.log2(p + 1e-10))

def first_derivative(signal):
    if len(signal) > 1:
        time_values = np.arange(len(signal))
        first_derivative = np.gradient(signal, time_values)
        return first_derivative
    else:
        return np.array([])


def second_derivative(signal):
    fd = first_derivative(signal)
    time_values = np.arange(len(fd))
    second_derivative = np.gradient(first_derivative)
    return second_derivative


def calculate_integral(window):
    a = np.sum(np.abs(window))
    return a

def calculate_avg_power(window):
    avg_power = np.mean(np.square(np.abs(window)))
    return avg_power

def calculate_arc_length(window):
    diff_signal = np.diff(window)
    arc_length = np.sum(np.sqrt(1 + np.square(diff_signal)))
    return arc_length

def slope(window):
    if len(window) > 1:
        time_values = np.arange(len(window))
        slope, _ = np.polyfit(time_values, window, 1)
        return slope
    else:
        return np.nan

segments - participant-video

In [None]:
df_list = {}
for pi in final_df['Participant ID'].unique():
    df_pi = final_df[final_df['Participant ID'] == pi]
    for vi in final_df['Video ID'].unique():
        # print(pi,vi)
        df_vi =  df_pi[df_pi['Video ID'] == vi]
        tag = str(pi) + '_' + vi
        df_list[tag] = (df_vi)
# df_list

In [None]:
df_list['10_V1']

In [None]:
sampling_freq = 15.625
out_df = pd.DataFrame()

for i in df_list:

    row_dict = {}

    pid = df_list[i]['Participant ID'].tolist()[0]
    vid = df_list[i]['Video ID'].tolist()[0]
    gender = df_list[i]['Gender'].tolist()[0]
    cma = df_list[i]['CMA'].tolist()[0]
    vnum = df_list[i]['Video_ID_number'].tolist()[0]

    eda_data = df_list[i]['EDA'].to_numpy()


    eda_clean = nk.eda_clean(eda_data, sampling_rate=sampling_freq ,method='biosppy') #Cleaning eda signal using NK
    x = np.array(eda_clean)
    # r, p, t, l, d, e, obj = eda_stats(eda_clean) #cvxeda for seperating phasic and tonic
    # scr = r 
    # scl = t 
    eda = nk.eda_phasic(x, sampling_freq)
    scr = np.array(eda['EDA_Phasic'])
    scl = np.array(eda['EDA_Tonic'])

    x_axis = np.linspace(0, scl.shape[0]/sampling_freq, scl.shape[0])
    
    #raw eda features
    row_dict['mean'] = np.mean(x) # Mean
    row_dict['std'] = np.std(x) # Standard Deviation
    row_dict['min'] = np.min(x) # Minimum
    row_dict['max'] = np.max(x) # Maximum
    row_dict['median_eda'] = np.quantile(x,0.5) #median
    row_dict['ku_eda'] = stats.kurtosis(x) #kurtosis
    row_dict['sk_eda'] = stats.skew(x) #skewness
    row_dict['dynrange'] = x.max()/x.min()#dynamic range
    row_dict['slope'] = np.polyfit(x_axis,scl,1)[0] #slope
    row_dict['variance'] = np.var(x) # Variance
    row_dict['entropy'] = shannon_entropy(x) # Shannon Entropy
    row_dict['insc'] = calculate_integral(x) # insc
    fd = first_derivative(x)
    row_dict['fd_mean'] = np.mean(fd)
    row_dict['fd_std'] = np.std(fd)

    #scr features
    row_dict['max_scr'] = np.max(scr) #min
    row_dict['min_scr'] = np.min(scr) #max
    row_dict['mean_scr'] = np.mean(scr) # Mean
    row_dict['sd_scr'] = np.std(scr) # Standard Deviation

    _, info = nk.eda_peaks(scr, sampling_freq) #scr peak
    peaks = info['SCR_Peaks']
    amplitude = info['SCR_Amplitude']
    recovery = info['SCR_RecoveryTime']
    
    row_dict['nSCR'] = len(info['SCR_Peaks']) / (x.shape[0]/sampling_freq/60) #to get the number of peaks per minute
    row_dict['aucSCR'] = np.trapz(scr)
    row_dict['meanAmpSCR'] = np.nanmean(amplitude)
    row_dict['maxAmpSCR'] = np.nanmax(amplitude)
    row_dict['meanRespSCR'] = np.nanmean(recovery)
    row_dict['sumAmpSCR'] = np.nansum(amplitude) / (x.shape[0]/sampling_freq/60) # per minute
    row_dict['sumRespSCR'] = np.nansum(recovery) / (x.shape[0]/sampling_freq/60) # per minute

    #scl features
    row_dict['max_scl'] = np.max(scl) #min
    row_dict['min_scl'] = np.min(scl) #max
    row_dict['mean_scl'] = np.mean(scl) # Mean
    row_dict['sd_scl'] = np.std(scl) # Standard Deviation

    row_dict['Participant ID'] = pid
    row_dict['Video ID'] = vid
    row_dict['Gender'] = gender
    row_dict['CMA'] = cma
    row_dict['Video_ID_number'] = vnum
    print(row_dict.keys())

    new_row = pd.DataFrame(row_dict , index=[0])
    out_df = pd.concat([out_df, new_row], ignore_index=True)

out_df

In [None]:
# Find columns with NaN values
columns_with_nan = out_df.columns[out_df.isna().any()].tolist()
columns_with_nan

In [None]:
print(out_df.columns.shape) #33 features
out_df.columns

In [None]:
out_df

In [None]:
eda_data_with_labels = label_prep(out_df)
eda_data_with_labels
# eda_data_with_labels.to_csv("EDA_data_with_labels.csv")

In [None]:
eda_data_with_labels.to_csv("../Data_files/EDA_data_labels.csv")

In [None]:
eda_label = out_df[['Participant ID',
       'Video ID', 'Gender', 'CMA', 'Video_ID_number']]
eda_label

In [None]:
eda_fet = out_df[['mean', 'std', 'min', 'max', 'median_eda', 'ku_eda', 'sk_eda',
       'dynrange', 'slope', 'variance', 'entropy', 'insc', 'fd_mean', 'fd_std',
       'max_scr', 'min_scr', 'mean_scr', 'sd_scr', 'nSCR', 'aucSCR',
       'meanAmpSCR', 'maxAmpSCR', 'meanRespSCR', 'sumAmpSCR', 'sumRespSCR',
       'max_scl', 'min_scl', 'mean_scl', 'sd_scl']]
eda_fet

Feature Normalisation

In [None]:
scaler_eda = MinMaxScaler()
eda_fet_scaled = pd.DataFrame(columns=eda_fet.columns, index=eda_fet.index)
eda_fet_scaled[eda_fet_scaled.columns] = scaler_eda.fit_transform(eda_fet)
eda_fet_scaled

In [None]:
# Calculate the correlation matrix
correlation_matrix = eda_fet_scaled.corr()

# Identify redundant features
redundant_features = []
for feature in correlation_matrix.columns:
    correlated_features = correlation_matrix.index[
        (correlation_matrix[feature] > 0.95) & (correlation_matrix.index != feature)
    ]
    redundant_features.extend(correlated_features)

redundant_features = list(set(redundant_features))
print(redundant_features)
selected_features = [feature for feature in correlation_matrix.columns if feature not in redundant_features]

# Create a subset of the correlation matrix for selected features
reduced_correlation_matrix = correlation_matrix.loc[selected_features, selected_features]

# Create a correlation heatmap using Seaborn
plt.figure(figsize=(10, 10))  # Adjust the figure size as needed
sns.set(font_scale=1)
sns.heatmap(reduced_correlation_matrix, annot=True, cmap='viridis', cbar=True, square=True,
            fmt=".2f", linewidths=0.5)
plt.title('Correlation Heatmap for EDA Features')
plt.show()

print(reduced_correlation_matrix.columns)

In [None]:
eda_final_fet = eda_fet_scaled[['ku_eda', 'sk_eda', 'dynrange', 'slope', 'variance', 'entropy', 'insc',
       'fd_mean', 'max_scr', 'min_scr', 'nSCR', 'meanAmpSCR',
       'meanRespSCR', 'sumAmpSCR', 'sumRespSCR']]
eda_final_fet

In [None]:
merged_df = pd.concat([eda_final_fet, eda_label], axis=1)
merged_df

In [None]:
vads = pd.read_csv("../Data_files/VADS.csv")
vads

In [None]:
for index, row in tqdm(merged_df.iterrows()):
    matching_rows = vads[(vads['Participant ID'] == row['Participant ID']) & (vads['Video ID'] == row['Video_ID_number'])]

    if not matching_rows.empty:

        merged_df.at[index, 'Valence'] = matching_rows['Valence'].iloc[0]
        merged_df.at[index, 'Arousal'] = matching_rows['Arousal'].iloc[0]
        merged_df.at[index, 'Dominance'] = matching_rows['Dominance'].iloc[0]
        merged_df.at[index, 'significance'] = matching_rows['significance'].iloc[0]
        
merged_df

In [None]:
eda_data_with_labels = label_prep(merged_df)
eda_data_with_labels.to_csv("../Data_files/EDA_labels.csv")
eda_data_with_labels

In [None]:
# Define the bins and labels for categorization
bins = [1, 3, 5]  # Define the bin edges
labels = [0, 1]   # Define the corresponding labels (0 (Low) for 1-3:, 1 (High) for 4-5)

# Use the cut function to categorize the 'arousal' column
merged_df['arousal_category'] = pd.cut(merged_df['Arousal'], bins=bins, labels=labels, include_lowest=True)
merged_df['valence_category'] = pd.cut(merged_df['Valence'], bins=bins, labels=labels, include_lowest=True)

# Convert the 'category' column to integer type if needed
merged_df['arousal_category'] = merged_df['arousal_category'].astype(int)
merged_df['valence_category'] = merged_df['valence_category'].astype(int)
merged_df

In [None]:
# merged_df.to_csv("eda_stat_fet",index=False)

In [None]:
def plot_tsne(eda_final_fet, valence_col, arousal_col, valence, arousal):

    # Apply t-SNE to reduce dimensions
    tsne = TSNE(n_components=2, random_state=42)
    tsne_results = tsne.fit_transform(eda_final_fet)

    # Create a DataFrame with t-SNE results
    tsne_df = pd.DataFrame(tsne_results, columns=['t-SNE1', 't-SNE2'])
    tsne_df['Label'] = valence_col

    # Plotting the t-SNE results with colors based on labels
    plt.figure(figsize=(8, 6))
    scatter = plt.scatter(tsne_df['t-SNE1'], tsne_df['t-SNE2'], c=tsne_df['Label'], cmap='viridis')
    plt.title(f't-SNE Plot of Features Colored by {valence}')
    plt.xlabel('t-SNE1')
    plt.ylabel('t-SNE2')
    plt.colorbar(scatter, label='Label')
    plt.show()

    # # Apply t-SNE to reduce dimensions
    # tsne = TSNE(n_components=2, random_state=42)
    # tsne_results = tsne.fit_transform(eda_final_fet)

    # # Create a DataFrame with t-SNE results
    # tsne_df = pd.DataFrame(tsne_results, columns=['t-SNE1', 't-SNE2'])
    tsne_df['Label'] = arousal_col

    # Plotting the t-SNE results with colors based on labels
    plt.figure(figsize=(8, 6))
    scatter = plt.scatter(tsne_df['t-SNE1'], tsne_df['t-SNE2'], c=tsne_df['Label'], cmap='viridis')
    plt.title(f't-SNE Plot of Features Colored by {arousal}')
    plt.xlabel('t-SNE1')
    plt.ylabel('t-SNE2')
    plt.colorbar(scatter, label='Label')
    plt.show()

plot_tsne(eda_final_fet, merged_df['valence_category'], merged_df['arousal_category'], 'valence', 'arousal')

In [None]:
mapping = {
    'Baseline': 1,
    'LVLA': 1,
    'LVHA': 1,
    'HVHA': 0,
    'HVLA': 0  # Baseline and HVLA mapped to 0
}

# Apply the mapping to the 'CMA' column
merged_df['CMA_numeric'] = merged_df['CMA'].map(mapping)
# autofet_df

merged_df['stress'] = merged_df['CMA'].apply(lambda x: 0 if x in ['Baseline', 'HVLA'] else 1)
merged_df

In [None]:
plot_tsne(eda_final_fet, merged_df['CMA_numeric'], merged_df['stress'], 'CMA', 'stress')

In [None]:
merged_df.to_csv("../Data_files/EDA_FINAL_FET.csv", index=False)

In [None]:
ghq_p =pd.read_csv("../Data_files/GHQ-Personality_category_sheet.csv")
ghq_p

In [None]:
for index, row in tqdm(merged_df.iterrows()):
    matching_rows = ghq_p[(ghq_p['Participant ID'] == row['Participant ID'])]

    if not matching_rows.empty:

        merged_df.at[index, 'GHQ Score'] = matching_rows['GHQ Score'].iloc[0]
        merged_df.at[index, 'GHQ Category'] = matching_rows['GHQ Category'].iloc[0]
        
merged_df

In [None]:
merged_df['ghq_category'] = merged_df['GHQ Category'].apply(lambda x: 0 if x in ['Distressed'] else 1)
merged_df

In [None]:
merged_df['Gender_category'] = merged_df['Gender'].apply(lambda x : 0 if x in ['Male'] else 1)
merged_df.columns

In [None]:
plot_tsne(eda_final_fet, merged_df['ghq_category'], merged_df['Gender_category'], 'ghq_category', 'Gender_category')