### SVM 로드 -> `ocsvm_loaded`로 사용

In [None]:
# SVM 로드
import joblib

loaded = joblib.load("ppg_ocsvm_model_1203.joblib")
ocsvm_loaded = loaded["model"]
scaler_loaded = loaded["scaler"]

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path
import scipy.signal as signal
from scipy.interpolate import interp1d
import math
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

In [None]:
plt.rcParams['figure.figsize'] = (12, 4)
plt.rcParams['axes.grid'] = True

# `prepare_time_axis(df)`
```
입력: df(컬럼에 t_us 포함)
출력: df(컬럼에 time_s 추가), fs_est(추정 주파수)
```

In [None]:
# @title
def prepare_time_axis(df):
    """pd의 df를 인자로 받아서 copy후 time_s컬럼에 기존 t_us (마이크로초 단위)를 초단위 변환"""
    df = df.copy()

    # t_us: microseconds → seconds
    df["time_s"] = df["t_us"] * 1e-6

    # 샘플링 주파수 추정
    t = df["time_s"].values
    dt = np.diff(t) # np.diff(t)는 연속된 원소의 차이를 구하는 함수
    # df는 연속된 시간 값들의 차이(Δt)만 모아놓은 1차원 배열이 됨

    dt_med = np.median(dt) # 간격들의 중앙값 활용
    fs_est = 1.0 / dt_med # 추정된fs = (1 / 샘플 간격)

    print(f"추정된 샘플링 주파수 ≈ {fs_est:.2f} Hz")

    return df, fs_est

# `design_bandpass()`
```
입력: fs (주파수), low (하한 컷오프(Hz)), high (상한 컷오프(Hz)). order (필터 차수)
출력: b, a (bandpass filter 계수)
```

In [None]:
# low_cut  = 0.5   # Hz
# high_cut = 6.0   # Hz
# order = 2

In [None]:
# @title
def design_bandpass(fs, low, high, order=2):
    """샘플링 주파수, 상/하한 컷오프 주파수, 필터 차수(기본 2) 입력받아서,
    설계된 대역통과 필터의 계수 반환 -> signal.filtfilt(b, a, x)로 실제 신호에 적용"""
    nyq = 0.5 * fs
    low_n  = low / nyq
    high_n = high / nyq
    b, a = signal.butter(order, [low_n, high_n], btype='band')
    return b, a

# `apply_bandpass()`
```
입력: df, b, a (필터 계수), ppg_col (필터링 대상 컬럼명, 기본='ppg')
출력: df (ppg_filt 컬럼 추가)
```

In [None]:
# @title
def apply_bandpass(df, b, a, ppg_col="ppg"):
    """컬럼명(ppg)을 전달받아서 실제로 대역폭 필터를 적용해서..."""
    # PPG 신호(df[ppg_col])를 numpy 배열로 가져옴
    x = df[ppg_col].values.astype(float)
    x_filt = signal.filtfilt(b, a, x)   # filtfilt(b, a, x)로 필터 적용

    # 새로운 df 생성, 새 컬럼으로 필터링된 신호를 추가
    df_filt = df.copy()
    df_filt[ppg_col + "_filt"] = x_filt
    return df_filt # 새로운 데이터프레임 (ppg_filt 컬럼 추가) 반환

# `detect_systolic_peaks()`
```
입력: time_s (초 단위 시간축 배열), ppg, fs, min_hr_bpm, max_hr_bpm (심박수 범위, 기본값 설정됨)
출력: peaks (peak 인덱스 배열), props (peak 특성(dict))
```

In [None]:
# @title
def detect_systolic_peaks(time_s, ppg, fs, min_hr_bpm=40, max_hr_bpm=180):
    """시간 축(초), ppg 신호, 샘플링 주파수, 허용 가능한 최대, 최소 심박수를 입력받고
    """
    # 최대 심박수를 기준으로 peak간의 최소(minimum?) 간격을 설정
    min_distance_sec = 60.0 / max_hr_bpm   # Ex. HR이 180이면 60/180 ≈ 최소 간격 0.33s
    min_distance_samples = int(min_distance_sec * fs) # 신호는 sample 단위 -> 샘플 개수로 바꿔야
    # PPG에서 peak(심박)를 찾을 때, 심박수가 너무 가까이 두 번 찍히는 건 잘못된 peak
    # 위의 예제에서 각 beat 사이 간격 = 1/3초 = "0.33초로 제한"

    # prominence는 대략 signal range의 일부로 설정
    # 잡음 peak를 제외하기 위한 임계치? -> 두드러지는 값 설정?
    prom = 0.1 * (np.max(ppg) - np.min(ppg))

    # signal.find_peaks() 사용하여 봉우리 감지
    peaks, props = signal.find_peaks(
        ppg,
        distance=min_distance_samples,
        prominence=prom
    )

    print(f"Detected {len(peaks)} systolic peaks.")
    return peaks, props

