In [None]:
import torch.nn as nn
import torch
import math

#### 模型定义：

In [None]:
"""
DDPM 模型需要的输入包括 噪声图像xt 和 时间步t ，输出为预测的噪声ϵθ(xt,t)。

首先，我们定义一个时间嵌入层，它负责将时间信息注入到特征中，将 时间步t 映射为高维向量。
参考 Transformer 中的位置编码方法，使用正余弦函数将时间步映射到高维空间。公式为：
                        PE(t, 2i) = sin(t / 10000^(2i/d))
                        PE(t, 2i+1) = cos(t / 10000^(2i/d))
其中，d为嵌入维度，t为维度索引。                    
"""
class SinusoidalPositionEmbeddings(nn.Module):
    def __init__(self, dim):
        super().__init__()
        
        self.dim = dim
    
    def forward(self, time):
        device = time.device

        # 将维度分为两半，分别用于sin和cos
        half_dim = self.dim // 2

        # 计算不同频率的指数衰减
        embeddings = math.log(10000) / (half_dim - 1)

        # 生成频率序列
        embeddings = torch.exp(
            torch.arange(half_dim, device=device) * -embeddings
        )

        # 将时间步与频率序列相乘
        embeddings = time[:, None] * embeddings[None, :]

        # 拼接sin和cos得到最终的嵌入向量
        embeddings = torch.cat(
            (embeddings.sin(), embeddings.cos()),
            dim = -1
        )

        return embeddings

In [None]:
"""
接着，定义一个U-Net的基本模块Block，包含时间嵌入、上、下采样功能。

第一次卷积扩展通道数，然后加入时间嵌入，接着进行第二次卷积，融合特征信息，最后进行上、下采样。

注：这里使用简化版U-Net，未使用原文中带有注意力机制的模型。
"""
class Block(nn.Module):
    def __init__(self, in_channels, out_channels, time_emb_dim, up=False):
        super().__init__()
        
        self.time_mlp = nn.Linear(time_emb_dim, out_channels)

        if up:
            self.conv1 = nn.Conv2d(2 * in_channels, out_channels, kernel_size=3, padding=1)     # 由于 U-Net 的残差连接,上采样时会 concat 之前的特征，输入通道数需要翻倍
            self.transform = nn.ConvTranspose2d(out_channels, out_channels, kernel_size=4, stride=2, padding=1)
        else:
            self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1)
            self.transform = nn.Conv2d(out_channels, out_channels, kernel_size=4, stride=2, padding=1)
        
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)
        self.bnorm1 = nn.BatchNorm2d(out_channels)
        self.bnorm2 = nn.BatchNorm2d(out_channels)

        self.relu = nn.ReLU()

    def forward(self, x, t):
        # 第一次卷积
        h = self.bnorm1(self.relu(self.conv1(x)))

        # 时间嵌入
        time_emb = self.relu(self.time_mlp(t))

        # 将时间信息注入特征图
        h = h + time_emb[..., None, None]

        # 第二次卷积
        h = self.bnorm2(self.relu(self.conv2(h)))

        # 上采样或下采样
        return self.transform(h)

In [None]:
"""
最后，将多个Block组合起来，形成一个U-Net模型。
每一层都会加入时间步信息，最终输出与输入图像尺寸相同的预测噪声。
"""
class SimpleUnet(nn.Module):
    def __init__(self):
        super().__init__()

        image_channels = 3
        down_channels = (64, 128, 256, 512, 1024)
        up_channels = (1024, 512, 256, 128, 64)
        out_dim = 3
        time_emb_dim = 32

        # 时间嵌入层
        self.time_embed = nn.Sequential(
            SinusoidalPositionEmbeddings(time_emb_dim),
            nn.Linear(time_emb_dim, time_emb_dim)
        )

        # 输入层、下采样层、上采样层和输出层
        self.input = nn.Conv2d(image_channels, down_channels[0], kernel_size=3, padding=1)
        self.downs = nn.ModuleList([Block(down_channels[i], down_channels[i + 1], time_emb_dim) for i in range(len(down_channels) - 1)])
        self.ups = nn.ModuleList([Block(up_channels[i], up_channels[i + 1], time_emb_dim, up=True) for i in range(len(up_channels) - 1)])
        self.output = nn.Conv2d(up_channels[-1], out_dim, kernel_size=3, padding=1)

    def forward(self, x, time_step):
        # 时间步嵌入
        t = self.time_embed(time_step)

        # 初步卷积
        x = self.input(x)

        # UNet前向传播：先下采样收集特征，再上采样恢复分辨率
        residual_stack = []
        for down in self.downs:
            x = down(x, t)
            residual_stack.append(x)
        for up in self.ups:
            residual_x = residual_stack.pop()
            x = torch.cat((x, residual_x), dim=1)
            x = up(x, t)
        
        return self.output(x)

#### 训练：

