In [1]:
import sys
import os
import csv
import threading
from queue import Queue
import numpy as np
import pandas as pd
import scipy.signal as signal
from scipy import signal as sig
from scipy.stats import pearsonr
from scipy.signal.windows import gaussian
from scipy.signal import butter, filtfilt, iirnotch, find_peaks, welch
import matplotlib.pyplot as plt
import matplotlib
matplotlib.use('TkAgg')
from sklearn.decomposition import FastICA
from sklearn.preprocessing import StandardScaler

In [2]:
window_size = 20

ecg_fps = 2000
ppg_fps = 50
bcg_fps = 500
rPPG_fps = 30

ecg_peak_height = 0.1

convolve_mode = 'same'

ROOT_PATH = r"F:\ZJXU-HR-V1-130"
OUTPUT_PATH = os.path.join(ROOT_PATH, 'result.csv')

In [3]:
# ECG信号处理

def bandpass_filter(signal, order=4):
# 带通滤波器，用于消除肌电干扰。
    lowcut = 0.5 
    highcut = 50
    fs = 2000
    nyquist = 0.5 * fs # 奈奎斯特频率
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = butter(order, [low, high], btype='band')
    filtered_signal = filtfilt(b, a, signal)
    return filtered_signal

# 陷波滤波器设计
def notch_filter(signal, quality=30):
    freq = 50
    fs = ecg_fps
    # 陷波滤波器，用于消除工频干扰（50 Hz）。
    nyquist = 0.5 * fs
    freq = freq / nyquist
    b, a = iirnotch(freq, quality)
    filtered_signal = filtfilt(b, a, signal)
    return filtered_signal

# 滤-基线漂移
def baseline_filter(data, fs=ecg_fps):
    data = np.array(data)
    winsize = int(round(0.2*fs))
    if winsize % 2 == 0:
        winsize += 1
    baseline_estimate = sig.medfilt(data, kernel_size=winsize)
    winsize = int(round(0.6*fs))
    if winsize % 2 == 0:
        winsize += 1
    baseline_estimate = sig.medfilt(baseline_estimate, kernel_size=winsize)
    ecg_blr = data - baseline_estimate
    return ecg_blr.tolist()

def ecg_smooth(signal, window_len=100, window='hanning'):
    if window_len > len(signal):
        return signal
    window = getattr(np, window)(window_len)
    return np.convolve(signal, window, mode=convolve_mode)[:len(signal)]

In [4]:
# PPG信号处理
def ppg_smooth(signal, window_len=25, window='hanning'):
    if window_len > len(signal):
        return signal
    window = getattr(np, window)(window_len)
    smoothed_signal = np.convolve(signal, window, mode=convolve_mode)[:len(signal)]
    return smoothed_signal

In [5]:
# BCG信号处理
def bcg_smooth(signal, window_len=50, window='hanning'):
    if window_len > len(signal):
        return signal
    window = getattr(np, window)(window_len)
    smoothed_signal = np.convolve(signal, window, mode=convolve_mode)[:len(signal)]
    return smoothed_signal


In [6]:
# rPPG信号处理
def rPPG_smooth(signal, window_len=5, window='hanning'):
    if window_len > len(signal):
        return signal
    window = getattr(np, window)(window_len)
    smoothed_signal = np.convolve(signal, window, mode=convolve_mode)[:len(signal)]
    return smoothed_signal

In [7]:
# 生成虚拟信号
def generate_vSignal(smoothed_signal, fps, peak_height=None):
    peaks, _ = find_peaks(smoothed_signal, height=peak_height, distance=fps*0.5)
    peak_indices = np.array(peaks)
    vSignal = np.zeros_like(smoothed_signal)
    num_peaks = len(peak_indices)
    # 处理第一个周期，确保从信号开始就有连续的余弦波形
    if num_peaks > 1:
        start = peak_indices[0]
        end = peak_indices[1]
        duration = end - start
        x = np.linspace(0, duration, num=end + 1)  # end+1 确保包含 end 索引
        vSignal[:end + 1] = np.cos(2 * np.pi * (x - 0) / duration)
    # 处理中间的周期
    for i in range(1, num_peaks - 1):
        start = peak_indices[i]
        end = peak_indices[i + 1]
        duration = end - start
        x = np.linspace(start, end, num=end - start + 1)
        vSignal[start:end + 1] = np.cos(2 * np.pi * (x - start) / duration)
    # 处理最后一个周期，确保余弦波形在信号末尾也是连续的
    if num_peaks > 1:
        start = peak_indices[-1]
        duration = len(smoothed_signal) - start
        x = np.linspace(0, duration, num=len(smoothed_signal) - start)
        vSignal[start:] = np.cos(2 * np.pi * (x - 0) / duration)
    return vSignal

