<center><img src="images/DLI_Header.png" alt="标题" style="width: 400px;"/></center>

# 1. 从 U-Net 到 Diffusion

U-Nets 是一种卷积神经网络，最初是为医学成像而设计的。例如，我们可以向网络输入心脏图像，它可以返回另一张突出显示潜在癌变区域的图片。

我们可以使用这个过程来生成新图像吗？这里有一个想法：如果我们在图像中添加噪声，然后使用 U-Net 将图像与噪声分离，结果会怎样？然后我们可以向模型输入噪声并让它创建可识别的图像吗？让我们试一试吧！

#### 学习目标

本笔记本的目标是：
* 探索 FashionMNIST 数据集
* 构建 U-Net 架构
  * 构建 Down Block
  * 构建 Up Block
* 训练模型以消除图像中的噪音
* 尝试生成服装

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.optim import Adam

# Visualization tools
import graphviz
from torchview import draw_graph
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt

在 PyTorch 中，我们可以通过将 [device](https://pytorch.org/docs/stable/tensor_attributes.html#torch.device) 设置为 `cuda` 来在操作中使用 GPU。函数 `torch.cuda.is_available()` 将确认 PyTorch 可以识别 GPU。

In [None]:
!nvidia-smi

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.cuda.is_available()

## 1.1 数据集

为了练习生成图像，我们将使用 [FashionMNIST](https://github.com/zalandoresearch/fashion-mnist) 数据集。FashionMNIST 旨在成为图像分类问题的“Hello World”数据集。黑白图像的小尺寸（28 x 28 像素）也使其成为图像生成的一个很好的起点。

FashionMNIST 包含在 [Torchvision](https://pytorch.org/vision/stable/index.html) 中，这是一个与 PyTorch 关联的计算机视觉库。下载数据集时，我们可以传递我们想要应用于图像的 [transformations](https://pytorch.org/vision/stable/transforms.html) 列表。现在，我们将使用 [ToTensor](https://pytorch.org/vision/stable/generated/torchvision.transforms.ToTensor.html#torchvision.transforms.ToTensor) 将图像转换为张量，以便我们可以使用神经网络处理图像。这将自动将像素值从 [0, 255] 缩放到 [0, 1]。它还会将尺寸从 [高度 x 宽度 x 通道] 重新排列为 [通道 x 高度 x 宽度]。

In [None]:
train_set = torchvision.datasets.FashionMNIST(
    "./data/", download=True, transform=transforms.Compose([transforms.ToTensor()])
)
NUM_CLASSES = 10

我们可以使用下面的代码对一些图像进行采样：

In [None]:
# Adjust for display; high w/h ratio recommended
plt.figure(figsize=(16, 1))

def show_images(dataset, num_samples=10):
    for i, img in enumerate(dataset):
        if i == num_samples:
            return
        plt.subplot(1, num_samples, i + 1)
        plt.imshow(torch.squeeze(img[0]))

show_images(train_set)

让我们为我们的数据集设置一些导入常量。使用 U-Nets，通常通过 [Max Pooling](https://paperswithcode.com/method/max-pooling) 不断将特征图的大小减半。然后，使用 [Transposed Convolution](https://pytorch.org/docs/stable/generated/torch.nn.ConvTranspose2d.html) 将特征图大小加倍。为了在 U-Net 上下移动时保持图像尺寸一致，如果图像大小可以被 `2` 整除多次，则会有所帮助。

In [None]:
IMG_SIZE = 16 # Due to stride and pooling, must be divisible by 2 multiple times
IMG_CH = 1 # Black and white image, no color channels
BATCH_SIZE = 128

现在我们已经定义了图像的目标大小，让我们创建一个函数来加载数据并将其转换为目标大小。我们将添加到图像中的随机噪声将从 [标准正态分布](https://mathworld.wolfram.com/NormalDistribution.html) 中采样，这意味着 68% 的噪声像素值将从 -1 到 1。我们将类似地将图像值缩放到从 -1 到 1。

这也是应用随机图像增强的好地方。现在，我们将从 [RandomHorizontalFlip](https://pytorch.org/vision/stable/generated/torchvision.transforms.RandomHorizontalFlip.html#torchvision.transforms.RandomHorizontalFlip) 开始。我们不会使用 [RandomVericalFlip](https://pytorch.org/vision/stable/generated/torchvision.transforms.RandomVerticalFlip.html#torchvision.transforms.RandomVerticalFlip)，因为我们最终会生成颠倒的图像。

In [None]:
def load_fashionMNIST(data_transform, train=True):
    return torchvision.datasets.FashionMNIST(
        "./data/",
        download=True,
        train=train,
        transform=data_transform,
    )


def load_transformed_fashionMNIST():
    data_transforms = [
        transforms.Resize((IMG_SIZE, IMG_SIZE)),
        transforms.ToTensor(),  # Scales data into [0,1]
        transforms.RandomHorizontalFlip(),
        transforms.Lambda(lambda t: (t * 2) - 1)  # Scale between [-1, 1]
    ]

    data_transform = transforms.Compose(data_transforms)
    train_set = load_fashionMNIST(data_transform, train=True)
    test_set = load_fashionMNIST(data_transform, train=False)
    return torch.utils.data.ConcatDataset([train_set, test_set])

In [None]:
data = load_transformed_fashionMNIST()
dataloader = DataLoader(data, batch_size=BATCH_SIZE, shuffle=True, drop_last=True)

## 1.2 U-Net 架构

首先，让我们定义 U-Net 架构的不同组件。主要是 `DownBlock` 和 `UpBlock` 。

### 1.2.1 Down Block

`DownBlock` 是一种典型的卷积神经网络。如果您是 PyTorch 新手，并且具有 Keras/TensorFlow 背景，则以下内容更类似于 [函数式 API](https://keras.io/guides/functional_api/)，而不是 [顺序模型](https://keras.io/guides/sequation_model/)。我们稍后将使用 [残差](https://stats.stackexchange.com/questions/321054/what-are-residual-connections-in-rnns) 和跳过连接。顺序模型不具备支持这些类型连接的灵活性，但函数模型却具备。

在下面的 `__init__` 函数中，我们将各种神经网络操作分配给类变量：
* [Conv2d](https://pytorch.org/docs/stable/generated/torch.nn.Conv2d.html) 将卷积应用于输入。`in_ch` 是我们正在卷积的通道数，`out_ch` 是输出通道数，与用于卷积的内核过滤器数相同。通常在 U-Net 架构中，我们在模型中向下移动的通道数越多。
* [ReLU](https://pytorch.org/docs/stable/generated/torch.nn.ReLU.html) 是卷积内核的激活函数。
* [BatchNorm2d](https://pytorch.org/docs/stable/generated/torch.nn.BatchNorm2d.html) 将 [批量归一化](https://towardsdatascience.com/batch-normalization-in-3-levels-of-understanding-14c2da90a338) 应用于一层神经元。ReLu 没有可学习的参数，因此我们可以将同一函数应用于多个层，其效果与使用多个 ReLu 函数相同。批量归一化确实具有可学习的参数，重复使用此函数可能会产生意想不到的效果。
* [MaxPool2D](https://pytorch.org/docs/stable/generated/torch.nn.MaxPool2d.html) 是我们将用来在特征图沿网络向下移动时减小其大小的方法。可以通过卷积实现此效果，但最大池化通常用于 U-Nets。

在 `forward` 方法中，我们描述了如何将各种函数应用于输入。到目前为止，操作按以下顺序连续进行：
* `Conv2d`
* `BatchNorm2d`
* `ReLU`
* `Conv2d`
* `BatchNorm2d`
* `ReLU`
* `MaxPool2d`

In [None]:
class DownBlock(nn.Module):
    def __init__(self, in_ch, out_ch):
        kernel_size = 3
        stride = 1
        padding = 1

        super().__init__()
        layers = [
            nn.Conv2d(in_ch, out_ch, kernel_size, stride, padding),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(),
            nn.Conv2d(out_ch, out_ch, kernel_size, stride, padding),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(),
            nn.MaxPool2d(2)
        ]
        self.model = nn.Sequential(*layers)

    def forward(self, x):
        return self.model(x)

### 1.2.2 Up Block

虽然 `DownBlock` 会减小特征图的大小，但 `UpBlock` 会将其加倍。这是通过 [ConvTranspose2d](https://pytorch.org/docs/stable/generated/torch.nn.ConvTranspose2d.html) 实现的。我们可以使用与 `DownBlock` 几乎相同的架构，但我们将用 convT 替换 conv2。转置的 `步幅` 为 2，将导致加倍，并带有适当数量的 `填充` 。

让我们使用下面的代码块进行一些练习。我们设置了一个示例，通过创建 `1` 的测试图像来测试此功能。

In [None]:
ch, h, w = 1, 3, 3
x = torch.ones(1, ch, h, w)
x

我们可以使用恒等 `内核` 来查看 `conv_transpose2d` 如何改变输入图像。恒等内核只有一个 `1` 值。当用于卷积时，输出将与输入相同。

尝试更改下面的 `stride` 、 `padding` 和 `output_padding` 。结果符合您的预期吗？

In [None]:
kernel = torch.tensor([[1.]])  # Identity kernel
kernel = kernel.view(1, 1, 1, 1).repeat(1, ch, 1, 1) # Make into a batch

output = F.conv_transpose2d(x, kernel, stride=1, padding=0, output_padding=0)[0]
output

内核大小也会影响输出特征图的大小。尝试更改下面的 `kernel_size` 。注意输出图像如何随着内核大小的增加而扩大？这与常规卷积相反，在常规卷积中，较大的内核大小会减小输出特征图的大小。

In [None]:
kernel_size = 3
kernel = torch.ones(1, 1, kernel_size, kernel_size)

output = F.conv_transpose2d(x, kernel, stride=1, padding=0, output_padding=0)[0]
output

另一个有趣的区别是：我们将输入通道乘以 2。这是为了适应跳过连接。我们将把 `UpBlock` 匹配的 `DownBlock` 的输出与 `UpBlock` 的输入连接起来。

<center><img src="images/FMUNet.png" width="600" /></center>

如果 x 是输入特征图的大小，则输出大小为：

`new_x = (x - 1) * stride + kernel_size - 2 * padding + out_padding`

如果 stride = 2 且 out_padding = 1，则为了将输入特征图的大小加倍：

`kernel_size = 2 * padding + 1`

操作与之前几乎相同，但有两点不同：
* `ConvTranspose2d` - 卷积转置而不是卷积
* `BatchNorm2d`
* `ReLU`
* `Conv2d`
* `BatchNorm2d`
* `ReLU`
* ~~`MaxPool2d`~~ - 扩大而不是缩小

In [None]:
class UpBlock(nn.Module):
    def __init__(self, in_ch, out_ch):
        # Convolution variables
        kernel_size = 3
        stride = 1
        padding = 1

        # Transpose variables
        strideT = 2
        out_paddingT = 1

        super().__init__()
        # 2 * in_chs for concatednated skip connection
        layers = [
            nn.ConvTranspose2d(2 * in_ch, out_ch, kernel_size, strideT, padding, out_paddingT),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(),
            nn.Conv2d(out_ch, out_ch, kernel_size, stride, padding),
            nn.BatchNorm2d(out_ch),
            nn.ReLU()
        ]
        self.model = nn.Sequential(*layers)
    
    def forward(self, x, skip):
        x = torch.cat((x, skip), 1)
        x = self.model(x)
        return x

### 1.2.3 完整的 U-Net

终于到了把它们拼凑起来的时候了！下面，我们有完整的 `UNet` 模型。

在 `__init__` 函数中，我们可以使用 `down_chs` 定义 U-Net 每一步的通道数。当前默认值为 `(16, 32, 64)`，这意味着数据在模型中移动时的当前维度为：

* input: 1 x 16 x 16
* down0: 16 x 16 x 16
  * down1: 32 x 8 x 8
    * down2: 64 x 4 x 4
      * dense_emb: 1024
    * up0: 64 x 4 x 4
  * up1: 64 x 8 x 8
* up2: 32 x 16 x 16
* out: 1 x 16 x 16

`forward` 类方法是我们最终添加跳过连接的地方。对于 U-Net 中的每一步，我们将跟踪每个 `DownBlock` 的输出。然后，当我们移动 `UpBlock` 时，我们将[连接](https://pytorch.org/docs/stable/generated/torch.cat.html)前一个 `UpBlock` 的输出与其对应的 `DownBlock`。

In [None]:
class UNet(nn.Module):
    def __init__(self):
        super().__init__()
        img_ch = IMG_CH
        down_chs = (16, 32, 64)
        up_chs = down_chs[::-1]  # Reverse of the down channels
        latent_image_size = IMG_SIZE // 4 # 2 ** (len(down_chs) - 1)

        # Inital convolution
        self.down0 = nn.Sequential(
            nn.Conv2d(img_ch, down_chs[0], 3, padding=1),
            nn.BatchNorm2d(down_chs[0]),
            nn.ReLU()
        )

        # Downsample
        self.down1 = DownBlock(down_chs[0], down_chs[1])
        self.down2 = DownBlock(down_chs[1], down_chs[2])
        self.to_vec = nn.Sequential(nn.Flatten(), nn.ReLU())
        
        # Embeddings
        self.dense_emb = nn.Sequential(
            nn.Linear(down_chs[2]*latent_image_size**2, down_chs[1]),
            nn.ReLU(),
            nn.Linear(down_chs[1], down_chs[1]),
            nn.ReLU(),
            nn.Linear(down_chs[1], down_chs[2]*latent_image_size**2),
            nn.ReLU()
        )
        
        # Upsample
        self.up0 = nn.Sequential(
            nn.Unflatten(1, (up_chs[0], latent_image_size, latent_image_size)),
            nn.Conv2d(up_chs[0], up_chs[0], 3, padding=1),
            nn.BatchNorm2d(up_chs[0]),
            nn.ReLU(),
        )
        self.up1 = UpBlock(up_chs[0], up_chs[1])
        self.up2 = UpBlock(up_chs[1], up_chs[2])

        # Match output channels
        self.out = nn.Sequential(
            nn.Conv2d(up_chs[-1], up_chs[-1], 3, 1, 1),
            nn.BatchNorm2d(up_chs[-1]),
            nn.ReLU(),
            nn.Conv2d(up_chs[-1], img_ch, 3, 1, 1),
        )

    def forward(self, x):
        down0 = self.down0(x)
        down1 = self.down1(down0)
        down2 = self.down2(down1)
        latent_vec = self.to_vec(down2)

        up0 = self.up0(latent_vec)
        up1 = self.up1(up0, down2)
        up2 = self.up2(up1, down1)
        return self.out(up2)

In [None]:
model = UNet()
print("Num params: ", sum(p.numel() for p in model.parameters()))

让我们使用 [torchview](https://github.com/mert-kurttutan/torchview) 验证模型架构。如果我们有三个 `down_chs`，则应该有两个 `DownBlock`，每个转换一个。同样，应该有两个 `UpBlock`。我们还应该检查是否有一个跳跃连接。U-Net 的“底部”不需要跳跃连接，因此每个 `UpBlock` 减一都有一个跳跃连接。

最后，输出尺寸是否与输入尺寸相同？

In [None]:
graphviz.set_jupyter_format('png')
model_graph = draw_graph(
    model,
    input_size=(BATCH_SIZE, IMG_CH, IMG_SIZE, IMG_SIZE),
    device='meta',
    expand_nested=True
)
model_graph.resize_graph(scale=1.5)
model_graph.visual_graph

在 [PyTorch 2.0](https://pytorch.org/get-started/pytorch-2.0/) 中，我们可以编译模型以加快训练速度。它会将操作列表发送到我们的 GPU，以便它可以像装配线一样将这些操作应用于我们的输入。有关更多信息，请阅读 [此处](https://pytorch.org/tutorials/intermediate/torch_compile_tutorial.html)。

In [None]:
model = torch.compile(UNet().to(device))

## 1.3 训练

让我们尝试在图像中添加噪音，看看我们的 U-Net 模型是否可以将其过滤掉。我们可以定义一个参数 `beta` 来表示我们的图像中噪音占原始图像的百分比。我们可以使用 `alpha` 来表示 `beta` 的[补充](https://brilliant.org/wiki/probability-by-complement/)。

In [None]:
def add_noise(imgs):
    dev = imgs.device
    percent = .5 # Try changing from 0 to 1
    beta = torch.tensor(percent, device=dev)
    alpha = torch.tensor(1 - percent, device=dev)
    noise = torch.randn_like(imgs)
    return alpha * imgs + beta * noise

接下来，我们将损失函数定义为原始图像和预测图像之间的[均方误差](https://developers.google.com/machine-learning/glossary#mean-squared-error-mse)。

In [None]:
def get_loss(model, imgs):
    imgs_noisy = add_noise(imgs)
    imgs_pred = model(imgs_noisy)
    return F.mse_loss(imgs, imgs_pred)

为了显示我们模型的输出，我们需要将其转换回 CPU 上的图像格式。

In [None]:
def show_tensor_image(image):
    reverse_transforms = transforms.Compose([
        transforms.Lambda(lambda t: (t + 1) / 2),
        transforms.Lambda(lambda t: torch.minimum(torch.tensor([1]), t)),
        transforms.Lambda(lambda t: torch.maximum(torch.tensor([0]), t)),
        transforms.ToPILImage(),
    ])
    plt.imshow(reverse_transforms(image[0].detach().cpu()))

为了在训练期间看到改进效果，我们可以使用 [子图](https://matplotlib.org/stable/api/_as_gen/matplotlib.pyplot.subplot.html) 比较 `原始` 图像、 `添加噪声` 图像和 `预测原始` 图像。

[@torch.no_grad](https://pytorch.org/docs/stable/generated/torch.no_grad.html) 将跳过使用此函数在训练期间计算梯度。

In [None]:
@torch.no_grad()
def plot_sample(imgs):
    # Take first image of batch
    imgs = imgs[[0], :, :, :]
    imgs_noisy = add_noise(imgs[[0], :, :, :])
    imgs_pred = model(imgs_noisy)

    nrows = 1
    ncols = 3
    samples = {
        "Original" : imgs,
        "Noise Added" : imgs_noisy,
        "Predicted Original" : imgs_pred
    }
    for i, (title, img) in enumerate(samples.items()):
        ax = plt.subplot(nrows, ncols, i+1)
        ax.set_title(title)
        show_tensor_image(img)
    plt.show()

最后，到了关键时刻！是时候训练我们的模型并观察它的改进了。

In [None]:
optimizer = Adam(model.parameters(), lr=0.0001)
epochs = 2

model.train()
for epoch in range(epochs):
    for step, batch in enumerate(dataloader):
        optimizer.zero_grad()

        images = batch[0].to(device)
        loss = get_loss(model, images)
        loss.backward()
        optimizer.step()

        if epoch % 1 == 0 and step % 100 == 0:
            print(f"Epoch {epoch} | Step {step:03d} Loss: {loss.item()} ")
            plot_sample(images)

预测图像中有一些噪音，但它仍然可以很好地提取原始服装。

现在，当给出纯噪音时，模型会如何表现？它能创建可信的新图像吗？

In [None]:
model.eval()
for _ in range(10):
    noise = torch.randn((1, IMG_CH, IMG_SIZE, IMG_SIZE), device=device)
    result = model(noise)
    nrows = 1
    ncols = 2
    samples = {
        "Noise" : noise,
        "Generated Image" : result
    }
    for i, (title, img) in enumerate(samples.items()):
        ax = plt.subplot(nrows, ncols, i+1)
        ax.set_title(title)
        show_tensor_image(img)
    plt.show()

## 1.4 下一步

嗯，这些图像看起来更像是墨迹图像而不是衣服。在下一个笔记本中，我们将改进这项技术以创建更易于识别的图像。

在继续之前，请通过运行下面的代码单元重新启动 jupyter 内核。这将防止将来的笔记本出现内存问题。

In [None]:
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)

<center><img src="images/DLI_Header.png" alt="标题" style="width: 400px;"/></center>