In [1]:
import os
import sys
from pathlib import Path

import torch

# 确保能 import 到你的 LAR_IQA 工程
sys.path.append("..")  # 根据你的 ipynb 所在路径调整

from bak.LAR_IQA.scripts.utils import load_model


def export_lar_iqa_onnx(
    checkpoint_path: str = "../bak/LAR_IQA/checkpoint_epoch_3.pt",
    out_dir: str = "./out",
    onnx_name: str = "lar_iqa.onnx",
    use_cuda: bool = True,
):
    # 1. 选择设备
    device = "cuda" if (use_cuda and torch.cuda.is_available()) else "cpu"
    print(f"[INFO] Using device: {device}")

    # 2. 加载模型
    ckpt_path = Path(checkpoint_path).resolve()
    if not ckpt_path.exists():
        raise FileNotFoundError(f"Checkpoint not found: {ckpt_path}")

    print(f"[INFO] Loading checkpoint from: {ckpt_path}")
    model = load_model(str(ckpt_path), False, device)
    model.eval()

    # 3. 构造 dummy 输入（与 preprocess_image 输出形状一致）
    #
    # preprocess_image 中：
    #   image_authentic: Resize 到 (384, 384)
    #   image_synthetic: CenterCrop 到 (1280, 1280)
    #
    # 所以 dummy 输入分别是 [1, 3, 384, 384] 和 [1, 3, 1280, 1280]
    image_authentic = torch.randn(1, 3, 384, 384, device=device)
    image_synthetic = torch.randn(1, 3, 1280, 1280, device=device)

    # 4. 确保导出目录存在
    out_path = Path(out_dir)
    out_path.mkdir(parents=True, exist_ok=True)
    onnx_path = out_path / onnx_name

    print(f"[INFO] Exporting ONNX to: {onnx_path}")

    # 5. 导出 ONNX
    torch.onnx.export(
        model,
        (image_authentic, image_synthetic),  # 模型的两个输入
        onnx_path.as_posix(),
        export_params=True,  # 保存权重到 ONNX
        opset_version=18,  # 常用的较新 opset 版本（你也可以改成 16/18）
        do_constant_folding=True,  # 常量折叠优化
        input_names=["image_authentic", "image_synthetic"],
        output_names=["score"],
        dynamic_axes={  # 只把 batch 维做成动态，空间尺寸固定
            "image_authentic": {0: "batch_size"},
            "image_synthetic": {0: "batch_size"},
            "score": {0: "batch_size"},
        },
    )

    print("[INFO] ONNX export finished.")
    print(f"[INFO] ONNX model saved at: {onnx_path}")
    return onnx_path


# 在 ipynb 中直接跑这一段即可导出
if __name__ == "__main__":
    export_lar_iqa_onnx()