# 归一化
def normalize(data):
    min_val = np.min(data)
    max_val = np.max(data)
    normalized_data = (data - min_val) / (max_val - min_val)
    return normalized_data

In [8]:
# 心率计算函数
def rr_hr(signal, fps, last_bpm):
    time = np.linspace(0, len(signal)/fps, len(signal), endpoint=False)
    peaks, _ = find_peaks(signal)  # 寻找波峰
    peak_times = time[peaks]  # 波峰的时间点
    peak_intervals = np.diff(peak_times)  # 波峰之间的时间间隔
    if peak_intervals.size > 0:
        frequency = 1 / peak_intervals.mean()  # 计算平均频率
    else:
        frequency = np.nan  # 没有足够的峰值来计算频率
    bpm = frequency * 60
    if last_bpm > 0:
        bpm = (last_bpm * 0.9) + (bpm * 0.1)
    return bpm

def cal_hr(signal, fps, ws=window_size):
    hr = []
    last_bpm = 60
    for i in range(0, len(signal)-fps*ws, fps*1):
        bpm = rr_hr(signal[i:i+fps*ws], fps, last_bpm)
        hr.append(bpm)
        last_bpm = bpm
    return np.array(hr)

In [9]:
def process_file(folder):
    folder_name = os.path.basename(folder)
    # print("开始", folder_name)
    """计算心率"""
    ecg_file = os.path.join(ROOT_PATH, os.path.join(folder, 'ECG_EEG_RSP.csv'))
    ppg_file = os.path.join(ROOT_PATH, os.path.join(folder, 'ppg.csv'))
    bcg_file = os.path.join(ROOT_PATH, os.path.join(folder, 'bcg.csv'))
    rPPG_file = os.path.join(ROOT_PATH, os.path.join(folder, 'rPPG_bigger.csv'))

    # ECG
    ECG_df = pd.read_csv(ecg_file, header=0)[['ECG, X, RSPEC-R']]
    ECGs = ECG_df.values.flatten()
    filter_01_data = bandpass_filter(ECGs)
    filter_02_data = notch_filter(filter_01_data)
    filter_03_data = baseline_filter(filter_02_data)
    ecg_smoothed = ecg_smooth(filter_03_data)
    vECG = generate_vSignal(ecg_smoothed, ecg_fps, ecg_peak_height).flatten()
    ECG_hr = cal_hr(vECG, ecg_fps)
    # plt.plot(normalize(ECGs), label=f'ECGs')
    # plt.plot(normalize(filter_03_data), label=f'filter_03_data')
    # plt.plot(normalize(ecg_smoothed), label=f'ecg_smoothed')
    # plt.plot(normalize(vECG), label=f'vECG')
    # plt.legend()
    # plt.show(block=True)

    # PPG
    PPG_df = pd.read_csv(ppg_file, header=0, usecols=['ppg'])
    PPGs = PPG_df.values.flatten()
    ppg_smoothed = ppg_smooth(PPGs)
    vPPG = generate_vSignal(ppg_smoothed, ppg_fps).flatten()
    PPG_hr = cal_hr(vPPG, ppg_fps)
    # plt.plot(normalize(PPGs), label=f'PPGs')
    # plt.plot(normalize(ppg_smoothed), label=f'ppg_smoothed')
    # plt.plot(normalize(vPPG), label=f'vPPG')
    # plt.legend()
    # plt.show(block=True)

    # BCG
    BCG_df = pd.read_csv(bcg_file, header=0, usecols=['bcg'])
    BCGs = BCG_df.values.flatten()
    bcg_smoothed = bcg_smooth(BCGs)
    vBCG = generate_vSignal(bcg_smoothed, bcg_fps).flatten()
    BCG_hr = cal_hr(vBCG, bcg_fps)
    # plt.plot(normalize(BCGs), label=f'BCGs')
    # plt.plot(normalize(bcg_smoothed), label=f'bcg_smoothed')
    # plt.plot(normalize(vBCG), label=f'vBCG')
    # plt.legend()
    # plt.show(block=True)

    # rPPG
    rPPG_df = pd.read_csv(rPPG_file)
    R_col = rPPG_df['R'].values.flatten()
    G_col = rPPG_df['G'].values.flatten()
    B_col = rPPG_df['B'].values.flatten()
    rPPGs = G_col/R_col + G_col/B_col
    rPPG_smoothed = rPPG_smooth(rPPGs)
    vRPPG = generate_vSignal(rPPG_smoothed, rPPG_fps).flatten()
    rPPG_hr = cal_hr(vRPPG, rPPG_fps)

    # dict = cal_metrics(ECG_hr, PPG_hr, BCG_hr, rPPG_hr)
    # for key, value in dict.items():
    #     print(f"{key}: {value}")

    # plt.title(folder_name)
    # plt.plot(ECG_hr, label=f'{len(ECG_hr)}ECG_hr')
    # plt.plot(PPG_hr, label=f'{len(PPG_hr)}PPG_hr')
    # plt.plot(BCG_hr, label=f'{len(BCG_hr)}BCG_hr')
    # plt.plot(rPPG_hr, label=f'{len(PPG_hr)}rPPG_hr')
    # plt.legend()
    # plt.show(block=True)

    return ECG_hr, PPG_hr, BCG_hr, rPPG_hr



