In [None]:
!pip install biobss --upgrade

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
from scipy.signal import butter, iirnotch, lfilter
from os import listdir
from os.path import isfile, join, isdir
import numpy as np
import matplotlib.pyplot as plt
import scipy.signal
import scipy.ndimage
import scipy.stats as stats

import csv
import pickle
import pandas as pd

import biobss

from tqdm import tqdm

from google.colab import drive

drive.mount('/content/drive')
folder_name="WESAD"
dataset_path= "/content/drive/Shareddrives/EPO4_C2/"+folder_name
subject = 'S2'

# to get the list of directories in the specified directory.
onlydirs = [f for f in listdir(dataset_path) if isdir(join(dataset_path, f))]

In [None]:
class read_data_of_one_subject:
    """Read data from WESAD dataset"""
    def __init__(self, path, subject):
        self.keys = ['label', 'subject', 'signal']
        self.signal_keys = ['wrist', 'chest']
        self.chest_sensor_keys = ['ACC', 'ECG', 'EDA', 'EMG', 'Resp', 'Temp']
        self.wrist_sensor_keys = ['ACC', 'BVP', 'EDA', 'TEMP']
        #os.chdir(path)
        #os.chdir(subject)
        with open(path+'/' + subject +'/'+subject + '.pkl', 'rb') as file:
            data = pickle.load(file, encoding='latin1')
        self.data = data

    def get_labels(self):
        return self.data[self.keys[0]]

    def get_wrist_data(self):
        """"""
        #label = self.data[self.keys[0]]
        assert subject == self.data[self.keys[1]]
        signal = self.data[self.keys[2]]
        wrist_data = signal[self.signal_keys[0]]
        #wrist_ACC = wrist_data[self.wrist_sensor_keys[0]]
        #wrist_ECG = wrist_data[self.wrist_sensor_keys[1]]
        return wrist_data

    def get_chest_data(self):
        """"""
        signal = self.data[self.keys[2]]
        chest_data = signal[self.signal_keys[1]]
        return chest_data

In [None]:
# Object instantiation
obj_data = {}
 
# Accessing class attributes and method through objects
obj_data[subject] = read_data_of_one_subject(dataset_path, subject)

In [None]:
chest_data_dict = obj_data[subject].get_chest_data()
chest_dict_length = {key: len(value) for key, value in chest_data_dict.items()}
print(chest_dict_length)

In [None]:
# Get labels
labels = obj_data[subject].get_labels()
baseline = np.asarray([idx for idx,val in enumerate(labels) if val == 1])
plt.plot(labels)

In [None]:
eda_base=chest_data_dict['EDA'][baseline,0]
sampling_rate=700
# cut a smaller window
eda=eda_base[:200*sampling_rate]

t=np.arange(0,eda.size*(1/sampling_rate),(1/sampling_rate))
t=t[:eda.size]

plt.figure(figsize=(12,4))
plt.plot(t,eda)
plt.xlabel('$Time (s)$') 
plt.ylabel('$EDA$') 
print(np.max(t))

In [None]:
eda_f = biobss.edatools.filter_eda(eda, sampling_rate)

plt.figure(figsize=(12,4))
plt.plot(t,eda_f)
plt.xlabel('$Time (s)$') 
plt.ylabel('$EDA$') 
print(np.max(t))

In [None]:
eda = biobss.edatools.eda_decompose(eda_f, sampling_rate=sampling_rate)
tonic = eda.iloc[:,0]
phasic = eda.iloc[:,1]
print(eda)

In [None]:
#Features from EDA signal
tonic_phase_feat = biobss.edatools.from_decomposed(phasic, tonic, sampling_rate)
eda_feat = biobss.edatools.from_signal(eda_f, sampling_rate)
eda_freq_feat = biobss.edatools.eda_freq_features(eda)
eda_hjorth_feat = biobss.edatools.eda_hjorth_features(eda)
eda_sig_feat = biobss.edatools.eda_signal_features(eda)
eda_stat_feat = biobss.edatools.eda_stat_features(eda)

#Features from tonic
tonic_feat = biobss.edatools.from_scl(tonic)

#Features from phasic
phasic_feat = biobss.edatools.from_scr(phasic)

In [None]:
print(tonic_phase_feat)

In [None]:
#field_names= ['scr_mean', 'scr_std', 'scr_max', 'scr_min', 'scr_range', 'scr_kurtosis', 'scr_skew', 'scr_momentum', 'scr_activity', 'scr_complexity', 'scr_mobility', 'scr_rms', 'scr_acr_length', 'scr_integral', 'scr_average_power',
 #             'scr_f1sc', 'scr_f2sc', 'scr_f3sc', 'scr_Energy', 'scr_Entropy', 'scr_max_freq', 'scl_mean', 'scl_std', 'scl_max', 'scl_min', 'scl_range', 'scl_kurtosis', 'scl_skew', 'scl_momentum']

In [None]:
############ lowpass filtering
# Parameters
order = 4
frequency = 5
sampling_rate = 100
frequency = frequency / (sampling_rate / 2)  # Normalize frequency to Nyquist Frequency (Fs/2).

