# ⚡️ Energy-Based Models

In this notebook, we'll walk through the steps required to train your own Energy Based Model to predict the distribution of a demo dataset

The code is adapted from the excellent ['Deep Energy-Based Generative Models' tutorial](https://uvadlc-notebooks.readthedocs.io/en/latest/tutorial_notebooks/tutorial8/Deep_Energy_Models.html) created by Phillip Lippe.

In [1]:
%load_ext autoreload
%autoreload 2
import time
startTime=time.time()
         
import numpy as np

import tensorflow as tf
from tensorflow.keras import (
    datasets,
    layers,
    models,
    optimizers,
    activations,
    metrics,
    callbacks,
    utils,
)

from notebooks.utils import display, sample_batch
import random


KeyboardInterrupt



## 0. Parameters <a name="parameters"></a>

In [None]:
#调整参数
STEP_SIZE = 10 #def generate_samples()梯度下降算法更新图像的学习率（步长）。原值10
STEPS = 360 #训练时def generate_samples()梯度下降算法更新图像的循环次数【最终生成结果时是1000】.原值60。影响训练速度的主要参数。
ALPHA = 0.2 #正则损失系数。原值0.1.
NOISE = 0.005 #图像上增加噪声（设定正态分布标准差）。原值0.005
LEARNING_RATE = 0.0001 #原值0.0001

GRADIENT_CLIP = 0.03 #def generate_samples()梯度下降算法更新图像的梯度剪裁

EPOCHS = 50 #原值60。
LOAD_MODEL = False

BATCH_SIZE = 128 #查看显存占用比例“nvidia-smi”。
BUFFER_SIZE = 8192 #class Buffer()以前迭代的样本缓冲区，保留最大样本数目.原值8192


#固定参数
train_dir=".\\png 192x48_GRAY\\" #文件名字符数目不能过多，否则出错，故在旋转、缩放、平移变换时减少文件名字符。
IMAGE_SIZE =  (48, 192) #再小清晰度就不满足了
CHANNELS = 1


In [None]:
# Load the data
train_data = utils.image_dataset_from_directory(
    train_dir, 
    labels=None,
    color_mode="grayscale", 
    image_size=IMAGE_SIZE, 
    batch_size=BATCH_SIZE,
    shuffle=True,
    seed=41,
    interpolation="bilinear", #双线性插值
)

In [None]:
train_sample = sample_batch(train_data)
print(train_sample.shape) 
np.savetxt('train_sample[0]-1.txt', train_sample[0][:,:,0])  #像素值0~255
display(train_sample, n=4)

In [None]:
# Preprocess the data
def preprocess(imgs):
    imgs = (tf.cast(imgs, "float32") - 127.5) / 127.5 ##tf.cast()函数：数据类型转换.放缩到 [-1, 1]
    return imgs

x_train = train_data.map(lambda x: preprocess(x))

In [None]:
# Show some items of clothing from the training set
train_sample = sample_batch(x_train)
print(train_sample.shape) 
np.savetxt('train_sample[0]-2.txt', train_sample[0][:,:,0])  #
display(train_sample, n=4)

## 2. Build the EBM network <a name="train"></a>

In [None]:
#参考我过去的“VAE、WGAN论文”对神经元数目修改、增加BatchNormalization，层数不作调整，达到VAE解码器参数量。
ebm_input = layers.Input(shape=(IMAGE_SIZE[0], IMAGE_SIZE[1], CHANNELS))
x = layers.Conv2D(32, kernel_size=5, strides=2, padding="same")(ebm_input)
x = layers.BatchNormalization()(x) #自己增加
x = layers.Activation(activations.swish)(x)
#x = layers.Dropout(0.3)(x) #自己增加，必须self.model(inp_imgs, training=True)
x = layers.Conv2D(64, kernel_size=3, strides=2, padding="same")(x)
x = layers.BatchNormalization()(x)
x = layers.Activation(activations.swish)(x)
#x = layers.Dropout(0.3)(x)
x = layers.Conv2D(128, kernel_size=3, strides=2, padding="same")(x)
x = layers.BatchNormalization()(x)
x = layers.Activation(activations.swish)(x)
#x = layers.Dropout(0.3)(x)
x = layers.Conv2D(128, kernel_size=3, strides=2, padding="same")(x)
x = layers.BatchNormalization()(x)
x = layers.Activation(activations.swish)(x)
#x = layers.Dropout(0.3)(x)
x = layers.Flatten()(x)
x = layers.Dense(128, activation=activations.swish)(x) #取消该层效果差
ebm_output = layers.Dense(1)(x)
model = models.Model(ebm_input, ebm_output)
model.summary()

In [None]:
if LOAD_MODEL:
    model.load_weights("./models/model.h5")

## 2. Set up a Langevin sampler function <a name="sampler"></a>

In [None]:
# Function to generate samples using Langevin Dynamics
def generate_samples(model, inp_imgs, steps, step_size, noise, return_img_per_step=False):
    imgs_per_step = []
    for _ in range(steps):
        inp_imgs += tf.random.normal(inp_imgs.shape, mean=0, stddev=noise) #图像上增加噪声。
        inp_imgs = tf.clip_by_value(inp_imgs, -1.0, 1.0) #剪裁
        with tf.GradientTape() as tape:
            tape.watch(inp_imgs)
            out_score = model(inp_imgs)
        grads = tape.gradient(out_score, inp_imgs)
        grads = tf.clip_by_value(grads, -GRADIENT_CLIP, GRADIENT_CLIP) #剪裁
        inp_imgs += step_size * grads #梯度“上升”算法更新图像
        inp_imgs = tf.clip_by_value(inp_imgs, -1.0, 1.0)
        if return_img_per_step:
            imgs_per_step.append(inp_imgs) 
    if return_img_per_step:
        return tf.stack(imgs_per_step, axis=0) #tf.stack()是拼接函数
    else:
        return inp_imgs

## 3. Set up a buffer to store examples <a name="buffer"></a>

In [None]:
class Buffer: #P197我们还维护了以前迭代的样本缓冲区，这样我们就可以将其作为下一批的起点，而不是纯随机噪声。
    def __init__(self, model):
        super().__init__()
        self.model = model
        self.examples = [ #注意，最终是一个列表，长度=BATCH_SIZE
            tf.random.uniform(shape=(1, IMAGE_SIZE[0], IMAGE_SIZE[1], CHANNELS)) * 2 - 1 #tf.random.uniform用于从均匀分布中输出随机值，默认0~1之间。
            for _ in range(BATCH_SIZE) #见“python列表推导式、列表解析式.pdf”
        ]

    def sample_new_exmps(self, steps, step_size, noise):
        #5%的观察是从零开始(也即随机噪声)
        #np.random.binomial(128, 0.5)结果61，np.random.binomial(128, 1)结果128，np.random.binomial(128, 0)结果0，np.random.binomial(128, 0.05)结果6
        n_new = np.random.binomial(BATCH_SIZE, 0.05) #二项分布。numpy.random.binomial(n,p,size=None)  一次试验的样本数n。事件发生的概率p，范围[0,1]。返回每次试验事件发生的次数，次数大于等于0且小于等于参数n。
        rand_imgs = (tf.random.uniform((n_new, IMAGE_SIZE[0], IMAGE_SIZE[1], CHANNELS)) * 2 - 1) #rand_imgs= tf.Tensor(..., shape=(n_new, 32, 32, 1), dtype=float32)
        #其余的直接从已有的缓存中随机抽取
        old_imgs = tf.concat(
            random.choices(self.examples, k=BATCH_SIZE - n_new), axis=0 #random.choices()从一个列表中随机选择多个元素，每个元素可能出现多次。参数是k，表示要随机抽取的元素数量。如果不指定k值，则默认为1。
        ) #old_imgs= tf.Tensor(..., shape=(k, 32, 32, 1), dtype=float32)
        # 新老样本连接起来并通过Langevin取样器
        inp_imgs = tf.concat([rand_imgs, old_imgs], axis=0) #inp_imgs= tf.Tensor(..., shape=(BATCH_SIZE, 32, 32, 1), dtype=float32)
        inp_imgs = generate_samples(self.model, inp_imgs, steps=steps, step_size=step_size, noise=noise) #inp_imgs= tf.Tensor(..., shape=(BATCH_SIZE, 32, 32, 1), dtype=float32)
        # 结果样本加入buffer，修剪到最长为8192个观察
        self.examples = tf.split(inp_imgs, BATCH_SIZE, axis=0) + self.examples #tf.split（）分割张量返回列表。这里两个列表相加=拼接（无任何数学运算，与张量加法运算不同的）。最终是一个列表，长度=2*BATCH_SIZE。注意x+y不等于y+x，这里老的放到后面的
        self.examples = self.examples[:BUFFER_SIZE] #[:BUFFER_SIZE]列表切片，从0~8192
        return inp_imgs #返回值是<tf.Tensor: shape=(BATCH_SIZE, 32, 32, 1), dtype=float32, numpy=.....>

In [None]:
class EBM(models.Model):
    def __init__(self):
        super(EBM, self).__init__()
        self.model = model
        self.buffer = Buffer(self.model)
        self.alpha = ALPHA
        self.loss_metric = metrics.Mean(name="loss")
        self.reg_loss_metric = metrics.Mean(name="reg")
        self.cdiv_loss_metric = metrics.Mean(name="cdiv")
        self.real_out_metric = metrics.Mean(name="real")
        self.fake_out_metric = metrics.Mean(name="fake")

    @property
    def metrics(self):
        return [
            self.loss_metric,
            self.reg_loss_metric,
            self.cdiv_loss_metric,
            self.real_out_metric,
            self.fake_out_metric,
        ]

    def train_step(self, real_imgs):
        real_imgs += tf.random.normal(shape=tf.shape(real_imgs), mean=0, stddev=NOISE) #tf.random.normal正态分布。真实图像添加噪声
        real_imgs = tf.clip_by_value(real_imgs, -1.0, 1.0) #剪裁
        fake_imgs = self.buffer.sample_new_exmps(steps=STEPS, step_size=STEP_SIZE, noise=NOISE) #从buffer中采样假图
        inp_imgs = tf.concat([real_imgs, fake_imgs], axis=0)
        with tf.GradientTape() as training_tape:
            #real_out, fake_out = tf.split(self.model(inp_imgs, training=True), 2, axis=0) #真实图像和虚假图像通过模型生成分数.这里添加“training=True”调用dropout
            real_out, fake_out = tf.split(self.model(inp_imgs), 2, axis=0)
            cdiv_loss = tf.reduce_mean(fake_out, axis=0) - tf.reduce_mean(real_out, axis=0) #损失=真实图像和虚假图像的分数之差。
            reg_loss = self.alpha * tf.reduce_mean(real_out**2 + fake_out**2, axis=0) #加上正则损失。b = tf.constant([3.0, 4.0]) ;print(b**2)#=print(tf.square(b))#tf.Tensor([ 9. 16.], shape=(2,), dtype=float32)
            loss = cdiv_loss + reg_loss
        grads = training_tape.gradient(loss, self.model.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.model.trainable_variables))
        self.loss_metric.update_state(loss)
        self.reg_loss_metric.update_state(reg_loss)
        self.cdiv_loss_metric.update_state(cdiv_loss)
        self.real_out_metric.update_state(tf.reduce_mean(real_out, axis=0))
        self.fake_out_metric.update_state(tf.reduce_mean(fake_out, axis=0))
        return {m.name: m.result() for m in self.metrics}

    def test_step(self, real_imgs): #评估循环：一个简单的for 循环，重复调用test_step() 函数。test_step() 函数只是train_step() 逻辑的子集。它省略了处理更新模型权重的代码，即所有涉及GradientTape 和优化器的代码。见《python深度学习第二版》P170
        batch_size = real_imgs.shape[0]
        fake_imgs = (
            tf.random.uniform((batch_size, IMAGE_SIZE[0], IMAGE_SIZE[1], CHANNELS))
            * 2
            - 1
        )
        inp_imgs = tf.concat([real_imgs, fake_imgs], axis=0)
        real_out, fake_out = tf.split(self.model(inp_imgs), 2, axis=0)
        cdiv = tf.reduce_mean(fake_out, axis=0) - tf.reduce_mean(
            real_out, axis=0
        )
        self.cdiv_loss_metric.update_state(cdiv)
        self.real_out_metric.update_state(tf.reduce_mean(real_out, axis=0))
        self.fake_out_metric.update_state(tf.reduce_mean(fake_out, axis=0))
        return {m.name: m.result() for m in self.metrics[2:]}

