In [2]:
import os
import pickle
import numpy as np
import pandas as pd
import neurokit2 as nk
import tensorflow as tf
import matplotlib.pyplot as plt
import tensorflow as tf

from keras.utils import to_categorical
from sklearn.model_selection import KFold
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import clone_model
from tensorflow.keras.layers import Input, LSTM, Dense, Concatenate, Masking, GRU, Flatten, Conv1D, LSTM, Bidirectional
from tensorflow.keras.layers import MaxPooling1D, BatchNormalization, Dropout, ZeroPadding1D
from sklearn.metrics import classification_report, ConfusionMatrixDisplay
from sklearn.metrics import accuracy_score, confusion_matrix, f1_score, precision_score, recall_score,roc_auc_score

tf.test.is_gpu_available()

Instructions for updating:
Use `tf.config.list_physical_devices('GPU')` instead.


True

In [290]:
# load_Beat_level로 수정해야함

def load_BEAT_level(signal, r_peaks):
    signal_list = []
    for number in range(1,len(r_peaks['ECG_R_Peaks'])-1):
        if len(signal[r_peaks['ECG_R_Peaks'][number]-231 : r_peaks['ECG_R_Peaks'][number]+469]) < 700:
            continue
        signal_list.append(signal[r_peaks['ECG_R_Peaks'][number]-231 : r_peaks['ECG_R_Peaks'][number]+469])
    return np.array(signal_list)

def shuffle_data(data, labels):
    # data와 label을 입력받아 무작위로 섞고 (인덱스 위치는 동일하게) Return
    indices = np.arange(data.shape[0])
    np.random.shuffle(indices)
    return data[indices], labels[indices]

# Beat level에 맞게 수정 
def load_data(subjects):
    # Concatenate 함수를 사용하기 전에 리스트 대신 NumPy 배열 사용
    normal_data, stress_data, amuse_data = [], [], []
    normal_label, stress_label, amuse_label = [], [], []

    for i in subjects:
        df = pd.read_pickle(Data + i + '/' + i + '.pkl')
        ecg_v = df['signal']['chest']['ECG']
        label = df['label']

        amuse_ecg = ecg_v[np.where(label == 3)[0]].flatten()
        normal_ecg = ecg_v[np.where(label == 1)[0]].flatten()
        stress_ecg = ecg_v[np.where(label == 2)[0]].flatten()
        
        _, amuse_rpeaks = nk.ecg_peaks(amuse_ecg, sampling_rate=sampling_rate, show=False, method='neurokit')
        _, normal_rpeaks = nk.ecg_peaks(normal_ecg, sampling_rate=sampling_rate, show=False, method='neurokit')
        _, stress_rpeaks = nk.ecg_peaks(stress_ecg, sampling_rate=sampling_rate, show=False, method='neurokit')
        

        amuse_data.append(load_BEAT_level(amuse_ecg, amuse_rpeaks))
        normal_data.append(load_BEAT_level(normal_ecg, normal_rpeaks))
        stress_data.append(load_BEAT_level(stress_ecg, stress_rpeaks))
    
    amuse = np.vstack(amuse_data)
    normal = np.vstack(normal_data)
    stress = np.vstack(stress_data)
    
    amuse_label = np.zeros(amuse.shape[0], np.int32)
    normal_label = np.ones(normal.shape[0], np.int32)
    stress_label = 2*np.ones(stress.shape[0], np.int32)
    
    Train = np.concatenate([amuse, normal, stress])
    Train_label = np.concatenate([amuse_label, normal_label, stress_label])
    
    train, train_y = shuffle_data(Train, Train_label)
    
    return train, train_y


