# DDM² Quantitative Evaluation

量化评估各方法的去噪效果，计算以下指标：
- **MAE**: Mean Absolute Error (越低越好)
- **SSIM**: Structural Similarity Index (越高越好)
- **LPIPS**: Learned Perceptual Image Patch Similarity (越低越好)

对比方法：
- Noisy Input (原始噪声)
- Teacher N2N (Noise2Noise)
- DDM² First Step
- DDM² Final

In [None]:
#==============================================================================
# 配置
#==============================================================================

# 数据路径
EXCEL_PATH = "/host/d/file/fixedCT_static_simulation_train_test_gaussian_local.xlsx"
DATA_ROOT = "/host/d/file/simulation/"

# 预测结果路径
DDM2_PRED_ROOT = "/host/d/file/pre/ddm2/pred_images"      # DDM² 推理结果
N2N_PRED_ROOT = "/host/d/file/pre/noise2noise/pred_images"  # Teacher N2N 结果
N2N_EPOCH = 78

# 评估哪些 batch
EVAL_BATCHES = [5]  # 验证集

# HU 窗口范围 (用于计算 MAE 和 SSIM 的 mask)
VMIN = 0
VMAX = 100

# LPIPS 的 HU 范围
LPIPS_VMIN = -1000.0
LPIPS_VMAX = 2000.0

# Slice 范围
SLICE_START = 30
SLICE_END = 80

# 输出文件
OUTPUT_EXCEL = "/host/d/file/ddm2_quantitative_results.xlsx"

GPU_ID = "0"

In [None]:
# 初始化
import os
import numpy as np
import pandas as pd
import nibabel as nib
import torch
import lpips
from skimage.metrics import structural_similarity
from tqdm.auto import tqdm

os.environ['CUDA_VISIBLE_DEVICES'] = GPU_ID
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

print(f"Device: {device}")

In [None]:
# 指标计算函数

def calc_mae_with_ref_window(img, ref, vmin, vmax):
    maes = []
    for slice_num in range(0, img.shape[-1]):
        slice_img = img[:,:,slice_num]
        slice_ref = ref[:,:,slice_num]
        mask = np.where((slice_ref >= vmin) & (slice_ref <= vmax), 1, 0)
        mae = np.sum(np.abs(slice_img - slice_ref) * mask) / np.sum(mask)
        maes.append(mae)

    return np.mean(maes), np.std(maes)


def calc_ssim_with_ref_window(img, ref, vmin, vmax):
    ssims = []
    for slice_num in range(0, img.shape[-1]):
        slice_img = img[:,:,slice_num]
        slice_ref = ref[:,:,slice_num]
        mask = np.where((slice_ref >= vmin) & (slice_ref <= vmax), 1, 0)
        _, ssim_map = structural_similarity(slice_img, slice_ref, data_range=vmax - vmin, full=True)
        ssim = np.sum(ssim_map * mask) / np.sum(mask)
        ssims.append(ssim)

    return np.mean(ssims), np.std(ssims)


def calc_lpips(imgs1, imgs2, vmin, vmax, loss_fn):
    lpipss = []
    for slice_num in range(0, imgs1.shape[-1]):
        slice1 = imgs1[:,:,slice_num]
        slice2 = imgs2[:,:,slice_num]

        slice1 = np.clip(slice1, vmin, vmax).astype(np.float32)
        slice2 = np.clip(slice2, vmin, vmax).astype(np.float32)

        slice1 = (slice1 - vmin) / (vmax - vmin) * 2 - 1
        slice2 = (slice2 - vmin) / (vmax - vmin) * 2 - 1

        slice1 = np.stack([slice1, slice1, slice1], axis=-1)
        slice2 = np.stack([slice2, slice2, slice2], axis=-1)

        slice1 = np.transpose(slice1, (2, 0, 1))[np.newaxis, ...]
        slice2 = np.transpose(slice2, (2, 0, 1))[np.newaxis, ...]

        slice1 = torch.from_numpy(slice1).to(device)
        slice2 = torch.from_numpy(slice2).to(device)

        with torch.no_grad():
            lpips_val = loss_fn(slice1, slice2)
        lpipss.append(lpips_val.item())

    return np.mean(lpipss), np.std(lpipss)

