In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os
import numpy as np
import pandas as pd
import scipy
from IPython.display import clear_output
import scipy.stats as stats
import heartpy
import json

In [None]:

DIR_WESAD="/content/drive/MyDrive/BasicMobileLab/content/data/WESAD"
DIR_SAVING_DATA= "/content/drive/MyDrive/BasicMobileLab/content/data/Dataset/" 

# ECG 데이터 추출

In [None]:
def peak_pos(x : np.array,threshhold: float):
  """ 
  신호 x로부터 ECG 피크를 감지한다. 감지된 피크는 float형 임계값 이상이다.
  피크 감지 이전에, 신호는 5개의 평균값으로 convolution을 이용하여 smoothen 된다.
  피크의 인덱스를 반환한다. 
  """
  assert len(x.shape)==1 # x.shape의 길이가 1이면 그대로 코드 진행
  x=(x-np.mean(x))/np.std(x) # x = (x-평균)/표준편차
  smoothen=[] # 
  conv=np.array([0.2,0.2,0.2,0.2,0.2]) # convolution 해줄 array(window size 마다 곱해줌으로써 신호 smoothing)
  smoothen=np.convolve(x,conv, mode='same') # convolution
  baseline=float(np.mean(smoothen)) # smooth한 신호를 baseline으로
  peakindex=-1 # 초기화
  peakindices=[]
  peakvalue=-100 # 초기화

  for index in range(0,len(smoothen)): # 0부터 데이터 끝나는 길이까지 반복
    value=smoothen[index] # smoothen 된 값을 받아옴

    if value>baseline: # 신호의 값이 baseline보다 클 때

      if peakvalue ==-100 or value>peakvalue : # peakvalue가 -100이거나  value>peakvalue라면
        peakindex=index # peakindex에 해당 인덱스 저장
        peakvalue=value # peakvalue에 해당 value 저장

    if value<baseline and peakindex!=-1 and peakvalue>threshhold: # 값이 baseline보다 작고 peakindex가 -1이고 peakvalue가 임계값보다 큰 경우
      peakindices.append(peakindex) # peak indicator에 peakindex 추가
      peakindex=-1 # 다시 peakindex -1로 초기화
      peakvalue=-100 # 다시 peakvalue -100으로 초기화

  if peakindex!=-1 and peakvalue>threshhold: # peakindex가 -1이 아니고 peakvalue가 임계값보다 크면
    peakindices.append(peakindex) # peak indicator에 peakindex 추가

  return np.array(peakindices) # peak indicator 반환


def TINN(x:np.array): # 
  """ https://cinc.org/archives/2003/pdf/481.pdf
      보간법(interpolation): 
      데이터들(xi)로부터, 주어진 데이터를 만족하는 근사 함수(f(x))를 구하고,  이 식을 이용하여 주어진 변수에 대한 함수 값을 구하는 일련의 과정.
      예를 들어, (0, 0), (1, 10), (2, 20)이 주어졌을 때, 이들에 대한 근사 함수를 f(x) = 10x로 구하고, 1.5에 대한 함수 값으로 15를 구하는 것.
      TINN score를 계산하기 위해 모든 triangular interpolation을 계산한다.
      또한 주어진 ECG 신호에 대한 모든 interbeats times를 포함하는 array x 에서 HRV index를 계산한다.

      축은 가우스 분포의 최대값인 추축의 오른쪽과 왼쪽에 각각 2개의 부분으로 나뉜다.
      TINN 점수 계산은 WESAD Dataset 논문에 정의되어 있으며, 이를 계산하려면 interbeats times의 가우스 분포와 가장 근사한 triangular interpolation이 필요하다.
      triangular interpolation은 가우시안 분포의 최대값에서 만나고 x축의 전반부의 N과 x축 후반부의 M에서 x축을 교차하는 2개의 선으로 정의된다.
      Thus inside ]N;M[ the interpolation function != 0
      Outside of ]N;M[ the interpolation function equals 0.
  """

  kernel = stats.gaussian_kde(x) # x array와 근사한 가우시안 분포를 위한 커널 생성
  absi=np.linspace(np.min(x),np.max(x),len(x)) # 인터비트 분포의 x축 계산(최소 인터비트 시간에서 최대 인터비트 시간까지)
  val=kernel.evaluate(absi) # 가우시안 분포를 생성된 x축에 맞춤
  ecart=absi[1]-absi[0] # Space between 2 values on the axis # 축의 두 값 사이의 거리
  maxind=np.argmax(val) # 가우스 분포(val array)가 최대인 인덱스 선택
  max_pos=absi[maxind]  # 가우스 분포가 최대인 interbeat time(abscissa)
  maxvalue=np.amax(val) # 가우스 분포의 최대값
  N_abs=absi[0:maxind+1] # x축의 전반부
  M_abs=absi[maxind:] # x축의 후반부
  HRVindex=len(x)/maxvalue # HRVindex = 신호 길이/가우스 분포 최대값
  err_N=[]
  err_M=[]

  for i in range(0,len(N_abs)-1): # 0부터 x축 전반부까지
    N=N_abs[i]
    slope=(maxvalue)/(max_pos-N) # 경사각 = 가우스분포 최대값/가우스 분포가 최대인 interbeat time-x축 전반부의 한 값
    D=val[0:maxind+1] # x축 전반부 가우스 분포에 fit 한 값
    q=np.clip(slope*ecart*np.arange(-i,-i+maxind+1),0,None) # x축의 전반부에 대한 Triangular interpolation
    diff=D-q 
    err=np.multiply(diff,diff) # err 에 diff^2 저장
    err1=np.delete(err,-1) # err에서 -1 제거하고 err1에 저장
    err2=np.delete(err, 0) # err에서 0 제거하고 err2에 저장
    errint=(err1+err2)/2
    errtot=np.linalg.norm(errint) # triangular interpolation과 x축의 전반부 가우스 분포 사이의 오차 영역
    err_N.append((errtot,N,N_abs,q))
  
  for i in range(1,len(M_abs)): # 1부터 x축 후반부까지
    M=M_abs[i]
    slope=(maxvalue)/(max_pos-M)
    D=val[maxind:]
    q=np.clip(slope*ecart*np.arange(-i,len(D)-i),0,None) # x축의 후반부에 대한 Triangular interpolation
    diff=D-q
    err=np.multiply(diff,diff)
    err1=np.delete(err,-1)
    err2=np.delete(err, 0)
    errint=(err1+err2)/2
    errtot=np.linalg.norm(errint) # triangular interpolation과 x축의 후반부 가우스 분포 사이의 오차 영역
    err_M.append((errtot,M,M_abs,q))

  return (err_N,err_M,absi,val,HRVindex) # 반환