# Plot 
def Plot_lr_curve(history):
    
    acc = history.history['accuracy']
    #val_acc = history.history['val_accuracy'] 
    loss = history.history['loss']
    #val_loss = history.history['val_loss']
    
    plt.figure(figsize=(5, 5))
    plt.plot(acc, label='Training Accuracy')
    #plt.plot(val_acc, label='Validation Accuracy')
    plt.legend()
    plt.ylabel('Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.show()

    plt.figure(figsize=(5,5))
    plt.plot(loss, label='Training Loss')
    #plt.plot(val_loss, label='Validation Loss')
    plt.legend()
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('epoch')
    plt.show()

def Plot_confusion_matrix(true_d, pred):
    
    pred_label = []
    true_label = []
    for i in range(len(pred)):
        pred_label.append(np.argmax(pred[i]))
    
    for i in range(len(true_d)):
        true_label.append(np.argmax(true_d[i]))
        
        
    # Accuracy
    accuracy = accuracy_score(true_label, pred_label)
    print('accuracy : ', np.round(accuracy,3))
    
    # Precision
    precision = precision_score(true_label, pred_label, average='macro')
    print('precision : ', np.round(precision,3))
    
    # Recall
    recall = recall_score(true_label, pred_label,average='macro')
    print('recall : ', np.round(recall,3))
    
    # Specificity
    specificity = recall_score(true_label, pred_label,average='macro')
    print('specificity : ',np.round(specificity,3))
    
    # F1 Score
    F1_score = f1_score(true_label, pred_label,average='macro')
    print('F1_score : ', np.round(F1_score,3))

    report = classification_report(true_label, pred_label, target_names=['Amuse', 'Normal','Stress'])
    print(report)
    confusion = confusion_matrix(true_label, pred_label)
    disp = ConfusionMatrixDisplay(confusion, display_labels=['Amuse', 'Normal', 'Stress'])
    disp.plot(cmap=plt.cm.Blues)
    plt.show()

# 1DCNN Model

In [291]:
CNN_1Dmodel = tf.keras.Sequential([
    Conv1D(16, 10, activation='relu', input_shape=(700, 1)),
    Conv1D(16, 10,activation='relu'),
    MaxPooling1D(2),
    
    
    Conv1D(32, 10,activation='relu'),
    Conv1D(32, 10,activation='relu'),
    MaxPooling1D(2),
    BatchNormalization(),
    Dropout(0.5),
    
    
    Conv1D(64, 10,activation='relu'),
    Conv1D(64, 10,activation='relu'),
    MaxPooling1D(2),
    
    Conv1D(128, 10,activation='relu'),
    Conv1D(128, 10,activation='relu'),
    MaxPooling1D(2),
    BatchNormalization(),
    Dropout(0.5),
    
    Conv1D(256, 3,activation='relu'),
    Conv1D(256, 3,activation='relu'),
    MaxPooling1D(2),
    
    Conv1D(512, 3,activation='relu'),
    Conv1D(512, 3,activation='relu'),
    BatchNormalization(),
    Dropout(0.5),
    Flatten(),
    
    Dense(512, activation='LeakyReLU'),
    Dense(1024, activation='LeakyReLU'),
    Dense(3, activation='softmax')
])

CNN_1Dmodel.summary()

Model: "sequential_10"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv1d_78 (Conv1D)          (None, 691, 16)           176       
                                                                 
 conv1d_79 (Conv1D)          (None, 682, 16)           2576      
                                                                 
 max_pooling1d_34 (MaxPoolin  (None, 341, 16)          0         
 g1D)                                                            
                                                                 
 conv1d_80 (Conv1D)          (None, 332, 32)           5152      
                                                                 
 conv1d_81 (Conv1D)          (None, 323, 32)           10272     
                                                                 
 max_pooling1d_35 (MaxPoolin  (None, 161, 32)          0         
 g1D)                                                

# K-Fold (1D-CNN)

In [292]:
# Initial settings
Data = "D:/Database/WESAD/"
sampling_rate = 700
n_split = 5 # num of Fold
random_state = 42
subjects = np.array(['S2', 'S3', 'S4', 'S5', 'S6', 'S7', 'S8', 'S9', 'S10', 'S11', 'S13', 'S14', 'S15', 'S16', 'S17'])
accuracies = [] # Fold accuracy list
fold_i = 0

# K-fold Code start
kf = KFold(n_splits=n_split, shuffle=True, random_state=random_state)

for train_index, test_index in kf.split(subjects):
    
    fold_i += 1
    print(fold_i, '번째 검증')
    
    train_subjects, test_subjects = subjects[train_index], subjects[test_index]
    
    # Train Data load
    train_data, train_labels = load_data(train_subjects)
    # Test Data load
    test_data, test_labels = load_data(test_subjects)
    
    # One-hot Encoding
    train_y = to_categorical(train_labels)
    test_y = to_categorical(test_labels)
    
    print('Train:', train_index, 'Test', test_index)
    
    # Model Training 
    model_clone = clone_model(CNN_1Dmodel)
    model_clone.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['accuracy'])
    history = model_clone.fit(train_data, train_y, epochs=50, batch_size=32) 
    scores = model_clone.evaluate(test_data, test_y)
    Plot_lr_curve(history)
    pred = model_clone.predict(test_data)
    Plot_confusion_matrix(test_y, pred)
    
    
    accuracies.append(scores[1])
    

average_accuracy = np.mean(accuracies)
print('평균 정확도 : ', average_accuracy)

1 번째 검증
Train: [ 1  2  3  4  5  6  7  8 10 12 13 14] Test [ 0  9 11]
Epoch 1/50

KeyboardInterrupt: 

# LSTM Model

