# 背后的数学

## 符号统一

- $x_{t}$ 表示在时间步 $t$ 时的图像，其中 $x_{0}$ 表示原始图像，而最终遵循各向同性高斯分布的最终图像被称为 $x_{T}$
- $q(x_{t}|x_{t-1})$ 对应于前向过程，其输出图像是在输入图像的基础上叠加了一些小噪声。
- $p(x_{t-1}|x_{t})$ 对应于反向扩散过程，它以 $x_{t}$ 为输入，并使用神经网络生成样本 $x_{t-1}$

## 前向过程

$$
q(x_{t}|x_{t-1}) = \mathcal{N}(x_{t},\sqrt{1-\beta_{t}}\,x_{t-1},\beta_{t}I)
$$

其中：

- $\mathcal{N}$ 是正态分布
- $x_{t}$ 是输出
- $\sqrt{1-\beta_{t}}x_{t-1}$ 是平均值，也可以记为 $\mu_{t}$
- $\beta_{t}I$ 是方差，也可以记为 $\sigma_{t}^{2}$
- $\beta$ 是调度器，$\forall \beta \,,\, \beta \in (0,1) $

现在我们已经知道了前向过程中一步的公式，理论上只要将这个步骤重复1000次就可以得到结果，但是有一种更简单的方法，只用了一步就解决了。

首先我们新定义两个新变量

$$
\alpha_{t} = 1-\beta_{t}
\\[10pt]
\overline\alpha_{t} = \prod_{s=1}^{t}\alpha_{s}
$$

然后我们可以利用重参数化技巧，也就是 $\mathcal{N}(\mu,\sigma^{2}) = \mu + \sigma\cdot\epsilon$ 来重写 $q(x_{t}|x_{t-1})$，其中 $\epsilon \sim \mathcal{N}(0,1)$

$$
q(x_{t}|x_{t-1}) = \mathcal{N}(x_{t},\sqrt{1-\beta_{t}}\,x_{t-1},\beta_{t}I)
\\[10pt]
= \sqrt{1-\beta_{t}}\,x_{t-1}\,+\,\sqrt{\beta_{t}}\,\epsilon
\\[10pt]
= \sqrt{\alpha_{t}}x_{t-1} \,+\, \sqrt{1-\alpha_{t}}\,\epsilon
$$

再将 $q(x_{t-1}|x_{t-2})$ 以类似的形式写出来，在上式中将 $x_{t-1}$ 替换为 $x_{t-2}$ 的表达式，可得：

$$
q(x_{t}|x_{t-1}) = \sqrt{\alpha_{t}}x_{t-1} \,+\, \sqrt{1-\alpha_{t}}\,\epsilon
\\[10pt]
= \sqrt{\alpha_{t}\alpha_{t-1}}x_{t-2} \,+\, \sqrt{1-\alpha_{t}\alpha_{t-1}}\,\epsilon
\\[10pt]
= \sqrt{\alpha_{t}\alpha_{t-1}\alpha_{t-2}}x_{t-3} \,+\, \sqrt{1-\alpha_{t}\alpha_{t-1}\alpha_{t-2}}\,\epsilon
\\[10pt]
···
$$

如此反复迭代，最终可以得到 $x_{t}$ 和 $x_{0}$ 之间的关系：

$$
q(x_{t}|x_{0}) = \sqrt{\overline\alpha_{t}}x_{0} \,+\, \sqrt{1-\overline\alpha_{t}}\,\epsilon
\,\sim\, \mathcal{N}(x_{t},\sqrt{\overline\alpha_{t}}x_{0},(1-\overline\alpha_{t})I)
$$

## 反向扩散过程

$$
p(x_{t-1}|x_{t}) = \mathcal{N}(x_{t-1},\mu_{\theta}(x_{t},t),\Sigma_{\theta}(x_{t},t))
$$

其中：