아래는 수축기 피크 사이를 35샘플로 조정한 `detect_systolic_peak()` 함

In [None]:
# @title
def detect_systolic_peaks(time_s, ppg, fs, min_hr_bpm=40, max_hr_bpm=180):
    """시간 축(초), ppg 신호, 샘플링 주파수, 허용 가능한 최대, 최소 심박수를 입력받고
    """
    min_distance_sec = 0.35 # 35개 샘플로 제한
    min_distance_samples = int(min_distance_sec * fs)

    prom = 0.1 * (np.max(ppg) - np.min(ppg))

    # signal.find_peaks() 사용하여 봉우리 감지
    peaks, props = signal.find_peaks(
        ppg,
        distance=min_distance_samples,
        prominence=prom
    )

    print(f"Detected {len(peaks)} systolic peaks.")
    return peaks, props

# `find_local_valley()`
```
입력: ppg, center_idx (검색 중심 index = peak idx), search_radius_samples (탐색 반경, 자동 설정)
출력: valley_idx
```

In [None]:
# @title
def find_local_valley(ppg, center_idx, search_radius_samples):
    # 신호 범위를 넘기지 않도록 범위 조절
    # [center_idx - r, center_idx + r] 구간만 살펴봄
    start = max(center_idx - search_radius_samples, 0)
    end = min(center_idx + search_radius_samples, len(ppg)-1)
    local_segment = ppg[start:end+1]

    # 구간에서 최소값 위치 찾기
    local_min_idx = np.argmin(local_segment)

    # 전체 ppg 기준의 인덱스로 변환
    valley_idx = start + local_min_idx
    return valley_idx # valley 인덱스 반환

# `find_foot_points()`
```
입력: time_s, ppg, peak (peak 인덱스 배열), fs
출력: start_indices (각 피크의 시작index 리스트), end_indices (각 피크의 끝 index 리스트)
```

In [None]:
# @title
def find_foot_points(time_s, ppg, peaks, fs):
    """
    각 peak 이전의 valley만 찾고,
    다음 심주기의 시작점 = 현재 심주기의 끝점이 되도록 연속성 보장
    """
    Ts_min, Ts_max = 0.15, 0.26

    valleys = []

    for i, pk in enumerate(peaks):
        Ts_min_samp = int(Ts_min * fs)
        Ts_max_samp = int(Ts_max * fs)

        # Find valley before peak
        start_min = max(pk - Ts_max_samp, 0)
        start_max = max(pk - Ts_min_samp, 0)

        if start_max <= start_min:
            valleys.append(None)
        else:
            local_seg = ppg[start_min:start_max+1]
            local_min_idx = np.argmin(local_seg)
            valley_idx = start_min + local_min_idx
            valleys.append(valley_idx)

    # 각 심주기를 valley[i] → peak[i] → valley[i+1]로 정의
    start_indices = []
    end_indices = []

    for i in range(len(peaks)):
        if valleys[i] is None:
            start_indices.append(None)
            end_indices.append(None)
            continue

        start_idx = valleys[i]

        # 마지막 peak인 경우
        if i == len(peaks) - 1:
            # 마지막 peak 이후 일정 구간에서 valley 찾기
            Te_min_samp = int(0.44 * fs)
            Te_max_samp = int(0.74 * fs)
            end_min = min(peaks[i] + Te_min_samp, len(ppg)-1)
            end_max = min(peaks[i] + Te_max_samp, len(ppg)-1)

            if end_max > end_min:
                local_seg = ppg[end_min:end_max+1]
                local_min_idx = np.argmin(local_seg)
                end_idx = end_min + local_min_idx
            else:
                end_idx = None
        else:
            # 다음 valley를 end point로 사용
            end_idx = valleys[i+1]

        start_indices.append(start_idx)
        end_indices.append(end_idx)

    return start_indices, end_indices

