In [None]:
import os
import glob
import numpy as np
import matplotlib.pyplot as plt
import sys
import pandas as pd
sys.path.append('/home/lizhijun/rppg-bio')

from methods.RGB_divide import *
from methods.optimal_combine import optimal_combine
from methods.CHROME_DEHAAN import CHROME_DEHAAN
from methods.PBV import PBV2
from methods.POS import POS_WANG
from methods.OMIT import OMIT
from methods.PCA import pca_bvp
from methods.matrix_opt import find_optimal_signal
from methods.LGI import LGI
from methods.ICA_POH import ICA_POH
from methods.RGB_divide import G
from modules.utils import sig_windowing,min_max_normalize
from modules.post_process import calculate_metric_per_segment,_calculate_SNR
from modules.post_process import cal_hr_spectrogram
from modules.utils import resample

from modules.freq import plot_stft,plot_freq,BPF,lowpass_filter,detrend



**Table of contents**<a id='toc0_'></a>    
- [导入文件](#toc1_)    
- [批处理](#toc2_)    
- [单个测试](#toc3_)    

<!-- vscode-jupyter-toc-config
	numbering=false
	anchor=true
	flat=false
	minLevel=1
	maxLevel=6
	/vscode-jupyter-toc-config -->
<!-- THIS CELL WILL BE REPLACED ON TOC UPDATE. DO NOT WRITE YOUR TEXT IN THIS CELL -->

# <a id='toc1_'></a>[导入文件](#toc0_)

In [None]:
hol_data_dir = '/home/lizhijun/rgb_ppg_holistic_PURE_UBFC/PURE1/'
ppg_dir = '/home/lizhijun/27ROI_ON_PURE/'


rgb_hol_files = glob.glob(os.path.join(hol_data_dir,'*RGBSignal.npy'))
rgb_ts_files = glob.glob(os.path.join(hol_data_dir,'*RGB_ts.npy'))
ppg_files = glob.glob(os.path.join(ppg_dir,'*_ppg.txt'))

ppg_files.sort()
rgb_hol_files.sort()
rgb_ts_files.sort()

def calculate_signal_correlation(signal1, signal2, plot=True):
    """
    计算两个信号之间的实际相关系数。

    参数:
    signal1 (numpy.ndarray): 第一个信号。
    signal2 (numpy.ndarray): 第二个信号。
    plot (bool): 是否绘制信号和互相关图。默认为 True。

    返回:
    float: 信号之间的相关系数。
    """
    # 计算互相关
    correlation = np.correlate(signal1 - np.mean(signal1), signal2 - np.mean(signal2), mode='full')
    lags = np.arange(-len(signal1) + 1, len(signal1))

    # 找到最大相关性的位置
    max_corr_index = np.argmax(np.abs(correlation))
    max_corr = correlation[max_corr_index]
    lag_at_max_corr = lags[max_corr_index]

    # 根据互相关计算相关系数
    correlation_coefficient = max_corr / (np.std(signal1) * np.std(signal2) * len(signal1))
    
    return correlation_coefficient

# <a id='toc2_'></a>[批处理](#toc0_)

In [None]:
import numpy as np
import scipy.signal
from datetime import datetime

# 假设ICA_POH, CHROME_DEHAAN, LGI, OMIT, POS_WANG 和 calculate_metric_per_segment 函数已定义
wsize = 20
results = []

methods = {
    'opt': find_optimal_signal,
    # 'ica': ICA_POH,
    # 'CHROME_DEHAAN': CHROME_DEHAAN,
    # 'LGI': LGI,
    # 'OMIT': OMIT,
    # 'POS': POS_WANG,
    # 'PBV':PBV2,   
    # 'Green':G,
    # 'PCA':pca_bvp
}

methods_list = list(methods.keys())  # 获取方法名列表

# 遍历所有文件对
for rgb_hol_path,rgb_ts, ppg_path in zip(rgb_hol_files, rgb_ts_files,ppg_files):
    # 加载RGB和PPG信号
    rgb_hol = np.load(rgb_hol_path)
    rgb_hol = np.squeeze(rgb_hol, axis=1)
    rgb_ts = np.load(rgb_ts)
    
    rgb_hol_res = np.array([resample(rgb_ts,rgb_hol[:,i],30) for i in range(rgb_hol.shape[1])]).T
    ppg = np.loadtxt(ppg_path)

    # 将信号分割成窗口
    ppg_wins, _ = sig_windowing(ppg, wsize=wsize, stride=1, fps=30)
    rgb_wins, _ = sig_windowing(rgb_hol_res, wsize=wsize, stride=1, fps=30)

    # 对于每种方法执行处理
    for method in methods_list:  # 使用方法名列表进行遍历
        hr_ppg_all = []
        hr_rppg_all = []
        snr_all = []
        corr_all = []
        acc_all = []
        cnt = 0

        for idx, rgb_win in enumerate(rgb_wins):
            # 使用当前方法计算rPPG信号
            # rppg_win = methods[method](rgb_win)  # 根据方法名从字典中获取对应的函数
            if method =='opt' or method=='chrom' or method=='pos' or method=='ica':
                rppg_win = methods[method](rgb_win,30)
            else:
                rppg_win = methods[method](rgb_win)
            hr_ppg, hr_rppg, snr = calculate_metric_per_segment(rppg_win, ppg_wins[idx])
            cnt += 1
            MIN_LEN = min(len(rppg_win), len(ppg_wins[idx]))
            corr = np.abs(calculate_signal_correlation(rppg_win[:MIN_LEN], ppg_wins[idx][:MIN_LEN]))
            if(np.abs(hr_rppg - hr_ppg) < 3):
                acc_all.append(1)
            hr_rppg_all.append(hr_rppg)
            hr_ppg_all.append(hr_ppg)
            snr_all.append(snr)
            corr_all.append(corr)

        # 计算整体指标
        hr_ppg_all = np.array(hr_ppg_all)
        hr_rppg_all = np.array(hr_rppg_all)
        SNR_all = np.array(snr_all)
        corr_all = np.array(corr_all)
        acc_all = np.array(acc_all)

        squared_errors = np.square(hr_ppg_all - hr_rppg_all)
        RMSE = np.sqrt(np.mean(squared_errors))
        MAE = np.mean(np.abs(hr_ppg_all - hr_rppg_all))
        SNR = np.mean(SNR_all)
        CORR = np.mean(corr_all)
        ACC = np.sum(acc_all)/len(rgb_wins)
        PCC = np.corrcoef(hr_ppg_all, hr_rppg_all)[0, 1]

        # 存储结果
        video_id = rgb_hol_path.split('/')[-1].split('_')[0]
        results.append([video_id, method, RMSE, MAE,SNR,CORR,ACC,PCC,hr_ppg_all,hr_rppg_all])

        print(f'{method} on {rgb_hol_path} finished')

# 如果需要将结果保存为DataFrame或CSV，可以使用pandas库
import pandas as pd
df_results = pd.DataFrame(results, columns=['vid_path', 'method', 'RMSE', 'MAE','SNR','CORR','ACC','PCC','hr_ppg_all','hr_rppg_all'])
# df_results.to_csv(f'PURE_{wsize}s_9_methods_{datetime.now():%Y-%m-%d}.csv', index=False)

In [None]:
df_results['RMSE'].mean()

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# 读取数据
df_results = pd.read_csv('/home/lizhijun/my_project/PURE_resample_results_20s_9_methods_2025-10-18.csv')

# 指定方法顺序
method_order = ['Green', 'ica', 'PCA', 'CHROME_DEHAAN', 'PBV', 'POS', 'LGI', 'OMIT', 'opt']


df_results['method'] = pd.Categorical(df_results['method'], categories=method_order, ordered=True)

# 计算每个方法的统计指标
summary_stats = df_results.groupby('method').agg({
    'RMSE': ['mean'],
    'MAE': ['mean'],
    'SNR': ['mean'],
    'CORR': ['mean'],
    'ACC': ['mean'],
    'PCC': ['mean']
})

# 按指定顺序重新排列
summary_stats = summary_stats.reindex(method_order)

# 保存统计数据到 CSV
summary_stats.to_csv('/home/lizhijun/my_project/PURE_summary_stats_ordered.csv')

print("Summary Statistics:\n", summary_stats)

# 设置 Seaborn 样式
sns.set(style="whitegrid")

# 创建一个 1 行 6 列的子图
fig, axes = plt.subplots(nrows=1, ncols=6, figsize=(36, 6))

# 绘制箱线图时指定顺序
sns.boxplot(x='method', y='RMSE', data=df_results, ax=axes[0], order=method_order)
axes[0].set_title('Comparison of RMSE across Methods')

sns.boxplot(x='method', y='MAE', data=df_results, ax=axes[1], order=method_order)
axes[1].set_title('Comparison of MAE across Methods')

sns.boxplot(x='method', y='SNR', data=df_results, ax=axes[2], order=method_order)
axes[2].set_title('Comparison of SNR across Methods')

sns.boxplot(x='method', y='ACC', data=df_results, ax=axes[4], order=method_order)
axes[4].set_title('Comparison of ACC across Methods')

sns.boxplot(x='method', y='PCC', data=df_results, ax=axes[5], order=method_order)
axes[5].set_title('Comparison of PCC across Methods')

sns.boxplot(x='method', y='CORR', data=df_results, ax=axes[3], order=method_order)
axes[3].set_title('Comparison of CORR across Methods')

plt.tight_layout()
plt.show()


In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

# =========================
# 读取 CSV
# =========================
df = pd.read_csv('/home/lizhijun/my_project/PURE_resample_results_20s_9_methods_2025-10-18.csv')

# =========================
# 将空格分隔的字符串 '[65.0 66.0 67.0]' 转为列表
# =========================
def str_to_list(s):
    s = s.strip('[]')
    return np.fromstring(s, sep=' ').tolist()

df['hr_rppg_all'] = df['hr_rppg_all'].apply(str_to_list)
df['hr_ppg_all'] = df['hr_ppg_all'].apply(str_to_list)

# =========================
# 展开每条记录中的多个值
# =========================
all_data = []
for _, row in df.iterrows():
    method = row['method']
    hr_es_list = row['hr_rppg_all']
    hr_gt_list = row['hr_ppg_all']
    for es, gt in zip(hr_es_list, hr_gt_list):
        all_data.append([method, es, gt])

df_expanded = pd.DataFrame(all_data, columns=['method', 'hr_es', 'hr_gt'])

# =========================
# Bland-Altman 子图绘制
# =========================
methods = df_expanded['method'].unique()
n_methods = len(methods)

fig, axes = plt.subplots(3, 3, figsize=(18, 18))
axes = axes.flatten()

for i, method_name in enumerate(methods):
    df_method = df_expanded[df_expanded['method'] == method_name]
    
    x = df_method['hr_es']
    y = df_method['hr_gt']
    mean = np.mean([x, y], axis=0)
    diff = np.array(x) - np.array(y)
    md = np.mean(diff)
    sd = np.std(diff)
    
    ax = axes[i]
    ax.scatter(mean, diff, alpha=0.5)
    ax.axhline(md, color='red', linestyle='--', label='Mean Diff')
    ax.axhline(md + 1.96*sd, color='blue', linestyle='--', label='±1.96 SD')
    ax.axhline(md - 1.96*sd, color='blue', linestyle='--')
    ax.set_title(method_name)
    ax.set_xlabel('Mean HR')
    ax.set_ylabel('Diff (Est - GT)')
    ax.legend()

plt.tight_layout()
plt.savefig('bland_altman_all_methods.png', dpi=300)
plt.show()


In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import seaborn as sns
import matplotlib.font_manager as fm

# font_path = "/home/lizhijun/fonts/SimSun.ttf"
font_path = '/usr/share/fonts/truetype/msttcorefonts/Times_New_Roman.ttf'

fm.fontManager.addfont(font_path)
prop = fm.FontProperties(fname=font_path, size=20)
plt.rcParams['font.family'] = prop.get_name()

# =========================
# 设置全局字体为 Times New Roman
# =========================
plt.rcParams['font.family'] = 'Times New Roman'
plt.rcParams['font.size'] = 15
plt.rcParams['axes.unicode_minus'] = False  # 防止负号显示异常

# =========================
# 1. 读取数据
# =========================
df = pd.read_csv('/home/lizhijun/my_project/PURE_results_20_9_methods_2025-10-17.csv')

# 定义指标
metrics = ['RMSE', 'MAE', 'SNR', 'ACC', 'CORR', 'PCC']

# =========================
# 2. 计算每个方法的平均指标
# =========================
summary_stats = df.groupby('method')[metrics].mean()

# =========================
# 3. 指标归一化（RMSE、MAE 为反向指标）
# =========================
summary_norm = summary_stats.copy()
scaler = MinMaxScaler()

for metric in metrics:
    values = summary_stats[metric].values.reshape(-1, 1)
    if metric in ['RMSE', 'MAE']:  # 越小越好 → 反向
        values = -values
    summary_norm[metric] = scaler.fit_transform(values).flatten()

# =========================
# 4. 重命名方法名称（在此自定义）
# =========================
rename_dict = {
    'CHROME_DEHAAN': 'CHROM',
    'ica': 'ICA',
    'opt': 'RQO',
    'Green':'GREEN'
    # 可根据你的 CSV 中的名称继续补充映射
}

summary_norm.rename(index=rename_dict, inplace=True)

# =========================
# 5. 绘制雷达图
# =========================

legend_order = ['GREEN', 'ICA', 'PCA', 'CHROM', 'PBV', 'POS', 'LGI', 'OMIT', 'RQO']  # 按你想显示的顺序
methods = [m for m in legend_order if m in summary_norm.index]


labels = metrics
num_vars = len(labels)
angles = np.linspace(0, 2 * np.pi, num_vars, endpoint=False).tolist()
angles += angles[:1]  # 闭合雷达图

# methods = summary_norm.index.tolist()
palette = sns.color_palette("tab10", n_colors=len(methods))

fig, ax = plt.subplots(figsize=(6, 4), subplot_kw=dict(polar=True))

for i, method in enumerate(methods):
    values = summary_norm.loc[method].tolist()
    values += values[:1]
    
    # 突出显示包含 RQO 的方法
    if 'RQO' in method.upper():
        ax.plot(angles, values, label=method, linewidth=3, color='red')
        ax.fill(angles, values, color='red', alpha=0.25)
    else:
        ax.plot(angles, values, label=method, linewidth=2, color=palette[i])
        ax.fill(angles, values, color=palette[i], alpha=0.2)

# 设置角度标签
ax.set_xticks(angles[:-1])
ax.set_xticklabels(labels, fontsize=12)

for label in ax.get_xticklabels():
    label.set_label(label.get_text())  # 保证文本存在
ax.tick_params(axis='x', pad=12)  

ax.set_yticklabels([]) 

# 设置图例在右侧居中

ax.legend(loc='center left', bbox_to_anchor=(1.15, 0.5), fontsize=12)

plt.tight_layout()
plt.savefig('/home/lizhijun/my_project/毕设论文画图/img/雷达图.svg')

# 取消标题（如需可启用）
# ax.set_title('Comparison of Methods Across Metric_


In [None]:
import matplotlib.pyplot as plt
import numpy as np

# 方法名映射字典：原始方法名 -> 显示名称
method_name_map = {
    'CHROME_DEHAAN': 'CHROM',
    'ica': 'ICA',
    'opt': 'RQO',
    'Green':'GREEN',
    
    # 可以继续添加其他方法映射
}

def str_to_list(s):
    s = s.strip('[]')
    return np.fromstring(s, sep=' ').tolist()

# df['hr_rppg_all'] = df['hr_rppg_all'].apply(str_to_list)
# df['hr_ppg_all'] = df['hr_ppg_all'].apply(str_to_list)

# =========================
# 展开每条记录中的多个值
# =========================
all_data = []
for _, row in df.iterrows():
    method = row['method']
    hr_es_list = row['hr_rppg_all']
    hr_gt_list = row['hr_ppg_all']
    for es, gt in zip(hr_es_list, hr_gt_list):
        all_data.append([method, es, gt])

df_expanded = pd.DataFrame(all_data, columns=['method', 'hr_es', 'hr_gt'])

methods = df_expanded['method'].unique()
methods = ['Green', 'ica', 'PCA', 'CHROME_DEHAAN', 'PBV', 'POS', 'LGI', 'OMIT', 'opt']
n_methods = len(methods)

# 子图布局：3x3
fig, axes = plt.subplots(3, 3, figsize=(14, 14))
axes = axes.flatten()  # 展平方便索引

band = 5  # 误差带宽度 ±5 bpm
title_fontsize = 20
label_fontsize = 20
tick_fontsize = 20
legend_fontsize = 20

for i, method_name in enumerate(methods):
    ax = axes[i]
    df_method = df_expanded[df_expanded['method'] == method_name]
    x = df_method['hr_gt']
    y = df_method['hr_es']
    
    ax.scatter(x, y, alpha=0.5)
    
    min_val = min(min(x), min(y))
    max_val = max(max(x), max(y))
    
    # 中心线 y=x
    ax.plot([min_val, max_val], [min_val, max_val], 'r--', label='y=x')
    
    # 误差带
    ax.fill_between(
        [min_val, max_val],
        [min_val - band, max_val - band],
        [min_val + band, max_val + band],
        color='orange', alpha=0.2, label=f'±{band} bpm'
    )
    
    # 设置显示名称
    display_name = method_name_map.get(method_name, method_name)
    ax.set_title(display_name, fontsize=title_fontsize)
    ax.set_xlabel('Ground Truth HR', fontsize=label_fontsize)
    ax.set_ylabel('Estimated HR', fontsize=label_fontsize)
    
    # 放大刻度
    ax.tick_params(axis='both', which='major', labelsize=tick_fontsize)
    
    ax.legend(fontsize=legend_fontsize)

# 调整布局
plt.tight_layout()
# plt.savefig('/home/lizhijun/my_project/毕设论文画图/img/九种方法散点图.svg')
plt.show()


In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

method_name_map = {
    'CHROME_DEHAAN': 'CHROM',
    'ica': 'ICA',
    'opt': 'RQO',
    'Green':'GREEN',
}

# 展开数据
all_data = []
for _, row in df.iterrows():
    for es, gt in zip(row['hr_rppg_all'], row['hr_ppg_all']):
        all_data.append([row['method'], es, gt])

df_expanded = pd.DataFrame(all_data, columns=['method', 'hr_es', 'hr_gt'])

methods = ['Green', 'ica', 'PCA', 'CHROME_DEHAAN', 'PBV', 'POS', 'LGI', 'OMIT', 'opt']

# ------- 全局范围 -------
vmin = min(df_expanded['hr_gt'].min(), df_expanded['hr_es'].min())
vmax = max(df_expanded['hr_gt'].max(), df_expanded['hr_es'].max())

# ------- 统一刻度 -------
ticks = np.linspace(vmin, vmax, 6)

band = 5

fig, axes = plt.subplots(3, 3, figsize=(14, 14))
axes = axes.flatten()

for i, method_name in enumerate(methods):
    ax = axes[i]
    df_m = df_expanded[df_expanded['method'] == method_name]

    x = df_m['hr_gt']
    y = df_m['hr_es']

    ax.scatter(x, y, alpha=0.5)

    ax.plot([vmin, vmax], [vmin, vmax], 'r--', label='y=x')

    ax.fill_between(
        [vmin, vmax],
        [vmin - band, vmax - band],
        [vmin + band, vmax + band],
        color='orange', alpha=0.2, label=f'±{band} bpm'
    )

    display_name = method_name_map.get(method_name, method_name)
    ax.set_title(display_name, fontsize=18)

    # —— 关键：范围 & 刻度完全一致 ——
    ax.set_xlim(vmin, vmax)
    ax.set_ylim(vmin, vmax)
    ax.set_xticks(ticks)
    ax.set_yticks(ticks)
    ax.set_aspect('equal', adjustable='box')

    ax.set_xlabel('Ground Truth HR')
    ax.set_ylabel('Estimated HR')
    ax.legend()

plt.tight_layout()
plt.show()


In [None]:
import matplotlib.pyplot as plt
import pandas as pd

# 读取数据
df = pd.read_csv("/home/lizhijun/rppg-bio/myfolder/results_hol_PURE_20s.csv")

# 创建子图
fig, axes = plt.subplots(4, 1, figsize=(10, 15))

# 绘制 RMSE 箱型图
df.boxplot(column='RMSE', by='method', ax=axes[0])
axes[0].set_title('RMSE Comparison')
axes[0].set_ylabel('RMSE')
axes[0].set_xlabel('')
axes[0].grid(True)

# 绘制 MAE 箱型图
df.boxplot(column='MAE', by='method', ax=axes[1])
axes[1].set_title('MAE Comparison')
axes[1].set_ylabel('MAE')
axes[1].set_xlabel('')
axes[1].grid(True)

# 绘制 SNR 箱型图
df.boxplot(column='SNR', by='method', ax=axes[2])
axes[2].set_title('SNR Comparison')
axes[2].set_ylabel('SNR')
axes[2].set_xlabel('')
axes[2].grid(True)

df.boxplot(column='CORR', by='method', ax=axes[3])
axes[3].set_title('coComparison')
axes[3].set_ylabel('SNR')
axes[3].set_xlabel('')
axes[3].grid(True)
# 调整布局
plt.tight_layout()
plt.show()

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

# 读取数据
df = pd.read_csv("/home/lizhijun/rppg-bio/myfolder/results_hol_PURE_20s.csv")

# 定义要展示的评价标准
metrics = ['RMSE', 'MAE', 'SNR', 'CORR', 'ACC']  # 替换为你的实际评价标准

# 创建子图
num_metrics = len(metrics)
fig, axes = plt.subplots(num_metrics, 1, figsize=(10, 5 * num_metrics))

# 如果只有一个评价标准，axes 不是数组，需要转换为数组
if num_metrics == 1:
    axes = [axes]

# 遍历每个评价标准，绘制箱型图
for i, metric in enumerate(metrics):
    sns.boxplot(data=df, x='method', y=metric, ax=axes[i])
    axes[i].set_title(f'{metric} Comparison')
    axes[i].set_ylabel(metric)
    axes[i].set_xlabel('')
    axes[i].grid(True)

# 调整布局
plt.tight_layout()
plt.show()

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

# 读取数据
df = pd.read_csv("/home/lizhijun/rppg-bio/myfolder/results_hol_PURE_20s.csv")

# 定义要展示的评价标准
metrics = ['RMSE', 'MAE', 'SNR', 'CORR', 'ACC']  # 替换为你的实际评价标准

# 创建子图
num_metrics = len(metrics)
fig, axes = plt.subplots(1, num_metrics, figsize=(5 * num_metrics, 6))  # 横向排列，调整 figsize

# 如果只有一个评价标准，axes 不是数组，需要转换为数组
if num_metrics == 1:
    axes = [axes]

# 遍历每个评价标准，绘制横向箱型图
for i, metric in enumerate(metrics):
    sns.boxplot(data=df, y='method', x=metric, ax=axes[i], orient='h')  # 使用 orient='h' 绘制横向箱型图
    axes[i].set_title(f'{metric} Comparison')
    axes[i].set_xlabel(metric)
    axes[i].set_ylabel('')
    axes[i].grid(True)

# 调整布局
plt.tight_layout()
plt.show()