- $\mathcal{N}$ 是正态分布
- $x_{t-1}$ 是输出
- $\Sigma_{\theta}$ 是网络本应该学习的方差参数，但是此处我们将其设定为一个固定的值，因而不对其进行学习
- $\mu_{\theta}$ 是网络需要学习的均值参数

## 损失函数

理论上的损失函数应该是一个负对数似然函数

$$
-\log{p_{\theta}(x_{0})}
$$

但是 $x_{0}$ 的概率很难计算，因为它取决于 $x_{0}$ 之前所有的时间步的输入，即 $x_{1:T}$

因此我们决定计算该函数的变分下界，即

$$
-\log{p_{\theta}(x_{0})} \le -\log{p_{\theta}(x_{0})} \,+\, D_{KL}(q(x_{1:T}|x_{0})||p_{\theta}(x_{1:T}|x_{0}))
$$

KL散度是衡量两个分布相似程度的指标，并且始终为非负数。

对于两个单一变量的高斯分步 $p\sim\mathcal{N}(\mu_{1},\sigma_{1}^{2})$ 和 $q\sim\mathcal{N}(\mu_{2},\sigma_{2}^{2})$ 而言，他们的KL散度为：

$$
D_{KL}(p||q) = \log(\frac{\sigma_{2}}{\sigma_{1}})\,+\,\frac{\sigma_{1}^{2}+(\mu_{1}-\mu_{2})^{2}}{2\sigma_{2}^{2}}\,-\,\frac{1}{2}
$$

在模型优化中，KL散度常作为损失函数，引导模型 $q$ 逼近真实分布 $p$ 。例如，变分自编码器（VAE）中用它约束潜在变量的分布。

下面开始推导为什么加上KL散度之后能够使得计算更加方便

- 我们可以将KL散度公式变为以下形式：

$$
D_{KL}(q(x_{1:T}|x_{0})||p_{\theta}(x_{1:T}|x_{0})) = \log\left( \frac{q(x_{1:T}|x_{0})}{p_{\theta}(x_{1:T}|x_{0})} \right)
$$

- 对数中的分母可以利用贝叶斯公式变为以下形式：

$$
p_{\theta}(x_{1:T}|x_{0}) = \frac{p_{\theta}(x_{0}|x_{1:T})p_{\theta}(x_{1:T})}{p_{\theta}(x_{0})}
$$

- 而这个贝叶斯公式中的分子又可以由全概率公式合并为：

$$
p_{\theta}(x_{0}|x_{1:T})p_{\theta}(x_{1:T}) = p_{\theta}(x_{0},x_{1:T}) = p_{\theta}(x_{0:T})
$$

- 所以原来的对数形式就可以写成：

$$
\log\left( \frac{q(x_{1:T}|x_{0})}{p_{\theta}(x_{1:T}|x_{0})} \right) = \log\left( \frac{q(x_{1:T}|x_{0})}{\frac{p_{\theta}(x_{0:T})}{p_{\theta}(x_{0})}} \right) = \log\left( \frac{q(x_{1:T}|x_{0})}{p_{\theta}(x_{0:T})} \right) + \log\left( p_{\theta}(x_{0}) \right)
$$

- 因此它和原本的 $-\log\left( p_{\theta}(x_{0}) \right)$相互抵消，最终损失函数变为

$$
\log\left( \frac{q(x_{1:T}|x_{0})}{p_{\theta}(x_{0:T})} \right)
$$

还没有结束！这个函数实际上看着还是十分复杂，我们对它进行进一步处理：

- 我们对对数中的分子分母都根据其定义进行展开，可以得到：

$$
\log\left( \frac{q(x_{1:T}|x_{0})}{p_{\theta}(x_{0:T})} \right)
= \log\left( \frac{\textstyle \prod_{t=1}^{T}q(x_{t}|x_{t-1})}{p(x_{T})\textstyle \prod_{t=1}^{T}p_{\theta}(x_{t-1}|x_{t})} \right)
= \log\left( \frac{\textstyle \prod_{t=1}^{T}q(x_{t}|x_{t-1})}{\textstyle \prod_{t=1}^{T}p_{\theta}(x_{t-1}|x_{t})} \right)-\log \left( p(x_{T}) \right)
$$

