In [None]:
import os
import matplotlib.animation as animation
import matplotlib.pyplot as plt
import numpy as np
import random
import pandas as pd
import scipy
import time
from IPython.display import clear_output
import scipy.stats as stats
import heartpy
import random
import json

In [None]:
"""Please extract the files in WESAD.zip in a file that will contains S2,S3,...Sk folders of the subject

If you want to use a subject for testing (I have used the S17 for testing) I suggest you to move your S17 file to a Testing folder.
That is to say for eg : /content/data/WESAD contains S2,S3,...Sk folders but does not contain S17 folder
                        /content/data/Testing contains S17 folder

"""
DIR_TESTING_FOLDER="/content/data/Testing/" # Please complete the path to your file that contains subject test folder. eg : DIR_TESTING_FOLDER=/content/data/Testing/
DIR_SAVING_DATA_TESTING= "/content/data/Testing/" #Please complete the path where you want to save your data once treated
SUBJECT_USED_FOR_TESTING = "S17" #S17 for example

# Fonctions extraction ECG

In [3]:
def peak_pos(x : np.array,threshhold: float):
  assert len(x.shape)==1
  x=(x-np.mean(x))/np.std(x)
  smoothed=[]
  conv=np.array([0.2,0.2,0.2,0.2,0.2])
  smoothed=np.convolve(x,conv, mode='same')
  baseline=float(np.mean(smoothed))
  peakindex=-1
  peakindices=[]
  peakvalue=-100
  for index in range(0,len(smoothed)):
    value=smoothed[index]
    if value>baseline:
      if peakvalue ==-100 or value>peakvalue :
        peakindex=index
        peakvalue=value
    if value<baseline and peakindex!=-1 and peakvalue>threshhold:
      peakindices.append(peakindex)
      peakindex=-1
      peakvalue=-100
  if peakindex!=-1 and peakvalue>threshhold:
    peakindices.append(peakindex)
  return np.array(peakindices)


def TINN(x:np.array):
  kernel = stats.gaussian_kde(x)
  absi=np.linspace(np.min(x),np.max(x),len(x))
  val=kernel.evaluate(absi)
  ecart=absi[1]-absi[0]
  maxind=np.argmax(val)
  max_pos=absi[maxind]
  maxvalue=np.amax(val)
  N_abs=absi[0:maxind+1]
  M_abs=absi[maxind:]
  HRVindex=len(x)/maxvalue
  err_N=[]
  for i in range(0,len(N_abs)-1):
    N=N_abs[i]
    slope=(maxvalue)/(max_pos-N)
    D=val[0:maxind+1]
    q=np.clip(slope*ecart*np.arange(-i,-i+maxind+1),0,None)
    diff=D-q
    err=np.multiply(diff,diff)
    err1=np.delete(err,-1)
    err2=np.delete(err, 0)
    errint=(err1+err2)/2
    errtot=np.linalg.norm(errint)
    err_N.append((errtot,N,N_abs,q))
  err_M=[]
  for i in range(1,len(M_abs)):
    M=M_abs[i]
    slope=(maxvalue)/(max_pos-M)
    D=val[maxind:]
    q=np.clip(slope*ecart*np.arange(-i,len(D)-i),0,None)
    diff=D-q
    err=np.multiply(diff,diff)
    err1=np.delete(err,-1)
    err2=np.delete(err, 0)
    errint=(err1+err2)/2
    errtot=np.linalg.norm(errint)
    err_M.append((errtot,M,M_abs,q))
  return (err_N,err_M,absi,val,HRVindex)

def best_TINN(x:np.array):
  err_N,err_M,_,_,HRVindex=TINN(x)
  N=np.argmin(np.array(err_N,dtype=object)[:,0])
  M=np.argmin(np.array(err_M,dtype=object)[:,0])
  absN=err_N[N][1]
  absM=err_M[M][1]
  return float(absN),float(absM),float(absM-absN),HRVindex

def num_compare_NN50(x,i):
  ref=x[i]
  k=0
  diff=np.absolute(x-ref)
  k+=np.sum(np.where(diff>0.05,1,0))
  return k 

def compare_NN50(x):
  k=0
  for i in range(0,len(x)):
    k+=num_compare_NN50(x,i)
  if k==0:
    k=1
  return k,(k/(len(x)*len(x)))

