In [15]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from functools import partial

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

class GuidedDiffusionSampler:
    def __init__(self,
                 diffusion_process,
                 model,
                 classifier,
                 classifier_scale=1.0,
                 ddim=False):
        """
        Args:
            diffusion_process: 预定义的扩散过程（如GaussianDiffusion）
            model: 你的ConditionalDiffusionUNet实例
            classifier: 训练好的噪声鲁棒分类器
            classifier_scale: 分类器引导强度系数
            ddim: 是否使用DDIM加速采样
        """
        self.diffusion = diffusion_process
        self.model = model
        self.classifier = classifier
        self.scale = classifier_scale
        self.ddim = ddim

    def cond_fn(self, x, t, y=None):
        """分类器梯度计算函数"""
        assert y is not None
        with torch.enable_grad():
            x_in = x.detach().requires_grad_(True)
            logits = self.classifier(x_in, t)
            log_probs = F.log_softmax(logits, dim=-1)
            selected = log_probs[range(len(logits)), y.view(-1)]
            return torch.autograd.grad(selected.sum(), x_in)[0] * self.scale

    def model_fn(self, x, t, y=None):
        """包装UNet前向传播"""
        return self.model(x, t, y=y)

    def p_sample(self, x, t, y):
        """单步采样（带分类器引导）"""
        # 原始扩散参数
        out = self.diffusion.p_mean_variance(self.model, x, t, y=y)

        # 应用分类器梯度
        if self.scale > 0:
            grad = self.cond_fn(x, t, y)
            out["mean"] = out["mean"] + out["variance"] * grad

        # 重参数化采样
        noise = torch.randn_like(x)
        nonzero_mask = (t != 0).float().view(-1, *([1]*(len(x.shape)-1)))
        sample = out["mean"] + nonzero_mask * torch.exp(0.5 * out["log_variance"]) * noise
        return sample

    def p_sample_loop(self, shape, y):
        """完整采样循环"""
        device = next(self.model.parameters()).device
        x = torch.randn(shape, device=device)

        for t in reversed(range(0, self.diffusion.num_timesteps)):
            timesteps = torch.full((shape[0],), t, device=device, dtype=torch.long)
            x = self.p_sample(x, timesteps, y=y)

        return x

    def generate(self, num_samples, num_classes, image_size):
        """生成入口函数"""
        self.model.eval()
        self.classifier.eval()

        all_images = []
        with torch.no_grad():
            for _ in range((num_samples-1)//self.batch_size + 1):
                # 生成目标类别标签
                y = torch.randint(0, num_classes, (self.batch_size,), device=device)

                # 执行采样
                samples = self.p_sample_loop(
                    shape=(self.batch_size, 1, image_size[0], image_size[1]),
                    y=y
                )

                # 后处理
                samples = (samples.clamp(-1, 1) + 1) / 2  # 缩放到[0,1]
                all_images.append(samples.cpu())

        return torch.cat(all_images, dim=0)[:num_samples]

In [16]:
import torch
import torch.nn as nn

def test_guided_sampler():
    print("=== 开始GuidedDiffusionSampler测试 ===")

    # 配置参数
    device = "cuda" if torch.cuda.is_available() else "cpu"
    batch_size = 2
    img_size = (32, 32)  # 假设图像尺寸为32x32

    # 创建模拟组件
    class MockModel(nn.Module):
        def __init__(self):
            super().__init__()
            # 添加可训练参数
            self.conv = nn.Conv2d(1, 8, kernel_size=3)  # 输入通道数根据实际情况调整

    def forward(self, x, t, y=None):
        return self.conv(x)  # 实际计算保留梯度


    class MockClassifier(nn.Module):
        def __init__(self):
            super().__init__()
            # 添加可训练参数
            self.conv = nn.Conv2d(1, 8, kernel_size=3)  # 假设输入是单通道图像
            self.fc = nn.Linear(8*30*30, 10)  # 根据实际尺寸调整

        def forward(self, x, t):
            # 模拟真实计算流程
            x = self.conv(x)
            x = x.view(x.size(0), -1)
            return self.fc(x)  # ✅ 输出与输入有计算关系，保留梯度

    class MockDiffusion:
        num_timesteps = 10  # 减少步数加速测试
        def p_mean_variance(self, model, x, t, y=None):
            return {
                "mean": x * 0.8,
                "variance": torch.ones_like(x) * 0.1,
                "log_variance": torch.log(torch.ones_like(x) * 0.1)
            }

    # 初始化组件
    model = MockModel().to(device)
    classifier = MockClassifier().to(device)
    diffusion = MockDiffusion()

    sampler = GuidedDiffusionSampler(
        diffusion_process=diffusion,
        model=model,
        classifier=classifier,
        classifier_scale=5.0
    )
    sampler.batch_size = batch_size  # 添加batch_size属性

    # -------------------------- 测试1: 梯度计算 --------------------------
    print("\n--- 测试梯度计算 ---")
    x = torch.randn(batch_size, 1, *img_size, device=device)
    t = torch.tensor([5, 5], device=device)
    y = torch.randint(0, 10, (batch_size,), device=device)

    grad = sampler.cond_fn(x, t, y)
    assert grad.shape == x.shape, f"梯度形状错误: {grad.shape} vs {x.shape}"
    print("✅ 梯度形状验证通过")

    # -------------------------- 测试2: 单步采样 --------------------------
    print("\n--- 测试单步采样 ---")
    sample = sampler.p_sample(x, t, y)
    assert sample.shape == x.shape, f"采样形状错误: {sample.shape} vs {x.shape}"
    print("✅ 单步采样形状验证通过")

    # 验证引导影响
    original_mean = diffusion.p_mean_variance(None, x, t)["mean"]
    assert not torch.allclose(sample, original_mean, atol=1e-3), "分类器引导未生效"
    print("✅ 分类器引导有效性验证通过")

    # -------------------------- 测试3: 完整采样循环 --------------------------
    print("\n--- 测试完整采样 ---")
    shape = (batch_size, 1, *img_size)
    y = torch.randint(0, 10, (batch_size,), device=device)
    samples = sampler.p_sample_loop(shape, y)

    assert samples.shape == shape, f"最终采样形状错误: {samples.shape} vs {shape}"
    print("✅ 完整采样形状验证通过")
    assert samples.min() > -3 and samples.max() < 3, "采样值范围异常"
    print("✅ 采样值范围验证通过")

    # -------------------------- 测试4: 生成函数 --------------------------
    print("\n--- 测试生成函数 ---")
    num_samples = 3
    generated = sampler.generate(
        num_samples=num_samples,
        num_classes=10,
        image_size=img_size
    )

    assert generated.shape[0] == num_samples, f"生成数量错误: {generated.shape[0]} vs {num_samples}"
    print("✅ 生成数量验证通过")
    assert (generated >= 0).all() and (generated <= 1).all(), "数值未正确归一化到[0,1]"
    print("✅ 归一化验证通过")

    print("\n=== 所有测试通过 ===")

if __name__ == "__main__":
    test_guided_sampler()


=== 开始GuidedDiffusionSampler测试 ===

--- 测试梯度计算 ---
✅ 梯度形状验证通过

--- 测试单步采样 ---
✅ 单步采样形状验证通过
✅ 分类器引导有效性验证通过

--- 测试完整采样 ---
✅ 完整采样形状验证通过
✅ 采样值范围验证通过

--- 测试生成函数 ---
✅ 生成数量验证通过
✅ 归一化验证通过

=== 所有测试通过 ===
