# 🔥 Diffusion Models

In this notebook, we'll walk through the steps required to train your own diffusion model on the Oxford flowers dataset

The code is adapted from the excellent ['Denoising Diffusion Implicit Models' tutorial](https://keras.io/examples/generative/ddim/) created by András Béres available on the Keras website.

In [1]:
%load_ext autoreload
%autoreload 2
import numpy as np
import matplotlib.pyplot as plt

plt.style.use("seaborn-v0_8-colorblind")

import math

import tensorflow as tf
from tensorflow.keras import (
    layers,
    models,
    optimizers,
    utils,
    callbacks,
    metrics,
    losses,
    activations,
)

from notebooks.utils import display, sample_batch

import time
startTime=time.time()

KeyboardInterrupt: 

## 0. Parameters <a name="parameters"></a>

In [None]:
#调整参数
BATCH_SIZE = 64 #查看显存占用比例“nvidia-smi”。
DATASET_REPETITIONS = 5 #数据集的重复次数。
LOAD_MODEL = False

NOISE_EMBEDDING_SIZE = 32 #正弦嵌入层将一个标量数值 (噪声方差) 转换到一个向量，32即向量维数
PLOT_DIFFUSION_STEPS = 20 #生成图像时，去除噪音的步骤数目

EMA = 0.999 #指数移动平均参数
LEARNING_RATE = 1e-3
WEIGHT_DECAY = 1e-4

EPOCHS = 150 # 4m/轮/(48, 192)

#固定参数
train_dir=".\\png 192x48_GRAY\\" #文件名字符数目不能过多，否则出错，故在旋转、缩放、平移变换时减少文件名字符。
IMAGE_SIZE =  (48, 192) #再小清晰度就不满足了
CHANNELS = 1

## 1. Prepare the Data

In [None]:
# Load the data
train_data = utils.image_dataset_from_directory(
    train_dir,  
    labels=None,
    color_mode="grayscale", #默认RGB三通道
    image_size=IMAGE_SIZE,
    batch_size=None,
    shuffle=True,
    seed=42,
    interpolation="bilinear",
)

In [None]:
# Preprocess the data
def preprocess(img):
    img = tf.cast(img, "float32") / 255.0 #tf.cast()函数：数据类型转换.由0~255放缩到0~1
    return img


train = train_data.map(lambda x: preprocess(x))
train = train.repeat(DATASET_REPETITIONS) #repeat(count) 用于将数据重复count次，相当于我们训练时的epoch
train = train.batch(BATCH_SIZE, drop_remainder=True) #batch(batch_size)用于将数据划分为多个batch。     drop_remainder：True表示剩下的数据不足batch_size个 仍掉。默认值为False

In [None]:
# Show some items of clothing from the training set
train_sample = sample_batch(train)
print(train_sample.shape) 
np.savetxt('train_sample[0].txt', train_sample[0][:,:,0])  #像素值0~1
display(train_sample, n=4)

### 1.1 Diffusion schedules <a name="diffusion_schedules"></a>

In [None]:
#这个方案最复杂
def linear_diffusion_schedule(diffusion_times): #线性扩散计划。0~1均匀分布。
    min_rate = 0.0001 #一开始的beta值，即一开始添加1%的噪音
    max_rate = 0.02 #最后一步的beta值，可见添加噪音的比例越来越大
    betas = min_rate + diffusion_times * (max_rate - min_rate) #min_rate~max_rate均匀分布。
    alphas = 1 - betas #0.9999~0.98002均匀分布
    alpha_bars = tf.math.cumprod(alphas) #cumprod()用于计算输入张量的累积乘积
    signal_rates = tf.sqrt(alpha_bars) #返回张量每个元素的平方根。
    noise_rates = tf.sqrt(1 - alpha_bars) #含有噪音的比例，由0向1演变.
    return noise_rates, signal_rates #“线性扩散代码解释.png”



In [None]:
#这个方案最简单，diffusion_times约等于noise_rates
def cosine_diffusion_schedule(diffusion_times): 
    signal_rates = tf.cos(diffusion_times * math.pi / 2)
    noise_rates = tf.sin(diffusion_times * math.pi / 2)
    return noise_rates, signal_rates

In [None]:
#最终采用的方案，diffusion_times相当于noise_rates[但是有点偏差]
# 我们将使用的offset cosine扩散计划，其调整计划来确保加噪steps在加噪的开始阶段不至于过小。
def offset_cosine_diffusion_schedule(diffusion_times):
    min_signal_rate = 0.02
    max_signal_rate = 0.95
    start_angle = tf.acos(max_signal_rate) #18.2度
    end_angle = tf.acos(min_signal_rate) #88.9度

    diffusion_angles = start_angle + diffusion_times * (end_angle - start_angle)

    signal_rates = tf.cos(diffusion_angles) #在0.95~0.02之间
    noise_rates = tf.sin(diffusion_angles) #0.312~0.9998。diffusion_times（0~1）相当于noise_rates[但是有点偏差]

    return noise_rates, signal_rates

diffusion_times=tf.convert_to_tensor([(1-x*1 / 20) for x in range(20)])
print(diffusion_times)
print(offset_cosine_diffusion_schedule(diffusion_times))
print(offset_cosine_diffusion_schedule(1))

In [None]:
T = 1000
diffusion_times = tf.convert_to_tensor([x / T for x in range(T)]) #0~1均匀分布。

linear_noise_rates, linear_signal_rates = linear_diffusion_schedule(    diffusion_times)
cosine_noise_rates, cosine_signal_rates = cosine_diffusion_schedule(    diffusion_times)
(    offset_cosine_noise_rates,    offset_cosine_signal_rates,) = offset_cosine_diffusion_schedule(diffusion_times)

In [None]:
plt.plot(
    diffusion_times, linear_signal_rates**2, linewidth=1.5, label="linear"
)
plt.plot(
    diffusion_times, cosine_signal_rates**2, linewidth=1.5, label="cosine"
)
plt.plot(
    diffusion_times,    offset_cosine_signal_rates**2,    linewidth=1.5,    label="offset_cosine",
)

plt.xlabel("t/T", fontsize=12)
plt.ylabel(r"$\bar{\alpha_t}$ (signal)", fontsize=12)
plt.legend()
plt.show()

In [None]:
plt.plot(
    diffusion_times, linear_noise_rates**2, linewidth=1.5, label="linear"
)
plt.plot(
    diffusion_times, cosine_noise_rates**2, linewidth=1.5, label="cosine"
)
plt.plot(
    diffusion_times,
    offset_cosine_noise_rates**2,
    linewidth=1.5,
    label="offset_cosine",
)

plt.xlabel("t/T", fontsize=12)
plt.ylabel(r"$1-\bar{\alpha_t}$ (noise)", fontsize=12)
plt.legend()
plt.show()

## 2. Build the model <a name="build"></a>

In [None]:
def sinusoidal_embedding(x):  #正弦嵌入层，将一个标量数值 (噪声方差) 转换到一个向量（对应图像通道）
    frequencies = tf.exp(   #最终得到指数值
        tf.linspace(          #均匀分布，返回为频率的最大放缩因子
            tf.math.log(1.0), #从0到ln(1000)
            tf.math.log(1000.0),
            NOISE_EMBEDDING_SIZE // 2,  #生成的个数
        )
    )
    angular_speeds = 2.0 * math.pi * frequencies
    embeddings = tf.concat(
        [tf.sin(angular_speeds * x), tf.cos(angular_speeds * x)], axis=3
    )
    return embeddings #这个函数输入是方差标量，输出结果shape是(1x1x32)，值为-1~1（sin、cos函数值域）

In [None]:
embedding_list = []
for y in np.arange(0, 1, 0.01): #y为噪音方差
    embedding_list.append(sinusoidal_embedding(np.array([[[[y]]]]))[0][0][0])
embedding_array = np.array(np.transpose(embedding_list))  #np.transpose（）数组转置
fig, ax = plt.subplots() #fig代表绘图窗口(Figure)；ax代表这个绘图窗口上的坐标系(axis)，一般会继续对ax进行操作。
ax.set_xticks(
    np.arange(0, 100, 10), labels=np.round(np.arange(0.0, 1.0, 0.1), 1)
)
ax.set_ylabel("embedding dimension", fontsize=8) #这里有32个值
ax.set_xlabel("noise variance", fontsize=8)
plt.pcolor(embedding_array, cmap="coolwarm") #plt.pcolor()是matplotlib库中的一种2D图像绘制函数，它可以根据数据值的大小自动填充不同颜色，从而为我们展示出数据的分布情况。
plt.colorbar(orientation="horizontal", label="embedding value") #添加颜色条
#ax.imshow(embedding_array, interpolation="nearest", origin="lower")
plt.show()

In [None]:
def ResidualBlock(width): #width通道数
    def apply(x): #这里两个def定义函数，这是“函数嵌套.png”
        input_width = x.shape[3] #图像x的通道数
        if input_width == width: #检查输入的通道数是否匹配，如不是则在跳跃连接上引入一个额外的Conv2D层
            residual = x
        else:
            residual = layers.Conv2D(width, kernel_size=1)(x)
        x = layers.BatchNormalization(center=False, scale=False)(x) #keras批标准化
        x = layers.Conv2D(width, kernel_size=3, padding="same", activation=activations.swish)(x)
        x = layers.Conv2D(width, kernel_size=3, padding="same")(x)
        x = layers.Add()([x, residual]) #layers.add()相应元素数值相加，H,W,C不改变；tf.keras.layers.concatenate（）拼接，H 、 W 不改变 ， 但是通道数增加
        return x

    return apply


def DownBlock(width, block_depth): #width通道数
    def apply(x): 
        x, skips = x 
        for _ in range(block_depth):
            x = ResidualBlock(width)(x) #这里两个输入见“函数嵌套.png”
            skips.append(x)
        x = layers.AveragePooling2D(pool_size=2)(x) #平均池化层，pool_size 池化窗口大小，strides: 池化步长，默认值等于 pool_size
        return x

    return apply 


def UpBlock(width, block_depth):
    def apply(x):
        x, skips = x 
        x = layers.UpSampling2D(size=2, interpolation="bilinear")(x)
        for _ in range(block_depth):
            x = layers.Concatenate()([x, skips.pop()]) #列表pop()方法：删除并返回列表中的最后一个元素。
            x = ResidualBlock(width)(x)
        return x 

    return apply

In [None]:
# Build the U-Net

noisy_images = layers.Input(shape=(IMAGE_SIZE[0], IMAGE_SIZE[1], CHANNELS)) #输入带有噪音的图像
x = layers.Conv2D(32, kernel_size=1)(noisy_images)

noise_variances = layers.Input(shape=(1, 1, 1)) #输入噪音方差。
noise_embedding = layers.Lambda(sinusoidal_embedding)(noise_variances) 
noise_embedding = layers.UpSampling2D(size=IMAGE_SIZE, interpolation="nearest")(noise_embedding) 

x = layers.Concatenate()([x, noise_embedding]) 

skips = [] #保存DownBlock函数的中间数据，给UpBlock函数调用

x = DownBlock(32, block_depth=2)([x, skips]) #该函数共产生2个中间结果，放到skip列表中，然后下采样作为返回值
x = DownBlock(64, block_depth=2)([x, skips])
x = DownBlock(96, block_depth=2)([x, skips])

x = ResidualBlock(128)(x)
x = ResidualBlock(128)(x)

x = UpBlock(96, block_depth=2)([x, skips])
x = UpBlock(64, block_depth=2)([x, skips])
x = UpBlock(32, block_depth=2)([x, skips])

x = layers.Conv2D(CHANNELS, kernel_size=1, kernel_initializer="zeros")(x) #输出是预测的噪音

unet = models.Model([noisy_images, noise_variances], x, name="unet") #输入带有噪音的图像、噪音方差，输出是预测的噪音。仅用于def denoise()调用。
unet.summary()

In [None]:
class DiffusionModel(models.Model):
    def __init__(self):
        super().__init__()

        self.normalizer = layers.Normalization() #标准化预处理层。“layers.Normalization()的含义.png”
        self.network = unet
        self.ema_network = models.clone_model(self.network) #克隆模型（但是不会克隆权重）
        self.diffusion_schedule = offset_cosine_diffusion_schedule #是否需要加括号？答：不需要，这里相当于属性值，然后调用时加括号。

    def compile(self, **kwargs):
        super().compile(**kwargs)
        self.noise_loss_tracker = metrics.Mean(name="n_loss")

    @property
    def metrics(self):
        return [self.noise_loss_tracker]

    def denormalize(self, images): 
        images = self.normalizer.mean + images * self.normalizer.variance**0.5 #variance 方差值
        return tf.clip_by_value(images, 0.0, 1.0)

    def denoise(self, noisy_images, noise_rates, signal_rates, training):
        if training:
            network = self.network
        else:
            network = self.ema_network
        pred_noises = network([noisy_images, noise_rates**2], training=training) 
        #去噪神经网络理解：如同一个神探，看到尸体腐烂的照片，根据死亡时间（用于推算腐烂比例或者程度），就能还原出尸体初始状态。这个神探是如何有这项本领的？答：他是通过研究许多尸体腐烂过程，不断修正自己的认知获得的。
        pred_images = (noisy_images - noise_rates * pred_noises) / signal_rates

        return pred_noises, pred_images

    def reverse_diffusion(self, initial_noise, diffusion_steps):
        num_images = initial_noise.shape[0] #获取生成图像的数目
        step_size = 1.0 / diffusion_steps
        current_images = initial_noise
        for step in range(diffusion_steps): #step=[0,1,2~,18,19]
            diffusion_times = tf.ones((num_images, 1, 1, 1)) - step * step_size #diffusion_times腐蚀程度（1~0.05，数值越大腐蚀程度越高）
            noise_rates, signal_rates = self.diffusion_schedule(diffusion_times) 
            pred_noises, pred_images = self.denoise(  #注意，每步pred_noises均不同的
                current_images, noise_rates, signal_rates, training=False
            )
            next_diffusion_times = diffusion_times - step_size
            next_noise_rates, next_signal_rates = self.diffusion_schedule(next_diffusion_times)
            current_images = (
                next_signal_rates * pred_images + next_noise_rates * pred_noises #《生成式深度学习 英文第二版》P226中间公式。
            )
        return pred_images #为什么不直接采用X0？答：神经网络性能达不到。

    def generate(self, num_images, diffusion_steps, initial_noise=None):
        if initial_noise is None:
            initial_noise = tf.random.normal(
                shape=(num_images, IMAGE_SIZE[0], IMAGE_SIZE[1], CHANNELS)
            )
        generated_images = self.reverse_diffusion(
            initial_noise, diffusion_steps
        )
        generated_images = self.denormalize(generated_images)
        return generated_images

    def train_step(self, images):
        images = self.normalizer(images, training=True) #“layers.Normalization()的含义.png”。这里将输入图像批次标准化
        noises = tf.random.normal(shape=(BATCH_SIZE, IMAGE_SIZE[0], IMAGE_SIZE[1], CHANNELS))  #标准正态分布中采样噪声

        diffusion_times = tf.random.uniform(shape=(BATCH_SIZE, 1, 1, 1), minval=0.0, maxval=1.0) #均匀分布中采样。下限 minval包含在范围内，而上限maxval被排除在外。
        noise_rates, signal_rates = self.diffusion_schedule(diffusion_times) 

        noisy_images = signal_rates * images + noise_rates * noises #将输入图像加上噪音

        with tf.GradientTape() as tape:
            # train the network to separate noisy images to their components
            pred_noises, pred_images = self.denoise(noisy_images, noise_rates, signal_rates, training=True) 

            noise_loss = self.loss(noises, pred_noises) 

        gradients = tape.gradient(noise_loss, self.network.trainable_weights)
        self.optimizer.apply_gradients(
            zip(gradients, self.network.trainable_weights)
        )

        self.noise_loss_tracker.update_state(noise_loss)

        for weight, ema_weight in zip(
            self.network.weights, self.ema_network.weights
        ):
            ema_weight.assign(EMA * ema_weight + (1 - EMA) * weight) #“.assign()”是tf变量赋值的方法

        return {m.name: m.result() for m in self.metrics}

    def test_step(self, images):
        images = self.normalizer(images, training=False)
        noises = tf.random.normal(shape=(BATCH_SIZE, IMAGE_SIZE[0], IMAGE_SIZE[1], CHANNELS))
        diffusion_times = tf.random.uniform(
            shape=(BATCH_SIZE, 1, 1, 1), minval=0.0, maxval=1.0
        )
        noise_rates, signal_rates = self.diffusion_schedule(diffusion_times)
        noisy_images = signal_rates * images + noise_rates * noises
        pred_noises, pred_images = self.denoise(
            noisy_images, noise_rates, signal_rates, training=False
        )
        noise_loss = self.loss(noises, pred_noises)
        self.noise_loss_tracker.update_state(noise_loss)

        return {m.name: m.result() for m in self.metrics}
'''
    @tf.function
    def call(self, inputs):
        pass
'''

In [None]:
ddm = DiffusionModel()

ddm.normalizer.adapt(train) #“layers.Normalization()的含义.png”。



In [None]:
if LOAD_MODEL:
    ddm.built = True
    ddm.load_weights("./checkpoint/checkpoint.ckpt")

## 3.Train the model <a name="train"></a>

In [None]:
ddm.compile(
    optimizer=optimizers.experimental.AdamW(  #“AdamW简单易用效果好.mhtml”
        learning_rate=LEARNING_RATE, weight_decay=WEIGHT_DECAY
    ),
    loss=losses.mean_absolute_error, #keras官方函数，计算标签和预测之间的绝对差值的平均值。mse 损失函数，即均方误差（MSE，mean squared error），预测值与目标值之差的平方。平均绝对误差（MAE，mean absolute error），它是预测值与目标值之差的绝对值。
)

In [None]:
# run training and plot generated images periodically
model_checkpoint_callback = callbacks.ModelCheckpoint(
    filepath="./checkpoint/checkpoint{epoch:02d}.ckpt", 
    save_weights_only=True,
    save_freq="epoch",
    verbose=0,
)

tensorboard_callback = callbacks.TensorBoard(log_dir="./logs")


class ImageGenerator(callbacks.Callback):
    def __init__(self, num_img):
        self.num_img = num_img

    def on_epoch_end(self, epoch, logs=None):
        generated_images = self.model.generate(
            num_images=self.num_img,
            diffusion_steps=PLOT_DIFFUSION_STEPS,
        ).numpy()
        display(
            generated_images,
            n=self.num_img,  #自增
            size=(80, 2),   #自增，这种比例清晰度高   
            save_to="./output/generated_img_%03d.png" % (epoch),
        )


image_generator_callback = ImageGenerator(num_img=10)

history=ddm.fit(
    train,
    epochs=EPOCHS,
    callbacks=[
        model_checkpoint_callback,
        tensorboard_callback,
        image_generator_callback,
    ],
)

In [None]:
with open('history.history.txt', 'w') as file_object:file_object.write(str(history.history)+'\n')  #损失写入文件

import matplotlib.pyplot as plt
history_dict = history.history
loss = history_dict['n_loss']
epochs = range(1, len(loss) + 1)
plt.plot(epochs, loss, 'b', label='loss')
plt.title('Training loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

## 4. Inference <a name="inference"></a>

In [None]:
# Generate some novel images of flowers
generated_images = ddm.generate(num_images=10, diffusion_steps=20).numpy()
display(generated_images)

In [None]:
# View improvement over greater number of diffusion steps
for diffusion_steps in list(np.arange(1, 6, 1)) + [20] + [100]:
    tf.random.set_seed(42)
    generated_images = ddm.generate(
        num_images=10,
        diffusion_steps=diffusion_steps,
    ).numpy()
    display(generated_images)

In [None]:
# Interpolation between two points in the latent space
tf.random.set_seed(100)


def spherical_interpolation(a, b, t):
    return np.sin(t * math.pi / 2) * a + np.cos(t * math.pi / 2) * b


for i in range(5):
    a = tf.random.normal(shape=(IMAGE_SIZE[0], IMAGE_SIZE[1], CHANNELS))
    b = tf.random.normal(shape=(IMAGE_SIZE[0], IMAGE_SIZE[1], CHANNELS))
    initial_noise = np.array(
        [spherical_interpolation(a, b, t) for t in np.arange(0, 1.1, 0.1)]
    )
    generated_images = ddm.generate(
        num_images=2, diffusion_steps=20, initial_noise=initial_noise
    ).numpy()
    display(generated_images, n=11)

In [None]:
endTime=time.time()
print('How many minutes:',(endTime-startTime)/60) 