In [None]:
# 构建患者列表

df = pd.read_excel(EXCEL_PATH)
df = df[df['batch'].isin(EVAL_BATCHES)].reset_index(drop=True)

# 按 Patient_ID 和 Patient_subID 分组，只取 random_num=0
df_r0 = df[df['random_num'] == 0].reset_index(drop=True)

patients = []
for _, row in df_r0.iterrows():
    patients.append({
        'patient_id': row['Patient_ID'],
        'patient_subid': row['Patient_subID'],
        'gt_file': row['x0_file'],
        'noise_file': row['noise_file']
    })

print(f"评估患者数: {len(patients)}")

In [None]:
# 辅助函数

def get_paths(patient_id, patient_subid):
    pid_str = f"{int(patient_id):08d}"
    psid_str = f"{int(patient_subid):010d}"
    
    paths = {
        'n2n': os.path.join(N2N_PRED_ROOT, pid_str, psid_str, 'random_0', f'epoch{N2N_EPOCH}', 'pred_img.nii.gz'),
        'ddm2_noisy': os.path.join(DDM2_PRED_ROOT, pid_str, psid_str, 'noisy_input.nii.gz'),
        'ddm2_n2n': os.path.join(DDM2_PRED_ROOT, pid_str, psid_str, 'n2n_teacher.nii.gz'),
        'ddm2_first': os.path.join(DDM2_PRED_ROOT, pid_str, psid_str, 'ddm2_first_step.nii.gz'),
        'ddm2_final': os.path.join(DDM2_PRED_ROOT, pid_str, psid_str, 'ddm2_final.nii.gz'),
    }
    
    return paths


def load_volume(path, slice_start=None, slice_end=None):
    if not os.path.exists(path):
        return None
    
    npy_path = path.replace('.nii.gz', '.npy')
    if os.path.exists(npy_path):
        data = np.load(npy_path)
    else:
        data = nib.load(path).get_fdata()
    
    if slice_start is not None and slice_end is not None:
        if data.shape[-1] > slice_end - slice_start:
            data = data[:, :, slice_start:slice_end]
    
    return data.astype(np.float32)

In [None]:
# 初始化 LPIPS 模型
print("Loading LPIPS model...")
lpips_fn = lpips.LPIPS().to(device)
print("LPIPS model loaded!")

In [None]:
# 主评估循环

results = []