In [285]:
LSTM_model = tf.keras.Sequential([
    
    LSTM(128, return_sequences=True, input_shape=(700,1)),
    Dropout(0.5),
    LSTM(128, return_sequences=True),
    Dropout(0.5),
    LSTM(256),
    
    Dense(512, activation='LeakyReLU'),
    Dense(1024, activation='LeakyReLU'),
    Dense(3, activation='softmax')
])

LSTM_model.summary()

Model: "sequential_5"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm_3 (LSTM)               (None, 700, 128)          66560     
                                                                 
 dropout_14 (Dropout)        (None, 700, 128)          0         
                                                                 
 lstm_4 (LSTM)               (None, 700, 128)          131584    
                                                                 
 dropout_15 (Dropout)        (None, 700, 128)          0         
                                                                 
 lstm_5 (LSTM)               (None, 256)               394240    
                                                                 
 dense_15 (Dense)            (None, 512)               131584    
                                                                 
 dense_16 (Dense)            (None, 1024)             

# LSTM 5-Fold

In [None]:
# Initial settings
Data = "D:/Database/WESAD/"
sampling_rate = 700
n_split = 5 # num of Fold
random_state = 42
subjects = np.array(['S2', 'S3', 'S4', 'S5', 'S6', 'S7', 'S8', 'S9', 'S10', 'S11', 'S13', 'S14', 'S15', 'S16', 'S17'])
accuracies = [] # Fold accuracy list
fold_i = 0

# K-fold Code start
kf = KFold(n_splits=n_split, shuffle=True, random_state=random_state)

for train_index, test_index in kf.split(subjects):
    
    fold_i += 1
    print(fold_i, '번째 검증')
    
    train_subjects, test_subjects = subjects[train_index], subjects[test_index]
    
    # Train Data load
    train_data, train_labels = load_data(train_subjects)
    # Test Data load
    test_data, test_labels = load_data(test_subjects)
    
    # One-hot Encoding
    train_y = to_categorical(train_labels)
    test_y = to_categorical(test_labels)
    
    print('Train:', train_index, 'Test', test_index)
    
    # Model Training 
    model_clone = clone_model(LSTM_model)
    model_clone.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['accuracy'])
    history = model_clone.fit(train_data, train_y, epochs=50, batch_size=32) 
    scores = model_clone.evaluate(test_data, test_y)
    Plot_lr_curve(history)
    pred = model_clone.predict(test_data)
    Plot_confusion_matrix(test_y, pred)
    
    
    accuracies.append(scores[1])
    

average_accuracy = np.mean(accuracies)
print('평균 정확도 : ', average_accuracy)

# CNN-LSTM1 Model

In [286]:
CNN_LSTM_model = tf.keras.Sequential([
    Conv1D(16, 10, activation='relu', input_shape=(700, 1)),
    Conv1D(16, 10,activation='relu'),
    MaxPooling1D(2),
    
    
    Conv1D(32, 10,activation='relu'),
    Conv1D(32, 10,activation='relu'),
    MaxPooling1D(2),
    BatchNormalization(),
    Dropout(0.5),
    
    
    Conv1D(64, 10,activation='relu'),
    Conv1D(64, 10,activation='relu'),
    MaxPooling1D(2),
    
    Conv1D(128, 10,activation='relu'),
    Conv1D(128, 10,activation='relu'),
    MaxPooling1D(2),
    BatchNormalization(),
    Dropout(0.5),
    
    Conv1D(256, 3,activation='relu'),
    Conv1D(256, 3,activation='relu'),
    MaxPooling1D(2),
    
    Conv1D(512, 3,activation='relu'),
    Conv1D(512, 3,activation='relu'),
    BatchNormalization(),
    Dropout(0.5),
    
    LSTM(128, return_sequences=True, input_shape=(700,1)),
    Dropout(0.5),
    LSTM(128, return_sequences=True),
    Dropout(0.5),
    LSTM(256),
    
    Dense(512, activation='LeakyReLU'),
    Dense(1024, activation='LeakyReLU'),
    Dense(3, activation='softmax')
])

CNN_LSTM_model.summary()

Model: "sequential_6"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv1d_48 (Conv1D)          (None, 691, 16)           176       
                                                                 
 conv1d_49 (Conv1D)          (None, 682, 16)           2576      
                                                                 
 max_pooling1d_20 (MaxPoolin  (None, 341, 16)          0         
 g1D)                                                            
                                                                 
 conv1d_50 (Conv1D)          (None, 332, 32)           5152      
                                                                 
 conv1d_51 (Conv1D)          (None, 323, 32)           10272     
                                                                 
 max_pooling1d_21 (MaxPoolin  (None, 161, 32)          0         
 g1D)                                                 

