In [None]:
!pip install neurokit2 python-dotenv

In [None]:
!pip install --upgrade tensorflow

In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os
import random
import seaborn as sns
from scipy import stats
from collections import Counter
import neurokit2 as nk
import matplotlib.pyplot as plt
import dotenv
import gspread

import sys
from google.auth import default
from google.colab import auth, drive


from tqdm import tqdm
from tqdm.notebook import tqdm

tqdm.pandas()

drive.mount('/content/drive', force_remount=True)

dotenv.load_dotenv('/content/drive/MyDrive/.env')

DELETE_FEATURE_SAMPLE = False
DATASET_PROCEDURES = ["DP_1", "DP_2"]
AUGMENTATION_SAMPLE_AMOUNT_BY_PROCEDURE = {"AP_2": 1, "AP_3": 1, "AP_5" : 9, "AP_6": 11, "AP_7": 11, "AP_8": 22, "AP_9": 22}
AUGMENTATION_PROCEDURES = AUGMENTATION_SAMPLE_AMOUNT_BY_PROCEDURE.keys()
MAIN_PATH = os.environ.get('MAIN_EXP_PATH')
MAIN_PROCESSED_PATH = f"{MAIN_PATH}(Processed)"


In [None]:
util_script_path = os.environ.get('UTIL_SCRIPT_PATH')
if util_script_path is not None:
  sys.path.insert(0, util_script_path)
else:
  print("Warning: UTIL_SCRIPT_PATH environment variable is not set.")
  # Consider providing a default path or handling the error differently

In [None]:
from analyze_and_transform_datasets import list_files_scandir, \
                                          get_formatted_values, \
                                          format_time_str, \
                                          get_sample_path, \
                                          read_datas_from_csv, \
                                          get_labels_df, \
                                          write_features_in_csv, \
                                          get_signal_metrics

In [None]:

all_EDA_infos = {}
EDA_files = []
all_EDA_Phasic_infos = {}
EDA_Phasic_files = []
all_EDA_Tonic_infos = {}
EDA_Tonic_files = []

allowed_EDA_files = [] #'EDA.csv'
allowed_EDA_Phasic_files = [] #'EDA_Phasic.csv'
allowed_EDA_Tonic_files = [] #'EDA_Tonic.csv'

for PROCEDURE_NAME in AUGMENTATION_PROCEDURES:
  for idx in range(AUGMENTATION_SAMPLE_AMOUNT_BY_PROCEDURE[PROCEDURE_NAME]):
    allowed_EDA_files.append(f'Aug-{PROCEDURE_NAME}-{(idx + 1)}_EDA.csv')
    allowed_EDA_Phasic_files.append(f'Aug-{PROCEDURE_NAME}-{(idx + 1)}_EDA_Phasic.csv')
    allowed_EDA_Tonic_files.append(f'Aug-{PROCEDURE_NAME}-{(idx + 1)}_EDA_Tonic.csv')

for DATASET_PROCEDURE in DATASET_PROCEDURES:
  EDA_files = []
  EDA_Phasic_files = []
  EDA_Tonic_files = []
  list_files_scandir(allowed_EDA_files, EDA_files, f'{MAIN_PROCESSED_PATH} {DATASET_PROCEDURE}', MAIN_PATH, True)
  list_files_scandir(allowed_EDA_Phasic_files, EDA_Phasic_files, f'{MAIN_PROCESSED_PATH} {DATASET_PROCEDURE}', MAIN_PATH, True)
  list_files_scandir(allowed_EDA_Tonic_files, EDA_Tonic_files, f'{MAIN_PROCESSED_PATH} {DATASET_PROCEDURE}', MAIN_PATH, True)
  print(f'Number of files will be used in procedure {DATASET_PROCEDURE}: {np.sum([len(EDA_files), len(EDA_Phasic_files), len(EDA_Tonic_files)])}')
  all_EDA_infos[DATASET_PROCEDURE] = EDA_files
  all_EDA_Phasic_infos[DATASET_PROCEDURE] = EDA_Phasic_files
  all_EDA_Tonic_infos[DATASET_PROCEDURE] = EDA_Tonic_files

### Extracting Physiological Metrics

#### - Electrodermal Activity(EDA)

In [None]:
random_procedure = random.sample(DATASET_PROCEDURES, 1)[0]
random_samples = random.sample(all_EDA_infos[random_procedure], 3)


random_samples = random.sample(EDA_files, 1)

In [None]:


