<a href="https://colab.research.google.com/github/LolitaSian/DiffusionModel/blob/main/jupyter/ddpm.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<h1>
	The Annotated Diffusion Model
</h1>



在这篇博客文章中，我们将逐步讲解([Ho et al., 2020](https://arxiv.org/abs/2006.11239))的原始DDPM论文，并基于[Phil Wang的TensorFlow版本]((https://github.com/lucidrains/denoising-diffusion-pytorch))实现Pytorch版本。请注意，扩散用于生成建模的思想实际上已经在([Sohl-Dickstein et al., 2015](https://arxiv.org/abs/1503.03585))中介绍过，但是，直到斯坦福大学的([Song et al., 2019](https://arxiv.org/abs/1907.05600))和谷歌大脑的([Ho et al., 2020](https://arxiv.org/abs/2006.11239))分别改进了这种方法才得以流行起来。

[扩散模型有几个视角](https://twitter.com/sedielem/status/1530894256168222722?s=20&t=mfv4afx1GcNQU5fZklpACw)。在这里我们采用离散时间（潜变量模型）的视角.

In [None]:
!pip install -q -U einops datasets matplotlib tqdm

import math
from inspect import isfunction
from functools import partial

%matplotlib inline
import matplotlib.pyplot as plt
from tqdm.auto import tqdm
from einops import rearrange

import torch
from torch import nn, einsum
import torch.nn.functional as F


## 什么是扩散模型

Diffusion model 和 Normalizing Flows, GANs or VAEs 一样，都是将噪声从一些简单的分布转换为一个数据样本，也是神经网络学习从纯噪声开始逐渐去噪数据的过程。
包含两个步骤：
- 一个我们选择的固定的（或者说预定义好的）前向扩散过程 $q$ ，就是逐渐给图片添加高斯噪声，直到最后获得纯噪声。

- 一个需要学习的反向的去噪过程 $p_\theta$，训练一个神经网做图像去噪，从纯噪声开始，直到获得最终图像。


<p align="center">
    <img src="https://drive.google.com/uc?id=1t5dUyJwgy2ZpDAqHXw7GhUAp2FE5BWHA" width="600" />
</p>

前向和反向过程都要经过时间步$t$，总步长是$T$（DDPM中$T=1000$)。

从$t=0$开始，从数据集分布中采样一个真实图片$\mathbf x_0$。前向过程就是在每一个时间步$t$中都从一个高斯分布中采样一个噪声，将其添加到上一时间步的图像上。给出一个足够大的$T$，和每一时间步中添加噪声的表格，最终在$T$时间步你会获得一个[isotropic Gaussian distribution](https://math.stackexchange.com/questions/1991961/gaussian-distribution-is-isotropic)。



## In more mathematical form


我们令$q(\mathbf x_0)$是真实分布，也就是真实的图像的分布。

我们可以从中采样一个图片，也就是$\mathbf x_0 \sim q(\mathbf x_0)$ 。

我们设定前向扩散过程$q(\mathbf x_t|\mathbf x_{t-1})$是给每个时间步$t$添加高斯噪声，这个高斯噪声不是随机选择的，是根据我们预选设定好的方差表（$0 < \beta_1 < \beta_2 < ... < \beta_T < 1$）的高斯分布中获取的。

然后我们就可以得到前向过程的公式为：
$$
q(\mathbf {x}_t | \mathbf {x}_{t-1}) = \mathcal{N}(\mathbf {x}_t; \sqrt{1 - \beta_t} \mathbf {x}_{t-1}, \beta_t \mathbf{I}). 
$$

回想一下哦。一个高斯分布（也叫正态分布）是由两个参数决定的，均值$\mu$和方差$\sigma^2 \geq 0$。

然后我们就可以认为每个时间步$t$的图像是从一均值为${\mu}_t = \sqrt{1 - \beta_t} \mathbf {x}_{t-1}$、方差为$\sigma^2_t = \beta_t$的条件高斯分布中画出来的。借助参数重整化（reparameterization trick）可以写成

$$
\mathbf {x}_t = \sqrt{1 - \beta_t}\mathbf {x}_{t-1} +  \sqrt{\beta_t} \mathbf{\epsilon}
$$

其中$\mathbf{\epsilon} \sim \mathcal{N}(\mathbf{0}, \mathbf{I})$，是从标准高斯分布中采样的噪声。 

$\beta_t$在不同的时间步$t$中不是固定的，因此我们给$\beta$加了下标。对于$\beta_t$的选择我们可以设置为线性的、二次的、余弦的等(有点像学习率计划)。

比如在DDPM中$\beta_1 = 10^{-4}$, $\beta_T = 0.02$，在中间是做了一个线性插值。而在Improved DDPM中是使用余弦函数。

从$\mathbf x_0$开始，我们通过$\mathbf{x}_1,  ..., \mathbf{x}_t, ..., \mathbf{x}_T$,最终获得$\mathbf{x}_T$ ，如果我们的高斯噪声表设置的合理，那最后我们获得的应该是一个纯高斯噪声。

现在，如果我们能知道条件分布$p(\mathbf {x}_{t-1} | \mathbf {x}_t)$，那我们就可以将这个过程倒过来：采样一个随机高斯噪声$\mathbf x_t$，我们可以对其逐步去噪，最终得到一个真实分布的图片$\mathbf x_0$。

但是我们实际上没办法知道$p(\mathbf {x}_{t-1} | \mathbf {x}_t)$。因为它需要知道所有可能图像的分布来计算这个条件概率。因此，我们需要借助神经网络来近似(学习)这个条件概率分布。 也就是$p_\theta (\mathbf {x}_{t-1} | \mathbf {x}_t)$，其中, $\theta$是神经网络的参数，需要使用梯度下降更新。


所以现在我们需要一个神经网络来表示逆向过程的(条件)概率分布。如果我们假设这个反向过程也是高斯分布，那么回想一下，任何高斯分布都是由两个参数定义的:

* 一个均值$\mu_\theta$;
* 一个方差$\Sigma_\theta$。

所以我们可以把这个过程参数化为

$$
p_\theta (\mathbf{x}_{t-1} | \mathbf{x}_t) = \mathcal{N}(\mathbf{x}_{t-1}; \mu_\theta(\mathbf{x}_{t},t), \Sigma_\theta (\mathbf{x}_{t},t))
$$

其中均值和方差也取决于噪声水平$t$。




从上边我们可以知道，逆向过程我们需要一个神经网络来学习（表示）高斯分布的均值和方差。

DDPM中作者固定方差，只让神经网络学习条件概率分布的均值。


> First, we set $\Sigma_\theta ( \mathbf{x}_t, t) = \sigma^2_t \mathbf{I}$ to untrained time dependent constants. Experimentally, both $\sigma^2_t = \beta_t$ and $\sigma^2_t  = \tilde{\beta}_t$ (see paper) had similar results. 

[Improved diffusion models](https://openreview.net/pdf?id=-NEXDKk8gZ)这篇文章中进行了改进，神经网络既需要学习均值也要学习方差。



## 通过重新参数化均值 定义目标函数

为了推导出一个目标函数来学习逆向过程的均值，作者观察到$q$和$p_\theta$可以看做是一个VAE模型 [(Kingma et al., 2013)](https://arxiv.org/abs/1312.6114). 

因此，变分下界（ELBO）可以用来最小化关于ground truth $\mathbf x_0$的负对数似然。

这个过程的ELBO是每个时间步$t$的损失总和：$L=L_0+L_1+…+L_𝑇$。

通过构建正向$q$过程和反向过程，损失的每一项，除了$L_0$，都是两个高斯分布之间的KL散度，并且可以写为关于均值的$L_2$损失!


因为高斯分布的特性，我们不需要在正向$q$过程中逐步添加$t$步长的噪声，我们可以直接获得$x_t$的结果：

$$
q(\mathbf {x}_t | \mathbf {x}_0) = \cal{N}(\mathbf {x}_t; \sqrt{\bar{\alpha}_t} \mathbf {x}_0, (1- \bar{\alpha}_t) \mathbf{I})
$$

其中$\alpha_t := 1 - \beta_t$ and $\bar{\alpha}_t := \Pi_{s=1}^{t} \alpha_s$。

这是一个很优秀的特性。这意味着我们可以对高斯噪声进行采样并适当缩放直接将其添加到$\mathbf x_0$中就可以直接得到$\mathbf x_t$。

$\bar{\alpha}_t$是方差表$\beta_t$的函数，因此也是已知的，我们可以对其预先计算。这样可以让我们在训练期间优化损失函数$L$的随机项（换句话说，在训练期间随机采样$t$就可以优化$L_t$）。



这个属性的另一个优美之处是通过重新参数化平均值，使神经网络学习（预测）添加的噪声。

通过神经网络$\epsilon_\theta(\mathbf x_t，t)$预测噪声，可以构成损失函数中时间步$t$的KL项。

这意味着我们的神经网络变成了噪声预测器，而不是直接去预测均值了。

均值的计算方法如下:

$$ \mathbf{\mu}_\theta(\mathbf{x}_t, t) = \frac{1}{\sqrt{\alpha_t}} \left(  \mathbf{x}_t - \frac{\beta_t}{\sqrt{1- \bar{\alpha}_t}} \mathbf{\epsilon}_\theta(\mathbf{x}_t, t) \right)$$

最后的目标函数$L_t$ 长这样，给定随机的时间步 $t$ ，${\epsilon} \sim \mathcal{N}({0}, {I})$ : 

$$ \| \mathbf{\epsilon} - \mathbf{\epsilon}_\theta(\mathbf{x}_t, t) \|^2 = \| \mathbf{\epsilon} - \mathbf{\epsilon}_\theta( \sqrt{\bar{\alpha}_t} \mathbf{x}_0 + \sqrt{(1- \bar{\alpha}_t)  } \mathbf{\epsilon}, t) \|^2.$$



$\mathbf x_0$是初始图像，我们看到噪声$t$样本由固定的前向过程给出。$\epsilon$是在时间步长$t$采样的纯噪声，$\epsilon_\theta(\mathbf x_t，t)$是我们的神经网络。神经网络的优化使用一个简单的均方误差(MSE)计算真实噪声和预测高斯噪声之间的差异。

训练算法如下：

<p align="center">
    <img src="https://drive.google.com/uc?id=1LJsdkZ3i1J32lmi9ONMqKFg5LMtpSfT4" width="400" />
</p>
 

1. 从未知的真实数据分布$q(\mathbf x_0)$中随机采样$\mathbf x_0$，
2. 我们在1和$T$之间均匀采不同时间步的噪声，
3. 我们从高斯分布采样一些噪声，并在$𝑡$时间步上使用前边定义的优良属性来破坏输入分布，
4. 神经网络根据损坏的图像$\mathbf x_t$进行训练，目的是预测施加在图片上的噪声，也就是基于已知方差表$\beta_t$作用在$\mathbf x_0$上的噪声


所有这些都是在批量数据上完成的，使用随机梯度下降优化神经网络。




## 神经网络

神经网络需要在特定的时间步$t$中输入一个带有噪声的图像，并返回预测的噪声。请注意，预测的噪声是一个张量，其大小与输入图像相同。因此，在技术实现上，网络的输入和输出是相同形状的张量。我们可以使用什么类型的神经网络来实现这个任务？


在这里通常使用的是类似于[自编码器](https://en.wikipedia.org/wiki/Autoencoder)的网络，自编码器在编码器和解码器之间有一个所谓的“瓶颈”（bottleneck）层。编码器首先将图像编码为较小的隐藏表示，称为“瓶颈”，然后解码器将该隐藏表示解码回实际图像。这使网络的瓶颈层中可以保留最重要的信息。


在体系结构方面，DDPM作者选择了U-Net，由([Ronneberger et al., 2015](https://arxiv.org/abs/1505.04597))提出，当时在医学图像分割方面实现了SOTA。与任何自编码器一样，该网络包括中间的瓶颈，以确保网络学习到最重要的信息。此外，它引入了编码器和解码器之间的残差连接，极大地改善了梯度流动（受[He et al., 2015](https://arxiv.org/abs/1512.03385))ResNet的启发）。

<p align="center">
    <img src="https://drive.google.com/uc?id=1_Hej_VTgdUWGsxxIuyZACCGjpbCGIUi6" width="400" />
</p>

可以看出，U-Net模型首先对输入进行下采样（即在空间分辨率上使输入更小），然后执行上采样。

下面，我们将逐步实现这个网络。



### 网络辅助模块

首先，我们定义一些辅助函数和类，在实现神经网络时会用到它们。重要的是，我们定义了一个“残差”模块，它将输入简单地加到特定函数的输出中（为特定函数添加了一个残差连接）。

我们还为上采样和下采样操作定义了别名。

In [None]:
def exists(x):
    return x is not None

def default(val, d):
    if exists(val):
        return val
    return d() if isfunction(d) else d

class Residual(nn.Module):
    def __init__(self, fn):
        super().__init__()
        self.fn = fn

    def forward(self, x, *args, **kwargs):
        return self.fn(x, *args, **kwargs) + x

def Upsample(dim):
    return nn.ConvTranspose2d(dim, dim, 4, 2, 1)

def Downsample(dim):
    return nn.Conv2d(dim, dim, 4, 2, 1)

### 位置嵌入

神经网络的参数在不同时间步之间是共享的，因此作者受Transformer([Vaswani et al., 2017](https://arxiv.org/abs/1706.03762))启发，使用正弦位置嵌入来对$t$进行编码。这使得神经网络能够“知道”它正在处理的batch中每个图像的时间步长（噪声水平）。

`SinusoidalPositionEmbeddings`模块以形状为`(batch_size, 1)`的张量作为输入，即当前batch中每个图片的时间步（噪声水平），并将其转换为形状为(batch_size，dim)的张量，其中dim是位置嵌入的维度。然后将其添加到每个残差块中。


In [None]:
class SinusoidalPositionEmbeddings(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.dim = dim

    def forward(self, time):
        device = time.device
        half_dim = self.dim // 2
        embeddings = math.log(10000) / (half_dim - 1)
        embeddings = torch.exp(torch.arange(half_dim, device=device) * -embeddings)
        embeddings = time[:, None] * embeddings[None, :]
        embeddings = torch.cat((embeddings.sin(), embeddings.cos()), dim=-1)
        return embeddings

###  ResNet/ConvNeXT块

接下来，我们定义U-Net模型的核心构建块。

DDPM作者使用了Wide ResNet块([Zagoruyko et al., 2016](https://arxiv.org/abs/1605.07146))，但Phil Wang代码中还实现了ConvNeXT块([Liu et al., 2022](https://arxiv.org/abs/2201.03545))。在U-Net架构中，可以任选其一。


In [None]:
class Block(nn.Module):
    def __init__(self, dim, dim_out, groups = 8):
        super().__init__()
        self.proj = nn.Conv2d(dim, dim_out, 3, padding = 1)
        self.norm = nn.GroupNorm(groups, dim_out)
        self.act = nn.SiLU()

    def forward(self, x, scale_shift = None):
        x = self.proj(x)
        x = self.norm(x)

        if exists(scale_shift):
            scale, shift = scale_shift
            x = x * (scale + 1) + shift

        x = self.act(x)
        return x

class ResnetBlock(nn.Module):
    """https://arxiv.org/abs/1512.03385"""
    
    def __init__(self, dim, dim_out, *, time_emb_dim=None, groups=8):
        super().__init__()
        self.mlp = (
            nn.Sequential(nn.SiLU(), nn.Linear(time_emb_dim, dim_out))
            if exists(time_emb_dim)
            else None
        )

        self.block1 = Block(dim, dim_out, groups=groups)
        self.block2 = Block(dim_out, dim_out, groups=groups)
        self.res_conv = nn.Conv2d(dim, dim_out, 1) if dim != dim_out else nn.Identity()

    def forward(self, x, time_emb=None):
        h = self.block1(x)

        if exists(self.mlp) and exists(time_emb):
            time_emb = self.mlp(time_emb)
            h = rearrange(time_emb, "b c -> b c 1 1") + h

        h = self.block2(h)
        return h + self.res_conv(x)
    
class ConvNextBlock(nn.Module):
    """https://arxiv.org/abs/2201.03545"""

    def __init__(self, dim, dim_out, *, time_emb_dim=None, mult=2, norm=True):
        super().__init__()
        self.mlp = (
            nn.Sequential(nn.GELU(), nn.Linear(time_emb_dim, dim))
            if exists(time_emb_dim)
            else None
        )

        self.ds_conv = nn.Conv2d(dim, dim, 7, padding=3, groups=dim)

        self.net = nn.Sequential(
            nn.GroupNorm(1, dim) if norm else nn.Identity(),
            nn.Conv2d(dim, dim_out * mult, 3, padding=1),
            nn.GELU(),
            nn.GroupNorm(1, dim_out * mult),
            nn.Conv2d(dim_out * mult, dim_out, 3, padding=1),
        )

        self.res_conv = nn.Conv2d(dim, dim_out, 1) if dim != dim_out else nn.Identity()

    def forward(self, x, time_emb=None):
        h = self.ds_conv(x)

        if exists(self.mlp) and exists(time_emb):
            assert exists(time_emb), "time embedding must be passed in"
            condition = self.mlp(time_emb)
            h = h + rearrange(condition, "b c -> b c 1 1")

        h = self.net(h)
        return h + self.res_conv(x)

### Attention 模块

接下来，我们定义注意力模块，该模块由DDPM作者添加在卷积块之间。注意力是Transformer([Vaswani et al., 2017](https://arxiv.org/abs/1706.03762))的构建块，在AI领域，从NLP和CV到蛋白质折叠，都取得了巨大的成功。Phil Wang实现了2种注意力变体：一种是常规的多头自注意力（和Transformer中的一样），另一种是[线性注意力变体](https://github.com/lucidrains/linear-attention-transformer)（[Shen et al., 2018](https://arxiv.org/abs/1812.01243)），其时间和内存需求随序列长度线性缩放，节省计算资源。


In [None]:
class Attention(nn.Module):
    def __init__(self, dim, heads=4, dim_head=32):
        super().__init__()
        self.scale = dim_head**-0.5
        self.heads = heads
        hidden_dim = dim_head * heads
        self.to_qkv = nn.Conv2d(dim, hidden_dim * 3, 1, bias=False)
        self.to_out = nn.Conv2d(hidden_dim, dim, 1)

    def forward(self, x):
        b, c, h, w = x.shape
        qkv = self.to_qkv(x).chunk(3, dim=1)
        q, k, v = map(
            lambda t: rearrange(t, "b (h c) x y -> b h c (x y)", h=self.heads), qkv
        )
        q = q * self.scale

        sim = einsum("b h d i, b h d j -> b h i j", q, k)
        sim = sim - sim.amax(dim=-1, keepdim=True).detach()
        attn = sim.softmax(dim=-1)

        out = einsum("b h i j, b h d j -> b h i d", attn, v)
        out = rearrange(out, "b h (x y) d -> b (h d) x y", x=h, y=w)
        return self.to_out(out)

class LinearAttention(nn.Module):
    def __init__(self, dim, heads=4, dim_head=32):
        super().__init__()
        self.scale = dim_head**-0.5
        self.heads = heads
        hidden_dim = dim_head * heads
        self.to_qkv = nn.Conv2d(dim, hidden_dim * 3, 1, bias=False)

        self.to_out = nn.Sequential(nn.Conv2d(hidden_dim, dim, 1), 
                                    nn.GroupNorm(1, dim))

    def forward(self, x):
        b, c, h, w = x.shape
        qkv = self.to_qkv(x).chunk(3, dim=1)
        q, k, v = map(
            lambda t: rearrange(t, "b (h c) x y -> b h c (x y)", h=self.heads), qkv
        )

        q = q.softmax(dim=-2)
        k = k.softmax(dim=-1)

        q = q * self.scale
        context = torch.einsum("b h d n, b h e n -> b h d e", k, v)

        out = torch.einsum("b h d e, b h d n -> b h e n", context, q)
        out = rearrange(out, "b h c (x y) -> b (h c) x y", h=self.heads, x=h, y=w)
        return self.to_out(out)

### Group normalization

DDPM 的作者在 U-Net 的卷积/注意力层之间交替使用了群组归一化([Wu et al., 2018](https://arxiv.org/abs/1803.08494))。下面，我们定义一个`PreNorm`类，它将用于在注意力层之前实现群组归一化。

请注意，在 Transformer 中，是否在注意力之前或之后使用归一化存在争论，具体可以看：[Transformers without Tears](https://tnq177.github.io/data/transformers_without_tears.pdf)。


In [None]:
class PreNorm(nn.Module):
    def __init__(self, dim, fn):
        super().__init__()
        self.fn = fn
        self.norm = nn.GroupNorm(1, dim)

    def forward(self, x):
        x = self.norm(x)
        return self.fn(x)

### Conditional U-Net

有了所有构建块（位置嵌入、ResNet/ConvNeXT块、注意力和组归一化）的定义，现在定义整个神经网络。

回想一下，网络 $\mathbf{\epsilon}_\theta(\mathbf{x}_t, t)$ 的任务是接收一批有噪声的图像和噪声水平，并输出加入噪声后的图像。更正式地说：

网络接收一个形状为`(batch_size, num_channels, height, width)`的有噪声图像批次和一个形状为`(batch_size, 1)`的噪声水平批次作为输入，并返回一个形状为`(batch_size, num_channels, height, width)`的张量。

网络的构建如下：

- 在一个batch的带噪图像上卷积，并为噪声水平（时间步$t$）计算位置嵌入。
- 进行一系列的下采样。
  每个下采样阶段包含2个ResNet/ConvNeXT块 + group归一化 + 注意力 + 残差连接 + 下采样。
- 在网络的中心，再次应用ResNet/ConvNeXT块，交替使用注意力。

- 进行一系列的上采样。
  每个上采样阶段包含2个ResNet/ConvNeXT块 + group归一化 + 注意力 + 残差连接 + 上采样操作。

- 应用一个ResNet/ConvNeXT块，然后是一个卷积层。


[神经网络堆叠层就像积木块一样](http://karpathy.github.io/2019/04/25/recipe/)。




In [None]:
class Unet(nn.Module):
    def __init__(
        self,
        dim,
        init_dim=None,
        out_dim=None,
        dim_mults=(1, 2, 4, 8),
        channels=3,
        with_time_emb=True,
        resnet_block_groups=8,
        use_convnext=True,
        convnext_mult=2,
    ):
        super().__init__()

        # determine dimensions
        self.channels = channels

        init_dim = default(init_dim, dim // 3 * 2)
        self.init_conv = nn.Conv2d(channels, init_dim, 7, padding=3)

        dims = [init_dim, *map(lambda m: dim * m, dim_mults)]
        in_out = list(zip(dims[:-1], dims[1:]))
        
        if use_convnext:
            block_klass = partial(ConvNextBlock, mult=convnext_mult)
        else:
            block_klass = partial(ResnetBlock, groups=resnet_block_groups)

        # time embeddings
        if with_time_emb:
            time_dim = dim * 4
            self.time_mlp = nn.Sequential(
                SinusoidalPositionEmbeddings(dim),
                nn.Linear(dim, time_dim),
                nn.GELU(),
                nn.Linear(time_dim, time_dim),
            )
        else:
            time_dim = None
            self.time_mlp = None

        # layers
        self.downs = nn.ModuleList([])
        self.ups = nn.ModuleList([])
        num_resolutions = len(in_out)

        for ind, (dim_in, dim_out) in enumerate(in_out):
            is_last = ind >= (num_resolutions - 1)

            self.downs.append(
                nn.ModuleList(
                    [
                        block_klass(dim_in, dim_out, time_emb_dim=time_dim),
                        block_klass(dim_out, dim_out, time_emb_dim=time_dim),
                        Residual(PreNorm(dim_out, LinearAttention(dim_out))),
                        Downsample(dim_out) if not is_last else nn.Identity(),
                    ]
                )
            )

        mid_dim = dims[-1]
        self.mid_block1 = block_klass(mid_dim, mid_dim, time_emb_dim=time_dim)
        self.mid_attn = Residual(PreNorm(mid_dim, Attention(mid_dim)))
        self.mid_block2 = block_klass(mid_dim, mid_dim, time_emb_dim=time_dim)

        for ind, (dim_in, dim_out) in enumerate(reversed(in_out[1:])):
            is_last = ind >= (num_resolutions - 1)

            self.ups.append(
                nn.ModuleList(
                    [
                        block_klass(dim_out * 2, dim_in, time_emb_dim=time_dim),
                        block_klass(dim_in, dim_in, time_emb_dim=time_dim),
                        Residual(PreNorm(dim_in, LinearAttention(dim_in))),
                        Upsample(dim_in) if not is_last else nn.Identity(),
                    ]
                )
            )

        out_dim = default(out_dim, channels)
        self.final_conv = nn.Sequential(
            block_klass(dim, dim), nn.Conv2d(dim, out_dim, 1)
        )

    def forward(self, x, time):
        x = self.init_conv(x)

        t = self.time_mlp(time) if exists(self.time_mlp) else None

        h = []

        # downsample
        for block1, block2, attn, downsample in self.downs:
            x = block1(x, t)
            x = block2(x, t)
            x = attn(x)
            h.append(x)
            x = downsample(x)

        # bottleneck
        x = self.mid_block1(x, t)
        x = self.mid_attn(x)
        x = self.mid_block2(x, t)

        # upsample
        for block1, block2, attn, upsample in self.ups:
            x = torch.cat((x, h.pop()), dim=1)
            x = block1(x, t)
            x = block2(x, t)
            x = attn(x)
            x = upsample(x)

        return self.final_conv(x)

## 前向扩散

前向过程就是在$T$时间步按照定义好的**方差表**给图像逐渐添加噪声的过程。

DDPM是使用线性插值做的方差表:

> We set the forward process variances to constants
increasing linearly from $\beta_1 = 10^{−4}$
to $\beta_T = 0.02$.

但是([Nichol et al., 2021](https://arxiv.org/abs/2102.09672)) 表示将噪声表预设为cosine schedule可以获得更好的效果。

下面，我们定义了 $T$ 个时间步骤的各种方差表，以及我们需要的相应变量，如累积方差。

In [None]:
def cosine_beta_schedule(timesteps, s=0.008):
    """
    cosine schedule as proposed in https://arxiv.org/abs/2102.09672
    """
    steps = timesteps + 1
    x = torch.linspace(0, timesteps, steps)
    alphas_cumprod = torch.cos(((x / timesteps) + s) / (1 + s) * torch.pi * 0.5) ** 2
    alphas_cumprod = alphas_cumprod / alphas_cumprod[0]
    betas = 1 - (alphas_cumprod[1:] / alphas_cumprod[:-1])
    return torch.clip(betas, 0.0001, 0.9999)

def linear_beta_schedule(timesteps):
    beta_start = 0.0001
    beta_end = 0.02
    return torch.linspace(beta_start, beta_end, timesteps)

def quadratic_beta_schedule(timesteps):
    beta_start = 0.0001
    beta_end = 0.02
    return torch.linspace(beta_start**0.5, beta_end**0.5, timesteps) ** 2

def sigmoid_beta_schedule(timesteps):
    beta_start = 0.0001
    beta_end = 0.02
    betas = torch.linspace(-6, 6, timesteps)
    return torch.sigmoid(betas) * (beta_end - beta_start) + beta_start


我们先使用线性方差表来进行$T=200$时间步长的训练。

首先定义我们将需要的各种变量，比如方差变量$\beta_t$ 和 累乘方差 $\bar{\alpha}_t$。

下面列出的每个变量都是一维张量，存储了从 $t$ 到 $T$ 的值。此外，我们还定义了一个 `extract` 函数，它将允许我们获取一个batch中索引的适当 $t$ 索引。

In [None]:
timesteps = 200

# define beta schedule
betas = linear_beta_schedule(timesteps=timesteps)

# define alphas 
alphas = 1. - betas
alphas_cumprod = torch.cumprod(alphas, axis=0)
alphas_cumprod_prev = F.pad(alphas_cumprod[:-1], (1, 0), value=1.0)
sqrt_recip_alphas = torch.sqrt(1.0 / alphas)

# calculations for diffusion q(x_t | x_{t-1}) and others
sqrt_alphas_cumprod = torch.sqrt(alphas_cumprod)
sqrt_one_minus_alphas_cumprod = torch.sqrt(1. - alphas_cumprod)

# calculations for posterior q(x_{t-1} | x_t, x_0)
posterior_variance = betas * (1. - alphas_cumprod_prev) / (1. - alphas_cumprod)

def extract(a, t, x_shape):
    batch_size = t.shape[0]
    out = a.gather(-1, t.cpu())
    return out.reshape(batch_size, *((1,) * (len(x_shape) - 1))).to(t.device)

我们用这个猫的图展示一下前向过程中是如何加噪的：

In [None]:
from PIL import Image
import requests

url = 'http://images.cocodataset.org/val2017/000000039769.jpg'
image = Image.open(requests.get(url, stream=True).raw)
image

<img src="https://drive.google.com/uc?id=17FXnvCTl96lDhqZ_io54guXO8hM-rsQ2" width="400" />


噪声是加到 PyTorch 张量上，而不是 Pillow 图像上。

我们首先定义图像变换，以便我们可以从 PIL 图像转换为 PyTorch 张量（我们可以在其上添加噪声），反之亦然。

这些变换非常简单：我们首先通过除以 255 来归一化图像（使它们在 `[0,1]` 范围内），然后确保它们在 `[-1,1]` 范围内。方法来自 DDPM 论文：

> We assume that image data consists of integers in $\{0, 1, ... , 255\}$ scaled linearly to $[−1, 1]$. This
ensures that the neural network reverse process operates on consistently scaled inputs starting from
the standard normal prior $p(\mathbf{x}_T )$. 


In [None]:
from torchvision.transforms import Compose, ToTensor, Lambda, ToPILImage, CenterCrop, Resize

image_size = 128
transform = Compose([
    Resize(image_size),
    CenterCrop(image_size),
    ToTensor(), # turn into Numpy array of shape HWC, divide by 255
    Lambda(lambda t: (t * 2) - 1),
    
])

x_start = transform(image).unsqueeze(0)
x_start.shape

我们也定义反向过程，把图像从张量转换回PIL图像：

In [None]:
import numpy as np

reverse_transform = Compose([
     Lambda(lambda t: (t + 1) / 2),
     Lambda(lambda t: t.permute(1, 2, 0)), # CHW to HWC
     Lambda(lambda t: t * 255.),
     Lambda(lambda t: t.numpy().astype(np.uint8)),
     ToPILImage(),
])

让我们验证一下：





In [None]:
reverse_transform(x_start.squeeze())

<img src="https://drive.google.com/uc?id=1WT22KYvqJbHFdYYfkV7ohKNO4alnvesB" width="100" />

现在我们可以和DDPM论文写的一样定义网络前向过程了：


In [None]:
# forward diffusion
def q_sample(x_start, t, noise=None):
    if noise is None:
        noise = torch.randn_like(x_start)

    sqrt_alphas_cumprod_t = extract(sqrt_alphas_cumprod, t, x_start.shape)
    sqrt_one_minus_alphas_cumprod_t = extract(
        sqrt_one_minus_alphas_cumprod, t, x_start.shape
    )

    return sqrt_alphas_cumprod_t * x_start + sqrt_one_minus_alphas_cumprod_t * noise

测试一下：

In [None]:
def get_noisy_image(x_start, t):
  # add noise
  x_noisy = q_sample(x_start, t=t)

  # turn back into PIL image
  noisy_image = reverse_transform(x_noisy.squeeze())

  return noisy_image

In [None]:
# take time step
t = torch.tensor([40])

get_noisy_image(x_start, t)

<img src="https://drive.google.com/uc?id=1Ra33wxuw3QxPlUG0iqZGtxgKBNdjNsqz" width="100" />

让我们可视化一下不同时间步骤的图像：

In [None]:
import matplotlib.pyplot as plt

# use seed for reproducability
torch.manual_seed(0)

# source: https://pytorch.org/vision/stable/auto_examples/plot_transforms.html#sphx-glr-auto-examples-plot-transforms-py
def plot(imgs, with_orig=False, row_title=None, **imshow_kwargs):
    if not isinstance(imgs[0], list):
        # Make a 2d grid even if there's just 1 row
        imgs = [imgs]

    num_rows = len(imgs)
    num_cols = len(imgs[0]) + with_orig
    fig, axs = plt.subplots(figsize=(200,200), nrows=num_rows, ncols=num_cols, squeeze=False)
    for row_idx, row in enumerate(imgs):
        row = [image] + row if with_orig else row
        for col_idx, img in enumerate(row):
            ax = axs[row_idx, col_idx]
            ax.imshow(np.asarray(img), **imshow_kwargs)
            ax.set(xticklabels=[], yticklabels=[], xticks=[], yticks=[])

    if with_orig:
        axs[0, 0].set(title='Original image')
        axs[0, 0].title.set_size(8)
    if row_title is not None:
        for row_idx in range(num_rows):
            axs[row_idx, 0].set(ylabel=row_title[row_idx])

    plt.tight_layout()

In [None]:
plot([get_noisy_image(x_start, torch.tensor([t])) for t in [0, 50, 100, 150, 199]])

这意味着我们现在可以根据模型定义损失函数，如下所示：

In [None]:
def p_losses(denoise_model, x_start, t, noise=None, loss_type="l1"):
    if noise is None:
        noise = torch.randn_like(x_start)

    x_noisy = q_sample(x_start=x_start, t=t, noise=noise)
    predicted_noise = denoise_model(x_noisy, t)

    if loss_type == 'l1':
        loss = F.l1_loss(noise, predicted_noise)
    elif loss_type == 'l2':
        loss = F.mse_loss(noise, predicted_noise)
    elif loss_type == "huber":
        loss = F.smooth_l1_loss(noise, predicted_noise)
    else:
        raise NotImplementedError()

    return loss

`denoise_model`是我们上面定义的 U-Net。我们将使用真实噪声和预测噪声之间的 Huber 损失。

## 定义 Dataset + DataLoader

这里我们定义了一个普通的PyTorch数据集。可以使用如Fashion-MNIST、CIFAR-10或ImageNet等，将图线性缩放到`[−1,1]`。

每个图像都被缩放到相同的大小。有趣的是，图像还会被随机水平翻转。来自论文的描述：

> We used random horizontal flips during training for CIFAR10; we tried training both with and without flips, and found flips to improve sample quality slightly.

在这里，我们使用🤗 [Datasets library](https://huggingface.co/docs/datasets/index) 的Fashion MNIST数据集，该数据集由已经具有相同分辨率的图像组成，即28x28。




In [None]:
from datasets import load_dataset

# load dataset from the hub
dataset = load_dataset("fashion_mnist")
image_size = 28
channels = 1
batch_size = 128



接下来，我们定义一个函数，用于修改数据集。

我们使用[`with_transform`]([functionality](https://huggingface.co/docs/datasets/v2.2.1/en/package_reference/main_classes#datasets.Dataset.with_transform))功能来实现。该函数只是应用一些基本的图像预处理：随机水平翻转，重新缩放，最后使它们的值在 [-1, 1] 范围内。

In [None]:
from torchvision import transforms
from torch.utils.data import DataLoader

# define image transformations (e.g. using torchvision)
transform = Compose([
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Lambda(lambda t: (t * 2) - 1)
])

# define function
def transforms(examples):
   examples["pixel_values"] = [transform(image.convert("L")) for image in examples["image"]]
   del examples["image"]

   return examples

transformed_dataset = dataset.with_transform(transforms).remove_columns("label")

# create dataloader
dataloader = DataLoader(transformed_dataset["train"], batch_size=batch_size, shuffle=True)

In [None]:
batch = next(iter(dataloader))
print(batch.keys())

## 采样

以下是模型训练时的采样代码，我们可以用于获取训练进度。按照论文中的算法2实现：

<img src="https://drive.google.com/uc?id=1ij80f8TNBDzpKtqHjk_sh8o5aby3lmD7" width="500" />


从扩散模型中生成新图像的过程是通过扩散过程的反向过程来实现的：我们从时间步$T$开始，从高斯分布中采样纯噪声，然后逐步去噪（使用它所学习的条件概率），直到时间步 $t=0$。如上所示，我们可以通过插入使用我们的噪声预测器的均值的重参数化来得到稍微去噪之后的图像，方差是预定义好的。

理想情况下，我们得到的图像看起来像来自真实数据分布的图像。



代码实现如下：

In [None]:
@torch.no_grad()
def p_sample(model, x, t, t_index):
    betas_t = extract(betas, t, x.shape)
    sqrt_one_minus_alphas_cumprod_t = extract(
        sqrt_one_minus_alphas_cumprod, t, x.shape
    )
    sqrt_recip_alphas_t = extract(sqrt_recip_alphas, t, x.shape)
    
    # Equation 11 in the paper
    # Use our model (noise predictor) to predict the mean
    model_mean = sqrt_recip_alphas_t * (
        x - betas_t * model(x, t) / sqrt_one_minus_alphas_cumprod_t
    )

    if t_index == 0:
        return model_mean
    else:
        posterior_variance_t = extract(posterior_variance, t, x.shape)
        noise = torch.randn_like(x)
        # Algorithm 2 line 4:
        return model_mean + torch.sqrt(posterior_variance_t) * noise 

# Algorithm 2 but save all images:
@torch.no_grad()
def p_sample_loop(model, shape):
    device = next(model.parameters()).device

    b = shape[0]
    # start from pure noise (for each example in the batch)
    img = torch.randn(shape, device=device)
    imgs = []
    
    for i in tqdm(reversed(range(0, timesteps)), desc='sampling loop time step', total=timesteps):
        img = p_sample(model, img, torch.full((b,), i, device=device, dtype=torch.long), i)
        imgs.append(img.cpu().numpy())
    return imgs

@torch.no_grad()
def sample(model, image_size, batch_size=16, channels=3):
    return p_sample_loop(model, shape=(batch_size, channels, image_size, image_size))


注意，上面的代码是原始实现的简化版本。我们发现我们的简化版本（与论文中的算法2一致）与[原始更复杂的实现](https://github.com/hojonathanho/diffusion/blob/master/diffusion_tf/diffusion_utils.py)同样有效。


## 训练

接下来，我们按照通常的 PyTorch 训练方式来训练模型。我们还定义了一些逻辑，使用上面定义的`sample`方法，以便定期保存生成的图像。


In [None]:
from pathlib import Path

def num_to_groups(num, divisor):
    groups = num // divisor
    remainder = num % divisor
    arr = [divisor] * groups
    if remainder > 0:
        arr.append(remainder)
    return arr

results_folder = Path("./results")
results_folder.mkdir(exist_ok = True)
save_and_sample_every = 1000


定义好模型，将其丢到GPU上，使用Adam进行优化。

In [None]:
from torch.optim import Adam

device = "cuda" if torch.cuda.is_available() else "cpu"

model = Unet(
    dim=image_size,
    channels=channels,
    dim_mults=(1, 2, 4,)
)
model.to(device)

optimizer = Adam(model.parameters(), lr=1e-3)

开始训练

In [32]:
from torchvision.utils import save_image

epochs = 5

for epoch in range(epochs):
    for step, batch in enumerate(dataloader):
      optimizer.zero_grad()

      batch_size = batch["pixel_values"].shape[0]
      batch = batch["pixel_values"].to(device)

      # Algorithm 1 line 3: sample t uniformally for every example in the batch
      t = torch.randint(0, timesteps, (batch_size,), device=device).long()

      loss = p_losses(model, batch, t, loss_type="huber")

      if step % 100 == 0:
        print("Loss:", loss.item())

      loss.backward()
      optimizer.step()

      # save generated images
      if step != 0 and step % save_and_sample_every == 0:
        milestone = step // save_and_sample_every
        batches = num_to_groups(4, batch_size)
        all_images_list = list(map(lambda n: sample(model, batch_size=n, channels=channels), batches))
        all_images = torch.cat(all_images_list, dim=0)
        all_images = (all_images + 1) * 0.5
        save_image(all_images, str(results_folder / f'sample-{milestone}.png'), nrow = 6)

Loss: 0.0475182868540287


KeyboardInterrupt: ignored

<div class="output stream stdout">

    Output:
    ----------------------------------------------------------------------------------------------------
    Loss: 0.46477368474006653
    Loss: 0.12143351882696152
    Loss: 0.08106148988008499
    Loss: 0.0801810547709465
    Loss: 0.06122320517897606
    Loss: 0.06310459971427917
    Loss: 0.05681884288787842
    Loss: 0.05729678273200989
    Loss: 0.05497899278998375
    Loss: 0.04439849033951759
    Loss: 0.05415581166744232
    Loss: 0.06020551547408104
    Loss: 0.046830907464027405
    Loss: 0.051029372960329056
    Loss: 0.0478244312107563
    Loss: 0.046767622232437134
    Loss: 0.04305662214756012
    Loss: 0.05216279625892639
    Loss: 0.04748568311333656
    Loss: 0.05107741802930832
    Loss: 0.04588869959115982
    Loss: 0.043014321476221085
    Loss: 0.046371955424547195
    Loss: 0.04952816292643547
    Loss: 0.04472338408231735

</div>

## 采样/推理

训练完了我们就可以使用下边这个函数进行推理采样了：

In [None]:
# sample 64 images
samples = sample(model, image_size=image_size, batch_size=64, channels=channels)

In [None]:
# show a random one
random_index = 5
plt.imshow(samples[-1][random_index].reshape(image_size, image_size, channels), cmap="gray")

<img src="https://drive.google.com/uc?id=1ytnzS7IW7ortC6ub85q7nud1IvXe2QTE" width="300" />

看起来这个模型能够生成一个不错的T恤衫！请记住，我们训练用数据集分辨率比较低（28x28），所以这个结果还OK。

我们还可以创建一个去噪过程的 GIF：

In [None]:
import matplotlib.animation as animation

random_index = 53

fig = plt.figure()
ims = []
for i in range(timesteps):
    im = plt.imshow(samples[i][random_index].reshape(image_size, image_size, channels), cmap="gray", animated=True)
    ims.append([im])

animate = animation.ArtistAnimation(fig, ims, interval=50, blit=True, repeat_delay=1000)
animate.save('diffusion.gif')
plt.show()

<img src="https://drive.google.com/uc?id=1eyonQWhfmbQsTq8ndsNjw5QSRQ9em9Au" width="500" />



# 后续阅读📕

- Improved Denoising Diffusion Probabilistic Models ([Nichol et al., 2021](https://arxiv.org/abs/2102.09672)): 学习条件分布的方差（除均值外）有助于提高性能。

- Cascaded Diffusion Models for High Fidelity Image Generation ([Ho et al., 2021](https://arxiv.org/abs/2106.15282)): 引入级联扩散，包括多个扩散模型的pipeline，用于生成逐步提高分辨率的图像，实现高保真度图像合成。

- Diffusion Models Beat GANs on Image Synthesis ([Dhariwal et al., 2021](https://arxiv.org/abs/2105.05233)): 通过改进U-Net架构，并引入分类器指导，证明了扩散模型可以实现比当前最先进的生成模型更好的图像样本质量。

- Classifier-Free Diffusion Guidance ([Ho et al., 2021](https://openreview.net/pdf?id=qw8AKxfYbI)): 通过联合训练一个条件和一个无条件扩散模型的单个神经网络，展示了指导扩散模型不需要分类器。

- Hierarchical Text-Conditional Image Generation with CLIP Latents (DALL-E 2) ([Ramesh et al., 2022](https://cdn.openai.com/papers/dall-e-2.pdf)): 使用先验将文本标题转化为CLIP图像嵌入，然后扩散模型将其解码成图像。

- Photorealistic Text-to-Image Diffusion Models with Deep Language Understanding (ImageGen) ([Saharia et al., 2022](https://arxiv.org/abs/2205.11487)): 将大型预训练语言模型（例如T5）与级联扩散相结合，可以很好地实现文本到图像的合成。


目前，扩散模型的主要（或许唯一的）缺点似乎是需要多次前向传递才能生成一张图像（而对于GAN等生成模型则不需要）。然而，[Zhang et al., 2022](https://arxiv.org/abs/2204.13902)可以在不到10个去噪步骤内实现高保真度的生成。