- 利用对数性质，将 $\prod$ 移到对数外变成 $\sum$

$$
原式 = \sum_{t=1}^{T}\log \left( \frac{q(x_{t}|x_{t-1})}{p_{\theta}(x_{t-1}|x_{t})} \right) - \log \left( p(x_{T}) \right)
= \sum_{t=2}^{T}\log \left( \frac{q(x_{t}|x_{t-1})}{p_{\theta}(x_{t-1}|x_{t})} \right)+ \log\left( \frac{q(x_{1}|x_{0})}{p_{\theta}(x_{0}|x_{1})} \right) - \log \left( p(x_{T}) \right)
$$

- 利用贝叶斯公式，将 $\sum$ 中所有的分子都该写成如下形式：

$$
q(x_{t}|x_{t-1}) = \frac{q(x_{t-1}|x_{t})q(x_{t})}{q(x_{t-1})} = \frac{q(x_{t-1}|x_{t},x_{0})q(x_{t}|x_{0})}{q(x_{t-1}|x_{0})}
$$

- 那么再代回原式后得到：

$$
原式 = \sum_{t=2}^{T}\log \left( \frac{q(x_{t-1}|x_{t},x_{0})q(x_{t}|x_{0})}{p_{\theta}(x_{t-1}|x_{t})q(x_{t-1}|x_{0})} \right)+ \log\left( \frac{q(x_{1}|x_{0})}{p_{\theta}(x_{0}|x_{1})} \right) - \log \left( p(x_{T}) \right)
$$

- 我们将 $\sum$ 中的log函数拆开得到

$$
原式 = \sum_{t=2}^{T}\log \left( \frac{q(x_{t-1}|x_{t},x_{0})}{p_{\theta}(x_{t-1}|x_{t})} \right) + \sum_{t=2}^{T}\log \left( \frac{q(x_{t}|x_{0})}{q(x_{t-1}|x_{0})} \right) + \log\left( \frac{q(x_{1}|x_{0})}{p_{\theta}(x_{0}|x_{1})} \right) - \log \left( p(x_{T}) \right)
$$

- 第二个 $\sum$ 中的式子展开后，中间项会全部抵消，最后留下：

$$
原式 = \sum_{t=2}^{T}\log \left( \frac{q(x_{t-1}|x_{t},x_{0})}{p_{\theta}(x_{t-1}|x_{t})} \right) + \log\left( \frac{q(x_{T}|x_{0})}{q(x_{1}|x_{0})} \right) + \log\left( \frac{q(x_{1}|x_{0})}{p_{\theta}(x_{0}|x_{1})} \right) - \log \left( p(x_{T}) \right)
$$

- 二、三两项消去并把第四项代入分母，原本第三项分母提出可得到

$$
原式 = \sum_{t=2}^{T}\log \left( \frac{q(x_{t-1}|x_{t},x_{0})}{p_{\theta}(x_{t-1}|x_{t})} \right) + \log\left( \frac{q(x_{T}|x_{0})}{p(x_{T })} \right) - \log(p_{\theta}(x_{0}|x_{1}))
$$

- 我们能够将第一项和第二项都看作KL散度，即

$$
原式 = \sum_{t=2}^{T}D_{KL}(q(x_{t-1}|x_{t},x_{0})||p_{\theta}(x_{t-1}|x_{t})) + D_{KL}(q(x_{T}|x_{0})||p(x_{T})) - \log(p_{\theta}(x_{0}|x_{1}))
$$

- 而由于 $q$ 只是一个前向传播过程且 $p(x_{T})$ 只是一个服从高斯正太分布的随机分布，几乎没有可以学习的参数，所以第二项是一个很小的量，完全可以被忽略。
- 我们知道 $q$ 和 $p$ 都服从正态分布，且正态分布中的方差被固定为 $\beta I$ ，因此我们可以得到以下等式；