# `compute_pulse_width()`, `compute_ab_ratio()`
```
입력: time_s, ppg, start_idx, peak_idx, end_idx
출력: Pw (Pulse Width (float) 또는 None), Ab_over_Aa (float 또는 None)
```

In [None]:
# @title
def compute_pulse_width(time_s, ppg, start_idx, peak_idx, end_idx):
    """PPG 한 주기에서 Pulse Width(PW) 계산
    초단위 시간축 배열, ppg 신호, 주기 시작/끝/수축기 피크 인덱스를 받아서 계산"""
    if start_idx is None or end_idx is None:
        return None

    v_start = ppg[start_idx]
    v_peak  = ppg[peak_idx]
    As = v_peak - v_start
    if As <= 0: # Peak가 valley보다 낮으면 → 비정상 → None 반환
        return None

    # valley와 peak 절반 높이 정의
    half_level = v_start + As * 0.5

    # start_idx → peak_idx 구간 잘라서 seg1으로 정의
    seg1 = ppg[start_idx:peak_idx+1]
    t1 = time_s[start_idx:peak_idx+1]

    # find index where seg1 crosses half_level
    try:
        # 절반 높이보다 커지는 시점?
        idx_before = np.where(seg1 < half_level)[0]
        idx_after = np.where(seg1 >= half_level)[0]
        # simple approach: first index above half_level
        if len(idx_before) > 0 and len(idx_after) > 0:
            i1 = idx_after[0] # 딱 절반 높이(half_level까지 올라왔을 때)
        else:
            i1 = None
    except:
        i1 = None

    # Half-level 하강 시점 찾기
    seg2 = ppg[peak_idx:end_idx+1]
    t2 = time_s[peak_idx:end_idx+1]

    try:
        idx_after2 = np.where(seg2 < half_level)[0]
        if len(idx_after2) > 0:
            i2 = idx_after2[0]
        else:
            i2 = None
    except:
        i2 = None

    if (i1 is None) or (i2 is None):
        return None

    # i1, i2가 전부 탐지 되었을 때 list에 저장
    t_half1 = t1[i1]
    t_half2 = t2[i2]
    Pw = t_half2 - t_half1
    if Pw <= 0:
        return None
    return Pw

In [None]:
# @title
def compute_ab_ratio(time_s, ppg, start_idx, peak_idx, end_idx):
    if start_idx is None or end_idx is None:
        return None

    seg = ppg[start_idx:end_idx+1]
    t_seg = time_s[start_idx:end_idx+1]

    # second derivative
    d1 = np.gradient(seg, np.mean(np.diff(t_seg)))
    d2 = np.gradient(d1, np.mean(np.diff(t_seg)))

    # we expect 'a' near systolic upstroke, 'b' early diastole
    # split segment into two halves as rough approximation
    mid_idx = len(seg) // 2

    # find max in first half (a-wave)
    a_region = d2[:mid_idx]
    if len(a_region) == 0:
        return None
    a_idx_local = np.argmax(a_region)
    Aa = a_region[a_idx_local]

    # find max in second half (b-wave) - could be smaller
    b_region = d2[mid_idx:]
    if len(b_region) == 0:
        return None
    b_idx_local = np.argmax(b_region)
    Ab = b_region[b_idx_local]

    if Aa == 0:
        return None

    return Ab / Aa

# `extract_pulse_features()`
```
입력: df, fs, ppg_col (PPG 컬럼명='ppg_filt), label (라벨 (옵션))
출력: features_df (심주기마다 특징값 담긴 DataFrame), (peaks, start_indices, end_indices)
```

