In [21]:
import os
import re
import numpy as np
import scipy.io
import scipy.signal as signal
from sklearn.model_selection import KFold
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.model_selection import StratifiedKFold
from tensorflow.keras import layers, models, optimizers, regularizers
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping, ModelCheckpoint
from tensorflow.keras import metrics
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score,  accuracy_score
from sklearn.metrics import roc_auc_score
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt

In [22]:
# GPU 메모리 설정 코드 추가
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        logical_gpus = tf.config.experimental.list_logical_devices('GPU')
        print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
    except RuntimeError as e:
        print(e)

In [23]:
import os

# ADHD 및 Control 파일 디렉토리 설정
adhd_save_dir = r"C:\Users\gsubi\OneDrive\Desktop\EEG_project\ADHD_epochs"
control_save_dir = r"C:\Users\gsubi\OneDrive\Desktop\EEG_project\Control_epochs"

# 파일명에서 피험자 ID와 에포크 번호를 추출하는 함수 (언더바 앞은 모두 피험자 ID로 설정)
def extract_subject_epoch(file_name):
    # 파일명을 먼저 확장자를 제외한 부분으로 나눔
    base_name = os.path.splitext(file_name)[0]
    
    # 언더바('_')로 파일명을 나누어 피험자 ID와 에포크 번호를 추출
    if '_' in base_name:
        parts = base_name.split('_')
        subject_id = parts[0]  # 언더바 앞의 부분이 피험자 ID
        epoch_num = parts[1][1:]  # 'e' 다음의 숫자가 에포크 번호
        return subject_id, epoch_num
    return None, None

# ADHD와 Control 디렉토리에서 파일 읽기
adhd_files = os.listdir(adhd_save_dir)
control_files = os.listdir(control_save_dir)

adhd_subjects = {}
control_subjects = {}

# ADHD 파일들의 피험자 ID별로 그룹화
for file in adhd_files:
    subject_id, epoch_num = extract_subject_epoch(file)
    if subject_id:
        if subject_id not in adhd_subjects:
            adhd_subjects[subject_id] = []
        adhd_subjects[subject_id].append(os.path.join(adhd_save_dir, file))

# Control 파일들의 피험자 ID별로 그룹화
for file in control_files:
    subject_id, epoch_num = extract_subject_epoch(file)
    if subject_id:
        if subject_id not in control_subjects:
            control_subjects[subject_id] = []
        control_subjects[subject_id].append(os.path.join(control_save_dir, file))

print(f"ADHD subjects: {adhd_subjects.keys()}")
print(f"Control subjects: {control_subjects.keys()}")


ADHD subjects: dict_keys(['v10p', 'v12p', 'v14p', 'v15p', 'v173', 'v177', 'v179', 'v181', 'v183', 'v18p', 'v190', 'v196', 'v198', 'v19p', 'v1p', 'v200', 'v204', 'v206', 'v209', 'v20p', 'v213', 'v215', 'v219', 'v21p', 'v227', 'v22p', 'v231', 'v234', 'v236', 'v238', 'v244', 'v246', 'v24p', 'v250', 'v254', 'v25p', 'v263', 'v265', 'v270', 'v274', 'v279', 'v27p', 'v284', 'v286', 'v288', 'v28p', 'v29p', 'v30p', 'v31p', 'v32p', 'v33p', 'v34p', 'v35p', 'v36p', 'v37p', 'v38p', 'v39p', 'v3p', 'v40p', 'v6p', 'v8p'])
Control subjects: dict_keys(['v107', 'v108', 'v109', 'v110', 'v111', 'v112', 'v113', 'v114', 'v115', 'v116', 'v117', 'v118', 'v120', 'v121', 'v123', 'v125', 'v127', 'v129', 'v131', 'v133', 'v134', 'v138', 'v140', 'v143', 'v147', 'v149', 'v151', 'v297', 'v298', 'v299', 'v300', 'v302', 'v303', 'v304', 'v305', 'v306', 'v307', 'v308', 'v309', 'v310', 'v41p', 'v42p', 'v43p', 'v44p', 'v45p', 'v46p', 'v47p', 'v48p', 'v49p', 'v50p', 'v51p', 'v52p', 'v53p', 'v54p', 'v55p', 'v56p', 'v57p', 'v58

In [24]:
def create_cnn_model(input_shape):
    model = models.Sequential()

    # 첫 번째 Residual Block
    
    model.add(layers.Conv2D(64, (3, 3), padding='same', input_shape=input_shape, name='conv1'))
    model.add(layers.BatchNormalization(name='batch_norm1'))
    model.add(layers.ReLU(name='relu1'))
    model.add(layers.MaxPooling2D((2, 2), name='maxpool1'))
    model.add(layers.Dropout(0.3, name='dropout1'))

    # 두 번째 Residual Block
    model.add(layers.Conv2D(128, (3, 3), padding='same', name='conv2'))
    model.add(layers.BatchNormalization(name='batch_norm2'))
    model.add(layers.ReLU(name='relu2'))
    model.add(layers.MaxPooling2D((2, 2), name='maxpool2'))
    model.add(layers.Dropout(0.4, name='dropout2'))

    # 세 번째 Residual Block
    model.add(layers.Conv2D(256, (3, 3), padding='same', name='conv3'))
    model.add(layers.BatchNormalization(name='batch_norm3'))
    model.add(layers.ReLU(name='relu3'))
    model.add(layers.MaxPooling2D((2, 2), name='maxpool3'))
    model.add(layers.Dropout(0.5, name='dropout3'))

    # 네 번째 Residual Block
    model.add(layers.Conv2D(512, (3, 3), padding='same', name='conv4'))
    model.add(layers.BatchNormalization(name='batch_norm4'))
    model.add(layers.ReLU(name='relu4'))
    model.add(layers.MaxPooling2D((2, 2), name='maxpool4'))
    model.add(layers.Dropout(0.5, name='dropout4'))

    # Fully Connected Layer
    model.add(layers.Flatten(name='flatten'))
    model.add(layers.Dense(256, activation='relu', kernel_regularizer=regularizers.l2(0.001), name='fc1'))
    model.add(layers.Dropout(0.5, name='dropout5'))

    # Output Layer
    model.add(layers.Dense(1, activation='sigmoid', name='output'))

    optimizer = optimizers.Adam(learning_rate=0.001)
    model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])

    return model