In [None]:
ebm = EBM()

## 3. Train the EBM network <a name="train"></a>

In [None]:
# Compile and train the model
ebm.compile(
    optimizer=optimizers.Adam(learning_rate=LEARNING_RATE,
                              #clipnorm=1 #自增clipnorm=1。所有参数梯度将被裁剪，让其 l2 范数最大为 1：g * 1 / max(1, l2_norm)
                             ), run_eagerly=True 
)

In [None]:
tensorboard_callback = callbacks.TensorBoard(log_dir="./logs")


class ImageGenerator(callbacks.Callback):
    def __init__(self, num_img):
        self.num_img = num_img

    def on_epoch_end(self, epoch, logs=None):
        start_imgs = (
            np.random.uniform(
                size=(self.num_img, IMAGE_SIZE[0], IMAGE_SIZE[1], CHANNELS)
            )
            * 2
            - 1
        )
        generated_images = generate_samples(
            ebm.model,
            start_imgs,
            steps=int(1000*10/STEP_SIZE), #原值1000
            step_size=STEP_SIZE,
            noise=NOISE,
            return_img_per_step=False,
        )
        generated_images = generated_images.numpy()
        display(
            generated_images,
            n=self.num_img,  #自增
            size=(80, 2),   #自增，这种比例清晰度高   
            save_to="./output/generated_img_%03d.png" % (epoch),
        )

        example_images = tf.concat(
            random.choices(ebm.buffer.examples, k=10), axis=0
        )
        example_images = example_images.numpy()
        display(
            example_images, 
            n=self.num_img,  #自增
            size=(80, 2),   #自增，这种比例清晰度高   
            save_to="./output/example_img_%03d.png" % (epoch)
        )