In [None]:
# Initial settings
Data = "D:/Database/WESAD/"
sampling_rate = 700
n_split = 5 # num of Fold
random_state = 42
subjects = np.array(['S2', 'S3', 'S4', 'S5', 'S6', 'S7', 'S8', 'S9', 'S10', 'S11', 'S13', 'S14', 'S15', 'S16', 'S17'])
accuracies = [] # Fold accuracy list
fold_i = 0

# K-fold Code start
kf = KFold(n_splits=n_split, shuffle=True, random_state=random_state)

for train_index, test_index in kf.split(subjects):
    
    fold_i += 1
    print(fold_i, '번째 검증')
    
    train_subjects, test_subjects = subjects[train_index], subjects[test_index]
    
    # Train Data load
    train_data, train_labels = load_data(train_subjects)
    # Test Data load
    test_data, test_labels = load_data(test_subjects)
    
    # One-hot Encoding
    train_y = to_categorical(train_labels)
    test_y = to_categorical(test_labels)
    
    print('Train:', train_index, 'Test', test_index)
    
    # Model Training 
    model_clone = clone_model(CNN_LSTM_model)
    model_clone.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['accuracy'])
    history = model_clone.fit(train_data, train_y, epochs=50, batch_size=32) 
    scores = model_clone.evaluate(test_data, test_y)
    Plot_lr_curve(history)
    pred = model_clone.predict(test_data)
    Plot_confusion_matrix(test_y, pred)
    
    
    accuracies.append(scores[1])
    

average_accuracy = np.mean(accuracies)
print('평균 정확도 : ', average_accuracy)

# CNN-LSTM2 Model

In [289]:
CNN_LSTM2 = tf.keras.Sequential([
    Conv1D(128, 10, activation='relu', input_shape=(700, 1)),
    Conv1D(128, 10,activation='relu'),
    MaxPooling1D(2),
    
    
    Conv1D(256, 10,activation='relu'),
    Conv1D(256, 10,activation='relu'),
    MaxPooling1D(2),
    BatchNormalization(),
    Dropout(0.5),
    
    
    Conv1D(512, 10,activation='relu'),
    Conv1D(512, 10,activation='relu'),
    MaxPooling1D(2),  
    
    LSTM(128, return_sequences=True, input_shape=(700,1)),
    Dropout(0.5),
    LSTM(128, return_sequences=True),
    Dropout(0.5),
    LSTM(256),
    
    Dense(512, activation='LeakyReLU'),
    Dense(1024, activation='LeakyReLU'),
    Dense(3, activation='softmax')
])

CNN_LSTM2.summary()

Model: "sequential_9"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv1d_72 (Conv1D)          (None, 691, 128)          1408      
                                                                 
 conv1d_73 (Conv1D)          (None, 682, 128)          163968    
                                                                 
 max_pooling1d_31 (MaxPoolin  (None, 341, 128)         0         
 g1D)                                                            
                                                                 
 conv1d_74 (Conv1D)          (None, 332, 256)          327936    
                                                                 
 conv1d_75 (Conv1D)          (None, 323, 256)          655616    
                                                                 
 max_pooling1d_32 (MaxPoolin  (None, 161, 256)         0         
 g1D)                                                 

# CNN-LSTM 5-Fold

In [275]:
# Initial settings
Data = "D:/Database/WESAD/"
sampling_rate = 700
n_split = 5 # num of Fold
random_state = 42
subjects = np.array(['S2', 'S3', 'S4', 'S5', 'S6', 'S7', 'S8', 'S9', 'S10', 'S11', 'S13', 'S14', 'S15', 'S16', 'S17'])
accuracies = [] # Fold accuracy list
fold_i = 0

# K-fold Code start
kf = KFold(n_splits=n_split, shuffle=True, random_state=random_state)

for train_index, test_index in kf.split(subjects):
    
    fold_i += 1
    print(fold_i, '번째 검증')
    
    train_subjects, test_subjects = subjects[train_index], subjects[test_index]
    
    # Train Data load
    train_data, train_labels = load_data(train_subjects)
    # Test Data load
    test_data, test_labels = load_data(test_subjects)
    
    # One-hot Encoding
    train_y = to_categorical(train_labels)
    test_y = to_categorical(test_labels)
    
    print('Train:', train_index, 'Test', test_index)
    
    # Model Training 
    model_clone = clone_model(CNN_LSTM2)
    model_clone.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['accuracy'])
    history = model_clone.fit(train_data, train_y, epochs=50, batch_size=32) 
    scores = model_clone.evaluate(test_data, test_y)
    Plot_lr_curve(history)
    pred = model_clone.predict(test_data)
    Plot_confusion_matrix(test_y, pred)
    
    
    accuracies.append(scores[1])
    

average_accuracy = np.mean(accuracies)
print('평균 정확도 : ', average_accuracy)