def get_freq_features_ecg(x):
  mean=np.mean(x)
  yf=np.array(scipy.fft.fft(x-mean))
  xf=scipy.fft.fftfreq(len(x),mean)[0:len(x)//2]
  psd=(2/len(yf))*np.abs(yf)[0:len(x)//2]
  fmean=np.mean(xf)
  fstd=np.std(xf)
  sumpsd=np.sum(psd)
  return fmean,fstd,sumpsd #ULf,Lf,Hf,UHf,ratioLH,

def get_data_ecg(x):
  working,mes=heartpy.process(x,700)
  peak=working["peaklist"]
  periods=np.array([(peak[i+1]-peak[i])/700 for i in range(0,len(peak)-1)])
  frequency=1/periods
  meanfreq = np.mean(frequency)
  stdfreq = np.std(frequency)
  HRV=np.array([(peak[i]-peak[i-1])/700 for i in range(1,len(peak))])
  _,_,T,HRVindex=best_TINN(HRV)
  num50,p50=compare_NN50(HRV)
  meanHRV=np.mean(HRV)
  stdHRV=np.std(HRV)
  rmsHRV=np.sqrt(np.mean(HRV**2))
  fmean,fstd,sumpsd=get_freq_features_ecg(HRV)
  return np.array([meanfreq,stdfreq,T,HRVindex,num50,p50,meanHRV,stdHRV,rmsHRV,fmean,fstd,sumpsd]) #ULf,Lf,Hf,UHf,ratioLH

def slice_per_label(labelseq,flabel,time_window,step):
  pts_window=time_window*flabel
  taken_indices=[]
  conv=np.array([1 for i in range(0,pts_window)])
  for i in range(0,len(labelseq)-pts_window,flabel*step):   #Sliding 5s window, step 1s
    extr=labelseq[i:i+pts_window]
    res=np.sum(np.multiply(extr,conv))
    l=labelseq[i]
    if l in [1,2,3,4]:
      condition=l*pts_window==res
      if condition==True:
        taken_indices.append((i,l))
  return taken_indices

def get_ecg_f_from_file(path):
  id=[]
  labels=[]
  features=[]
  taken_indices=[]
  fecg=700
  i=0
  discard=0
  discardbis=0
  pts_window=20*700
  df = pd.read_pickle(path)
  label=df['label']
  print("openned")
  indice_neutral=np.where(label==1)[0]        #Find where is the neutral phase for baseline
  taken_indices=slice_per_label(label,700,20,1)   #get all indices of the start of a slice of the sequence with cst label
  print("indiced")
  ECG_neutral = np.array(df["signal"]["chest"]["ECG"][indice_neutral[0]:indice_neutral[-1]*700][:,0]) #Baseline
  features_neutral=get_data_ecg(ECG_neutral)        #Baseline extraction
  for x in taken_indices:
    i+=1
    if i%100==0:
      clear_output(wait=True)
      print(path)
      print(i/len(taken_indices))
    indice = x[0]
    try:
      ECG=np.array(df['signal']['chest']['ECG'][(indice*fecg)//700:(pts_window+indice)*fecg//700][:,0])
      result=np.divide(get_data_ecg(ECG),features_neutral)
      if not (np.isinf(result).any() or np.isnan(result).any()):
        features.append(result.tolist())
        labels.append(int(x[1]))
      else : 
        discard+=1
    except KeyboardInterrupt:
      print(1/0)
    except : 
      discardbis+=1
  print("reject because infinit : " +str(discard))
  print("reject because error in algorithm : " +str(discardbis))
  return features,id,labels,discard,discardbis,ECG_neutral

# Extract ECG

In [None]:
name=SUBJECT_USED_FOR_TESTING
data_w={}
path =str(DIR_TESTING_FOLDER+name+"/"+name+".pkl")
X,Y,Z,discard,discardbis,neutr=get_ecg_f_from_file(path)
data_w["id"]=Y
data_w["label"]=Z
data_w["features"]=X
with open(DIR_SAVING_DATA_TESTING+"WESADECG_"+name+".json", 'w') as f:
  json.dump(data_w, f)
  