In [None]:
# @title
def extract_pulse_features(df, fs, ppg_col="ppg_filt", label=None):
    time_s = df["time_s"].values
    ppg = df[ppg_col].values.astype(float)

    peaks, _ = detect_systolic_peaks(time_s, ppg, fs)
    start_indices, end_indices = find_foot_points(time_s, ppg, peaks, fs)  # 수정된 함수 사용

    feature_rows = []
    dropped = 0

    for i, pk in enumerate(peaks):
        s_idx = start_indices[i]
        e_idx = end_indices[i]

        # 필터링 조건
        if (s_idx is None) or (e_idx is None):
            dropped += 1
            continue
        if s_idx >= pk or pk >= e_idx:
            dropped += 1
            continue

        # 나머지 특징 추출 코드는 동일
        t_start = time_s[s_idx]
        t_peak = time_s[pk]
        t_end = time_s[e_idx]

        As = ppg[pk] - ppg[s_idx]
        if As <= 0:
            dropped += 1
            continue

        Tc = t_peak - t_start
        if Tc < 0.1 or Tc > 0.5:
            dropped += 1
            continue

        Pw = compute_pulse_width(time_s, ppg, s_idx, pk, e_idx)

        if i < len(peaks) - 1:
            Pi = time_s[peaks[i+1]] - t_peak
        else:
            Pi = None

        if (Pi is not None) and (As > 0):
            Pi_over_As = Pi / As
        else:
            Pi_over_As = None

        Ab_over_Aa = compute_ab_ratio(time_s, ppg, s_idx, pk, e_idx)
        if Ab_over_Aa is not None and Ab_over_Aa > 3.0:
            dropped += 1
            continue

        feature_rows.append({
            "t_peak": t_peak,
            "As": As,
            "Pw": Pw,
            "Pi_over_As": Pi_over_As,
            "Tc": Tc,
            "Ab_over_Aa": Ab_over_Aa,
            "start_idx": s_idx,
            "peak_idx": pk,
            "end_idx": e_idx,
            "label": label
        })

    features_df = pd.DataFrame(feature_rows)
    print(f"{dropped}개의 심주기 오류 발생으로 Drop")
    print(f"Extracted {len(features_df)} pulses with features.")

    return features_df, (peaks, start_indices, end_indices)

# `extract_pulse_waveforms()`
```
입력:
출력: np array ((N, seq_len) shape의 pulse)
```
> 심주기 단위로 잘라서 150으로 리샘플링

In [None]:
# @title
def extract_pulse_waveforms(df_signal, features_df, seq_len=150):
    """
    Extract pulse waveforms using start_idx and end_idx from features_df.
    Resample each pulse to fixed length seq_len.
    """
    pulses = []
    for _, row in features_df.iterrows():
        start_time = int(row["start_idx"])
        end_time = int(row["end_idx"])

        x = df_signal["ppg_filt"].values[start_time:end_time+1] # if "ppg_filt" in df_signal.columns else df_signal["ppg_filt_smaf"].values[start_time:end_time+1]

        # Resample to seq_len
        old_indices = np.linspace(0, 1, len(x))
        new_indices = np.linspace(0, 1, seq_len)
        x_resampled = np.interp(new_indices, old_indices, x)
        pulses.append(x_resampled)

    print(f"Extracted {len(pulses)} pulses.")
    return np.array(pulses)  # shape: (N, seq_len)

# 시각화 코드 (숨김코드 실행)
* `plot_ppg(df, fs, title, duration_sec=8)`: ppg 원 신호 그래프 출력
* `plot_filter_response(b, a, fs, title)`: 필터가 억제하는 대역폭 그래프
* `plot_before_after(df, col_raw, col_filt, title, duration_sec=8)`: 필터 통과 전후 ppg 그래프

In [None]:
# @title
def plot_ppg(df, fs, title, duration_sec=8):
    """df, 샘플링 주파수(추정값), 제목, 구간 길이를 인자로 받아서 ppg 그래프 출력"""
    # .values를 활용해 각 컬럼 값들을 numpy 배열로 가져옴
    t = df["time_s"].values # 가로축은 시간축
    x = df["ppg"].values # 세로축은 ppg 시그널로 plot 생성

    t0 = t[0]
    # duration_sec을 기준으로 필요한 범위만 선택
    mask = (t - t0) <= duration_sec

    print(f"\nPlotting: {title}")
    # 샘플 개수가 duration_sec × fs 근접해야 정상
    print(f"Using samples = {mask.sum()} (approx {duration_sec} seconds)")

    plt.figure()
    plt.plot(t[mask] - t0, x[mask])
    plt.title(title)
    plt.xlabel("Time (s)")
    plt.ylabel("PPG (raw)")
    plt.show()

