In [1]:
import os
import numpy as np
import pandas as pd
from google.colab import drive
import h5py #mat 열어보기위한.

drive.mount('/content/drive')

MAT_PATH = '/content/drive/MyDrive/Colab Notebooks/p044036.mat'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## **mat 형태의 파일에서 최상위 키를 꺼내 SW에 저장 -> SW에서 각 값들을 변수들에 꺼내서 저장함**

In [2]:
mat = h5py.File(MAT_PATH, 'r') #mat 파일 열어서 저장
SW  = mat['Subj_Wins'] # SW에 최상위 키(우리가 쓸 값들이 들어있는) 저장

ds_PPG = SW['PPG_F']
ds_ECG = SW['ECG_F']
ds_SBP = SW['SegSBP']
ds_DBP = SW['SegDBP']

ds_PPG.shape # 나머지들도 데이터가 1행 N열 이런식으로 있을듯

(1, 1696)

In [3]:
N = 1696

# **mat에서 데이터를 직접 꺼내면 위치를 가르키는 값이 나오기에 실제데이터를 꺼내오는 함수를 구현 후 -> for문에서 각 list들에 값들을 append**

In [4]:
def read_cell_vector(h5file, dataset, idx): # for문에서
    ref = dataset[0, idx]
    arr = np.array(h5file[ref])
    return np.squeeze(arr)

#함수로 mat 데이터셋에서 포인터처럼 실제 값을 뽑아오는 과정을 선언해놓고
#아래 for문에서 list에 실제값들을 집어넣음.

ppg_list, ecg_list, sbp_list, dbp_list = [], [], [], []

for i in range(N):
    ppg = read_cell_vector(mat, ds_PPG, i).ravel()
    ecg = read_cell_vector(mat, ds_ECG, i).ravel()
    sbp = float(np.ravel(read_cell_vector(mat, ds_SBP, i))[0])
    dbp = float(np.ravel(read_cell_vector(mat, ds_DBP, i))[0])

    ppg_list.append(ppg)
    ecg_list.append(ecg)
    sbp_list.append(sbp)
    dbp_list.append(dbp)

# *데이터 프레임 형태로 저장*

In [5]:
df = pd.DataFrame({
    'ppg': ppg_list,   # ndarray(1250,)
    'ecg': ecg_list,   # ndarray(1250,)
    'sbp': sbp_list,   # float
    'dbp': dbp_list    # float
})

print("rows:", len(df))
print("ppg/ecg 길이 예:", df.loc[0, 'ppg'].shape, df.loc[0, 'ecg'].shape)
print("\n")
print("SBP/DBP 통계:")
print(df[['sbp','dbp']].describe().round(3))

print("\n샘플 3개:")
print(df.head(3))

has_nan = df[['sbp','dbp']].isna().any().any()
print("\nNaN:", has_nan)
print("SBP 범위:", float(np.min(df['sbp'])), "→", float(np.max(df['sbp'])))
print("DBP 범위:", float(np.min(df['dbp'])), "→", float(np.max(df['dbp'])))

rows: 1696
ppg/ecg 길이 예: (1250,) (1250,)


SBP/DBP 통계:
            sbp       dbp
count  1696.000  1696.000
mean    114.151    65.836
std       5.786     2.773
min      96.942    55.989
25%     109.731    63.749
50%     114.145    65.965
75%     118.730    67.514
max     126.806    73.455

샘플 3개:
                                                 ppg  \
