In [1]:
import h5py
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv1D, BatchNormalization, ReLU, MaxPooling1D, Flatten, Dense, Dropout, LSTM
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_absolute_error
import os
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import tensorflow as tf
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_absolute_error

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
# 1. 데이터 로드
csv_path = '/content/drive/MyDrive/Colab Notebooks/SBPDBP.csv'
df = pd.read_csv(csv_path)

In [4]:
# 2. 데이터 분할 (입력 X와 타겟 y)
ppg_cols = [col for col in df.columns if 'PPG' in col]
ecg_cols = [col for col in df.columns if 'ECG' in col]

X_ppg = df[ppg_cols].values
X_ecg = df[ecg_cols].values
y = df[['SBP', 'DBP']].values

In [5]:
# 3. 데이터 전처리
# 입력 데이터를 3차원 형태로 결합 (샘플 수, 타임스텝, 채널 수)
X = np.stack([X_ppg, X_ecg], axis=-1)
print(f"입력 데이터(X) shape: {X.shape}")
print(f"타겟 데이터(y) shape: {y.shape}")

입력 데이터(X) shape: (1696, 1250, 2)
타겟 데이터(y) shape: (1696, 2)


In [6]:
# 4. CNN-LSTM 모델
def create_cnnlstm_model(input_shape):
    inputs = Input(shape=input_shape)

    # CNN Block: 신호의 특징 추출
    x = Conv1D(filters=64, kernel_size=10, activation='relu')(inputs)
    x = Conv1D(filters=64, kernel_size=10, activation='relu')(x)
    x = MaxPooling1D(pool_size=3)(x)
    x = BatchNormalization()(x)

    # LSTM Block: 추출된 특징의 시간적 패턴 학습
    x = LSTM(100, return_sequences=True)(x)
    x = LSTM(100)(x)
    x = Dropout(0.5)(x)

    # Fully Connected Block
    x = Dense(100, activation='relu')(x)

    # 출력층(Output Layer): SBP, DBP 2개의 연속된 값을 예측
    outputs = Dense(2, activation='linear')(x)  # 회귀 문제이므로 linear 활성화 함수 사용

    model = Model(inputs=inputs, outputs=outputs)

    # 손실 함수와 평가 지표를 회귀에 맞게 변경 (MAE)
    model.compile(optimizer='adam', loss='mae', metrics=['mae'])

    return model

In [7]:
# 5. 순차적 4-폴드 교차 검증
num_folds, fold_size, train_size = 4, 400, 300
sbp_maes, dbp_maes = [], []

# 1600개 데이터만 사용
X = X[:1600]
y = y[:1600]

for i in range(num_folds):
    print(f"\n===== Fold {i+1}/{num_folds} 학습 및 평가 시작 =====")

    start_idx, end_idx = i * fold_size, (i + 1) * fold_size
    X_fold, y_fold = X[start_idx:end_idx], y[start_idx:end_idx]

    X_train, X_test = X_fold[:train_size], X_fold[train_size:]
    y_train, y_test = y_fold[:train_size], y_fold[train_size:]

    # 데이터 스케일링
    x_scaler, y_scaler = StandardScaler(), StandardScaler()

    # X 데이터 스케일링 (채널별로 적용되도록 reshape 후 복원)
    X_train_scaled = x_scaler.fit_transform(X_train.reshape(-1, X.shape[2])).reshape(X_train.shape)
    X_test_scaled = x_scaler.transform(X_test.reshape(-1, X.shape[2])).reshape(X_test.shape)

    # y 데이터 스케일링
    y_train_scaled = y_scaler.fit_transform(y_train)

    # 모델 생성 및 학습
    model = create_cnnlstm_model(input_shape=(X.shape[1], X.shape[2]))
    if i == 0:
        print("\n--- 모델 구조 ---")
        model.summary()  # 첫 번째 폴드에서만 구조 출력

    history = model.fit(
        X_train_scaled, y_train_scaled,
        epochs=50, batch_size=32, validation_split=0.2, verbose=0,
        callbacks=[tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)]
    )
    print(f"Fold {i+1}: 학습 완료 (Best epoch: {np.argmin(history.history['val_loss']) + 1})")

    # 예측 및 성능 평가
    y_pred_scaled = model.predict(X_test_scaled)
    y_pred = y_scaler.inverse_transform(y_pred_scaled)

    sbp_maes.append(mean_absolute_error(y_test[:, 0], y_pred[:, 0]))
    dbp_maes.append(mean_absolute_error(y_test[:, 1], y_pred[:, 1]))

    print(f"Fold {i+1} 결과 -> SBP MAE: {sbp_maes[-1]:.2f}, DBP MAE: {dbp_maes[-1]:.2f}")


===== Fold 1/4 학습 및 평가 시작 =====

--- 모델 구조 ---


Fold 1: 학습 완료 (Best epoch: 3)
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 328ms/step
Fold 1 결과 -> SBP MAE: 5.98, DBP MAE: 1.78

===== Fold 2/4 학습 및 평가 시작 =====
Fold 2: 학습 완료 (Best epoch: 1)
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 311ms/step
Fold 2 결과 -> SBP MAE: 7.54, DBP MAE: 3.90

===== Fold 3/4 학습 및 평가 시작 =====
Fold 3: 학습 완료 (Best epoch: 1)




[1m3/4[0m [32m━━━━━━━━━━━━━━━[0m[37m━━━━━[0m [1m0s[0m 422ms/step



[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 518ms/step
Fold 3 결과 -> SBP MAE: 6.30, DBP MAE: 2.30

===== Fold 4/4 학습 및 평가 시작 =====
Fold 4: 학습 완료 (Best epoch: 2)
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 312ms/step
Fold 4 결과 -> SBP MAE: 4.44, DBP MAE: 2.15


In [8]:
# 6. 최종 결과 분석
sbp_mean, sbp_std = np.mean(sbp_maes), np.std(sbp_maes)
dbp_mean, dbp_std = np.mean(dbp_maes), np.std(dbp_maes)

print("\n===== 최종 결과 요약 =====")
print(f"SBP MAE: {sbp_mean:.2f} ± {sbp_std:.2f}")
print(f"DBP MAE: {dbp_mean:.2f} ± {dbp_std:.2f}")
print("---------------------------")
print(f"SBP 목표 달성: {'성공' if (sbp_mean + sbp_std) <= 5 else '실패'} (결과: {sbp_mean + sbp_std:.2f})")
print(f"DBP 목표 달성: {'성공' if (dbp_mean + dbp_std) <= 5 else '실패'} (결과: {dbp_mean + dbp_std:.2f})")


===== 최종 결과 요약 =====
SBP MAE: 6.06 ± 1.11
DBP MAE: 2.53 ± 0.81
---------------------------
SBP 목표 달성: 실패 (결과: 7.17)
DBP 목표 달성: 성공 (결과: 3.35)