def plot_filter_response(b, a, fs, title):
    """구해진 필터 계수 b, a와 샘플링 주파수, 제목을 전달받아서
    필터가 어떤 주파수를 통과시키고, 어떤 주파수를 억제하는지 그래프 출력"""
    w, h = signal.freqz(b, a, worN=1024)
    freqs = w * fs / (2 * np.pi)

    plt.figure(figsize=(8,4))
    plt.plot(freqs, 20 * np.log10(np.abs(h)))
    plt.title(f"Frequency Response: {title}")
    plt.xlabel("Frequency (Hz)")
    plt.ylabel("Magnitude (dB)")
    plt.grid(True)
    plt.xlim(0, fs/2)
    plt.show()

def plot_before_after(df, col_raw, col_filt, title, duration_sec=8):
    """df, 원본 신호와 필터 통과 신호, 제목, 구간 길이 입력받아서 시각화"""
    t = df["time_s"].values # 시간 컬럼
    x_raw  = df[col_raw].values
    x_filt = df[col_filt].values

    # 시간 배열을 생성하고 시작부터 duration_sec만큼 데이터 선택(mask)
    t0 = t[0]
    mask = (t - t0) <= duration_sec

    print(f"\nPlotting before/after for: {title}")
    print(f"Samples used: {mask.sum()}")

    # 같은 구간에서 원본 신호와 필터 신호를 하나의 그래프에 출력
    plt.figure(figsize=(12,5))
    plt.plot(t[mask] - t0, x_raw[mask],  label="Raw",  alpha=0.5)
    plt.plot(t[mask] - t0, x_filt[mask], label="Filtered", linewidth=2)
    plt.title(title)
    plt.xlabel("Time (s)")
    plt.ylabel("PPG")
    plt.legend()
    plt.show()

### CSV 준비

In [None]:
# =============================
# 1. File paths
# =============================

path1 = "/content/1204_tae_static_1.csv"
path2 = "/content/1204_tae_static_2.csv"

# =============================
# 2. Load CSV
# =============================
# pd 데이터프레임화
df1 = pd.read_csv(path1)
df2 = pd.read_csv(path2)

print("\ncsv1 head:")
display(df1.head())

print("\ncsv2 head:")
display(df2.head())

In [None]:
# =============================
# 3. 초 단위 시간축 변환 및 시각화
# =============================
# (t_us → seconds) + estimate fs
# 시간 축 정리 및 샘플링 주파수 추정

df1, fs_df1 = prepare_time_axis(df1)
df2, fs_df2 = prepare_time_axis(df2)

# plot_ppg(df, fs, title, duration_sec=8)
plot_ppg(df1, fs_df1, "csv1 PPG")
plot_ppg(df2, fs_df2, "csv2 PPG")

In [None]:
# =====================
# design_bandpass() -> 출력은 필터링 계수 a, b
# =====================
import scipy.signal as signal

low_cut  = 0.5   # Hz
high_cut = 6.0   # Hz
order = 2

b_df1, a_df1 = design_bandpass(fs_df1, low_cut, high_cut, order)
b_df2, a_df2 = design_bandpass(fs_df2, low_cut, high_cut, order)

print("Filter designed.")

In [None]:
# 필터 억제 그래프 출력

# plot_filter_response(b_df1, a_df1, fs_df1, "Band-pass 0.5–6 Hz (csv1 fs)")
# plot_filter_response(b_df2, a_df2, fs_df2, "Band-pass 0.5–6 Hz (csv2 fs)")

In [None]:
# =====================
# 실제 필터 적용
# =====================

df1_filt = apply_bandpass(df1, b_df1, a_df1)
df2_filt = apply_bandpass(df2, b_df2, a_df2)

print("Filtering finished.")
print("Columns in df1:", df1_filt.columns.tolist())
print("Columns in df2:", df2_filt.columns.tolist())

In [None]:
# =====================
# 필터 통과 전후 시각화1
# =====================
plot_before_after(
    df1_filt,
    col_raw="ppg",
    col_filt="ppg_filt",
    title="df1 PPG",
    duration_sec=5
)

plot_before_after(
    df2_filt,
    col_raw="ppg",
    col_filt="ppg_filt",
    title="df2 PPG",
    duration_sec=5
)