In [None]:
#eda_data = np.empty((0,196639)) 
#eda_time_data = np.empty((0,196639)) 
#
#for i in range(56):
#  sub_ind = i
#  # read eda file
#  csvpath = dataset_path+'/'+onlydirs[sub_ind]+'/BitalinoGSR.txt'
#  # load the txt file
#  data = np.loadtxt(csvpath,dtype='str')
#  # eda recordings
#  eda = data[:,0].astype(float)[:196639][np.newaxis] # slice all the data to the length of the shortest one
#  eda_data = np.append(eda_data,eda, axis = 0)
#  # timestamps
#  eda_time = data[:,1].astype(float)[:196639][np.newaxis] # slice all the data to the length of the shortest one
#  eda_time_data = np.append(eda_time, eda_time_data, axis = 0)



In [None]:
# Filtering
b, a = scipy.signal.butter(order, frequency)
eda_lp = scipy.signal.filtfilt(b, a, eda)

################################### Smoothing
# hybrid method
# step 1
size= int(0.75 * sampling_rate)
eda_sm0 = scipy.ndimage.uniform_filter1d(eda_lp, size, mode='nearest')

# step 2
# window
kernel="parzen"
window = scipy.signal.get_window(kernel, size)
w = window / window.sum()

eda_sm_l= np.empty((0,196639)) 

# Extend signal edges to avoid boundary effects.
signal_edge_l = eda_sm0[:0]
signal_edge_r = eda_sm0[:-1]

for i in range(15):
  pad_l = np.full(shape=38, fill_value= signal_edge_l[i], dtype= float) # pad with 38 times the first signal value
  pad_r = np.full(shape=38, fill_value= signal_edge_r[i], dtype= float) # pad with 38 times the last signal value

  eda_sm0_pad = np.concatenate((pad_l, eda_sm0[i], pad_r)) # pad the signal at begin and end
 
  # Compute moving average.
  eda_sm = np.convolve(w, eda_sm0_pad)
  eda_sm = eda_sm[size:-size]
  eda_sm_l = np.append(eda_sm_l, eda_sm[np.newaxis], axis = 0)


In [None]:
################## decompose 
# Electrodermal Activity (EDA) into Phasic and Tonic components.
# Phasic
order=5
freqs=[0.05 / (sampling_rate)]
sos = scipy.signal.butter(order, freqs, btype='highpass', output="sos")
phasic= scipy.signal.sosfiltfilt(sos, eda_sm_l)

#Tonic
sos = scipy.signal.butter(order, freqs, btype='lowpass', output="sos")
tonic= scipy.signal.sosfiltfilt(sos, eda_sm_l)

In [None]:
#fig, ax = plt.subplots(56, 1, figsize=(20, 100))
#
#for i in range(56):
#  eda_data_plot = eda_data[i]
#  eda_sm_plot = eda_sm_l[i]
#  phasic_plot = phasic[i]
#  tonic_plot = tonic[i]
#  t=np.arange(0,eda_time_data[i].size*0.01,0.01)
#  t=t[:eda.size]
#
#  ax[i].plot(t,eda_sm_plot)
#  ax[i].plot(t,phasic_plot)
#  ax[i].plot(t,tonic_plot)
#  # labels and titles
#  ax[i].set_xlabel('$Time (s)$') 
#  ax[i].set_ylabel('$EDA$') 
#
#  # saving the figure as png
#  #plt.savefig('eda.png', dpi=300)

In [None]:
onset_l = np.empty((0,56)) 
hrt_time_l = np.empty((0,56)) 

for i in range(56): 
  peaks, properties = scipy.signal.find_peaks(phasic[i])
  heights, left_bases, right_bases = scipy.signal.peak_prominences(phasic[i], peaks)
  widths, width_heights, left_ips, __ = scipy.signal.peak_widths(phasic[i], peaks, rel_height=0.9)
  __, __, __, right_ips = scipy.signal.peak_widths(phasic[i], peaks, rel_height=0.5)

  # find the indices with an amplitude larger that 0.1
  keep = np.full(len(peaks), True)
  amplitude_min=0.1
  keep[heights<amplitude_min] = False

  # only keep those 
  peaks=peaks[keep]
  heights=heights[keep]
  widths=widths[keep]
  left_bases = left_bases[keep]
  right_bases = right_bases[keep]
  width_heights = width_heights[keep]
  left_ips = left_ips[keep]
  right_ips = right_ips[keep]

  onset = left_ips.astype(int) / sampling_rate
  onset_index = left_ips.astype(int)
  onset_l = np.append(onset_l,onset[:56][np.newaxis], axis = 0)
  #print("Onset time is\n",onset)

  hrt = right_ips.astype(int) / sampling_rate
  hrt_index = right_ips.astype(int)
  hrt_time = (hrt_index - peaks) / sampling_rate
  hrt_time_l = np.append(hrt_time_l,hrt_time[:56][np.newaxis], axis = 0)
  #print("Half recovery time is\n",hrt_time)

  ### plot
  #t=np.arange(0,eda_time_data[i].size*0.01,0.01)
  #t=t[:eda.size]
#
  #plt.figure(figsize=(25,2))
  #plt.plot(t,phasic[i],label='phasic')
  #plt.plot(t[peaks],phasic[i,peaks],'o',label='peaks')
  #plt.plot(onset, phasic[i,onset_index],'x',label='onset')
  #plt.plot(hrt, phasic[i,hrt_index],'x',label='half recovery')
  ## labels and titles
  #plt.xlabel('$Time (s)$') 
  #plt.ylabel('$EDA$') 
  #plt.legend()

In [None]:
mean = np.mean(hrt_time_l, axis= 1)
min = np.amin(hrt_time_l, axis= 1)
max = np.amax(hrt_time_l, axis= 1)
print(mean)
print(min)
print(max)