# 학습률 스케줄러 (ReduceLROnPlateau)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=0.00001, verbose=1)

# Early Stopping (과적합 방지)
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)

# Model Checkpoint (최적의 가중치 저장)
checkpoint = ModelCheckpoint('best_model.h5', monitor='val_loss', save_best_only=True, mode='min')


In [25]:
# ADHD와 Control 피험자 분리
adhd_subjects_list = list(adhd_subjects.keys())
control_subjects_list = list(control_subjects.keys())

# ADHD는 1, Control은 0으로 라벨 설정
all_subjects = adhd_subjects_list + control_subjects_list
all_labels = [1] * len(adhd_subjects_list) + [0] * len(control_subjects_list)

In [26]:
def plot_training_history(history):
    # 훈련 및 검증 손실 시각화
    plt.figure(figsize=(12, 5))

    # Loss 그래프
    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'], label='Train Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Loss over epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()

    # Accuracy 그래프
    plt.subplot(1, 2, 2)
    plt.plot(history.history['accuracy'], label='Train Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Accuracy over epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()

    plt.show()

def plot_tsne(model, data, labels):
    # 모델의 중간 레이어에서 출력된 임베딩을 얻기 위해 flatten 레이어 전까지 모델을 자름
    layer_outputs = models.Model(inputs=model.input, outputs=model.get_layer('flatten').output)
    
    # 학습된 모델을 통해 데이터 임베딩 추출
    embeddings = layer_outputs.predict(data)
    
    # t-SNE 적용 (n_components=2로 2차원 공간에 시각화)
    tsne = TSNE(n_components=2, random_state=42)
    reduced_data = tsne.fit_transform(embeddings)

    # ADHD와 Control 데이터 구분하여 시각화
    adhd_idx = (labels == 1)
    control_idx = (labels == 0)

    plt.figure(figsize=(10, 6))
    plt.scatter(reduced_data[adhd_idx, 0], reduced_data[adhd_idx, 1], color='red', label='ADHD', alpha=0.5)
    plt.scatter(reduced_data[control_idx, 0], reduced_data[control_idx, 1], color='blue', label='Control', alpha=0.5)
    
    # x축과 y축 레이블 추가
    plt.xlabel('t-SNE Component 1')
    plt.ylabel('t-SNE Component 2')

    plt.title('t-SNE visualization of data embeddings')
    plt.legend()
    plt.show()


In [27]:
# 샘플링 주파수 설정 (128Hz)
fs = 128

# 데이터를 불러오고 각 채널별로 PSD를 계산하는 함수
def load_data_with_psd(file_paths):
    data = []
    for file_path in file_paths:
        mat = scipy.io.loadmat(file_path)
        
        # 파일명에서 확장자를 제외한 기본 파일명 추출
        base_name = os.path.splitext(os.path.basename(file_path))[0]
        
        # 기본 파일명을 그대로 키로 사용하여 데이터 로드
        if base_name in mat:
            raw_data = mat[base_name]
            
            num_channels = raw_data.shape[1]  # 19개 채널 (열 개수)
            psd_data = []
            
            # 각 채널별로 PSD 계산
            for channel in range(num_channels):
                channel_data = raw_data[:, channel]  # 각 채널 데이터 (2560 샘플)
                freqs, psd = signal.welch(channel_data, fs=fs, nperseg=128)  # nperseg을 샘플 길이에 맞게 설정
                psd_data.append(psd)
            
            psd_data = np.array(psd_data)
            
            # psd 데이터를 CNN이 처리할 수 있는 형태로 변환 (예: 64x65 크기로 재구성)
            if psd_data.shape[1] >= 64:  # 주파수 대역이 64 이상일 때만
                psd_reshaped = psd_data[:, :64].reshape(19, 64, 1)  # 채널 수를 유지한 채 19x64x1 크기로 변환
                data.append(psd_reshaped)
            else:
                raise ValueError(f"PSD data is too small for {base_name}. Expected at least 64 frequency bins.")
        else:
            raise KeyError(f"'{base_name}' key not found in {file_path}")
    
    return np.array(data)

In [28]:
# 평가 지표 저장 리스트
train_accuracy_list = []
accuracy_list = []
precision_adhd_list = []
precision_control_list = []
recall_adhd_list = []
recall_control_list = []
f1_adhd_list = []
f1_control_list = []
auc_list = []

In [None]:
# 초기화
all_data = []
all_labels = []

# 각 ADHD 피험자에 대해 PSD 데이터를 로드하고 all_data와 all_labels에 추가
for subject in adhd_subjects.keys():
    data = load_data_with_psd(adhd_subjects[subject])
    all_data.extend(data)
    all_labels.extend([1] * len(data))  # ADHD는 1로 라벨링

# 각 Control 피험자에 대해 PSD 데이터를 로드하고 all_data와 all_labels에 추가
for subject in control_subjects.keys():
    data = load_data_with_psd(control_subjects[subject])
    all_data.extend(data)
    all_labels.extend([0] * len(data))  # Control은 0으로 라벨링

# 데이터를 NumPy 배열로 변환
all_data = np.array(all_data)
all_labels = np.array(all_labels)

# 데이터의 길이 확인
print(f"Length of all_data: {len(all_data)}")
print(f"Length of all_labels: {len(all_labels)}")

# 학습 및 테스트 데이터 분할
x_train, x_test, y_train, y_test = train_test_split(all_data, all_labels, test_size=0.2, random_state=42)

# 입력 데이터의 형태에 따라 input_shape를 설정합니다.
input_shape = all_data.shape[1:]  # (19, 64, 1) 형태일 것으로 예상됩니다.

# CNN 모델 생성 및 학습
cnn_model = create_cnn_model(input_shape)
history = cnn_model.fit(x_train, y_train, epochs=10, validation_data=(x_test, y_test), verbose=1)

# 모델 평가
train_metrics = cnn_model.evaluate(x_train, y_train, verbose=1)
test_metrics = cnn_model.evaluate(x_test, y_test, verbose=1)

# 평가 결과 추출
train_loss, train_acc = train_metrics[0], train_metrics[1]
test_loss, test_acc = test_metrics[0], test_metrics[1]

# 예측 결과 가져오기
y_pred = cnn_model.predict(x_test)
y_pred_classes = (y_pred > 0.5).astype("int32")

# Confusion Matrix 기반의 Precision, Recall, F1-Score 계산
test_precision_adhd = precision_score(y_test, y_pred_classes, pos_label=1)
test_precision_control = precision_score(y_test, y_pred_classes, pos_label=0)
test_recall_adhd = recall_score(y_test, y_pred_classes, pos_label=1)
test_recall_control = recall_score(y_test, y_pred_classes, pos_label=0)
test_f1_adhd = f1_score(y_test, y_pred_classes, pos_label=1)
test_f1_control = f1_score(y_test, y_pred_classes, pos_label=0)

# AUC 계산
test_auc = roc_auc_score(y_test, y_pred)

# 성능 결과 출력
print("Model Performance without Cross-Validation")
print(f"Test Accuracy: {test_acc}")
print(f"Test Precision (ADHD): {test_precision_adhd}, Control: {test_precision_control}")
print(f"Test Recall (ADHD): {test_recall_adhd}, Control: {test_recall_control}")
print(f"Test F1-Score (ADHD): {test_f1_adhd}, Control: {test_f1_control}")
print(f"Test AUC: {test_auc}")


Length of all_data: 778
Length of all_labels: 778


TypeError: 'tuple' object is not callable