# Auto-Encoder

机器学习的两种基本范式: 监督学习(Supervised Learning)和无监督学习(Unsupervised Learning).

两者最主要的区别是在于模型在训练时是否需要**人工标注**的**标签信息**。

自监督学习(Self-Supervised Learning): 算法把数据**$x$本身**作为监督信号来学习. 利用辅助任务（pretext）从大规模的无监督数据中挖掘自身的监督信息，通过这种构造的监督信息对网络进行训练，从而可以学习到对下游任务有价值的表征.


[自监督学习](https://blog.csdn.net/sdu_hao/article/details/104515917) 

## 自编码器原理

有监督学习中神经网络的功能可以看做是特种降维(Dimensionality Reduction)的过程: $高维输入特征x \rightarrow 低维变量o$.

自监督学习利用数据$x$本身作为监督信号来指导网络的训练，即希望神经网络能够学习到映射$f_{\theta}: x \rightarrow \bar x$. 将网络分成两部分:
- $g_{\theta_1}:x \rightarrow z$, Encoder网络: 输入$x$数据编码成低维隐变量(Latent Variable).
- $h_{\theta_2}:z \rightarrow \bar x$, Decoder网络: 编码过后的输入$z$解码为高维度的$\bar x$
把整个模型$f_{\theta}$称为自动编码器(Auto-Encoder)

![自编码器模型](自编码器模型.png)

我们希望解码器的输出能够完美地或者近似重建(Reconstruct, 或恢复)出原来的输入, 即$\bar x \approx x$. 优化目标可以写成:
$$
Minimize L = dist(x, \bar x) \\
\bar x = h_{\theta_2}(g_{\theta_1}(x))
$$


$dist$距离度量, 常用欧氏距离.

In [None]:
import os.path
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

%matplotlib inline

In [None]:
a = np.diag([1, 2, 3, 4, 5, 6])
np.linalg.det(a)

In [None]:
import time
from PIL import Image

In [None]:
from tensorflow.keras import layers, Model, Sequential, optimizers, losses

In [None]:
gpus = tf.config.experimental.list_physical_devices('GPU')
try:
    for gpu in gpus:
        print(gpu)
        tf.config.experimental.set_memory_growth(gpu, True)
except RuntimeError as e:
    print(e)

In [None]:
np.linalg.det(np.eye(2))

In [None]:
(X_train, y_train), (X_test, y_test) = tf.keras.datasets.fashion_mnist.load_data()

In [None]:
X_train = X_train.astype(np.float32) / 255. 
X_test = X_test.astype(np.float32)/ 255.

In [None]:
X_train.shape

In [None]:
X_test.shape

In [None]:
# 只需要图片原始数据 不需要标签y
train_db = tf.data.Dataset.from_tensor_slices((X_train))
test_db = tf.data.Dataset.from_tensor_slices((X_test))

In [None]:
for x in train_db.take(1):
    print(x.shape)
    plt.imshow(x)

In [None]:
BATCH_SIZE = 512
train_db = train_db.shuffle(10000).batch(BATCH_SIZE)
test_db = test_db.batch(BATCH_SIZE)

In [None]:
class AutoEncoder(Model):
    def __init__(self, hid_dim):
        super().__init__()
        # 784 -> 20
        self.encoder = Sequential([
            layers.Dense(256, activation='relu'),
            layers.Dense(128, activation='relu'),
            layers.Dense(hid_dim)
        ])
        self.decoder = Sequential([
            layers.Dense(128, activation='relu'),
            layers.Dense(256, activation='relu'),
            layers.Dense(784)
        ])
    def call(self, inputs, training=None):
        hidden = self.encoder(inputs)
        x = self.decoder(hidden)
        return x

In [None]:
model = AutoEncoder(20)

In [None]:
model.build(input_shape=(None, 784))

In [None]:
model.summary()

In [None]:
tf.nn.sigmoid_cross_entropy_with_logits?

In [None]:
a = tf.random.normal([4, 10])
b = tf.random.normal([4, 10])
c = tf.nn.sigmoid_cross_entropy_with_logits(labels=a, logits=b)
c

In [None]:
tf.reduce_sum(c, axis=1)

In [None]:
tf.keras.losses.categorical_crossentropy(a, b, from_logits=True)

In [None]:
opt = optimizers.Adam(lr=0.001)

z * -log(sigmoid(x)) + (1-z) * -log(1-sigmoid(x))

In [None]:
@tf.function
def train_step(inp):
    with tf.GradientTape() as tape:
        x_rec_logist = model(inp)
        # 或者直接使用MSE损失
        loss = tf.nn.sigmoid_cross_entropy_with_logits(
            labels=inp, logits=x_rec_logist)
        loss =tf.reduce_mean(loss)
    grads = tape.gradient(loss, model.trainable_variables)
    opt.apply_gradients(zip(grads, model.trainable_variables))
    return loss

In [None]:
Image.fromarray?

In [None]:
def save_image(images, name):
    # 'L': 8-bit pixels, black and white
    new_im  = Image.new('L', (280, 280))
    index = 0
    # 10行 10列  100张 = 50 + 50 
    for i in range(0, 280, 28):
        for j in range(0, 280, 28):
            im = images[index] 
            im = Image.fromarray(im, mode='L')
            # 将小图片写入对应位置  列方向排布
            new_im.paste(im, (i, j))  # (x轴, y轴) 一列
            index += 1
    new_im.save(name)

In [None]:
save_path = './check_point/ae.ckpt'
if os.path.exists(os.path.join(save_path, '.index')):
    model.load_weights(save_path)

In [None]:
EPOCHS = 100
start = time.time()
for epoch in range(EPOCHS):
    for step, x in enumerate(train_db):
        x = tf.reshape(x, [-1, 784])
        loss = train_step(x)
        if step % 100 == 0:
            print(f"Epoch {epoch}, Batch {step}, Loss {float(loss)}")
    
    # 每个epoch 进行一次重建
    x = next(iter(test_db))
    logits = model(tf.reshape(x, [-1, 784]))
    x_hat = tf.sigmoid(logits)  # 将输出转换为0至1的像素值，使用sigmoid 函数
    x_hat = tf.reshape(x_hat, [-1, 28, 28])  # 恢复原来形状
    
    # 原始图片 + 重建图片 对比
    x_concat = tf.concat([x[:50], x_hat[:50]], axis=0)
    x_concat = x_concat.numpy() * 255  # 像素值恢复
    x_concat = x_concat.astype(np.uint8)
    
    save_image(x_concat, f"ae_images/rec_epoch_{epoch}.png")
model.save_weights(save_path)
print('Time taken for {} epochs {} sec\n'.format(EPOCHS, time.time() - start))

In [None]:
img = Image.open('ae_images/rec_epoch_99.png')

In [None]:
img

In [None]:
Image.new?

## 自编码器变种
### Denising Auto-Encoder
防止神经网络记忆住输入数据的底层特征, 给输入数据添加随机的噪声扰动:
$$
\tilde x = x + \epsilon, \epsilon \sim \mathcal N(0, var)
$$

### Dropout Auto-Encoder
在网络层之间插入Dropout 层实现网络连接的随即断开, 防止过拟合.

### Adversarial Auto-Encoder
对抗自编码器利用额外的判别器网络来判断降维的隐藏变量$z$是否采样自先验分布$P(z)$, 方便利用$P(z)$来重建输入

### Variational Auto-Encoder

视频教学: https://www.bilibili.com/video/BV15E411w7Pz

基本的自编码器是一个判别模型, 而不是生成模型. 

变分自编码器(VAE)可以实现给定隐藏变量的分布$P(z)$, 通过学习条件概率分布$P(x|z)$, 对联合概率分布$P(x, z) = P(x|z)P(z)$进行采样, 生成不同的样本.

![VAE](VAE.png)

对比自编码器, VAE模型对隐藏变量$z$的分布有显示的约束, 希望其符合预设的先验分布$P(z)$. 因此，在损失函数的设计上，除了原有的重建误差项外，还添加了隐变量𝒛分布的约束项。

最大化目标  
$$L(\phi, \theta) = -D_{KL}(q_{\phi}(z|x)||p(z)) + E_{z \sim q}[log p_{\theta}(x|z)]$$

用编码器网络参数化$q_{\phi}(z|x)$函数, 解码器网络参数化$p_{\theta}(x|z)$.

特别地, 当$q_{\phi}(z|x)$和$p(z)$都为**正态分布**时, 第一项散度的计算可以简化为:
$$
D_{KL}(q_{\phi}(z|x)||p(z)) = log\frac {\sigma_2}{\sigma_1} + \frac {\sigma_1^2 + (\mu_1-\mu_2)^2}{x\sigma_2^2} - \frac 1 2
$$

更特别地, 当$p(z) \sim \mathcal N(0, 1)$时, 即$\mu_2=0, \sigma_2=1$
$$
D_{KL}(q_{\phi}(z|x)||p(z)) = -log\sigma_1 + \frac {\sigma_1^2 + \mu_1^2}{2} - \frac 1 2
$$


多维:
$$D_{KL}(q_{\phi}(z|x)||p(z)) = 0.5*(-log|\Sigma_1| + tr(\Sigma_1) + u_1^Tu_1 - D)$$
假设$\Sigma_1$是个对角矩阵(diagonal covariance structure)时, 结果和上面的相同

便于计算, 第二项$E_{z \sim q}[log p_{\theta}(x|z)]$同样可以基于自编码器中的重建误差函数实现.

VAE模型的优化目标转换为:
$$
min D_{KL}(q_{\phi}(z|x)||p(z)) \\
max E_{z \sim q}[log p_{\theta}(x|z)]
$$

详细推导过程: [KL散度推导](https://hsinjhao.github.io/2019/05/22/KL-DivergenceIntroduction/)

[两个多维高斯分布的Kullback-Leibler divergence(KL散度)](https://hsinjhao.github.io/2019/05/22/KL-DivergenceIntroduction/)

#### 重参数技巧


![](重参数.png)

Reparameterization Trick

相关论文 https://arxiv.org/pdf/1312.6114v10.pdf

In [None]:
class VAE(Model):
    def __init__(self, h_dim):
        super().__init__()
        self.fc1 = layers.Dense(128)
        self.fc2 = layers.Dense(h_dim)
        self.fc3 = layers.Dense(h_dim)

        self.fc4 = layers.Dense(128)
        self.fc5 = layers.Dense(784)
   
    def encoder(self, x):
        # 编码器
        h = tf.nn.relu(self.fc1(x))
        # 均值
        mu = self.fc2(h)
        # 方差
        log_var = self.fc3(h)

        return mu, log_var
    
    def decoder(self, z):
        # 解码器
        out = tf.nn.relu(self.fc4(z))
        out = self.fc5(out)
        return out
    
    def reparamentize(self, mu, log_var):
        # 从标准正态分布采样
        eps = tf.random.normal(log_var.shape)
        var = tf.exp(log_var * 0.5)
        z = mu + var * eps
        return z

    def call(self, inputs, training=None):
        mu, log_var = self.encoder(inputs)

        z = self.reparamentize(mu, log_var)

        out = self.decoder(z)

        return out, mu, log_var

In [None]:
h_dim = 10
vae_model = VAE(h_dim)

In [None]:
vae_model.build(input_shape=(4, 784))  # tf.random.normal(log_var.shape) 需要确定的shape
vae_model.summary()

In [None]:
opt = optimizers.Adam(learning_rate=0.001)

In [None]:
@tf.function
def train_vae_step(model, inp):
    with tf.GradientTape() as tape:
        # 每个样本的mu, sigma 都不同
        x_rec_logist, mu, log_var = model(inp)
        # 重建损失  [b, 784]
        rec_loss = tf.nn.sigmoid_cross_entropy_with_logits(
            labels=inp, logits=x_rec_logist)
        rec_loss = tf.reduce_sum(rec_loss) / inp.shape[0]
        # 需要加上约束隐变量z  (b, h_dim)
        # log_var = log(sigma ** 2) = 2log(sigma)
        kl = 0.5 * (tf.exp(log_var) + mu ** 2 - 1 - log_var)
        kl = tf.reduce_sum(kl) / inp.shape[0]
        loss = rec_loss + kl * 1.0
    grads = tape.gradient(loss, model.trainable_variables)
    opt.apply_gradients(zip(grads, model.trainable_variables))
    return rec_loss, kl

In [None]:
EPOCHS = 100
for epoch in range(EPOCHS):
    for step, x in enumerate(train_db):
        x = tf.reshape(x, [-1, 784])
        rec_loss, kl = train_vae_step(vae_model, x)

        if step % 100 == 0:
            print(f"Epoch {epoch}, Batch {step}, rec loss {float(rec_loss)}, kl {float(kl)}")
    # 生成
    if (epoch + 1) % 10 == 0:
        z = tf.random.normal([100, h_dim])
        logits = vae_model.decoder(z)
        x = tf.sigmoid(logits)  # 0至1的像素值
        img = tf.reshape(x, [-1, 28, 28]).numpy() * 255 # 0-255
        img = img.astype(np.uint8)
        save_image(img, f'vae_images/gen_image_{epoch}.png')

        inp = next(iter(test_db))[:100]
        inp = tf.reshape(inp, [-1, 784])
        out, _, _ = vae_model(inp)
        out = tf.sigmoid(out)  # 0至1的像素值
        img = tf.reshape(out, [-1, 28, 28]).numpy() * 255 # 0-255
        img = img.astype(np.uint8)
        save_image(img, f'vae_images/test_image_{epoch}.png')