$$
p(x_{t-1}|x_{t}) \sim \mathcal{N}(x_{t-1},\mu_{\theta}(x_{t},t),\beta I)
\\
q(x_{t-1}|x_{t},x_{0}) \sim \mathcal{N}(x_{t-1},\tilde\mu_{t}(x_{t},x_{0}),\tilde\beta_{t} I)
$$

其中
$$
\tilde\mu_{t}(x_{t},x_{0}) = \frac{\sqrt{\alpha_{t}}\,(1-\overline \alpha_{t-1})}{1-\overline \alpha_{t}}x_{t} + \frac{\sqrt{\overline\alpha_{t-1}}\beta_{t}}{1-\overline\alpha_{t}}x_{0}
$$

- 由前文中得到的 $x_{t} = \sqrt{\overline\alpha_{t}}x_{0} \,+\, \sqrt{1-\overline\alpha_{t}}\,\epsilon$ 可以反推得到 $x_{0} = \frac{1}{\sqrt{\overline\alpha_{t}}}(x_{t}-\sqrt{1-\overline\alpha_{t}}\epsilon)$ ，将其带入原式，最终可得：

$$
\tilde\mu_{t} = \frac{1}{\sqrt{\alpha_{t}}}(x_{t}-\frac{\beta_{t}}{\sqrt{1-\overline\alpha_{t}}}\epsilon)
$$

- 作者决定采用两个均值之间的均方误差作为损失函数，也就是

$$
L_{t} = \frac{1}{2\sigma_{t}^{2}}\left| \tilde\mu_{t}(x_{t},x_{0}) - \mu_{\theta}(x_{t},t) \right| ^{2}
$$

- 而 $\tilde\mu_{t}$ 和 $\mu_{\theta}$ 两个式子除了一个 $\epsilon$ 和 $\epsilon_{\theta}$ 的区别之外完全相同，并且作者通过实验发现省略前面的系数实际上训练效果更好，所以损失函数实际上可以化简为预测 $\epsilon$ ，也就是噪声，即

$$
L_{t} = \left| \epsilon - \epsilon_{\theta}(x_{t},t ) \right| ^{2}
$$

- 最后，再把 $x_{t}$ 用 $x_{0}$ 表示，我们能够得到：

$$
L_{t} = \left| \epsilon - \epsilon_{\theta}(\sqrt{\overline\alpha_{t}}x_{0} + \sqrt{1-\overline\alpha_{t}}\epsilon\,,t ) \right| ^{2}
$$



# 代码实现

简而言之，我们从数据中获取图像并逐步添加噪点。然后，我们训练一个模型来预测每个步骤的噪声，并使用该模型生成图像

In [None]:
import torch
from torch import nn
from torch.nn import functional as F
import numpy as np
import matplotlib.pyplot as plt
import torchvision
from torchvision import transforms
from torch.utils.data import DataLoader, Subset
from tqdm.notebook import tqdm
import copy
%matplotlib inline

device = torch.device('cuda' if torch.cuda.is_available() 
                      else 'mps' if torch.mps.is_available()
                      else 'cpu')

In [None]:
def show_images(images, title=""):
    """把给定的图片以方形网格的形式显示出来"""
    images = [np.clip(im.permute(1,2,0).numpy(), 0, 1) for im in images]
    
    # 定义行数和列数
    fig = plt.figure(figsize=(8, 8))
    rows = int(len(images)**0.5)
    cols = round(len(images)/rows)
    
    # 在网格中显示图片
    idx = 0
    for r in range(rows):
        for c in range(cols):
            fig.add_subplot(rows,cols,idx + 1)
            
            if idx < len(images):
                plt.imshow(images[idx])
                plt.axis('off')
                idx += 1
    plt.suptitle(title)
    plt.show()

## UNet构建
下面是对代码各个部分的详细说明：

