In [115]:
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.signal import butter, lfilter, detrend, resample, find_peaks, filtfilt
from sklearn.decomposition import PCA






In [None]:
def  detect_ppg_peaks_auto(y, fs, max_hr=160, prom  = None, edge_sec=3.0, width_sec = 0.06):
    
    min_dist = int(fs * 60 / max_hr)
    n = len(y)
    edge = int(edge_sec * fs)
    lo = edge
    hi = n - edge
    if hi <= lo:
        lo, hi = 0, n
    
    y_win = y[lo: hi]
    width_min = int(width_sec * fs)
    # 自适应 prominence（用局部幅度的10%起步；可按需调到 0.05~0.2）
    if prom is None:
        amp = np.nanpercentile(y_win, 95) - np.nanpercentile(y_win, 5)
        prom = max(1e-6, 0.1 * float(amp))

    p_pos, _ = find_peaks( y_win,  distance=min_dist, prominence=prom, width=width_min)
    p_neg, _ = find_peaks(-y_win,  distance=min_dist, prominence=prom, width=width_min)
    if len(p_pos): p_pos = p_pos + lo
    if len(p_neg): p_neg = p_neg + lo
    return p_pos, p_neg # 两套峰：正峰、负峰



def butter_bandpass(lowcut, highcut, fs, order=3):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    return b, a

def bandpass_filter(data, lowcut, highcut, fs, order=3,zero_phase=True):
    b, a = butter_bandpass(lowcut, highcut, fs, order=order)
    #实时检测用 lfilter,否则造成相位延迟
    if zero_phase:
        y = filtfilt(b, a, data)
    else:
        y = lfilter(b, a, data)
    return y

def baseline(y):
    return np.mean(y)

def motion_removal(y):
    pca = PCA(n_components=1)
    y = y.reshape(-1, 1)
    principalComponents = pca.fit_transform(y)
    motion_removed = y - principalComponents
    return motion_removed.ravel()

def normalization(y):
    return (y - np.min(y)) / (np.max(y) - np.min(y))

def assess_quality(y): # T o  do
    return []

def artifact_removal(y, segments_to_discard): # T o do
    return y

def detect_heartbeats(y,fs):
    max_hr = 180.0 #按照最大心率来找最小峰距
    min_distance = int(fs * 60.0 / max_hr)
    prom = None
    amp = np.nanpercentile(y,95) - np.nanpercentile(y,5)
    prom = max(1e-6, 0.10 * float(amp))
    peaks, _ = find_peaks(y, distance=min_distance,prominence = prom) # adjust this distance accordingly
    return peaks

def preprocess_ppg_signal(x, fs, target_sampling_rate):
    results = {}

    y = bandpass_filter(x, 0.7, 5.0, fs, zero_phase=True)
    results['bandpass_filter'] = y.copy()

    y = y - baseline(y)
    results['baseline_removed'] = y.copy()

    y = motion_removal(y)
    results['motion_removed'] = y.copy()

    y = normalization(y)
    results['normalized'] = y.copy()

    y = resample(y, int(len(y) * target_sampling_rate/fs))

    segments_to_discard = assess_quality(y)
    y = artifact_removal(y, segments_to_discard)
    results['artifact_removed'] = y.copy()

    heartbeats = detect_heartbeats(y,fs = target_sampling_rate)
    results['heartbeats'] = heartbeats

    return y, results

# Using the same example PPG signal generation as before
# fs = 100.0  # sample rate in Hz
# T = 5.0    # seconds
# n = int(T * fs)  # total number of samples
# t = np.linspace(0, T, n, endpoint=False)
# a = 0.02
# f0 = 1.0
# x = 0.1 * np.sin(2 * np.pi * 1.0 * t)
# x += a * np.cos(2 * np.pi * 2.0 * t + .1)
# x += a * np.cos(2 * np.pi * 3.5 * t)
# x += 0.04 * np.random.randn(t.size)
DATA_DIR = Path("bidmc_dataset/bidmc-ppg-and-respiration-dataset-1.0.0/bidmc_csv")
REC_ID = "bidmc_53"
csv_path = DATA_DIR / f"{REC_ID}_Signals.csv"

signals = pd.read_csv(csv_path)
signals.rename(columns=lambda c: c.strip(), inplace=True)
upper_map = {c : c.upper() for c in signals.columns}
ppg_col = None
for orig, upper in upper_map.items():
    if upper == "PLETH":
        ppg_col = orig
        break

if ppg_col is None :
    raise ValueError(f"Not find PPG column.{list(signals.columns)}")

x = signals[ppg_col].astype(float).to_numpy()
fs = 125.0
t = np.arange(len(x)) / fs
target_fs = 125.0
processed_signal, results = preprocess_ppg_signal(x, fs, target_fs)