In [None]:
"""
首先需要定义一个噪声调度器，用于控制加噪过程，生成不同时间步的噪声图像。
"""
class Noisescheduler(nn.Module):
    """
    在前向过程中，需要定义变量。
    这里使用 register_buffer 来定义变量，这样这些变量就会自动与模型参数一起保存和加载。
    """
    def __init__(self, beta_start=0.0001, beta_end=0.02, num_steps=1000):
        super().__init__()
        self.beta_start = beta_start
        self.beta_end = beta_end
        self.num_steps = num_steps

        # β_t: 线性噪声调度
        self.register_buffer('betas', torch.linspace(beta_start, beta_end, num_steps))
        # α_t = 1 - β_t 
        self.register_buffer('alphas', 1.0 - self.betas)
        # α_bar_t = ∏(1-β_i) from i=1 to t
        self.register_buffer('alpha_bar', torch.cumprod(self.alphas, dim=0))
        # α_bar_(t-1)
        self.register_buffer('alpha_bar_prev', torch.cat([torch.tensor([1.0]), self.alpha_bar[:-1]], dim=0))
        # sqrt(α_bar_t)
        self.register_buffer('sqrt_alpha_bar', torch.sqrt(self.alpha_bar))
        # 1/sqrt(α_t)
        self.register_buffer('sqrt_recip_alphas', torch.sqrt(1.0 / self.alphas))
        # sqrt(1-α_bar_t)
        self.register_buffer('sqrt_one_minus_alpha_bar', torch.sqrt(1.0 - self.alpha_bar))

        # 1/sqrt(α_bar_t)
        self.register_buffer('sqrt_recip_alphas_bar', torch.sqrt(1.0 / self.alpha_bar))
        # sqrt(1/α_bar_t - 1)
        self.register_buffer('sqrt_recipm1_alphas_bar', torch.sqrt(1.0 / self.alpha_bar - 1))
        # 后验分布方差 σ_t^2
        self.register_buffer('posterior_var', self.betas * (1.0 - self.alpha_bar_prev) / (1.0 - self.alpha_bar))
        # 后验分布均值系数1: β_t * sqrt(α_bar_(t-1))/(1-α_bar_t)
        self.register_buffer('posterior_mean_coef1', self.betas * torch.sqrt(self.alpha_bar_prev) / (1.0 - self.alpha_bar))
        # 后验分布均值系数2: (1-α_bar_(t-1)) * sqrt(α_t)/(1-α_bar_t)
        self.register_buffer('posterior_mean_coef2', (1.0 - self.alpha_bar_prev) * torch.sqrt(self.alphas) / (1.0 - self.alpha_bar))
    

    """
    由于是对一个batch的图像进行训练，而且需要将这些变量与图像张量进行运算，
    而目前定义的张量都是一维张量，所以需要对公式中的变量的维度进行调整，以适应不同张量的维度。

    因此定义get方法，用于获取指定时间步的变量值并调整形状，
    其中 var 为变量张量，t 为时间步，x_shape 为目标形状。
    """
    def get(self, var, t, x_shape):
        # 从变量张量中收集指定时间步的值
        out = var[t]

        # 调整形状为[batch_size, 1, 1, 1]，以便进行广播
        return out.view(
            [t.shape[0]] + [1] * (len(x_shape) - 1)
        )

    # 然后就可以实现加噪过程
    def add_noise(self, x, t):
        # 获取时间步t对应的sqrt(α_bar_t)
        sqrt_alpha_bar = self.get(self.sqrt_alpha_bar, t, x.shape)

        # 获取时间步t对应的sqrt(1-α_bar_t)
        sqrt_one_minus_alpha_bar = self.get(self.sqrt_one_minus_alpha_bar, t, x.shape)

        # 从标准正态分布采样噪声 ε ~ N(0,I)
        noise = torch.randn_like(x)

        # 实现前向扩散过程：x_t = sqrt(α_bar_t) * x_0 + sqrt(1-α_bar_t) * ε
        return sqrt_alpha_bar * x + sqrt_one_minus_alpha_bar * noise, noise


"""
完整的训练流程：
1、随机采样时间步 t  
2、对图像添加噪声，获得带噪声的图像和噪声  
3、使用模型预测噪声  
4、计算预测噪声和真实噪声之间的MSE损失  
5、反向传播和优化
"""

#### 采样：

In [None]:
"""
采样过程的思路为，从标准正态分布中采样初始噪声，然后逐步去噪，
从 t=T 到 t=0，最后将最终结果裁剪到 [-1, 1] 范围。

在去噪过程中，需要获取采样需要的系数，在之前的NoiseScheduler类中定义了这些系数
"""