In [None]:
# =====================
# 필터 통과 전후 시각화2
# =====================
def compare_hist(df, col_raw, col_filt, title):
    plt.figure(figsize=(10,4))
    plt.hist(df[col_raw].values,  bins=60, alpha=0.4, label="Raw")
    plt.hist(df[col_filt].values, bins=60, alpha=0.4, label="Filtered")
    plt.title(title)
    plt.xlabel("PPG value")
    plt.ylabel("Count")
    plt.legend()
    plt.show()

compare_hist(df1_filt, "ppg", "ppg_filt",   "Value distribution (csv1)")
compare_hist(df2_filt, "ppg", "ppg_filt", "Value distribution (csv2)")

# 5 개 특징점으로 SVM 학습 (12.03)

In [None]:
# =====================
# 필터 통과 전후 시각화2
# =====================
df1_features, (df1_peaks, df1_s_idx, df1_e_idx) = extract_pulse_features(
    df1_filt,
    fs_df1,
    ppg_col="ppg_filt",
    label="esther")

df2_features, (df2_peaks, df2_s_idx, df2_e_idx) = extract_pulse_features(
    df2_filt,
    fs_df2,
    ppg_col="ppg_filt",
    label="p1")

In [None]:
# =====================
# df2 features -> csv로 저장
# =====================
name = input("csv 파일 이름:")
df2_features.to_csv(f"features_{name}.csv", index=True)

# `plot_feature_hist()`
```
입력: (features_df, col, title)
출력: 히스토그램 분포 시각화
```

In [None]:
# @title
def plot_feature_hist(features_df, col, title):
    plt.figure(figsize=(6,4))
    vals = features_df[col].dropna().values
    plt.hist(vals, bins=40, alpha=0.7)
    plt.title(title)
    plt.xlabel(col)
    plt.ylabel("Count")
    plt.grid(True)
    plt.show()

In [None]:
plot_feature_hist(df1_features, "As", "Static - Systolic Amplitude (As)")
plot_feature_hist(df1_features, "Pw", "Static - Pulse Width (Pw)")
plot_feature_hist(esther_features, "Tc", "Static - Crest Time (Tc)")
plot_feature_hist(df1_features, "Pi_over_As", "Static - Pi/As")
plot_feature_hist(df1_features, "Ab_over_Aa", "Static - Ab_w / Aa_w")

## 분포에서 너무 떨어진 데이터들 Drop

In [None]:
# =====================
# 특징 분포에서 너무 떨어진 이상치 값들 제거
# =====================

cols = ['As', 'Pw', 'Pi_over_As', 'Ab_over_Aa']

def iqr_filter(df, cols, k=1.5):
    mask = np.ones(len(df), dtype=bool)
    for c in cols:
        q1 = df[c].quantile(0.25)
        q3 = df[c].quantile(0.75)
        iqr = q3 - q1
        low  = q1 - k * iqr
        high = q3 + k * iqr
        mask &= df[c].between(low, high)
    return mask

# df1_features 특징점들 분포를 보고, 너무 떨어진 값들 Drop
df_mask = iqr_filter(df1_features, cols)
df1_clean = df1_features[df_mask].copy()
print(df1_clean.shape)

# # df2에 적용
# df2_mask = iqr_filter(df2_features, cols)
# df2_clean = df2_features[df2_mask].copy()

In [None]:
# 특징점만 모아서 새 df 생성
cols = ['As', 'Pw', 'Pi_over_As', 'Ab_over_Aa']
df1_features_subset = df1_clean[cols].copy()
print(f"features_subset.shaep: {df1_features_subset.shape}")

# # df2에 적용
# df2_features_subset = df2_clean[cols].copy()
# print(df2_features_subset.shape)

In [None]:
# =====================
# 결측값 방지
# =====================
df1_features_subset = df1_features_subset.dropna().values
# df2_features_subset = df2_features_subset.dropna().values # 결측값 버림!!!

> 사용자 라벨(=0) 만들기

In [None]:
# ['label']을 만들어서 모두 0으로 채우기
df1_features_subset['label'] = 0
print(df1_features_subset[:10])

## SVM 교차 검증 학습

In [None]:
from sklearn.svm import OneClassSVM
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import KFold

In [None]:
# =====================
# SVM 하이퍼파라미터 튜닝
# =====================

# 1) 사용자 데이터만 사용 (label == 0 이라고 가정)
user_mask = df1_features_subset['label'] == 0
X_user = df1_features_subset[user_mask].drop(columns=['label']).values  # (N, d)
print("전체 사용자 데이터:", X_user.shape)

