In [45]:
import os
import numpy as np
import tensorflow as tf
from keras.models import Sequential
from keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout
from sklearn.model_selection import LeaveOneGroupOut
from sklearn.preprocessing import LabelEncoder
from keras.utils import to_categorical
from tensorflow.keras.models import load_model
from sklearn.metrics import classification_report, accuracy_score

# TensorFlow 로그 레벨 설정
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' 

# GPU 설정
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(f"Failed to set memory growth: {e}")
else:
    print("No GPU devices found. Running on CPU.")


In [46]:
def load_data(directory):
    """
    데이터를 로드하고, 레이블과 파일 이름을 포함하여 반환하는 함수

    Parameters:
        directory (str): 데이터가 저장된 디렉토리 경로

    Returns:
        X (np.ndarray): 입력 데이터
        y (np.ndarray): 레이블
        participants (np.ndarray): 참여자 정보
        file_names (np.ndarray): 각 샘플의 파일 이름
    """
    # Positive(긍정)과 Negative(부정) 데이터를 저장할 리스트 초기화
    files_positive = []
    files_negative = []

    # 디렉토리에 있는 파일들을 탐색
    for file in os.listdir(directory):
        # 파일 이름에 '_positive_'가 포함되어 있으면 positive 파일 리스트에 추가
        if '_positive_' in file:
            files_positive.append(os.path.join(directory, file))
        # 파일 이름에 '_negative_'가 포함되어 있으면 negative 파일 리스트에 추가
        elif '_negative_' in file:
            files_negative.append(os.path.join(directory, file))

    # positive 또는 negative 파일이 하나도 없을 경우 예외를 발생시킴
    if not files_positive or not files_negative:
        raise ValueError(f"No files found. Positive: {files_positive}, Negative: {files_negative}")

    # positive와 negative 파일의 데이터를 읽어서 각각 리스트에 저장
    positive_data = [np.load(file) for file in files_positive]  # positive 파일 데이터 로드
    negative_data = [np.load(file) for file in files_negative]  # negative 파일 데이터 로드

    # positive와 negative 데이터를 합쳐서 X 데이터로 생성
    # axis=0은 데이터를 샘플 축(행)으로 결합함
    X = np.concatenate(positive_data + negative_data, axis=0)

    # y 레이블 생성
    # positive 데이터의 개수만큼 1로 된 레이블 생성 후
    # negative 데이터의 개수만큼 0으로 된 레이블을 추가
    y = np.array([1] * len(np.concatenate(positive_data)) + [0] * len(np.concatenate(negative_data)))

    # 참여자 정보(participants) 및 파일 이름(file_names) 생성
    participants = []
    file_names = []
    for file in files_positive + files_negative:
        # 파일 이름에서 참여자 ID 추출 (파일 이름 예: "001_positive_data.npy")
        participant_id = os.path.basename(file).split('_')[0]
        data = np.load(file)
        num_samples = data.shape[0]
        participants.extend([participant_id] * num_samples)
        file_names.extend([os.path.basename(file)] * num_samples)

    # 리스트를 NumPy 배열로 변환
    participants = np.array(participants)
    file_names = np.array(file_names)

    # 최종적으로 데이터(X), 레이블(y), 참여자 정보(participants), 파일 이름(file_names)를 반환
    return X, y, participants, file_names

In [47]:
def create_cnn_model(input_shape, num_classes, save_path):
    """
    CNN 모델을 생성, 컴파일하고 .h5 파일로 저장하는 함수

    Parameters:
        input_shape (tuple): 입력 데이터의 형태 (예: (timesteps, features))
        num_classes (int): 출력 클래스의 수 (예: 감정 레이블의 개수)
        save_path (str): 생성된 모델을 저장할 .h5 파일 경로

    Returns:
        model (Sequential): 생성된 CNN 모델
    """
    # Sequential 모델 생성
    model = Sequential([
        # 1D Convolutional Layer: 32개의 필터, kernel_size=3, 활성화 함수 ReLU
        # input_shape 지정 (첫 번째 레이어에서만 필요)
        Conv1D(32, kernel_size=3, activation='relu', input_shape=input_shape),

        # MaxPooling Layer: 풀링 크기(pool_size)=2, strides=1
        MaxPooling1D(pool_size=2, strides=1),

        # 1D Convolutional Layer: 64개의 필터, kernel_size=3, 활성화 함수 ReLU
        Conv1D(64, kernel_size=3, activation='relu'),

        # MaxPooling Layer: 풀링 크기(pool_size)=2, strides=1
        MaxPooling1D(pool_size=2, strides=1),

        # Flatten Layer: 1D 데이터를 1차원으로 펼침
        Flatten(),

        # Fully Connected Layer: 128개의 뉴런, 활성화 함수 ReLU
        Dense(128, activation='relu'),

        # Dropout Layer: 50%의 뉴런을 무작위로 비활성화 (overfitting 방지)
        Dropout(0.5),

        # Output Layer: 클래스 수에 따라 Softmax 활성화 함수 사용
        Dense(num_classes, activation='softmax')
    ])

    # 모델 컴파일: 최적화 알고리즘, 손실 함수, 평가지표 설정
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

    # 모델 저장
    model.save(save_path)
    print(f"Model saved to {save_path}")

    return model  # 생성된 모델 반환