sample_path = MAIN_PROCESSED_PATH + ' ' + random_procedure + '/' + random_samples[0]['diagnose_result']
sample_path += '/' + random_samples[0]['sample_name']
sample_path += '/' + random_samples[0]['game_name']
sample_path += '/' + random_samples[0]['file_name']
print(f'Read from : {sample_path}')
df = pd.read_csv(sample_path)
df['time'] = pd.to_datetime(df['time'].str.replace(',', '.'), unit='s')
df.index_col = 'time'
df.index = df['time']
df.head()

In [None]:

df_values = df.loc[df['time'].values[0]: (df['time'].values[0]+ np.timedelta64(10, 's')), 'values']
if len(df_values) > 0:
  zero_crossings = nk.signal_zerocrossings(df_values.values)
  info = nk.signal_findpeaks(df_values.values)
  info2 = nk.signal_findpeaks(-df_values.values)
  nk.events_plot([info2["Peaks"], info["Peaks"], zero_crossings], df_values.values)
  _ , info3 = nk.eda_peaks(df_values.values, sampling_rate=4)
  nk.events_plot([info3["SCR_Peaks"]], df_values.values)

In [None]:
def get_eda_metrics(df_values, sample_rate = 100):

  statistic_values = {'eda_zero_cross': np.nan, 'eda_positive_peak': np.nan, 'eda_negative_peak': np.nan}

  df_values_array = df_values.values
  try:
    zero_crossings = nk.signal_zerocrossings(df_values_array)
    statistic_values['eda_zero_cross'] = len(zero_crossings)
  except Exception as e:
    print(e)
    print("EDA zero crossings cannot calculated")

  try:
    positive_peaks = nk.signal_findpeaks(df_values_array)
    statistic_values['eda_positive_peak'] = len(positive_peaks["Peaks"])
  except Exception as e:
    print(e)
    print("EDA positive peaks cannot calculated")

  try:
    negative_peaks = nk.signal_findpeaks(-df_values_array)
    statistic_values['eda_negative_peak'] = len(negative_peaks["Peaks"])
  except Exception as e:
    print(e)
    print("EDA negative peaks cannot calculated")

  return statistic_values


def get_scl_metrics(df_values, sample_rate = 100):
  statistic_values = {'scl_zero_cross': np.nan, 'scl_positive_peak': np.nan, 'scl_negative_peak': np.nan}

  df_values_array = df_values.values
  try:
    zero_crossings = nk.signal_zerocrossings(df_values_array)
    statistic_values['scl_zero_cross'] = len(zero_crossings)
  except Exception as e:
    print(e)
    print("SCL zero crossings cannot calculated")

  try:
    positive_peaks = nk.signal_findpeaks(df_values_array)
    statistic_values['scl_positive_peak'] = len(positive_peaks["Peaks"])
  except Exception as e:
    print(e)
    print("SCL positive peaks cannot calculated")

  try:
    negative_peaks = nk.signal_findpeaks(-df_values_array)
    statistic_values['scl_negative_peak'] = len(negative_peaks["Peaks"])
  except Exception as e:
    print(e)
    print("SCL negative peaks cannot calculated")

  return statistic_values


def get_scr_metrics(df_values, sample_rate = 100):

  statistic_values = {'scr_zero_cross': np.nan, 'scr_positive_peak': np.nan, 'scr_negative_peak': np.nan}

  df_values_array = df_values.values
  try:
    zero_crossings = nk.signal_zerocrossings(df_values_array)
    statistic_values['scr_zero_cross'] = len(zero_crossings)
  except Exception as e:
    print(e)
    print("SCR zero crossings cannot calculated")

  try:
    _ , info = nk.eda_peaks(df_values_array, sampling_rate=sample_rate)
    statistic_values['scr_positive_peak'] = len(info["SCR_Peaks"])
  except Exception as e:
    print(e)
    print("SCR positive peaks cannot calculated")

  try:
    _ , info = nk.eda_peaks(-df_values_array, sampling_rate=sample_rate)
    statistic_values['scr_negative_peak'] = len(info["SCR_Peaks"])
  except Exception as e:
    print(e)
    print("SCR negative peaks cannot calculated")

  return statistic_values



In [None]:
# get_eda_metrics(df, 10, 'SCR')
get_signal_metrics(df, 10, 'EDA', [], special_features=['eda_zero_cross', 'eda_positive_peak', 'eda_negative_peak'], get_special_metrics_callback=get_eda_metrics, sampling_rate=4)