# 교차검증 전 test셋 분리
X_train_all, X_test = train_test_split(
    X_user, test_size=0.2, random_state=42
)
print("train:", X_train_all.shape, "test:", X_test.shape)

# 2) 하이퍼파라미터 후보
nu_list = [0.01, 0.03, 0.05, 0.07, 0.1]
gamma_list  = ['scale', 0.1, 0.01]

kf = KFold(n_splits=5, shuffle=True, random_state=42)

best_score  = -np.inf
best_params = None

def predict_with_threshold(scores, thr):
    # 1 = 정상, -1 = 이상
    return np.where(scores >= thr, 1, -1)

for nu in nu_list:
    for gamma in gamma_list:
        fold_scores = []

        for tr_idx, val_idx in kf.split(X_train_all):
            X_tr  = X_train_all[tr_idx]
            X_val = X_train_all[val_idx]

            # --- 각 fold마다 스케일러는 train fold 기준으로만 fit ---
            scaler_cv = StandardScaler()
            X_tr_sc  = scaler_cv.fit_transform(X_tr)
            X_val_sc = scaler_cv.transform(X_val)

            # --- 모델 학습 ---
            model = OneClassSVM(kernel='rbf', nu=nu, gamma=gamma)
            model.fit(X_tr_sc)

            # --- train fold 점수 분포로 threshold (하위 5%) 설정 ---
            train_scores = model.decision_function(X_tr_sc)
            thr = np.percentile(train_scores, 5)

            # --- val fold 평가 (모두 정상 데이터) ---
            val_scores = model.decision_function(X_val_sc)
            test_pred   = predict_with_threshold(val_scores, thr)

            # 정상으로 나온 비율 (높을수록 좋음)
            fold_scores.append((test_pred == 1).mean())

        mean_score = np.mean(fold_scores)
        print(f"nu={nu}, gamma={gamma}, cv_score={mean_score:.3f}")

        if mean_score > best_score:
            best_score  = mean_score
            best_params = {'nu': nu, 'gamma': gamma}

print("선택된 best_params:", best_params, "cv_score:", best_score)

In [None]:
# =====================
# train 데이터셋 전체로 학습
# =====================

# 3) 선택된 파라미터로 train 전체(X_train_all)에 최종 학습
scaler = StandardScaler()
X_train_sc = scaler.fit_transform(X_train_all)
# X_test_sc = scaler.transform(X_test)

ocsvm = OneClassSVM(kernel='rbf', **best_params)
ocsvm.fit(X_train_sc)

In [None]:
# =====================
# 훈련된 모델 저장
# =====================

import joblib

# 예시: 이미 학습이 끝난 객체들
# ocsvm : OneClassSVM 모델
# scaler: StandardScaler (fit 완료 상태)
# threshold: train_scores에서 구한 값

save_obj = {
    "model": ocsvm,
    "scaler": scaler,
}
model_name = input("모델 이름:")

joblib.dump(save_obj, f"{model_name}.joblib")
print("저장 완료")

In [None]:
np.set_printoptions(precision=6,   # 소수점 6자리까지
                    suppress=True) # e-05 같은 지수 표현 끄기

In [None]:
# =====================
# decision_function 점수 확인
# =====================

train_scores = ocsvm.decision_function(X_train_sc)  # (값이 클수록 정상)

X_test_sc = scaler.transform(X_test)
test_scores = ocsvm.decision_function(X_test_sc) # 동일한 스케일러 적용 후 점수 확인

print("train_scores min/max:", train_scores.min(), train_scores.max())
print("train_scores mean:", train_scores.mean())
print("========================")
print("val_scores min/max:", test_scores.min(), test_scores.max())
print("val_scores mean:", test_scores.mean())

In [None]:
print(test_scores)

In [None]:
test_pred = ocsvm.predict(X_test_sc) # 결정함수 값이 0보다 크면 1, 아니면 -1
print(test_pred)

## 공격자 데이터(df2) 피처로 점수 확인

In [None]:
df2_features_sc = scaler.transform(df2_features_subset)
print("스케일된 df2_features_sc")
print(df2_features_sc[:5])
print("========================")
print(f"피처 추출된 심주기 개수: {len(df2_features_sc)}")