def cal_metrics(ecg_hr, ppg_hr, bcg_hr, rPPG_hr):
    """计算评估指标"""
    min_len = min(len(ecg_hr), len(ppg_hr), len(bcg_hr), len(rPPG_hr))
    ecg_hr = ecg_hr[:min_len]
    ppg_hr = ppg_hr[:min_len]
    bcg_hr = bcg_hr[:min_len]
    rPPG_hr = rPPG_hr[:min_len]
    
    return {
        "PPG-MAE": np.mean(np.abs(ecg_hr - ppg_hr)),
        "BCG-MAE": np.mean(np.abs(ecg_hr - bcg_hr)),
        "rPPG-MAE": np.mean(np.abs(ecg_hr - rPPG_hr)),
        "PPG-SDE": np.std(ecg_hr - ppg_hr),
        "BCG-SDE": np.std(ecg_hr - bcg_hr),
        "rPPG-SDE": np.std(ecg_hr - rPPG_hr),
        "PPG-MSE": np.mean((ecg_hr - ppg_hr)**2),
        "BCG-MSE": np.mean((ecg_hr - bcg_hr)**2),
        "rPPG-MSE": np.mean((ecg_hr - rPPG_hr)**2),
        "PPG-RMSE": np.sqrt(np.mean((ecg_hr - ppg_hr)**2)),
        "BCG-RMSE": np.sqrt(np.mean((ecg_hr - bcg_hr)**2)),
        "rPPG-RMSE": np.sqrt(np.mean((ecg_hr - rPPG_hr)**2)),
        "PPG-PCC": pearsonr(ecg_hr, ppg_hr)[0],
        "BCG-PCC": pearsonr(ecg_hr, bcg_hr)[0],
        "rPPG-PCC": pearsonr(ecg_hr, rPPG_hr)[0]
    }

In [None]:
results_lock = threading.Lock()
results = []

task_queue = Queue()

def worker():
    while True:
        folder = task_queue.get()
        try:
            ecg_hr, ppg_hr, bcg_hr, rPPG_hr = process_file(folder)
            metrics = cal_metrics(ecg_hr, ppg_hr, bcg_hr, rPPG_hr)
            
            with results_lock:
                results.append((folder, metrics))
                
        except Exception as e:
            print(f"处理 {folder} 失败: {str(e)}")
        finally:
            task_queue.task_done()

entries = os.listdir(ROOT_PATH)
folders = [entry for entry in entries if os.path.isdir(os.path.join(ROOT_PATH, entry))]
for folder in folders:
    task_queue.put(folder)

for _ in range(8):
    t = threading.Thread(target=worker, daemon=True)
    t.start()

task_queue.join()

# 保存结果到CSV
if results:
    header = ["Set"] + list(next(iter(results))[1].keys())
    
    sorted_results = sorted(results, key=lambda x: x[0])
    
    with open(OUTPUT_PATH, "w", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=header)
        writer.writeheader()
        
        for folder, metrics in sorted_results:
            row = {"Set": folder}
            row.update(metrics)
            writer.writerow(row)
            
    print(f"结果已保存到 {OUTPUT_PATH}")
else:
    print("没有有效结果需要保存")