In [236]:
def hr_1hz_from_peaks_time(peaks, fs,t0_sig = 0.0, t0_num = 0.0, T_num = None):
    if len(peaks) < 2: 
        return None
    ibi = np.diff(peaks) / fs           # 秒
    hr  = 60.0 / ibi                    # bpm
    tmid = (peaks[1:] + peaks[:-1]) / (2*fs) + float(t0_sig)
    tmid_num = tmid - float(t0_num)
    m = tmid_num >= 0
    if not np.any(m):
        return None
    sec = np.floor(tmid_num[m]).astype(int)
    s = pd.Series(hr[m]).groupby(sec).mean()
    if T_num is not None:
        s = s.reindex(range(int(T_num)))
    return s

def mae_on_overlap(est_series, ref):
    if est_series is None:
        return np.inf
    est = est_series.values
    mask = ~np.isnan(est) & ~np.isnan(ref)
    if not mask.any():
        return np.inf
    return float(np.mean(np.abs(est[mask] - ref[mask])))



num = pd.read_csv(DATA_DIR / f"{REC_ID}_Numerics.csv")
num.columns = [c.strip() for c in num.columns]
ref_col = "HR" if "HR" in num.columns else None
if ref_col:
    ref = num[ref_col].astype(float).values
    t0_sig = float(signals['Time [s]'].iloc[0]) if 'Time [s]' in signals.columns else 0.0
    t0_num = float(num['Time [s]'].iloc[0]) if 'Time [s]' in num.columns else 0.0
    T_num  = len(ref)
p1, p2 = detect_ppg_peaks_auto(processed_signal, fs=target_fs, max_hr=160, prom=None)
hr1 = hr_1hz_from_peaks_time(p1, fs=target_fs, t0_sig=t0_sig, t0_num=t0_num, T_num=T_num)
hr2 =  hr_1hz_from_peaks_time(p2, fs=target_fs, t0_sig=t0_sig, t0_num=t0_num, T_num=T_num)
mae1 = mae_on_overlap(hr1, ref)
mae2 = mae_on_overlap(hr2, ref)
if mae1 <= mae2:
    peaks = p1
    chosen = 'positive'
    best_mae= mae1
else:
    peaks = p2
    chosen = 'negative'
    best_mae = mae2

print(f"Chosen polarity: {chosen},  MAE={best_mae:.2f} bpm")
results['heartbeats'] = peaks
# def eval_record(data_dir, rec_id,fs = 125.0):
#     try:
#         sig_path = Path(data_dir) / f"{rec_id}_Signals.csv"
#         signals = pd.read_csv(sig_path)
#         signals.rename(columns=lambda c: c.strip(), inplace=True)
#         upper_map = {c : c.upper() for c in sig.columns}
#         ppg_col = None
#         for orig, upper in upper_map.items():
#             if upper == "PLETH":
#                 ppg_col = orig
#                 break

#         if ppg_col is None :
#             raise ValueError(f"Not find PPG column.{list(signals.columns)}")

#         x = signals[ppg_col].astype(float).to_numpy()
#         t0_sig f= loat(sig['Time [s]'].iloc[0]) if 'Time [s]' in sig.columns else 0.0
#         num_path = Path(data_dir) / f"{rec_id}_Numerics.csv"
#         num = pd.read_csv(num_path)
#         num.columns = [c.strip() for c in num.columns]
#         ref_col = "HR" if "HR" in num.columns else None
#         if ref_col:
#         ref = num[ref_col].astype(float).values
#         t0_num = float(num['Time [s]'].iloc[0]) if 'Time [s]' in num.columns else 0.0
#         T_num  = len(ref)
#         bands = [()(0.8,4.0),(0.7,5.0)]
#         best = (np.inf, None, None, None, None, None)  # mae, pol, max_hr, coef, (lo,hi), peaks
#     except Exception as e:
#         return{"rec" : rec_id,"note": f "error:{e}"}

Chosen polarity: positive,  MAE=22.65 bpm


In [None]:


# Plotting


plot_titles = ["Original PPG Signal", 
               "Bandpass Filter",
                "Baseline Removed",
               "Motion Artifact Removal (PCA)",
                "Normalization",
                "Artifact Removal"]

plot_data = [x,
              results['bandpass_filter'],
            results['baseline_removed'],
             results['motion_removed'],
            results['normalized'], 
            results['artifact_removed']]

for i, (title, data) in enumerate(zip(plot_titles, plot_data), 1):
    plt.figure(figsize=(14, 15))
    plt.subplot(len(plot_titles), 1, i)
    plt.plot(t, data)
    plt.title(title)
    plt.xlabel('Time (s)')
    plt.ylabel('Signal value')
    plt.tight_layout()
    plt.grid()
    plt.show()

# 可视化心跳峰（在最终序列上）
plt.figure(figsize=(14, 3))
plt.plot(t, results['artifact_removed'])
peak = results["heartbeats"]
if len(peak):
    # 峰的时间坐标 = 索引 / target_fs；此处 target_fs==fs，可直接用 t[peaks]
    plt.plot(t[peak], results['artifact_removed'][peak], 'r.', label='peaks')
plt.title("Detected Heartbeats")
plt.xlabel("Time (s)")
plt.ylabel("Amplitude")
plt.grid(True)
plt.legend()
plt.show()