In [None]:
df2_score = ocsvm.decision_function(df2_features_sc)
print("val_scores min/max:", df2_score.min(), df2_score.max())
print("val_scores mean:", df2_score.mean())
print(df2_score.mean())
print("========================")
print(df2_score)

In [None]:
# @title
# --------------------
# 5) 커스텀 threshold 설정
#    예: train 데이터의 5% 지점(5번째 퍼센타일)을 이상치 경계로 사용
# --------------------
# threshold = np.percentile(train_scores, 5)   # 5%보다 낮으면 이상으로 간주

# threshold = np.percentile(train_scores, 5)
# print("threshold:", threshold)

# # 5) 진짜 test 셋 평가
# test_pred    = predict_with_threshold(test_scores, threshold)

# print("test에서 정상(1) 비율:", (test_pred == 1).mean())

# ppg 전처리 과정 (시각화 함께)

# `plot_pulses_with_markers()`
> (df, peaks, starts, ends, title, duration_sec=5)를 입력받아서 **피크, 벨리 시각화**

In [None]:
# @title
def plot_pulses_with_markers(df, peaks, starts, ends, title, duration_sec=5):
    time_s = df["time_s"].values
    ppg    = df["ppg_filt"].values.astype(float)

    t0 = time_s[0]
    mask = (time_s - t0) <= duration_sec

    plt.figure(figsize=(12,5))
    plt.plot(time_s[mask] - t0, ppg[mask], label="PPG filtered")

    # mark peaks and valleys in this window
    for pk, s_idx, e_idx in zip(peaks, starts, ends):
        if (s_idx is None) or (e_idx is None):
            continue
        t_pk = time_s[pk] - t0
        t_s  = time_s[s_idx] - t0
        t_e  = time_s[e_idx] - t0
        if t_pk < 0 or t_pk > duration_sec:
            continue

        plt.plot(t_pk, ppg[pk], "ro", label="Peak" if "Peak" not in plt.gca().get_legend_handles_labels()[1] else "")
        plt.plot(t_s,  ppg[s_idx], "go", label="Valley" if "Valley" not in plt.gca().get_legend_handles_labels()[1] else "")
        plt.plot(t_e,  ppg[e_idx], "go")

    plt.title(title)
    plt.xlabel("Time (s)")
    plt.ylabel("PPG (filtered)")
    plt.legend()
    plt.show()

In [None]:
plot_pulses_with_markers(
    df1_filt,
    df1_peaks,
    df1_s_idx,
    df1_e_idx,
    title="Static PPG with detected peaks/valleys",
    duration_sec=60
)

plot_pulses_with_markers(
    df2_filt,
    df2_peaks,
    df2_s_idx,
    df2_e_idx,
    title="Far-wrist PPG with detected peaks/valleys",
    duration_sec=60
)

# `plot_all_pulses()`
> (pulses, cols=10) 입력받아서 모든 심주기 찍어보기, 심주기 단위로 잘려서 150으로 리샘플된 np 배열

In [None]:
df1_pulses = extract_pulse_waveforms(df1_filt, df1_features, seq_len=150)
df2_pulses = extract_pulse_waveforms(df2_filt, df2_features, seq_len=150)

In [None]:
# @title
def plot_all_pulses(pulses, cols=10):
    """
    pulses: numpy array (N, seq_len)
    cols: subplot에서 한 행에 몇 개 출력할지
    """
    N, seq_len = pulses.shape

    rows = math.ceil(N / cols)

    fig, axes = plt.subplots(rows, cols, figsize=(cols*2, rows*2), sharex=True, sharey=True)
    axes = axes.flatten()

    for i in range(len(axes)):
        ax = axes[i]
        if i < N:
            ax.plot(pulses[i])
            ax.set_title(f"{i}", fontsize=8)
            ax.set_xticks([])
            ax.set_yticks([])
        else:
            ax.axis('off')   # 남는 subplot 숨기기

    plt.tight_layout()
    plt.show()

In [None]:
print(df1_pulses.shape)
plot_all_pulses(df1_pulses, cols=10)

### `outliers = [ ]` <- 여기에 제거할 인덱스 번호 입력

In [None]:
outliers = []  # 제거할 인덱스
static_clean = np.delete(df1_pulses, outliers, axis=0)
print(static_clean.shape)