这个代码实现了 UNet 网络的核心组件，用于图像分割等计算机视觉任务。首先，代码定义了一个名为 `double_conv` 的类，该类执行连续两次卷积操作。在每次卷积之后，都使用了批归一化 (`BatchNorm2d`) 和 ReLU 激活函数，这样可以提高网络的训练稳定性及非线性感知能力。

接着，`down_layer` 类实现了下采样操作。它首先使用最大池化 (`nn.MaxPool2d`) 将输入特征图的分辨率减半，然后调用 `double_conv` 对池化后的特征图进行卷积处理，以便提取更深层次的特征信息。这种结构在典型的编码器-解码器架构中十分常见，用来逐步压缩空间信息并增加通道特征。

在上采样路径中，`up` 类负责将低分辨率的特征图通过转置卷积 (`nn.ConvTranspose2d`) 的方式进行上采样。由于上采样后可能与跳跃连接（来自编码器同层特征）的尺寸不匹配，因此采用 `F.pad` 对特征图进行适当填充，使得后续使用 `torch.cat` 在通道维度上进行拼接成为可能。这样可以充分利用编码器过程中的细粒度信息，有助于提高解码器的重构效果。

基于 `up` 类，`up_layer` 类首先利用上采样操作获得尺寸匹配的特征图，然后跟跳跃连接得到的特征图进行拼接，最后通过 `double_conv` 进一步融合信息。这种设计允许网络在解码阶段逐步恢复空间信息，并利用编码器中的高分辨率细节。

UNet 类将上述各个模块组合成一个整体架构。编码器部分依次由几层 `double_conv` 和 `down_layer` 构成，每经过一层，下采样使特征图尺寸减小而通道数增多。解码器部分则利用 `up_layer` 逐步上采样，同时通过跳跃连接将前面的特征图与当前上采样特征图进行融合，并最终由最后的卷积层 (`nn.Conv2d`) 输出与输入相同尺寸的结果。

此外，UNet 还引入了时间嵌入（time embedding）的概念。通过一个嵌入层 (`nn.Embedding`) 生成正弦余弦嵌入（利用 `sinusoidal_embedding` 函数），然后将这些嵌入值经过一系列全连接层（由 `_make_te` 生成的模块）调整维度后，分别加到各个卷积模块的输入上。这种设计常见于时间条件生成模型中，用以引入额外时间信息从而影响输出。

最后，代码中还展示了一些辅助函数，如 `requires_grad_` 用于控制模块参数是否参与梯度计算，以及一个简单的 `super` 类的重载声明，展示了构造函数可接受不同参数形式。这些细节共同构成了一个灵活且功能丰富的网络架构。

时间嵌入（time embedding）主要用于将时间步信息引入网络中，从而让模型在不同的时间步（例如扩散过程中的不同阶段）下能够学习和调整其特征提取和生成过程。在这段代码中，时间嵌入具体发挥如下作用：

- **条件控制：** 通过将时间步 t 转换为一个嵌入向量，网络能够“知道”当前正处于扩散过程中的哪个阶段，进而根据不同的 t 调整各层输出。这对于像 DDPM 这样的扩散模型尤为重要，因为模型需要针对不同的噪声水平做出不同的处理。

- **信息融合：** 时间嵌入经过一系列全连接层（通过 _make_te 创建）后，其维度与对应卷积层的输入或输出匹配，并通过 reshape 后加到特征图上。这样，不仅在整个网络的编码和解码过程中将时间信息传递下去，而且使网络在每个阶段都能灵活利用时间信息来调节特征变化。

- **对抗噪声：** 在训练过程中，不同时间步对应不同的噪声程度，利用时间嵌入可以使网络在处理不同噪声条件时更加稳定，从而提高生成图像的质量。 

总结来说，时间嵌入使模型具备条件生成能力，能够根据扩散过程中当前的步数或噪声水平来调整内部特征，促进在整个降噪或生成过程中达到更好的表现。