def sample(model, scheduler, num_samples, size, device='cpu'):
    model.eval()

    with torch.no_grad():
        # 从标准正态分布采样初始噪声 x_T ~ N(0, I)
        x_t = torch.randn(num_samples, *size).to(device)

        # 逐步去噪，从 t=T 到 t=0
        for t in reversed(range(scheduler.num_steps)):
            # 构造时间步batch
            t_batch = torch.tensor([t] * num_samples).to(device)

            # 获取采样需要的系数
            sqrt_recip_alpha_bar = scheduler.get(scheduler.sqrt_recip_alphas_bar, t_batch, x_t.shape)
            sqrt_recipm1_alpha_bar = scheduler.get(scheduler.sqrt_recipm1_alphas_bar, t_batch, x_t.shape)
            posterior_mean_coef1 = scheduler.get(scheduler.posterior_mean_coef1, t_batch, x_t.shape)
            posterior_mean_coef2 = scheduler.get(scheduler.posterior_mean_coef2, t_batch, x_t.shape)

            # 预测噪声
            predicted_noise = model(x_t, t_batch)

            # 计算x_0的预测值：x_0 = 1/sqrt(α_bar_t) * x_t - sqrt(1/α_bar_t-1) * ε_θ(x_t,t)
            _x_0 = sqrt_recip_alpha_bar * x_t - sqrt_recipm1_alpha_bar * predicted_noise
            
            # 计算后验分布均值 μ_θ(x_t,t)
            model_mean = posterior_mean_coef1 * _x_0 + posterior_mean_coef2 * x_t
            
            # 计算后验分布方差的对数值 log(σ_t^2)
            model_log_var = scheduler.get(
                torch.log(
                    torch.cat([scheduler.posterior_var[1:2], scheduler.betas[1:]]),
                    t_batch, x_t.shape
                )
            )

            if t > 0:
                # t > 0时从后验分布采样：x_t-1 = μ_θ(x_t,t) + σ_t * z, z~N(0,I)
                noise = torch.rand_like(x_t).to(device)
                x_t = model_mean + torch.exp(0.5 * model_log_var) * noise
            else:
                # t = 0时直接使用均值作为生成结果
                x_t = model_mean
            
        # 将最终结果裁剪到[-1, 1]的范围
        x_0 = torch.clamp(x_t, -1.0, 1.0)
    
    return x_0

#### 评估：

In [None]:
"""
使用预训练好的模型，获得真实图像和生成图像的特征
"""
class InceptionStatistics:
    pass

# %%
"""
Inception Score（IS）
IS 分数通过预训练的网络评估生成图像的质量和多样性。

IS 分数越高说明：
1、每张生成图像的类别预测越清晰（质量好）
2、不同图像的类别分布越分散（多样性好）

具体步骤：
1、将所有图像分为 batch
2、对每组计算：
    · 计算边缘分布 p(y)，即对当前 batch 的p(y|x) 取平均
    · 计算 KL 散度
    · 取指数
3、返回所有组得分的均值和标准差
"""

def calculate_inception_score(probs, splits=10):
    # 存储每个splits的 IS 分数
    scores = []
    # 计算每个split的大小
    split_size = probs[0] // splits

    # 对每个split进行计算
    for i in range(splits):
        # 获取当前split的概率分布
        part = probs[i * split_size: (i + 1) * split_size]
        
        # 计算KL散度：KL(p(y|x) || p(y))
        kl = part * (
            np.log(part) - 
            np.log(
                np.expand_dims(
                    np.mean(part, axis=0), 0
                )
            )
        )

        # 对每个样本的KL散度求平均
        kl = np.mean(np.sum(kl, axis=1))

        # 计算 exp(KL) 并添加到scores列表
        scores.append(np.exp(kl))
    
    # 返回所有split的IS分数的均值和标准差
    return np.mean(scores), np.std(scores)

# %%
"""
Fréchet Inception Distance (FID)
FID 分数通过比较真实图像和生成图像在网络特征空间的分布来评估生成质量。

FID 分数越低说明图像的特征分布越接近真实图像分布，生成质量越好。

具体步骤为：
1、分别对真实图像和生成图像：
    · 通过模型提取特征
    · 计算特征和均值向量和协方差矩阵
2、计算均值向量之间的欧氏距离
3、计算协方差矩阵的平方根项
4、计算最终的 FID 分数
"""

def calculate_fid(real_features, fake_features):
    # 计算真实图像和生成图像特征的均值向量和协方差矩阵
    mu1, sigma1 = real_features.mean(axis=0), np.cov(real_features, rowvar=False)
    mu2, sigma2 = fake_features.mean(axis=0), np.cov(fake_features, rowvar=False)

    # 计算均值向量之间的欧几里得距离的平方
    ssdiff = np.sum((mu1 - mu2) ** 2)

    # 计算协方差矩阵的平方根项：(Σ_r Σ_f)^(1/2)
    covmean = linalg.sqrtm(sigma1.dot(sigma2))
    # 如果结果包含复数,取其实部
    if np.iscomplexobj(covmean):
        covmean = covmean.real
    
    # 计算最终的 FID 分数
    fid = ssdiff + np.trace(sigma1 + sigma2 - 2 * covmean)

    return fid