def best_TINN(x:np.array):
  """
    가우스 분포에 대해 가장 근사한 triangular interpolation 함수를 갖는 N과 M을 선택하고 반환
    N; M; the TINN score = M-N ; and the HRV index
  """
  err_N,err_M,_,_,HRVindex=TINN(x)
  N=np.argmin(np.array(err_N,dtype=object)[:,0]) # 오차 영역이 최소인 N
  M=np.argmin(np.array(err_M,dtype=object)[:,0]) # 오차 영역이 최소인 M
  absN=err_N[N][1]
  absM=err_M[M][1]
  return float(absN),float(absM),float(absM-absN),HRVindex # 반환

# 시간 영역 분석
def num_compare_NN50(x,i):
  """
    NN50: 연속적인 NN간격의 차이가 50 ms를 초과하는 NN간격
    주어진 HRV interval x[i]에 대해 50ms 이상 차이가 나는 HRV interval의 개수 셈
  """
  ref=x[i]
  k=0
  diff=np.absolute(x-ref) # 주어진 HRV interval에서 현재 신호의 값 빼고 절댓값 씌워서 오차값으로
  k+=np.sum(np.where(diff>0.05,1,0)) # 오차값이 0.05보다 크면 1, 그렇지 않으면 0
  return k # 반환

def compare_NN50(x):
  """
    주어진 HRV interval x[i]에 대해 50ms 이상 차이가 나는 HRV interval의 개수와 퍼센티지 반환
  """
  k=0
  for i in range(0,len(x)): # x 길이만큼 반복
    k+=num_compare_NN50(x,i) # 50ms 이상 차이나는 HRV interval 총 개수
  if k==0:
    k=1
  return k,(k/(len(x)*len(x))) # 

# 주파수 영역에서 분석 시작