In [None]:
def  sinusoidal_embedding(n,d):
    """生成正弦余弦嵌入"""
    embedding = torch.tensor([[i / 10000**(2*j/d) for j in range(d)] for i in range(n)])
    sin_mask = torch.arange(0,n,2)
    embedding[sin_mask] = torch.sin(embedding[sin_mask])
    embedding[1-sin_mask] = torch.cos(embedding[1-sin_mask])
    return embedding

In [None]:
class double_conv(nn.Module):
    def __init__(self,in_ch,out_ch):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_ch,out_ch,3,padding=1),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_ch,out_ch,3,padding=1),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(inplace=True)
        )
    
    def forward(self,x):
        x = self.conv(x)
        return x

class down_layer(nn.Module):
    def __init__(self,in_ch,out_ch):
        super().__init__()
        self.pool = nn.MaxPool2d(2,stride=2,padding=0)
        self.conv = double_conv(in_ch,out_ch)
    
    def forward(self,x):
        x = self.pool(x)
        x = self.conv(x)
        return x

class up(nn.Module):
    def __init__(self,in_ch,out_ch):
        super().__init__()
        self.up_scale = nn.ConvTranspose2d(in_ch,out_ch,2,stride=2)
    
    def forward(self,x1,x2):
        x2 = self.up_scale(x2)
        diffY = x1.size()[2] - x2.size()[2]
        diffX = x1.size()[3] - x2.size()[3]
        
        x2 = F.pad(x2, [diffX // 2, diffX - diffX // 2,
                        diffY // 2, diffY - diffY // 2])
        x = torch.cat([x2,x1],dim=1)
        return x

class up_layer(nn.Module):
    def __init__(self,in_ch,out_ch):
        super().__init__()
        self.up = up(in_ch,out_ch)
        self.conv = double_conv(in_ch,out_ch)
    
    def forward(self,x1,x2):
        a = self.up(x1,x2)
        x = self.conv(a)
        return x
        
class UNet(nn.Module):
    def __init__(self,in_channels=1,n_steps=1000,time_emb_dim=100):
        super().__init__()
        self.conv1 = double_conv(in_channels,64)
        self.down1 = down_layer(64,128)
        self.down2 = down_layer(128,256)
        self.down3 = down_layer(256,512)
        self.down4 = down_layer(512,1024)
        self.up1 = up_layer(1024,512)
        self.up2 = up_layer(512,256)
        self.up3 = up_layer(256,128)
        self.up4 = up_layer(128,64)
        self.last_conv = nn.Conv2d(64,in_channels,1)
        
        # TIme embedding
        self.time_embed = nn.Embedding(n_steps,time_emb_dim)
        self.time_embed.weight.data = sinusoidal_embedding(n_steps,time_emb_dim)
        self.time_embed.requires_grad_(False)
        self.te1 = self._make_te(time_emb_dim,in_channels)
        self.te2 = self._make_te(time_emb_dim,64)
        self.te3 = self._make_te(time_emb_dim,128)
        self.te4 = self._make_te(time_emb_dim,256)
        self.te5 = self._make_te(time_emb_dim,512)
        self.te1_up = self._make_te(time_emb_dim,1024)
        self.te2_up = self._make_te(time_emb_dim,512)
        self.te3_up = self._make_te(time_emb_dim,256)
        self.te4_up = self._make_te(time_emb_dim,128)
        
    def _make_te(self,dim_in,dim_out):
        return nn.Sequential(
            nn.Linear(dim_in,dim_out),
            nn.SiLU(),
            nn.Linear(dim_out,dim_out),
        )
    
    def forward(self,x,t):
        bs = x.shape[0]
        t = self.time_embed(t)
        x1 = self.conv1(x+self.te1(t).reshape(bs,-1,1,1))
        x2 = self.down1(x1+self.te2(t).reshape(bs,-1,1,1))
        x3 = self.down2(x2+self.te3(t).reshape(bs,-1,1,1))
        x4 = self.down3(x3+self.te4(t).reshape(bs,-1,1,1))
        x5 = self.down4(x4+self.te5(t).reshape(bs,-1,1,1))
        x1_up = self.up1(x4,x5+self.te1_up(t).reshape(bs,-1,1,1))
        x2_up = self.up2(x3,x1_up+self.te2_up(t).reshape(bs,-1,1,1))
        x3_up = self.up3(x2,x2_up+self.te3_up(t).reshape(bs,-1,1,1))
        x4_up = self.up4(x1,x3_up+self.te4_up(t).reshape(bs,-1,1,1))
        output = self.last_conv(x4_up)
        return output

In [None]:
bs = 3
x = torch.randn(bs,1,256,256)
n_steps = 1000
timesteps = torch.randint(0,n_steps,(bs,)).long()
unet = UNet()
y = unet(x,timesteps)
print(y.shape)

In [None]:
class DDPM(nn.Module):
    def __init__(self,network,num_timesteps,beta_start=1e-4,beta_end=2e-2,device=device) -> None:
        super().__init__()
        self.betas = torch.linspace(beta_start,beta_end,num_timesteps,dtype=torch.float32).to(device)
        self.alphas = 1 - self.betas
        self.alphas_cumprod = torch.cumprod(self.alphas,dim=0)
        self.network = network
        self.device = device
        self.sqrt_alphas_cumprod = self.alphas_cumprod**0.5
        self.sqrt_one_minus_alphas_cumprod = (1-self.alphas_cumprod)**0.5
        
    def add_noise(self,x_start,x_noise,timesteps):
        s1 = self.sqrt_alphas_cumprod[timesteps]
        s2 = self.sqrt_one_minus_alphas_cumprod[timesteps]
        s1 = s1.reshape(-1,1,1,1)
        s2 = s2.reshape(-1,1,1,1)
        return s1 * x_start + s2 * x_noise
    
    def reverse(self,x,t):
        return self.network(x,t)
    
    def step(self,model_output,timestep,sample):
        t = timestep
        coef_epsilon = (1-self.alphas)/self.sqrt_one_minus_alphas_cumprod
        coef_eps_t = coef_epsilon[t].reshape(-1,1,1,1)
        coef_first = 1/self.alphas ** 0.5
        coef_first_t = coef_first[t].reshape(-1,1,1,1)
        pred_prev_sample = coef_first_t*(sample-coef_eps_t*model_output)
        
        variance = 0
        if t > 0:
            noise = torch.randn_like(model_output).to(self.device)
            variance = ((self.betas[t]**0.5)*noise)
        
        pred_prev_sample = pred_prev_sample + variance
        return pred_prev_sample

In [None]:
def training_loop(model, dataloader, optimizer, num_epochs, num_timesteps, device=device):
    """Training loop for DDPM"""

    global_step = 0
    losses = []
    
    for epoch in range(num_epochs):
        model.train()
        progress_bar = tqdm(total=len(dataloader))
        progress_bar.set_description(f"Epoch {epoch}")
        for step, batch in enumerate(dataloader):
            batch = batch[0].to(device)
            noise = torch.randn(batch.shape).to(device)
            timesteps = torch.randint(0, num_timesteps, (batch.shape[0],)).long().to(device)

            noisy = model.add_noise(batch, noise, timesteps)
            noise_pred = model.reverse(noisy, timesteps)
            loss = F.mse_loss(noise_pred, noise)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            progress_bar.update(1)
            logs = {"loss": loss.detach().item(), "step": global_step}
            losses.append(loss.detach().item())
            progress_bar.set_postfix(**logs)
            global_step += 1
        
        progress_bar.close()

In [None]:
root_dir = './data/'
transforms01 = torchvision.transforms.Compose([
        torchvision.transforms.ToTensor(),
        #torchvision.transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])
dataset = torchvision.datasets.CIFAR10(root=root_dir, train=True, transform=transforms01, download=True)
dataloader = torch.utils.data.DataLoader(dataset=dataset, batch_size=512, shuffle=True,num_workers=10)

In [None]:
for b in dataloader:
    batch = b[0]
    break
bn = [b for b in batch[:100]]
show_images(bn,"origin")

In [None]:
learning_rate = 1e-4
num_epochs = 15
num_timesteps = 1000
network = UNet(in_channels=3)
network.to(device)
model = DDPM(network, num_timesteps, beta_start=0.0001, beta_end=0.02, device=device)
model.train()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
training_loop(model, dataloader, optimizer, num_epochs, num_timesteps, device=device) 

In [None]:
def generate_image(ddpm, sample_size, channel, size):
    """从高斯噪声生成图像"""

    frames = []
    frames_mid = []
    ddpm.eval()
    with torch.no_grad():
        timesteps = list(range(ddpm.num_timesteps))[::-1]
        sample = torch.randn(sample_size, channel, size, size).to(device)
        
        for i, t in enumerate(tqdm(timesteps)):
            time_tensor = (torch.ones(sample_size) * t).long().to(device)
            residual = ddpm.reverse(sample, time_tensor).to(device)
            sample = ddpm.step(residual, time_tensor[0], sample)

            if t==500:
                #sample_squeezed = torch.squeeze(sample)
                for i in range(sample_size):
                    frames_mid.append(sample[i].detach().cpu())

        #sample = torch.squeeze(sample)
        for i in range(sample_size):
            frames.append(sample[i].detach().cpu())
    return frames, frames_mid

In [None]:
generated, generated_mid = generate_image(model, 100, 3, 32)

In [None]:
show_images(generated_mid, "Mid result")
show_images(generated, "Final result")

In [None]:
def make_dataloader(dataset, class_name ='ship'):
    s_indices = []
    s_idx = dataset.class_to_idx[class_name]
    for i in range(len(dataset)):
        current_class = dataset[i][1]
        if current_class == s_idx:
            s_indices.append(i)
    s_dataset = Subset(dataset, s_indices)
    return torch.utils.data.DataLoader(dataset=s_dataset, batch_size=512, shuffle=True)

In [None]:
ship_dataloader = make_dataloader(dataset)
ship_network = copy.deepcopy(network)
ship_model = DDPM(ship_network, num_timesteps, beta_start=0.0001, beta_end=0.02, device=device)
num_epochs = 10
num_timesteps = model.num_timesteps
learning_rate = 3e-4
ship_model.train()
optimizer = torch.optim.Adam(ship_model.parameters(), lr=learning_rate)
training_loop(ship_model, ship_dataloader, optimizer, num_epochs, num_timesteps, device=device)
generated, generated_mid = generate_image(ship_model, 100, 3, 32)
show_images(generated, "Generated ships")

In [None]:
horse_dataloader = make_dataloader(dataset, 'horse')
horse_network = copy.deepcopy(network)
horse_model = DDPM(horse_network, num_timesteps, beta_start=0.0001, beta_end=0.02, device=device)
num_epochs = 10
num_timesteps = model.num_timesteps
learning_rate = 3e-4
horse_model.train()
optimizer = torch.optim.Adam(horse_model.parameters(), lr=learning_rate)
training_loop(horse_model, horse_dataloader, optimizer, num_epochs, num_timesteps, device=device)
generated, generated_mid = generate_image(horse_model, 100, 3, 32)
show_images(generated, "Generated horses")

In [None]:
truck_dataloader = make_dataloader(dataset, 'truck')
truck_network = copy.deepcopy(network)
truck_model = DDPM(truck_network, num_timesteps, beta_start=0.0001, beta_end=0.02, device=device)
num_epochs = 10
num_timesteps = model.num_timesteps
learning_rate = 3e-4
truck_model.train()
optimizer = torch.optim.Adam(truck_model.parameters(), lr=learning_rate)
training_loop(truck_model, truck_dataloader, optimizer, num_epochs, num_timesteps, device=device)
generated, generated_mid = generate_image(truck_model, 100, 3, 32)
show_images(generated, "Generated trucks")