[INFO] Using device: cuda
[INFO] Loading checkpoint from: F:\ML\PythonAIProject\SMARKMediaTools_web\electron-media-toolbox\bak\LAR_IQA\checkpoint_epoch_3.pt
[INFO] Exporting ONNX to: out\lar_iqa.onnx


  torch.onnx.export(


[torch.onnx] Obtain model graph for `MobileNetMerged([...]` with `torch.export.export(..., strict=False)`...
[torch.onnx] Obtain model graph for `MobileNetMerged([...]` with `torch.export.export(..., strict=False)`... ✅
[torch.onnx] Run decomposition...
[torch.onnx] Run decomposition... ✅
[torch.onnx] Translate the graph into ONNX...
[torch.onnx] Translate the graph into ONNX... ✅
Applied 184 of general pattern rewrite rules.
[INFO] ONNX export finished.
[INFO] ONNX model saved at: out\lar_iqa.onnx


-------------------------------

In [1]:
import os
import sys
from pathlib import Path
from typing import Literal, Optional

import torch
import onnxruntime as ort  # 新增：用于离线图优化

# 确保能 import 到你的 LAR_IQA 工程
sys.path.append("..")  # 根据你的 ipynb 所在路径调整

from bak.LAR_IQA.scripts.utils import load_model


def _pick_ort_providers(prefer: Literal["directml", "cpu", "auto"] = "auto"):
    """
    根据当前环境选择合适的 onnxruntime providers：
    - 优先 DirectML（如果安装了 onnxruntime-directml 并可用）
    - 否则退回 CPUExecutionProvider
    """
    available = ort.get_available_providers()
    providers: list[str]

    if prefer in ("directml", "auto") and "DmlExecutionProvider" in available:
        providers = ["DmlExecutionProvider", "CPUExecutionProvider"]
        print(f"[ORT] Use providers: {providers}")
    else:
        providers = ["CPUExecutionProvider"]
        print(f"[ORT] Use providers: {providers}")

    return providers


def _pick_optimization_level(
    level: Literal["disable", "basic", "extended", "all"] = "extended",
) -> ort.GraphOptimizationLevel:
    """
    将字符串映射到 onnxruntime.GraphOptimizationLevel。
    默认使用 extended：basic + extended 的图融合，兼容 CPU / DirectML。
    """
    if level == "disable":
        return ort.GraphOptimizationLevel.ORT_DISABLE_ALL
    if level == "basic":
        return ort.GraphOptimizationLevel.ORT_ENABLE_BASIC
    if level == "all":
        # all = basic + extended + layout（NCHWc，CPU 专用）
        return ort.GraphOptimizationLevel.ORT_ENABLE_ALL
    # 默认 extended
    return ort.GraphOptimizationLevel.ORT_ENABLE_EXTENDED


def export_lar_iqa_onnx(
    checkpoint_path: str = "../bak/LAR_IQA/checkpoint_epoch_3.pt",
    out_dir: str = "./out",
    onnx_name: str = "lar_iqa.onnx",
    use_cuda: bool = True,
    # 新增：是否做 ORT 离线图优化
    optimize_with_ort: bool = True,
    # 新增：图优化等级（推荐 "extended"）
    optimization_level: Literal["disable", "basic", "extended", "all"] = "extended",
    # 新增：优化时偏好的 provider（"directml" / "cpu" / "auto"）
    optimize_provider: Literal["directml", "cpu", "auto"] = "auto",
    # 新增：优化后模型文件名（None 则使用 `<stem>_opt.onnx`）
    optimized_name: Optional[str] = None,
):
    # ---------------------------------------------------------------------------
    # 1. 选择 PyTorch 导出时使用的设备（只影响导出阶段的 dummy forward）
    # ---------------------------------------------------------------------------
    device = "cuda" if (use_cuda and torch.cuda.is_available()) else "cpu"
    print(f"[INFO] Using device for export: {device}")

    # ---------------------------------------------------------------------------
    # 2. 加载 PyTorch 模型
    # ---------------------------------------------------------------------------
    ckpt_path = Path(checkpoint_path).resolve()
    if not ckpt_path.exists():
        raise FileNotFoundError(f"Checkpoint not found: {ckpt_path}")

    print(f"[INFO] Loading checkpoint from: {ckpt_path}")
    model = load_model(str(ckpt_path), False, device)
    model.eval()

    # ---------------------------------------------------------------------------
    # 3. 构造 dummy 输入（与 preprocess_image 输出形状一致）
    #    image_authentic: [1, 3, 384, 384]
    #    image_synthetic: [1, 3, 1280, 1280]
    # ---------------------------------------------------------------------------
    image_authentic = torch.randn(1, 3, 384, 384, device=device)
    image_synthetic = torch.randn(1, 3, 1280, 1280, device=device)

    # ---------------------------------------------------------------------------
    # 4. 确保导出目录存在
    # ---------------------------------------------------------------------------
    out_path = Path(out_dir)
    out_path.mkdir(parents=True, exist_ok=True)
    onnx_path = out_path / onnx_name

    print(f"[INFO] Exporting ONNX to: {onnx_path}")

    # ---------------------------------------------------------------------------
    # 5. 使用 torch.onnx.export 导出原始 ONNX
    # ---------------------------------------------------------------------------
    torch.onnx.export(
        model,
        (image_authentic, image_synthetic),  # 模型的两个输入
        onnx_path.as_posix(),
        export_params=True,  # 保存权重到 ONNX
        opset_version=18,  # 你之前使用的 opset
        do_constant_folding=True,  # 常量折叠优化
        input_names=["image_authentic", "image_synthetic"],
        output_names=["score"],
        dynamic_axes={  # 只把 batch 维做成动态，空间尺寸固定
            "image_authentic": {0: "batch_size"},
            "image_synthetic": {0: "batch_size"},
            "score": {0: "batch_size"},
        },
    )

    print("[INFO] ONNX export finished.")
    print(f"[INFO] Raw ONNX model saved at: {onnx_path}")

    # ---------------------------------------------------------------------------
    # 6. 可选：使用 ONNX Runtime 做一次“离线图优化”，生成 *_opt.onnx
    # ---------------------------------------------------------------------------
    if not optimize_with_ort:
        print("[INFO] Skip ORT graph optimization (optimize_with_ort=False).")
        return onnx_path

    # 6.1 选择优化后的输出路径
    if optimized_name is None:
        optimized_onnx_path = onnx_path.with_name(onnx_path.stem + "_opt.onnx")
    else:
        optimized_onnx_path = out_path / optimized_name

    # 6.2 配置 SessionOptions
    sess_options = ort.SessionOptions()
    sess_options.graph_optimization_level = _pick_optimization_level(optimization_level)
    # 设置离线优化输出路径：初始化 Session 时会把优化后的模型写到这里
    sess_options.optimized_model_filepath = optimized_onnx_path.as_posix()

    print(f"[INFO] Start ORT offline optimization -> {optimized_onnx_path} (level={optimization_level})")

    # 6.3 选择 provider（优先 DirectML，否则 CPU）
    providers = _pick_ort_providers(optimize_provider)

    # 6.4 创建 Session（只为触发优化与序列化，不必真正推理）
    _ = ort.InferenceSession(
        onnx_path.as_posix(),
        sess_options,
        providers=providers,
    )

    print("[INFO] ORT graph optimization finished.")
    print(f"[INFO] Optimized ONNX model saved at: {optimized_onnx_path}")

    return optimized_onnx_path


# 在 ipynb 中直接跑这一段即可导出
if __name__ == "__main__":
    export_lar_iqa_onnx()


[INFO] Using device for export: cuda
[INFO] Loading checkpoint from: F:\ML\PythonAIProject\SMARKMediaTools_web\electron-media-toolbox\bak\LAR_IQA\checkpoint_epoch_3.pt
[INFO] Exporting ONNX to: out\lar_iqa.onnx


  torch.onnx.export(


[torch.onnx] Obtain model graph for `MobileNetMerged([...]` with `torch.export.export(..., strict=False)`...
[torch.onnx] Obtain model graph for `MobileNetMerged([...]` with `torch.export.export(..., strict=False)`... ✅
[torch.onnx] Run decomposition...
[torch.onnx] Run decomposition... ✅
[torch.onnx] Translate the graph into ONNX...
[torch.onnx] Translate the graph into ONNX... ✅
Applied 184 of general pattern rewrite rules.
[INFO] ONNX export finished.
[INFO] Raw ONNX model saved at: out\lar_iqa.onnx
[INFO] Start ORT offline optimization -> out\lar_iqa_opt.onnx (level=extended)
[ORT] Use providers: ['DmlExecutionProvider', 'CPUExecutionProvider']
[INFO] ORT graph optimization finished.
[INFO] Optimized ONNX model saved at: out\lar_iqa_opt.onnx


In [2]:
# %% [markdown]
# 对比 LAR-IQA 的 PyTorch / ONNX / ONNX-OPT 的数值误差与推理速度

import os
import sys
import time
from pathlib import Path
from typing import List, Tuple

import numpy as np
import torch
import onnxruntime as ort

# 让 Python 找到你的 LAR_IQA 工程
sys.path.append("..")  # 根据 notebook 所在位置适当调整

# 根据你的工程结构选择其中一个导入方式：
# from packages.LAR_IQA.scripts.utils import load_model, preprocess_image
from bak.LAR_IQA.scripts.utils import load_model, preprocess_image


# ----------------- 路径配置：按需修改 -----------------

CHECKPOINT_PATH = "../bak/LAR_IQA/checkpoint_epoch_3.pt"
ONNX_RAW_PATH = "./out/lar_iqa.onnx"
ONNX_OPT_PATH = "./out/lar_iqa_opt.onnx"

# 测试图片目录：请改成你实际存放测试照片的文件夹
IMAGE_DIR = r"N:\待整理\2025.10.19 上海\个人导出\DxO\导出"  # TODO: 修改为你自己的图片目录

# ONNX provider 配置（按需切换）
# - 只用 CPU: ["CPUExecutionProvider"]
# - CUDA: ["CUDAExecutionProvider", "CPUExecutionProvider"]
# - DirectML: ["DmlExecutionProvider", "CPUExecutionProvider"]
# ORT_PROVIDERS = ["CUDAExecutionProvider", "CPUExecutionProvider"]  # 如果你想测 DirectML 就改成 ["DmlExecutionProvider", "CPUExecutionProvider"]
ORT_PROVIDERS = ["DmlExecutionProvider", "CPUExecutionProvider"]  # 如果你想测 DirectML 就改成 ["DmlExecutionProvider", "CPUExecutionProvider"]


# ----------------- 帮助函数 -----------------


def collect_image_paths(folder: str) -> List[Path]:
    exts = [".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff", ".webp"]
    folder = Path(folder)
    if not folder.exists():
        raise FileNotFoundError(f"IMAGE_DIR 不存在: {folder.resolve()}")
    paths: List[Path] = []
    for ext in exts:
        paths.extend(folder.rglob(f"*{ext}"))
    paths = sorted(paths)
    if not paths:
        raise RuntimeError(f"在 {folder.resolve()} 下没有找到任何图片文件")
    return paths


def preprocess_all_images(image_paths: List[Path]):
    """
    使用原来的 preprocess_image（PIL+torchvision），
    在 CPU 上预处理一次，后面 Torch / ONNX 都重用这些张量。
    返回：List[(path, image_auth_cpu, image_syn_cpu)]
    """
    results = []
    for p in image_paths:
        # 始终在 CPU 上预处理，确保 .numpy() 可用
        img_auth_cpu, img_syn_cpu = preprocess_image(str(p), "RGB", "cpu")
        results.append((p, img_auth_cpu, img_syn_cpu))
    return results


def run_torch_inference(
    preprocessed,
    checkpoint_path: str,
    device: torch.device,
):
    print(f"\n[PyTorch] device = {device}")
    ckpt_path = Path(checkpoint_path).resolve()
    if not ckpt_path.exists():
        raise FileNotFoundError(f"Checkpoint not found: {ckpt_path}")

    model = load_model(str(ckpt_path), False, device)
    model.eval()

    scores = []

    # warmup
    with torch.no_grad():
        p0, auth0_cpu, syn0_cpu = preprocessed[0]
        auth0 = auth0_cpu.to(device)
        syn0 = syn0_cpu.to(device)
        _ = model(auth0, syn0)
        if device.type == "cuda":
            torch.cuda.synchronize()

    if device.type == "cuda":
        torch.cuda.synchronize()
    t0 = time.perf_counter()

    with torch.no_grad():
        for _, auth_cpu, syn_cpu in preprocessed:
            auth = auth_cpu.to(device, non_blocking=True)
            syn = syn_cpu.to(device, non_blocking=True)
            out = model(auth, syn)
            score = float(out.detach().cpu().item())
            scores.append(score)

    if device.type == "cuda":
        torch.cuda.synchronize()
    t1 = time.perf_counter()

    total_time = t1 - t0
    avg_time = total_time / len(preprocessed)
    print(f"[PyTorch] Total: {total_time:.4f}s, Per image: {avg_time * 1000:.3f} ms")

    return np.array(scores, dtype=np.float32), total_time, avg_time


def make_onnx_session(onnx_path: str, providers):
    onnx_path = Path(onnx_path).resolve()
    if not onnx_path.exists():
        raise FileNotFoundError(f"ONNX model not found: {onnx_path}")
    sess_options = ort.SessionOptions()
    # 这里只做推理，不再做离线图优化，因此不设 optimized_model_filepath
    sess = ort.InferenceSession(onnx_path.as_posix(), sess_options, providers=providers)
    print(f"[ONNX] Loaded: {onnx_path.name}, providers = {sess.get_providers()}")
    return sess


def run_onnx_inference(
    preprocessed,
    session: ort.InferenceSession,
    label: str = "ONNX",
):
    scores = []

    # warmup
    p0, auth0_cpu, syn0_cpu = preprocessed[0]
    warm_inputs = {
        "image_authentic": auth0_cpu.numpy(),
        "image_synthetic": syn0_cpu.numpy(),
    }
    _ = session.run(None, warm_inputs)

    t0 = time.perf_counter()
    for _, auth_cpu, syn_cpu in preprocessed:
        inputs = {
            "image_authentic": auth_cpu.numpy(),
            "image_synthetic": syn_cpu.numpy(),
        }
        outputs = session.run(None, inputs)
        # outputs[0] 预期形状为 [1, 1] 或 [1]
        score = float(np.array(outputs[0]).reshape(-1)[0])
        scores.append(score)
    t1 = time.perf_counter()

    total_time = t1 - t0
    avg_time = total_time / len(preprocessed)
    print(f"[{label}] Total: {total_time:.4f}s, Per image: {avg_time * 1000:.3f} ms")

    return np.array(scores, dtype=np.float32), total_time, avg_time


def summarize_error(
    ref_scores: np.ndarray,
    test_scores: np.ndarray,
    name: str,
):
    diff = test_scores - ref_scores
    abs_diff = np.abs(diff)
    mae = abs_diff.mean()
    max_abs = abs_diff.max()
    rmse = np.sqrt((diff**2).mean())
    print(f"\n[{name}] vs PyTorch 误差统计：\n  MAE  = {mae:.6f}\n  RMSE = {rmse:.6f}\n  Max |Δ| = {max_abs:.6f}")


# ----------------- 主流程 -----------------


def main():
    # 1) 收集测试图片
    image_paths = collect_image_paths(IMAGE_DIR)
    print(f"[INFO] 找到 {len(image_paths)} 张测试图片。示例：")
    for p in image_paths[:5]:
        print("   -", p)

    # 2) 统一预处理（CPU）
    print("\n[INFO] 预处理所有图片（preprocess_image, RGB）...")
    preprocessed = preprocess_all_images(image_paths)
    print("[INFO] 预处理完成。")

    # 3) PyTorch 推理
    torch_device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    torch_scores, t_torch, avg_torch = run_torch_inference(
        preprocessed,
        CHECKPOINT_PATH,
        torch_device,
    )

    # 4) ONNX 原始模型推理
    sess_raw = make_onnx_session(ONNX_RAW_PATH, ORT_PROVIDERS)
    onnx_raw_scores, t_raw, avg_raw = run_onnx_inference(
        preprocessed,
        sess_raw,
        label="ONNX-raw",
    )

    # 5) ONNX 优化模型推理
    sess_opt = make_onnx_session(ONNX_OPT_PATH, ORT_PROVIDERS)
    onnx_opt_scores, t_opt, avg_opt = run_onnx_inference(
        preprocessed,
        sess_opt,
        label="ONNX-opt",
    )

    # 6) 数值误差统计（相对 PyTorch）
    summarize_error(torch_scores, onnx_raw_scores, name="ONNX-raw")
    summarize_error(torch_scores, onnx_opt_scores, name="ONNX-opt")

    # 7) 速度对比汇总
    n = len(preprocessed)
    print("\n========== 推理速度对比（{} 张图） ==========".format(n))
    print(f"PyTorch   : total {t_torch:.4f}s, per image {avg_torch * 1000:.3f} ms")
    print(f"ONNX-raw  : total {t_raw:.4f}s, per image {avg_raw * 1000:.3f} ms")
    print(f"ONNX-opt  : total {t_opt:.4f}s, per image {avg_opt * 1000:.3f} ms")


# 在 notebook 中直接调用 main() 即可
if __name__ == "__main__":
    main()


[INFO] 找到 57 张测试图片。示例：
   - N:\待整理\2025.10.19 上海\个人导出\DxO\导出\Z30_5655-NEF.jpg
   - N:\待整理\2025.10.19 上海\个人导出\DxO\导出\Z30_5656-NEF.jpg
   - N:\待整理\2025.10.19 上海\个人导出\DxO\导出\Z30_5718-NEF.jpg
   - N:\待整理\2025.10.19 上海\个人导出\DxO\导出\Z30_5719-NEF.jpg
   - N:\待整理\2025.10.19 上海\个人导出\DxO\导出\Z30_5720-NEF.jpg

[INFO] 预处理所有图片（preprocess_image, RGB）...
[INFO] 预处理完成。

[PyTorch] device = cuda
[PyTorch] Total: 1.8677s, Per image: 32.766 ms
[ONNX] Loaded: lar_iqa.onnx, providers = ['DmlExecutionProvider', 'CPUExecutionProvider']
[ONNX-raw] Total: 1.2670s, Per image: 22.229 ms
[ONNX] Loaded: lar_iqa_opt.onnx, providers = ['DmlExecutionProvider', 'CPUExecutionProvider']
[ONNX-opt] Total: 1.2622s, Per image: 22.143 ms

[ONNX-raw] vs PyTorch 误差统计：
  MAE  = 0.000316
  RMSE = 0.000377
  Max |Δ| = 0.000941

[ONNX-opt] vs PyTorch 误差统计：
  MAE  = 0.000316
  RMSE = 0.000377
  Max |Δ| = 0.000941

PyTorch   : total 1.8677s, per image 32.766 ms
ONNX-raw  : total 1.2670s, per image 22.229 ms
ONNX-opt  : total 1.2622s, 

--------------------

In [4]:
# %% [markdown]
# 对比 LAR-IQA 的 PyTorch / ONNX 推理结果与耗时：
# - 阶段 1：OpenCV 读图时间
# - 阶段 2：预处理时间（CPU 与 GPU 各自一份）
# - 阶段 3：模型推理时间（PyTorch / ONNX）

import os
import sys
import time
from pathlib import Path
from typing import List, Tuple

import cv2
import numpy as np
import torch
import torch.nn.functional as F
import onnxruntime as ort

# 让 Python 找到你的 LAR_IQA 工程
sys.path.append("..")  # 根据 notebook 所在位置适当调整

from bak.LAR_IQA.scripts.utils import load_model  # 只用 load_model，不再用原始 preprocess_image

# ----------------- 路径配置：按需修改 -----------------

CHECKPOINT_PATH = "../bak/LAR_IQA/checkpoint_epoch_3.pt"
ONNX_RAW_PATH = "./out/lar_iqa.onnx"

# 测试图片目录：请改成你实际存放测试照片的文件夹
IMAGE_DIR = r"N:\待整理\2025.10.19 上海\个人导出\DxO\导出"

# ONNX provider 配置（按需切换）
# - 只用 CPU: ["CPUExecutionProvider"]
# - CUDA: ["CUDAExecutionProvider", "CPUExecutionProvider"]
# - DirectML: ["DmlExecutionProvider", "CPUExecutionProvider"]
ORT_PROVIDERS = ["DmlExecutionProvider", "CPUExecutionProvider"]


# ----------------- 工具函数 -----------------


def collect_image_paths(folder: str) -> List[Path]:
    exts = [".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff", ".webp"]
    folder = Path(folder)
    if not folder.exists():
        raise FileNotFoundError(f"IMAGE_DIR 不存在: {folder.resolve()}")
    paths: List[Path] = []
    for ext in exts:
        paths.extend(folder.rglob(f"*{ext}"))
    paths = sorted(paths)
    if not paths:
        raise RuntimeError(f"在 {folder.resolve()} 下没有找到任何图片文件")
    return paths


def cv_imread_unicode(path_str: str) -> np.ndarray:
    """
    使用 OpenCV 读取图片，兼容中文/特殊路径。
    """
    data = np.fromfile(path_str, dtype=np.uint8)
    img = cv2.imdecode(data, cv2.IMREAD_COLOR)
    if img is None:
        raise RuntimeError(f"cv_imdecode 读取失败: {path_str}")
    return img  # BGR, uint8


def read_all_images_cv2(image_paths: List[Path]):
    """
    阶段 1：统一读图（OpenCV），返回 [(Path, bgr_np)]，并统计总耗时。
    """
    images: List[Tuple[Path, np.ndarray]] = []
    t0 = time.perf_counter()
    for p in image_paths:
        img = cv_imread_unicode(str(p))
        images.append((p, img))
    t1 = time.perf_counter()
    total = t1 - t0
    avg = total / len(images)
    print(f"[Stage-1 Read] Total: {total:.4f}s, Per image: {avg * 1000:.3f} ms, Num: {len(images)}")
    return images, total, avg


def preprocess_single_from_bgr(
    bgr: np.ndarray,
    device: torch.device,
) -> Tuple[torch.Tensor, torch.Tensor]:
    """
    从 OpenCV BGR 图像，按训练时的逻辑生成：
      - image_authentic: Resize 到 (384, 384)
      - image_synthetic: CenterCrop 到 (1280, 1280)
    并做 ToTensor + Normalize，返回 shape=[1,3,H,W] 的 tensor（在指定 device 上）。
    """
    # BGR -> RGB，转为 float32 [0,1]，NCHW
    # 注意 .copy() 避免 from_numpy 引用只读内存
    rgb = bgr[..., ::-1].copy()  # HWC, RGB
    img = torch.from_numpy(rgb).to(device=device, dtype=torch.float32) / 255.0
    img = img.permute(2, 0, 1).unsqueeze(0)  # [1,3,H,W]

    # authentic: Resize 到 384x384（双线性插值）
    image_auth = F.interpolate(img, size=(384, 384), mode="bilinear", align_corners=False)

    # synthetic: CenterCrop 到 1280x1280（不足时对边缘做对称 padding）
    _, _, h, w = img.shape
    crop_h = crop_w = 1280

    if h < crop_h or w < crop_w:
        # 对称 padding 到至少 1280，再居中裁剪
        pad_top = max((crop_h - h) // 2, 0)
        pad_bottom = max(crop_h - h - pad_top, 0)
        pad_left = max((crop_w - w) // 2, 0)
        pad_right = max(crop_w - w - pad_left, 0)
        img_padded = F.pad(img, (pad_left, pad_right, pad_top, pad_bottom))
    else:
        img_padded = img

    _, _, hp, wp = img_padded.shape
    top = (hp - crop_h) // 2
    left = (wp - crop_w) // 2
    image_syn = img_padded[:, :, top : top + crop_h, left : left + crop_w]

    # Normalize（ImageNet 均值/方差）
    mean = torch.tensor([0.485, 0.456, 0.406], device=device).view(1, 3, 1, 1)
    std = torch.tensor([0.229, 0.224, 0.225], device=device).view(1, 3, 1, 1)

    image_auth = (image_auth - mean) / std
    image_syn = (image_syn - mean) / std

    return image_auth, image_syn  # [1,3,H,W]


def preprocess_all_images(
    images_bgr: List[Tuple[Path, np.ndarray]],
    device: torch.device,
    label: str,
):
    """
    阶段 2：对读入的 BGR 图像做预处理，返回 [(Path, auth_tensor, syn_tensor)]。
    auth/syn 张量已经在指定 device 上。
    """
    results = []
    if device.type == "cuda":
        torch.cuda.empty_cache()

    t0 = time.perf_counter()
    for p, bgr in images_bgr:
        auth, syn = preprocess_single_from_bgr(bgr, device)
        results.append((p, auth, syn))
    if device.type == "cuda":
        torch.cuda.synchronize()
    t1 = time.perf_counter()

    total = t1 - t0
    avg = total / len(results)
    print(f"[Stage-2 Preprocess-{label}] Total: {total:.4f}s, Per image: {avg * 1000:.3f} ms, Num: {len(results)}")
    return results, total, avg


def run_torch_inference(
    preprocessed,
    checkpoint_path: str,
    device: torch.device,
):
    """
    阶段 3-A：PyTorch 推理。
    preprocessed: [(Path, auth_tensor, syn_tensor)]，auth/syn 已在 device 上。
    """
    print(f"\n[PyTorch] device = {device}")
    ckpt_path = Path(checkpoint_path).resolve()
    if not ckpt_path.exists():
        raise FileNotFoundError(f"Checkpoint not found: {ckpt_path}")

    model = load_model(str(ckpt_path), False, device)
    model.eval()

    scores = []

    # warmup
    with torch.no_grad():
        _, auth0, syn0 = preprocessed[0]
        _ = model(auth0, syn0)
        if device.type == "cuda":
            torch.cuda.synchronize()

    if device.type == "cuda":
        torch.cuda.synchronize()
    t0 = time.perf_counter()

    with torch.no_grad():
        for _, auth, syn in preprocessed:
            out = model(auth, syn)
            score = float(out.detach().cpu().item())
            scores.append(score)

    if device.type == "cuda":
        torch.cuda.synchronize()
    t1 = time.perf_counter()

    total_time = t1 - t0
    avg_time = total_time / len(preprocessed)
    print(f"[PyTorch] Total: {total_time:.4f}s, Per image: {avg_time * 1000:.3f} ms")

    return np.array(scores, dtype=np.float32), total_time, avg_time


def make_onnx_session(onnx_path: str, providers):
    onnx_path = Path(onnx_path).resolve()
    if not onnx_path.exists():
        raise FileNotFoundError(f"ONNX model not found: {onnx_path}")
    sess_options = ort.SessionOptions()
    sess = ort.InferenceSession(onnx_path.as_posix(), sess_options, providers=providers)
    print(f"[ONNX] Loaded: {onnx_path.name}, providers = {sess.get_providers()}")
    return sess


def run_onnx_inference(
    preprocessed_cpu,
    session: ort.InferenceSession,
    label: str = "ONNX",
):
    """
    阶段 3-B：ONNX 推理，使用 CPU 预处理的张量（会自动拷贝到 DML / CUDA）。
    preprocessed_cpu: [(Path, auth_cpu, syn_cpu)]，auth/syn 在 CPU 上。
    """
    scores = []

    # warmup
    _, auth0_cpu, syn0_cpu = preprocessed_cpu[0]
    warm_inputs = {
        "image_authentic": auth0_cpu.numpy(),
        "image_synthetic": syn0_cpu.numpy(),
    }
    _ = session.run(None, warm_inputs)

    t0 = time.perf_counter()
    for _, auth_cpu, syn_cpu in preprocessed_cpu:
        inputs = {
            "image_authentic": auth_cpu.numpy(),
            "image_synthetic": syn_cpu.numpy(),
        }
        outputs = session.run(None, inputs)
        score = float(np.array(outputs[0]).reshape(-1)[0])
        scores.append(score)
    t1 = time.perf_counter()

    total_time = t1 - t0
    avg_time = total_time / len(preprocessed_cpu)
    print(f"[{label}] Total: {total_time:.4f}s, Per image: {avg_time * 1000:.3f} ms")

    return np.array(scores, dtype=np.float32), total_time, avg_time


def summarize_error(
    ref_scores: np.ndarray,
    test_scores: np.ndarray,
    name: str,
):
    diff = test_scores - ref_scores
    abs_diff = np.abs(diff)
    mae = abs_diff.mean()
    max_abs = abs_diff.max()
    rmse = np.sqrt((diff**2).mean())
    print(f"\n[{name}] vs PyTorch 误差统计：\n  MAE  = {mae:.6f}\n  RMSE = {rmse:.6f}\n  Max |Δ| = {max_abs:.6f}")


# ----------------- 主流程 -----------------


def main():
    # 1) 收集测试图片
    image_paths = collect_image_paths(IMAGE_DIR)
    print(f"[INFO] 找到 {len(image_paths)} 张测试图片。前 5 张示例：")
    for p in image_paths[:5]:
        print("   -", p)

    # 2) 阶段 1：统一读图（OpenCV）
    images_bgr, t_read, avg_read = read_all_images_cv2(image_paths)

    # 3) 阶段 2：预处理（CPU & GPU）
    cpu_device = torch.device("cpu")
    pre_cpu, t_pre_cpu, avg_pre_cpu = preprocess_all_images(images_bgr, cpu_device, label="CPU")

    pre_gpu = None
    t_pre_gpu = 0.0
    avg_pre_gpu = 0.0
    use_cuda = torch.cuda.is_available()

    if use_cuda:
        gpu_device = torch.device("cuda")
        pre_gpu, t_pre_gpu, avg_pre_gpu = preprocess_all_images(images_bgr, gpu_device, label="GPU")
    else:
        print("[Stage-2 Preprocess-GPU] CUDA 不可用，跳过 GPU 预处理测试。")

    # 4) 阶段 3-A：PyTorch 推理（优先使用 GPU 预处理）
    if use_cuda and pre_gpu is not None:
        torch_device = torch.device("cuda")
        pre_for_torch = pre_gpu
    else:
        torch_device = torch.device("cpu")
        pre_for_torch = pre_cpu

    torch_scores, t_torch, avg_torch = run_torch_inference(
        pre_for_torch,
        CHECKPOINT_PATH,
        torch_device,
    )

    # 5) 阶段 3-B：ONNX 推理（使用 CPU 预处理）
    sess_raw = make_onnx_session(ONNX_RAW_PATH, ORT_PROVIDERS)
    onnx_raw_scores, t_raw, avg_raw = run_onnx_inference(
        pre_cpu,
        sess_raw,
        label="ONNX-raw",
    )

    # 6) 数值误差统计（ONNX vs PyTorch）
    summarize_error(torch_scores, onnx_raw_scores, name="ONNX-raw")

    # 7) 速度对比汇总（包含读图 & 预处理）
    n = len(pre_cpu)
    print("\n========== 整体耗时对比（{} 张图） ==========".format(n))
    print(f"[Stage-1 Read]         Total {t_read:.4f}s,   Per image {avg_read * 1000:.3f} ms")
    print(f"[Stage-2 Pre-CPU]      Total {t_pre_cpu:.4f}s, Per image {avg_pre_cpu * 1000:.3f} ms")
    if use_cuda:
        print(f"[Stage-2 Pre-GPU]      Total {t_pre_gpu:.4f}s, Per image {avg_pre_gpu * 1000:.3f} ms")
    print(f"[Stage-3 PyTorch]      Total {t_torch:.4f}s,  Per image {avg_torch * 1000:.3f} ms")
    print(f"[Stage-3 ONNX-raw]     Total {t_raw:.4f}s,    Per image {avg_raw * 1000:.3f} ms")


# 在 notebook 中直接调用 main() 即可
if __name__ == "__main__":
    main()


[INFO] 找到 57 张测试图片。前 5 张示例：
   - N:\待整理\2025.10.19 上海\个人导出\DxO\导出\Z30_5655-NEF.jpg
   - N:\待整理\2025.10.19 上海\个人导出\DxO\导出\Z30_5656-NEF.jpg
   - N:\待整理\2025.10.19 上海\个人导出\DxO\导出\Z30_5718-NEF.jpg
   - N:\待整理\2025.10.19 上海\个人导出\DxO\导出\Z30_5719-NEF.jpg
   - N:\待整理\2025.10.19 上海\个人导出\DxO\导出\Z30_5720-NEF.jpg
[Stage-1 Read] Total: 9.6026s, Per image: 168.467 ms, Num: 57
[Stage-2 Preprocess-CPU] Total: 11.7436s, Per image: 206.029 ms, Num: 57
[Stage-2 Preprocess-GPU] Total: 11.5621s, Per image: 202.844 ms, Num: 57

[PyTorch] device = cuda
[PyTorch] Total: 1.9552s, Per image: 34.302 ms
[ONNX] Loaded: lar_iqa.onnx, providers = ['DmlExecutionProvider', 'CPUExecutionProvider']
[ONNX-raw] Total: 25.0422s, Per image: 439.336 ms

[ONNX-raw] vs PyTorch 误差统计：
  MAE  = 0.000567
  RMSE = 0.000681
  Max |Δ| = 0.001306

[Stage-1 Read]         Total 9.6026s,   Per image 168.467 ms
[Stage-2 Pre-CPU]      Total 11.7436s, Per image 206.029 ms
[Stage-2 Pre-GPU]      Total 11.5621s, Per image 202.844 ms
[Stage-3 