for p in tqdm(patients, desc="Evaluating"):
    patient_id = p['patient_id']
    patient_subid = p['patient_subid']
    print(patient_id, patient_subid)
    
    # 获取路径
    paths = get_paths(patient_id, patient_subid)
    
    # 加载 GT (Ground Truth)
    gt_path = p['gt_file'].replace('/host/d/file/simulation/', DATA_ROOT)
    gt = load_volume(gt_path, SLICE_START, SLICE_END)
    if gt is None:
        print(f"  Skip {patient_id}: GT not found")
        continue
    
    # 加载各方法结果
    noisy = load_volume(paths['ddm2_noisy'])
    n2n = load_volume(paths['ddm2_n2n']) or load_volume(paths['n2n'])
    ddm2_first = load_volume(paths['ddm2_first'])
    ddm2_final = load_volume(paths['ddm2_final'])
    
    # 确保 shape 一致
    target_slices = gt.shape[-1]
    
    def match_slices(img):
        if img is None:
            return None
        if img.shape[-1] != target_slices:
            return img[:, :, :target_slices]
        return img
    
    noisy = match_slices(noisy)
    n2n = match_slices(n2n)
    ddm2_first = match_slices(ddm2_first)
    ddm2_final = match_slices(ddm2_final)
    
    # 计算各方法的指标
    row = {
        'patient_id': patient_id,
        'patient_subid': patient_subid,
    }
    
    # Noisy
    if noisy is not None:
        mae_noisy, _ = calc_mae_with_ref_window(noisy, gt, VMIN, VMAX)
        ssim_noisy, _ = calc_ssim_with_ref_window(noisy, gt, VMIN, VMAX)
        lpips_noisy, _ = calc_lpips(noisy, gt, LPIPS_VMIN, LPIPS_VMAX, lpips_fn)
        row['mae_noisy'] = mae_noisy
        row['ssim_noisy'] = ssim_noisy
        row['lpips_noisy'] = lpips_noisy
        print(f'noisy: MAE={mae_noisy:.4f}, SSIM={ssim_noisy:.4f}, LPIPS={lpips_noisy:.4f}')
    
    # N2N
    if n2n is not None:
        mae_n2n, _ = calc_mae_with_ref_window(n2n, gt, VMIN, VMAX)
        ssim_n2n, _ = calc_ssim_with_ref_window(n2n, gt, VMIN, VMAX)
        lpips_n2n, _ = calc_lpips(n2n, gt, LPIPS_VMIN, LPIPS_VMAX, lpips_fn)
        row['mae_n2n'] = mae_n2n
        row['ssim_n2n'] = ssim_n2n
        row['lpips_n2n'] = lpips_n2n
        print(f'n2n: MAE={mae_n2n:.4f}, SSIM={ssim_n2n:.4f}, LPIPS={lpips_n2n:.4f}')
    
    # DDM2 First
    if ddm2_first is not None:
        mae_first, _ = calc_mae_with_ref_window(ddm2_first, gt, VMIN, VMAX)
        ssim_first, _ = calc_ssim_with_ref_window(ddm2_first, gt, VMIN, VMAX)
        lpips_first, _ = calc_lpips(ddm2_first, gt, LPIPS_VMIN, LPIPS_VMAX, lpips_fn)
        row['mae_ddm2_first'] = mae_first
        row['ssim_ddm2_first'] = ssim_first
        row['lpips_ddm2_first'] = lpips_first
        print(f'ddm2_first: MAE={mae_first:.4f}, SSIM={ssim_first:.4f}, LPIPS={lpips_first:.4f}')
    
    # DDM2 Final
    if ddm2_final is not None:
        mae_final, _ = calc_mae_with_ref_window(ddm2_final, gt, VMIN, VMAX)
        ssim_final, _ = calc_ssim_with_ref_window(ddm2_final, gt, VMIN, VMAX)
        lpips_final, _ = calc_lpips(ddm2_final, gt, LPIPS_VMIN, LPIPS_VMAX, lpips_fn)
        row['mae_ddm2_final'] = mae_final
        row['ssim_ddm2_final'] = ssim_final
        row['lpips_ddm2_final'] = lpips_final
        print(f'ddm2_final: MAE={mae_final:.4f}, SSIM={ssim_final:.4f}, LPIPS={lpips_final:.4f}')
    
    results.append(row)
    
    # 保存中间结果
    df_results = pd.DataFrame(results)
    df_results.to_excel(OUTPUT_EXCEL, index=False)

print(f"\n评估完成: {len(results)} 个患者")

In [None]:
# 汇总统计

df_results = pd.read_excel(OUTPUT_EXCEL)

print("\n" + "=" * 80)
print("量化结果汇总")
print("=" * 80)

metrics = ['mae', 'ssim', 'lpips']
methods = ['noisy', 'n2n', 'ddm2_first', 'ddm2_final']

for metric in metrics:
    print(f"\n{metric.upper()}:")
    print("-" * 60)
    print(f"{'Method':<15} {'Mean':>12} {'Std':>12}")
    print("-" * 60)
    
    for method in methods:
        col = f'{metric}_{method}'
        if col in df_results.columns:
            vals = df_results[col].dropna()
            if len(vals) > 0:
                print(f"{method:<15} {vals.mean():>12.4f} {vals.std():>12.4f}")

print("\n" + "=" * 80)