In [48]:
# 데이터 로드 및 평가 함수
def evaluate_saved_model(data_directory, model_save_path):
    # 데이터 디렉토리에서 테스트 데이터 로드
    print("Loading test data...")
    X_test = []
    y_test = []

    for file_name in os.listdir(data_directory):
        if file_name.endswith(".npy"):
            data_path = os.path.join(data_directory, file_name)
            data = np.load(data_path, allow_pickle=True)  # allow_pickle=True로 수정

            label = 1 if "positive" in file_name else 0
            X_test.append(data)
            y_test.append(label)

    # 데이터 리스트를 동일한 길이로 패딩
    max_channels = max(data.shape[0] if data.ndim > 2 else 1 for data in X_test)
    max_timesteps = max(data.shape[-1] if data.ndim > 1 else len(data) for data in X_test)

    X_test_padded = []

    for data in X_test:
        if data.ndim == 3:
            padded_data = np.pad(data, ((0, max_channels - data.shape[0]), (0, 0), (0, max_timesteps - data.shape[-1])), mode='constant')
        elif data.ndim == 2:
            padded_data = np.pad(data, ((0, max_channels - data.shape[0]), (0, max_timesteps - data.shape[-1])), mode='constant')
        else:
            padded_data = np.pad(data, (0, max_timesteps - len(data)), mode='constant')
        X_test_padded.append(padded_data)

    # numpy 배열로 변환
    X_test = np.array(X_test_padded)
    y_test = np.array(y_test)

    print("Original X_test shape:", X_test.shape)

    # 데이터 재구성
    try:
        target_timesteps = 100  # 모델에서 요구하는 타임스텝 크기
        target_features = 64  # 모델에서 요구하는 피처 크기

        reshaped_X_test = X_test.reshape(X_test.shape[0], target_timesteps, target_features)
        reshaped_y_test = np.repeat(y_test, reshaped_X_test.shape[0] // y_test.shape[0])

        print("Reshaped X_test shape:", reshaped_X_test.shape)
        print("Reshaped y_test shape:", reshaped_y_test.shape)
    except ValueError as e:
        print(f"Reshape Error: {e}")
        return

    # 저장된 모델 로드
    print("Loading model from:", model_save_path)
    model = load_model(model_save_path)

    # 모델 평가
    print("Evaluating the model...")
    loss, accuracy = model.evaluate(reshaped_X_test, reshaped_y_test, verbose=1)
    print(f"Test Loss: {loss}, Test Accuracy: {accuracy}")

    # 모델 예측
    predictions = model.predict(reshaped_X_test)
    predicted_classes = (predictions > 0.5).astype(int)

    for i, (actual, predicted) in enumerate(zip(reshaped_y_test, predicted_classes)):
        print(f"Sample {i}: Actual: {actual}, Predicted: {predicted[0]}")


In [49]:
def main():
    """
    메인 함수: CNN 모델 생성, 저장 및 평가 실행
    """
    # 데이터 디렉토리 및 모델 저장 경로 설정
    data_directory = '/home/bcml1/2025_EMOTION/DEAP_EEG/ch_BPF'  # 데이터를 저장한 디렉토리 경로
    model_save_path = 'cnn_intra_model.h5'  # 모델 저장 경로

    # 모델 생성 및 저장
    print("Creating and saving the model...")
    input_shape = (100, 64)  # 예시 입력 데이터 형태 (timesteps, features)
    num_classes = 2  # 예시 클래스 개수
    create_cnn_model(input_shape, num_classes, model_save_path)

    # 저장된 모델 평가
    print("\nEvaluating the saved model...")
    evaluate_saved_model(data_directory,model_save_path)

if __name__ == "__main__":
    main()




Creating and saving the model...
Model saved to cnn_intra_model.h5

Evaluating the saved model...
Loading test data...
Original X_test shape: (20, 22, 8, 5120)
Reshaped X_test shape: (160, 22, 1, 5120)
Reshaped y_test shape: (160,)
Loading model from: cnn_intra_model.h5
Evaluating the model...


ValueError: Exception encountered when calling Sequential.call().

[1mInvalid input shape for input Tensor("data:0", shape=(32, 22, 1, 5120), dtype=float32). Expected shape (None, 100, 64), but input has incompatible shape (32, 22, 1, 5120)[0m

Arguments received by Sequential.call():
  • inputs=tf.Tensor(shape=(32, 22, 1, 5120), dtype=float32)
  • training=False
  • mask=None