def get_freq_features_ecg(x):
  """ 
    FFT를 계산하여 HRV 신호(interbeats times)의 주파수 features를 반환하고, HRV의 평균이 샘플링 period로 사용되는 푸리에 주파수를 계산한다.
  """
  mean=np.mean(x) # 신호의 평균값
   # scipy를 이용하여 푸리에 변환(numpy보다 계산 속도 빠름)
  yf=np.array(scipy.fft.fft(x-mean)) # (신호-평균)을 푸리에 변환 하여 배열에 저장
  xf=scipy.fft.fftfreq(len(x),mean)[0:len(x)//2] # 샘플 spacing의 단위당 주기로 frequency bin centers 플로트 배열을 반환.
  psd=(2/len(yf))*np.abs(yf)[0:len(x)//2] # 
  fmean=np.mean(xf)
  fstd=np.std(xf)
  sumpsd=np.sum(psd)
  return fmean,fstd,sumpsd # 반환

def get_data_ecg(x):
  """
    peak list를 계산하기 위해 HeartPy 패키지를 사용하여 ECG 신호 x의 feature를 수집(이전의 peak detection function이 아님).
  """
  working,mes=heartpy.process(x,700) # 700Hz로 샘플링된 주파수
  peak=working["peaklist"] # 
  periods=np.array([(peak[i+1]-peak[i])/700 for i in range(0,len(peak)-1)])
  frequency=1/periods # 주기(period) 를 이용하여 주파수(700Hz) 구함
  meanfreq = np.mean(frequency) # 주파수 평균
  stdfreq = np.std(frequency) # 주파수 표준편차
  HRV=np.array([(peak[i]-peak[i-1])/700 for i in range(1,len(peak))]) # 
  _,_,T,HRVindex=best_TINN(HRV) 
  num50,p50=compare_NN50(HRV)
  meanHRV=np.mean(HRV)
  stdHRV=np.std(HRV)
  rmsHRV=np.sqrt(np.mean(HRV**2))
  fmean,fstd,sumpsd=get_freq_features_ecg(HRV)
  return np.array([meanfreq,stdfreq,T,HRVindex,num50,p50,meanHRV,stdHRV,rmsHRV,fmean,fstd,sumpsd])

def slice_per_label(labelseq,flabel,time_window,step):
  # 레이블 별로  slice 하는 함수
  """
  Return a list of index i of the ECG signal that in [i;i+pts_windows] the label (= emotionnal state of the user) is constant.
  The window is defined by the user by time_window (s) and flabel (freq of sampling for the label, eg 700Hz) (Hz).
  """
  pts_window=time_window*flabel
  taken_indices=[]
  conv=np.array([1 for i in range(0,pts_window)])
  for i in range(0,len(labelseq)-pts_window,flabel*step):   #Sliding 5s window, step 1s
    extr=labelseq[i:i+pts_window]
    res=np.sum(np.multiply(extr,conv))
    l=labelseq[i]
    if l in [1,2,3,4]:
      condition=l*pts_window==res
      if condition==True:
        taken_indices.append((i,l))
  return taken_indices

def get_ecg_f_from_file(path):
  """
  모든 ECG feature를 추출(주어진 파일의 ECG 신호의 각 윈도우에서)
  """
  id=[]
  labels=[]
  features=[]
  taken_indices=[]
  fecg=700
  i=0
  discard=0
  discardbis=0
  pts_window=20*700
  df = pd.read_pickle(path)
  label=df['label']
  print("openned")
  indice_neutral=np.where(label==1)[0]        #Find where is the neutral phase for baseline
  taken_indices=slice_per_label(label,700,20,1)   #get all indices of the start of a slice of the sequence with cst label
  print("indiced")
  ECG_neutral = np.array(df["signal"]["chest"]["ECG"][indice_neutral[0]:indice_neutral[-1]*700][:,0]) #Baseline
  features_neutral=get_data_ecg(ECG_neutral)        #Baseline extraction
  for x in taken_indices:
    i+=1
    if i%100==0:
      clear_output(wait=True)
      print(path)
      print(i/len(taken_indices))
    indice = x[0]
    try:
      ECG=np.array(df['signal']['chest']['ECG'][(indice*fecg)//700:(pts_window+indice)*fecg//700][:,0])
      result=np.divide(get_data_ecg(ECG),features_neutral)
      if not (np.isinf(result).any() or np.isnan(result).any()):
        features.append(result.tolist())
        labels.append(int(x[1]))
      else : 
        discard+=1
    except KeyboardInterrupt:
      print(1/0)
    except : 
      discardbis+=1
  print("reject because infinit : " +str(discard))
  print("reject because error in algorithm : " +str(discardbis))
  return features,id,labels,discard,discardbis,ECG_neutral

NameError: ignored

# Extract ECG

In [None]:
""" The extracted features and label (id is not used anymore) are stored in a dictionnary for each subject and each dictionnary is written
in a json file named with the number of the subject
"""

dir_wesad=" "
l= os.listdir(DIR_WESAD) 
del l[l.index('wesad_readme.pdf')]
try :
  del l[l.index('wesad_readme.pdf')]
except :
  pass
i=0
for name in l:
  i+=1
  data_w={}
  path =str(DIR_WESAD+name+"/"+name+".pkl")
  print(name)
  print(i/len(l))
  print(len(l))
  X,Y,Z,discard,discardbis,neutr=get_ecg_f_from_file(path)
  data_w["id"]=Y
  data_w["label"]=Z
  data_w["features"]=X
  with open(DIR_SAVING_DATA+"WESADECG_"+name+".json", 'w') as f:
    json.dump(data_w, f)