In [1]:
import random
import imageio
import numpy as np
from argparse import ArgumentParser # 参数转换？

from tqdm.auto import tqdm
import matplotlib.pyplot as plt

import einops # gif visualization
import torch
import torch.nn as nn
from torch.optim import Adam #优化器
from torch.utils.data import DataLoader # 数据迭代器

from torchvision.transforms import Compose,ToTensor,Lambda
from torchvision.datasets.mnist import MNIST, FashionMNIST #导入MNIST, FashionMNIST 两个数据集

In [None]:
fashion = True # 代表选用 FashionMNIST 数据集 
train_flag = True # 代表训练模式，否则进行推理和生成

# 定义DDPM类
class MyDDPM():
    pass

# 定义Unet网络
class MyUNet():
    pass

# 展示图片
def show_images():
    pass

# 生成图片（采样过程）
def generate_new_images():
    pass

# 加噪的步长
n_steps = 100

# 实例化DDPM类
ddpm = MyDDPM()
# ddpm = MyDDPM(MyUNet(), n_steps = n_steps, device = device)

loader = None
n_epochs = None
optim = None
device = None

# Training

In [None]:
# 选用FashionMnist 模型存储路径则为 “ddpm_fashion.pt”
store_path = "ddpm_fashion.pt" if fashion else "ddpm_mnist.pt" # 定义模型的存储路径

def training_loop(ddpm, loader, n_epochs, optim, device, display = False, store_path = "ddpm_model.pt"):
    # 定义均方误差损失
    mse = nn.MSELoss()
    # 初始化loss为+∞ 不断更新得到最小的loss 最优的model
    best_loss = float("inf") 
    # t = 0, 1, 2, 3,...,1000 这些t是DDPM()的属性
    n_steps = ddpm.n_steps
    
    # 遍历每一个epoch
    for epoch in tqdm(range(n_epochs), decs = f"Training progress", colour="#00ff00"):
        epoch_loss = 0.0
        # leave = False 进度条跑完一次后不会保留在终端上面，而是会开始下一次的进度展示 
        
        # 遍历一个epoch下的多个batch_size(由data_loader迭代返回每个batch_size的数据)
        # 一个batch当中有128张图片
        for step,batch in enumerate(tqdm(loader, leave = False, desc = f"Epoch {epoch + 1}/{n_epochs}", colour = "#005500" )):
            x0 = batch[0].to(device)   # x0 是一个batch(?)，x[0]当中包含128张【原图】
            n = len(x0) # batchsize的大小
            
            t = None
            eta = None
            # DDPM 前向过程: 加噪声的过程
            # 前向过程的eta是GT
            # 前向过程输入原图、扩散步长、所家的
            # 前向过程得到噪声图像
            noisy_imgs = ddpm(x0,t,eta) # x0 [128, 1, 28, 28] x0是一批原图
            
            # DDPM 逆向过程: 对原图去噪的过程
            # 逆向过程，用模型来预测噪声，和原来我们自己加的噪声进行对比
            # eta_theta 就是 ddpm 模型反向传播预测得到的噪声 predicted noise
            # 反向过程输入噪声图、前向扩散的步长
            # 反向过程预测得到前向过程添加了多少噪声
            eta_theta = ddpm.backward(noisy_imgs, t)
            
            # 对【GT也就是我们实际加的噪声】和【predicted noise也就是DDPM预测得到的噪声】做损失
            loss = mse(eta, eta_theta)
            
            # 清零梯度
            optim.zero_grad()             
            
            # loss反向传播
            loss.backward()
            
            # 更新loss (随机梯度下降法？求导更新权重)
            optim.step()
            
            # 平均损失
            epoch_loss += loss.item() * len(x0) / len(loader.dataset)
            
        # 跑完一个epoch之后，用我们得到的模型去生成一张图像并展示
        if display:
            # 每轮输出：在第i轮生成的图像为{}
            show_images(generate_new_images(ddpm,device = device), f"Images generated at epoch{epoch + 1}")
        
        # 日志记录本轮的损失
        log_string = f"Loss at epoch {epoch + 1}: {epoch_loss:.3f}"
        
        # 保存效果更好的模型
        if best_loss > epoch_loss:
            best_loss = epoch_loss
            torch.save(ddpm.state_dict(),store_path)
            log_string += " --> Best model ever(stored)"
        

if train_flag: # 训练模式下执行训练
    training_loop(ddpm, loader, n_epochs, optim, device)

# Testing and Generation

In [None]:
# 网络结构
best_model = MyDDPM(MyUNet(), n_steps = n_steps, device = device) 
# 加载权重参数/模型到 device
best_model.load_state_dict(torch.load(store_path), map_location = device) 
# 开启评估模式/推理模式
best_model.eval() 

In [None]:
# 采样器
# 输入：预测杂声的模型，加噪步长，图片生成过程
generated = generate_new_images(
    best_model,
    n_samples = 100,
    device = device,
    gif_name = "fashion.gif" if fashion else "mnist.gif"
)

show_images(generated, "Final result")