0  [0.102875708056771, 0.09054385313944432, 0.078...   
1  [0.7937207803338165, 0.7754172660644632, 0.756...   
2  [0.41167803992789, 0.45914175538516705, 0.5083...   

                                                 ecg         sbp        dbp  
0  [0.07822516569462085, 0.08121745378862912, 0.0...  125.542373  66.147942  
1  [0.1407351166777705, 0.16467849835858261, 0.19...  124.780405  65.793008  
2  [0.07667023630053167, 0.0856216305788385, 0.07...  123.412770  65.102678  

NaN: False
SBP 범위: 96.94159152233816 → 126.80580723837063
DBP 범위: 55.98929895452683 → 73.45502027844395


# **블록 인덱스 만들기**

In [6]:
import numpy as np

def assert_no_overlap(*idx_groups):
    all_idx = np.concatenate([np.asarray(g) for g in idx_groups])
    assert len(np.unique(all_idx)) == len(all_idx), "overlap detected"

In [7]:
import numpy as np

N = len(df)
n_folds = 4
base, rem = N // n_folds, N % n_folds
block_sizes = [base + (1 if i < rem else 0) for i in range(n_folds)]

blocks = []
start = 0
for bsz in block_sizes:
    end = start + bsz
    blocks.append(np.arange(start, end))  # 순차 인덱스
    start = end

lens = [len(b) for b in blocks]
print("block sizes:", lens, "| sum =", sum(lens))
assert all(l >= 400 for l in lens), "블록 길이가 400 미만인 것이 있습니다 (300+100 분할 불가)."
print("first block head:", blocks[0][:5], "| last block tail:", blocks[-1][-5:])


block sizes: [424, 424, 424, 424] | sum = 1696
first block head: [0 1 2 3 4] | last block tail: [1691 1692 1693 1694 1695]


In [8]:
tvt_blocks = []

trainval_k = 300
test_k = 100
val_size = 50

for b in blocks:
    bsz = len(b)
    assert bsz >= trainval_k + test_k, f"block len {bsz} < 400"

    end = b[-1] + 1
    test_idx  = np.arange(end - test_k, end)
    trainval  = np.arange(end - test_k - trainval_k, end - test_k)

    val_idx   = trainval[-val_size:]
    train_idx = trainval[:-val_size]

    #오버래핑 검사
    assert_no_overlap(train_idx, val_idx, test_idx)

    tvt_blocks.append((train_idx, val_idx, test_idx))

print([tuple(map(len, x)) for x in tvt_blocks])

[(250, 50, 100), (250, 50, 100), (250, 50, 100), (250, 50, 100)]


In [9]:
[(len(tr), len(va), len(te)) for tr, va, te in tvt_blocks]

[(250, 50, 100), (250, 50, 100), (250, 50, 100), (250, 50, 100)]

In [10]:
folds_tvt = tvt_blocks
len(folds_tvt), [tuple(map(len, x)) for x in folds_tvt]

(4, [(250, 50, 100), (250, 50, 100), (250, 50, 100), (250, 50, 100)])

# **평탄화** -> 회귀 입력 방식에 맞추기 위해서


In [11]:
import numpy as np

def XY_from_df(df, idx):
  p = np.stack([df['ppg'].iloc[i] for i in idx])  # (n,1250)
  e = np.stack([df['ecg'].iloc[i] for i in idx])  # (n,1250)
  X = np.concatenate([p, e], axis=1).astype('float32')
  Y = np.stack([df['sbp'].iloc[i] for i in idx], axis=0).astype('float32')
  Y = np.column_stack([Y, [df['dbp'].iloc[i] for i in idx]]).astype('float32')
  return X, Y

tr, va, te = folds_tvt[0]
Xtr, Ytr = XY_from_df(df, tr)
Xva, Yva = XY_from_df(df, va)
Xte, Yte = XY_from_df(df, te)


Xtr.shape, Ytr.shape, Xva.shape, Yva.shape, Xte.shape, Yte.shape

((250, 2500), (250, 2), (50, 2500), (50, 2), (100, 2500), (100, 2))

# **입출력 차원 체크**

굳이 해야되는지 모르겠음..

In [12]:
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_absolute_error

sx = StandardScaler()
Xtr_std = sx.fit_transform(Xtr)
Xva_std = sx.transform(Xva)
Xte_std = sx.transform(Xte)

reg = Ridge(alpha=1.0, random_state=42)
reg.fit(Xtr_std, Ytr)

tr_pred = reg.predict(Xtr_std)
va_pred = reg.predict(Xva_std)
te_pred = reg.predict(Xte_std)

pred_va = reg.predict(Xva_std)
pred_te = reg.predict(Xte_std)

def mae_report(y, p):
    mae_sbp = mean_absolute_error(y[:,0], p[:,0])
    mae_dbp = mean_absolute_error(y[:,1], p[:,1])
    return mae_sbp, mae_dbp, (mae_sbp+mae_dbp)/2

mae_va = mae_report(Yva, pred_va)
mae_te = mae_report(Yte, pred_te)
print("VAL MAE (SBP, DBP, AVG):", [round(x,3) for x in mae_va])
print("TEST MAE(SBP, DBP, AVG):", [round(x,3) for x in mae_te])


VAL MAE (SBP, DBP, AVG): [7.271, 2.65, 4.961]
TEST MAE(SBP, DBP, AVG): [8.869, 3.019, 5.944]


# **df -> 텐서로 변환(n, 2, 1250) 형태**

한 세그먼트 ppg(1250) ecg(1250) -> (n,2,1250) 샘플개수, 채널2개(PPG,ECG), 1250 라벨도 y:(n,2) -> SBD, DBP

In [13]:
import numpy as np
import torch

def to_tensor_3d(df, idx):
  p = np.stack([df['ppg'].iloc[i] for i in idx])
  e = np.stack([df['ecg'].iloc[i] for i in idx])
  X = np.stack([p, e], axis=1).astype('float32')

  Y = np.column_stack([
      [df['sbp'].iloc[i] for i in idx],
      [df['dbp'].iloc[i] for i in idx],
  ]).astype('float32')# (n,2)

  return torch.from_numpy(X), torch.from_numpy(Y)


tr_idx, va_idx, te_idx = folds_tvt[0]
Xtr3, Ytr = to_tensor_3d(df, tr_idx)
Xva3, Yva = to_tensor_3d(df, va_idx)
Xte3, Yte = to_tensor_3d(df, te_idx)
Xtr3.shape, Ytr.shape

(torch.Size([250, 2, 1250]), torch.Size([250, 2]))

# **데이터셋 포맷**

학습할때 쓰려고 미리 배치 뽑기 편하게 설정

In [14]:
import torch
from torch.utils.data import Dataset, DataLoader

class SegDataset(Dataset):
    def __init__(self, X, Y):
        self.X = X
        self.Y = Y
    def __len__(self):
        return self.X.shape[0]
    def __getitem__(self, i):
        return self.X[i], self.Y[i]

In [15]:
bs = 64
train_ds = SegDataset(Xtr3, Ytr)
val_ds   = SegDataset(Xva3, Yva)
test_ds  = SegDataset(Xte3, Yte)

train_loader = DataLoader(train_ds, batch_size=bs, shuffle=False)
val_loader   = DataLoader(val_ds,   batch_size=bs, shuffle=False)
test_loader  = DataLoader(test_ds,  batch_size=bs, shuffle=False)

xb, yb = next(iter(train_loader))
xb.shape, yb.shape

(torch.Size([64, 2, 1250]), torch.Size([64, 2]))

In [16]:
import torch

def fit_channel_stats(X):
    mean = X.mean(dim=(0,2))
    std  = X.std(dim=(0,2), unbiased=False).clamp_min(1e-8)
    return mean, std

def apply_channel_norm(X, mean, std):
    return (X - mean[None, :, None]) / std[None, :, None]

tr_idx, va_idx, te_idx = folds_tvt[0]
Xtr3, Ytr = to_tensor_3d(df, tr_idx)
Xva3, Yva = to_tensor_3d(df, va_idx)
Xte3, Yte = to_tensor_3d(df, te_idx)

m, s = fit_channel_stats(Xtr3)
Xtr3n = apply_channel_norm(Xtr3, m, s)
Xva3n = apply_channel_norm(Xva3, m, s)
Xte3n = apply_channel_norm(Xte3, m, s)

m.tolist(), s.tolist()

([0.4485641419887543, 0.1287144422531128],
 [0.2546645402908325, 0.15269353985786438])

# **모델**

CNN 1D CONV + GAP + MLP 구성

입력 파형이 시계열 -> CNN 씀

GAP -> CNN이 뽑은 특징을 압축하고 평균으로 요약해줌

MLP -> SBP/DBP 두개의 상관이 큼 그래서 동시에 예측하면 -> 두 값이 같은 정보를 공유하면서 예측이됨.


In [17]:
import torch
import torch.nn as nn

class CNNRegressor(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv1d(2, 32, kernel_size=7, padding=3),
            nn.ReLU(),
            nn.Conv1d(32, 64, kernel_size=5, padding=2),
            nn.ReLU(),
            nn.Conv1d(64, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Dropout(0.1),
        )

        self.gap = nn.AdaptiveAvgPool1d(1) # (B,64,1)
        self.head = nn.Sequential(
            nn.Flatten(), # (B,64)
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 2) # [SBP, DBP]
        )

    def forward(self, x): # x: (B,2,1250)
        x = self.net(x) # (B,64,T)
        x = self.gap(x) # (B,64,1)
        out = self.head(x) # (B,2)
        return out

model = CNNRegressor()
model


CNNRegressor(
  (net): Sequential(
    (0): Conv1d(2, 32, kernel_size=(7,), stride=(1,), padding=(3,))
    (1): ReLU()
    (2): Conv1d(32, 64, kernel_size=(5,), stride=(1,), padding=(2,))
    (3): ReLU()
    (4): Conv1d(64, 64, kernel_size=(3,), stride=(1,), padding=(1,))
    (5): ReLU()
    (6): Dropout(p=0.1, inplace=False)
  )
  (gap): AdaptiveAvgPool1d(output_size=1)
  (head): Sequential(
    (0): Flatten(start_dim=1, end_dim=-1)
    (1): Linear(in_features=64, out_features=32, bias=True)
    (2): ReLU()
    (3): Linear(in_features=32, out_features=2, bias=True)
  )
)

# **학습루프 지정 + 4 Fold loop**

In [18]:
import torch, numpy as np
from torch.utils.data import DataLoader

device   = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
E        = 100
patience = 10
bs, lr, wd = 64, 1e-3, 1e-4
criterion = torch.nn.MSELoss()

@torch.no_grad()
def eval_mse_mae(model, loader):
    model.eval()
    device = next(model.parameters()).device
    tot_mse, tot_mae, n = 0.0, 0.0, 0
    for xb, yb in loader:
        xb, yb = xb.to(device), yb.to(device)
        pred = model(xb)
        diff = pred - yb
        mse = (diff.pow(2).mean(dim=1))
        mae = (diff.abs().mean(dim=1))
        tot_mse += mse.sum().item()
        tot_mae += mae.sum().item()
        n += xb.size(0)
    return tot_mse / n, tot_mae / n


def run_epoch(model, loader, train, optimizer=None):
    if train: model.train()
    else:     model.eval()
    tot = 0.0
    with torch.set_grad_enabled(train):
        for xb, yb in loader:
            xb, yb = xb.to(device), yb.to(device)
            pred = model(xb)
            loss = criterion(pred, yb)
            if train:
                optimizer.zero_grad(); loss.backward(); optimizer.step()
            tot += loss.item() * xb.size(0)
    return tot / len(loader.dataset)

states = []

for (tr_idx, va_idx, te_idx) in folds_tvt:
    assert_no_overlap(tr_idx, va_idx, te_idx)

    Xtr, Ytr = to_tensor_3d(df, tr_idx)
    Xva, Yva = to_tensor_3d(df, va_idx)
    Xte, Yte = to_tensor_3d(df, te_idx)
    m, s     = fit_channel_stats(Xtr)
    Xtr = apply_channel_norm(Xtr, m, s)
    Xva = apply_channel_norm(Xva, m, s)
    Xte = apply_channel_norm(Xte, m, s)


#안섞음.
    tr_loader = DataLoader(SegDataset(Xtr, Ytr), batch_size=bs, shuffle=False)
    va_loader = DataLoader(SegDataset(Xva, Yva), batch_size=bs, shuffle=False)
    te_loader = DataLoader(SegDataset(Xte, Yte), batch_size=bs, shuffle=False)

    model = CNNRegressor().to(device)
    optim = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=wd)

    states.append({
        "model": model, "optim": optim,
        "tr_loader": tr_loader, "va_loader": va_loader, "te_loader": te_loader,
        "best_val": float('inf'), "best_state": None,
        "bad": 0, "done": False
    })

In [19]:
for ep in range(1, E+1):
    line = [f"Epoch {ep:03d}"]
    for i, st in enumerate(states, 1):
        if st["done"]:
            line.append(f"[Fold {i}: done]")
            continue

        _ = run_epoch(st["model"], st["tr_loader"], train=True, optimizer=st["optim"])
        va_mse, va_mae = eval_mse_mae(st["model"], st["va_loader"])

        line.append(f"[Fold {i}: MSE {va_mse:.2f} | MAE {va_mae:.2f}]")

        if va_mse + 1e-6 < st["best_val"]:
            st["best_val"]  = va_mse
            st["bad"]       = 0
            st["best_state"] = {k: v.cpu().clone() for k, v in st["model"].state_dict().items()}
        else:
            st["bad"] += 1
            if st["bad"] >= patience:
                st["done"] = True

    print(" ".join(line))
    if all(st["done"] for st in states):
        print("all folds early-stopped")
        break

Epoch 001 [Fold 1: MSE 9084.61 | MAE 92.35] [Fold 2: MSE 7874.35 | MAE 85.11] [Fold 3: MSE 9157.94 | MAE 92.04] [Fold 4: MSE 9116.22 | MAE 91.91]
Epoch 002 [Fold 1: MSE 9055.43 | MAE 92.21] [Fold 2: MSE 7856.11 | MAE 85.02] [Fold 3: MSE 9137.91 | MAE 91.95] [Fold 4: MSE 9103.73 | MAE 91.84]
Epoch 003 [Fold 1: MSE 8988.15 | MAE 91.90] [Fold 2: MSE 7815.34 | MAE 84.81] [Fold 3: MSE 9095.26 | MAE 91.76] [Fold 4: MSE 9078.49 | MAE 91.70]
Epoch 004 [Fold 1: MSE 8841.10 | MAE 91.19] [Fold 2: MSE 7727.41 | MAE 84.36] [Fold 3: MSE 9007.10 | MAE 91.37] [Fold 4: MSE 9024.82 | MAE 91.42]
Epoch 005 [Fold 1: MSE 8548.14 | MAE 89.76] [Fold 2: MSE 7554.53 | MAE 83.45] [Fold 3: MSE 8839.84 | MAE 90.62] [Fold 4: MSE 8915.60 | MAE 90.82]
Epoch 006 [Fold 1: MSE 8021.55 | MAE 87.12] [Fold 2: MSE 7239.97 | MAE 81.76] [Fold 3: MSE 8539.03 | MAE 89.24] [Fold 4: MSE 8709.72 | MAE 89.68]
Epoch 007 [Fold 1: MSE 7124.57 | MAE 82.34] [Fold 2: MSE 6702.16 | MAE 78.74] [Fold 3: MSE 8026.06 | MAE 86.79] [Fold 4: MSE

In [22]:
import torch, numpy as np

@torch.no_grad()
def fold_test_metrics(st):
    model = st["model"]
    model.load_state_dict(st["best_state"])
    model.eval()
    device = next(model.parameters()).device

    abs_sbp, abs_dbp = [], []
    sq_sbp,  sq_dbp  = [], []
    for xb, yb in st["te_loader"]:
        xb, yb = xb.to(device), yb.to(device)
        pred = model(xb)
        diff = pred - yb
        abs_sbp.append(diff[:, 0].abs().cpu())
        abs_dbp.append(diff[:, 1].abs().cpu())
        sq_sbp.append((diff[:, 0]**2).cpu())
        sq_dbp.append((diff[:, 1]**2).cpu())

    mae_sbp = torch.cat(abs_sbp).mean().item()
    mae_dbp = torch.cat(abs_dbp).mean().item()
    mae_avg = (mae_sbp + mae_dbp) / 2.0
    mse_sbp = torch.cat(sq_sbp).mean().item()
    mse_dbp = torch.cat(sq_dbp).mean().item()
    return mae_avg, mse_sbp, mse_dbp

fold_mae, fold_mse_sbp, fold_mse_dbp = [], [], []
for i, st in enumerate(states, 1):
    mae_avg, mse_sbp, mse_dbp = fold_test_metrics(st)
    fold_mae.append(mae_avg); fold_mse_sbp.append(mse_sbp); fold_mse_dbp.append(mse_dbp)
    print(f"Fold {i} | MAE {mae_avg:.3f} | MSE(SBP) {mse_sbp:.3f} | MSE(DBP) {mse_dbp:.3f}")

print("\n== Summary ==")
print(f"MAE (평균)      = {np.mean(fold_mae):.3f} ± {np.std(fold_mae, ddof=1):.3f}")
print(f"MSE SBP (평균)  = {np.mean(fold_mse_sbp):.3f} ± {np.std(fold_mse_sbp, ddof=1):.3f}")
print(f"MSE DBP (평균)  = {np.mean(fold_mse_dbp):.3f} ± {np.std(fold_mse_dbp, ddof=1):.3f}")

Fold 1 | MAE 4.009 | MSE(SBP) 50.916 | MSE(DBP) 8.914
Fold 2 | MAE 6.181 | MSE(SBP) 62.044 | MSE(DBP) 50.468
Fold 3 | MAE 4.322 | MSE(SBP) 50.547 | MSE(DBP) 11.475
Fold 4 | MAE 3.469 | MSE(SBP) 22.862 | MSE(DBP) 15.602

== Summary ==
MAE (평균)      = 4.495 ± 1.178
MSE SBP (평균)  = 46.592 ± 16.696
MSE DBP (평균)  = 21.615 ± 19.432