image_generator_callback = ImageGenerator(num_img=10)

In [None]:
class SaveModel(callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        model.save_weights("./models/model_%03d.h5" % (epoch)) #每轮均保存


save_model_callback = SaveModel()

In [None]:
history=ebm.fit(
    x_train,
    shuffle=True,
    epochs=EPOCHS, 
    #validation_data=x_test,
    callbacks=[
        save_model_callback,
        tensorboard_callback,
        image_generator_callback,
    ],
)

with open('history.history.txt', 'w') as file_object:file_object.write(str(history.history)+'\n')  #损失写入文件

In [None]:
import matplotlib.pyplot as plt
history_dict = history.history
loss = history_dict['loss']
reg = history_dict['reg']
cdiv = history_dict['cdiv']
real = history_dict['real']
fake = history_dict['fake']
epochs = range(1, len(loss) + 1)
plt.plot(epochs, loss, 'b', label='loss')
plt.plot(epochs, reg, 'g', label='reg')
plt.plot(epochs, cdiv, 'r', label='cdiv')
plt.plot(epochs, real, 'c', label='real')
plt.plot(epochs, fake, 'y', label='fake')
plt.title('Training loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

## 4. Generate images <a name="generate"></a>

In [None]:
start_imgs = (
    np.random.uniform(size=(10, IMAGE_SIZE[0], IMAGE_SIZE[1], CHANNELS)) * 2 - 1
)

In [None]:
display(start_imgs)

In [None]:
gen_img = generate_samples(
    ebm.model,
    start_imgs,
    steps=int(1000*10/STEP_SIZE), #原值1000
    step_size=STEP_SIZE,
    noise=NOISE,
    return_img_per_step=True,
)

In [None]:
display(gen_img[-1].numpy())

In [None]:
imgs = []
for i in [0, 1, 3, 5, 10, 30, 50, 100, 300, 999]:
    imgs.append(gen_img[i].numpy()[6])

display(np.array(imgs))

In [None]:
endTime=time.time()
print('How many minutes:',(endTime-startTime)/60)