## Auto Encoders

In [9]:
import os
import tensorflow as tf
import numpy as np
from tensorflow import keras
from tensorflow.keras import Sequential, layers
import matplotlib.pyplot as plt
from PIL import Image

In [10]:
# 定义随机种子和验证TensorFlow的版本
tf.random.set_seed(22)
np.random.seed(22)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')

In [11]:
# 定义保存图片的函数save_image(image, name)，将多张图片保存到一张图片中
# images 表示
# name 表示
def save_images(imgs, name):
    # Image : Creates a new image with the given mode and size.创建一个新图片
    # Image.new(mode, size, color=0)
    new_im = Image.new('L', (280, 280))
    
    index = 0
    for i in range(0, 280, 28):    # range(start, stop[, step])
        for j in range(0, 280, 28):
            im = imgs[index]    # 根据索引获取图片
            im = Image.fromarray(im, mode='L')    # 实现im（数组类型）array到（图片）image类型的转换
            new_im.paste(im, (i, j))    # 将im paste 到(i, j) 的位置
            index += 1    # 索引值累加
    new_im.save(name)   # 保存 new_im 到指定的路径下的文件[name 参数需指定path & file_name]

In [12]:
# 定义超参数（超参数需要人工调整）注意：在这里定义的是全局变量
# 通过AutoEncoders 将图片降维后的维度
h_dim = 20
# 定义批处理数量
batchsz = 512
# 定义学习率
lr = 1e-3

In [13]:
# 加载数据集并预处理
(x_train, y_train), (x_test, y_test) = keras.datasets.fashion_mnist.load_data()
# print(type(x_train))
# print(type(y_train))
# 此处不要标签数据，只需要x_train 和 x_test
# astype() 完成数据类型转换
x_train, x_test = x_train.astype(np.float32) / 255., x_test.astype(np.float32) / 255.
train_db = tf.data.Dataset.from_tensor_slices(x_train)
train_db = train_db.shuffle(batchsz * 5).batch(batchsz)
test_db = tf.data.Dataset.from_tensor_slices(x_test)
# test_db = test_db.batch(batchsz)
test_db = test_db.batch(50)
print(x_train.shape, y_train.shape)
print(x_test.shape, y_test.shape)

(60000, 28, 28) (60000,)
(10000, 28, 28) (10000,)


In [14]:
# 定义AutoEncoers类，用于对图片数据的处理
class AutoEncoders(keras.Model):
    def __init__(self):    # 不需要传递 h_dim 参数，因为这个参数是全局变量
        super(AutoEncoders, self).__init__()
        
        # Encoders层
        self.encoder = Sequential([
            layers.Dense(256, activation=tf.nn.relu),    # [b, 28*28] => [b, 256]
            layers.Dense(128, activation=tf.nn.relu),    # [b, 256] => [b, 128]
            layers.Dense(h_dim)                          # [b, 128] => [b, h_dim]
        ])
    
        # Decoders 层
        self.decoder = Sequential([
            layers.Dense(128, activation=tf.nn.relu),    # [b, h_dim] => [b, 128]
            layers.Dense(256, activation=tf.nn.relu),    # [b, 128] => [b, 256]
            layers.Dense(28*28)                          # [b, 256] => [b, 28*28]
        ])
    
    # 定义call方法，实现前向传播
    def call(self, inputs, training=None):
        h = self.encoder(inputs)    # [b, 28*28] => [b, 20]
        x_hat = self.decoder(h)     # [b, 20] => [b, 28*28]
        
        return x_hat
    
# 定义AutoEncoers类实例
autoEncoder = AutoEncoders()
# 调用 build()方法，需要传递input_shape参数，指定输入数据的维度
autoEncoder.build(input_shape=(None, 28*28))
# 查看网络的 summary
autoEncoder.summary()

Model: "auto_encoders_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
sequential_2 (Sequential)    multiple                  236436    
_________________________________________________________________
sequential_3 (Sequential)    multiple                  237200    
Total params: 473,636
Trainable params: 473,636
Non-trainable params: 0
_________________________________________________________________


In [15]:
# 定义优化器
optimizer = tf.optimizers.Adam(lr=lr)
# 训练
epochs = 100
for epoch in range(epochs):
    for step, x in enumerate(train_db):
        # [b, 28, 28] => [b, 28*28]
        x = tf.reshape(x, [-1, 784])
        with tf.GradientTape() as tape:
            x_rec_logicts = autoEncoder(x)
            rec_loss = tf.losses.binary_crossentropy(x, x_rec_logicts, from_logits=True)
            rec_loss = tf.reduce_mean(rec_loss)
            
        grads = tape.gradient(rec_loss, autoEncoder.trainable_variables)
        optimizer.apply_gradients(zip(grads, autoEncoder.trainable_variables))
        
        if step % 100 == 0:
            print(epoch, step, float(rec_loss))
            
    # evaluation 评估
    # 从test_db 取一个batchsz的数据用于测试
    x = next(iter(test_db))
    logits = autoEncoder(tf.reshape(x, [-1, 28*28]))
    # 得到logits 最终是为了还原成图片，所有需要经过sigmoid()函数，变换到（0，1）的范围内
    x_hat = tf.sigmoid(logits)
    # 将 x_hat 的维度变换成原始图片的维度 [b, 28, 28]
    x_hat = tf.reshape(x_hat, [-1, 28, 28])    # [b, 28*28] => [-1, 28, 28]
    
    # 把原来的图片拼接与生成的图片拼接在一起，用于对比，在axis=0维度上拼接
    x_concat = tf.concat([x, x_hat], axis=0)    # [b, 28, 28] => [2*b, 28, 28]
#     x_concat = x_hat
    # 提取x_concat中numpy数据，然后由原来的(0, 1)范围变换为 （0，255），便于显示图片，
    x_concat = x_concat.numpy() * 255.
    # 转换数据类型，保存图片的一种格式
    x_concat = x_concat.astype(np.uint8)    
    save_images(x_concat, '/home/kukafee/workspace/picture/rec_epoch_%d.png'%epoch)

0 0 0.6926943063735962
0 100 0.32506993412971497
1 0 0.311957448720932
1 100 0.30228668451309204
2 0 0.2933153808116913
2 100 0.2949793040752411
3 0 0.28729984164237976
3 100 0.2936669588088989
4 0 0.28346970677375793
4 100 0.28766804933547974
5 0 0.28027844429016113
5 100 0.28534072637557983
6 0 0.27829572558403015
6 100 0.2834666073322296
7 0 0.2777024507522583
7 100 0.28189289569854736
8 0 0.2750740945339203
8 100 0.28097739815711975
9 0 0.2755848169326782
9 100 0.27975577116012573
10 0 0.27388712763786316
10 100 0.2793492078781128
11 0 0.27210408449172974
11 100 0.27822038531303406
12 0 0.2713480293750763
12 100 0.277967244386673
13 0 0.27047988772392273
13 100 0.2771189212799072
14 0 0.26970478892326355
14 100 0.2759568989276886
15 0 0.2693599760532379
15 100 0.27548521757125854
16 0 0.26865154504776
16 100 0.27515745162963867
17 0 0.26814794540405273
17 100 0.2743704319000244
18 0 0.2678173780441284
18 100 0.27387046813964844
19 0 0.2672489285469055
19 100 0.2740730047225952
20 0