In [None]:

sample_path = MAIN_PROCESSED_PATH + ' ' + random_procedure + '/' + random_samples[0]['diagnose_result']
sample_path += '/' + random_samples[0]['sample_name']
sample_path += '/' + random_samples[0]['game_name']
sample_path += '/EDA_Phasic.csv'
print(sample_path)
df2 = pd.read_csv(sample_path)
df2['time'] = pd.to_datetime(df2['time'].str.replace(',', '.'), unit='s')
df2.index_col = 'time'
df2.index = df2['time']
df2.head()

In [None]:

df_values = df2.loc[df2['time'].values[0]: (df2['time'].values[0]+ np.timedelta64(10, 's')), 'values']
if len(df_values) > 0:
  info = nk.signal_findpeaks(df_values.values)
  _ , info2 = nk.eda_peaks(df_values.values, sampling_rate=4)
  _ , info3 = nk.eda_peaks(-df_values.values, sampling_rate=4)
  nk.events_plot([info3["SCR_Peaks"]], df_values.values)
  nk.events_plot([info2["SCR_Peaks"]], df_values.values)
  nk.events_plot([info["Peaks"]], df_values.values)

In [None]:
get_signal_metrics(df2, 10, 'SCR', [], special_features=['scr_zero_cross', 'scr_positive_peak', 'scr_negative_peak'], get_special_metrics_callback=get_scr_metrics, sampling_rate=4)

In [None]:
for AUGMENTATION_PROCEDURE in AUGMENTATION_PROCEDURES:
  for DATASET_PROCEDURE in DATASET_PROCEDURES:
    for EDA_file_idx in tqdm(range(len(all_EDA_infos[DATASET_PROCEDURE]))):
      EDA_file = all_EDA_infos[DATASET_PROCEDURE][EDA_file_idx]
      raw_dataframe = read_datas_from_csv(EDA_file, DATASET_PROCEDURE, MAIN_PROCESSED_PATH)
      for interval_value in [10, 30, 50, 70]:
        if interval_value == 10:
          write_features_in_csv(EDA_file, raw_dataframe, interval_value, DATASET_PROCEDURE, 'EDA', MAIN_PROCESSED_PATH, DELETE_FEATURE_SAMPLE, special_features=['eda_zero_cross', 'eda_positive_peak', 'eda_negative_peak'], get_special_metrics_callback=get_eda_metrics, sampling_rate=4)

### - Tonic EDA (SCL)

In [None]:

for DATASET_PROCEDURE in DATASET_PROCEDURES:
  for EDA_Tonic_file_idx in tqdm(range(len(all_EDA_Tonic_infos[DATASET_PROCEDURE]))):
    EDA_Tonic_file = all_EDA_Tonic_infos[DATASET_PROCEDURE][EDA_Tonic_file_idx]
    raw_dataframe = read_datas_from_csv(EDA_Tonic_file, DATASET_PROCEDURE, MAIN_PROCESSED_PATH)
    for interval_value in [10, 30, 50, 70]:
      if interval_value == 10:
        write_features_in_csv(EDA_Tonic_file, raw_dataframe, interval_value, DATASET_PROCEDURE, 'SCL', MAIN_PROCESSED_PATH, DELETE_FEATURE_SAMPLE, special_features=['scl_zero_cross', 'scl_positive_peak', 'scl_negative_peak'], get_special_metrics_callback=get_scl_metrics, sampling_rate=4)


### - Phasic EDA (SCR)

In [None]:

for DATASET_PROCEDURE in DATASET_PROCEDURES:
  for EDA_Phasic_file_idx in tqdm(range(len(all_EDA_Phasic_infos[DATASET_PROCEDURE]))):
    EDA_Phasic_file = all_EDA_Phasic_infos[DATASET_PROCEDURE][EDA_Phasic_file_idx]
    raw_dataframe = read_datas_from_csv(EDA_Phasic_file, DATASET_PROCEDURE, MAIN_PROCESSED_PATH)
    for interval_value in [10, 30, 50, 70]:
      if interval_value == 10:
        write_features_in_csv(EDA_Phasic_file, raw_dataframe, interval_value, DATASET_PROCEDURE, 'SCR', MAIN_PROCESSED_PATH, DELETE_FEATURE_SAMPLE, special_features=['scr_zero_cross', 'scr_positive_peak', 'scr_negative_peak'], get_special_metrics_callback=get_scr_metrics, sampling_rate=4)
