In [2]:
import numpy as np
import sklearn.preprocessing as prep #sklearn.preprocessing是一个对数据进行预处理的常用模块
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data

def xavier_init(fan_in, fan_out, constant = 1):
    low = -constant * np.sqrt(6.0 / (fan_in + fan_out))
    high = constant * np.sqrt(6.0 / (fan_in + fan_out))
    return tf.random_uniform((fan_in, fan_out), minval =low, maxval = high, dtype = tf.float32) #创建了一个范围内的均匀分布

class AdditiveGaussianNoiseAutoencoder(object): #去噪自编码的class
    def __init__(self, n_input, n_hidden, transfer_function=tf.nn.softplus, 
                 optimizer = tf.train.AdamOptimizer(), scale=0.1):
        self.n_input = n_input #输入变量
        self.n_hidden = n_hidden #隐含层节点数
        self.transfer = transfer_function #隐含层激活函数
        self.scale = tf.placeholder(tf.float32)
        self.training_scale = scale 
        network_weights = self._initialize_weights()
        self.weights = network_weights
        self.x = tf.placeholder(tf.float32, [None, self.n_input])
        self.hidden = self.transfer(tf.add(tf.matmul(self.x + scale * tf.random_normal((n_input,)), #能提取特征的隐含层
                                           self.weights['w1']), self.weights['b1']))
        self.reconstruction = tf.add(tf.matmul(self.hidden, self.weights['w2']), self.weights['b2']) #输出层进行数据复原
        self.cost = 0.5 * tf.reduce_sum(tf.pow(tf.subtract(self.reconstruction, self.x), 2.0)) #使用平方误差作为cost
        self.optimizer = optimizer.minimize(self.cost)
        
        init = tf.global_variables_initializer()
        self.sess = tf.Session()
        self.sess.run(init)
    
    def _initialize_weights(self):
        all_weights = dict()
        all_weights['w1'] = tf.Variable(xavier_init(self.n_input, self.n_hidden))
        all_weights['b1'] = tf.Variable(tf.zeros([self.n_hidden],dtype = tf.float32))
        all_weights['w2'] = tf.Variable(tf.zeros([self.n_hidden,self.n_input], dtype = tf.float32))
        all_weights['b2'] = tf.Variable(tf.zeros([self.n_input],dtype = tf.float32))
        return all_weights
    
    def partial_fit(self, X): #执行一步训练;函数只需让Session执行两个计算图的节点，分别是损失cost和训练过程optimizer
        cost, opt = self.sess.run((self.cost, self.optimizer), feed_dict = {self.x: X, self.scale: self.training_scale})
        return cost
    
    def calc_total_cost(self, X): #只求损失cost，这里只让Session执行一个计算图节点self.cost，不像partial_fit会触发训练操作
        return self.sess.run(self.cost, feed_dict = {self.x: X, self.scale: self.training_scale})
    
    def transform(self, X): #提供一个接口来获取抽象后的特征，自主编码器的隐含层的最主要功能就是学习出数据中的高阶特征
        return self.sess.run(self.hidden, feed_dict = {self.x: X, self.scale: self.training_scale})
    
    def generate(self, hidden = None): #将隐含层的输出结果作为输入，通过之后的重建层将提取到的高阶特征复原为原始数据
        if hidden is None:
            hidden = np.random.normal(size =self.weights["b1"])
        return self.sess.run(self.reconstruction, feed_dict = {self.hidden: hidden})
    
    def reconstruct(self, X): #整体运行一遍复原过程
        return self.sess.run(self.reconstruction, feed_dict = {self.x: X, self.scale: self.training_scale})
    
    def getWeights(self):
        return self.sess.run(self.weights['w1'])
    
    def getBiases(self):
        return self.sess.run(self.weights['b1'])
    
mnist = input_data.read_data_sets('MNIST_data', one_hot = True)

def standard_scale(X_train, X_test): #利用StandardScale这个类，先在训练集上进行fit，再将这个Scale用到训练数据和测试数据上
    preprocessor = prep.StandardScaler().fit(X_train)
    X_train = preprocessor.transform(X_train)
    X_test = preprocessor.transform(X_test)
    return X_train, X_test

def get_random_block_from_data(data,batch_size): #取随机整数，再以这个随机数作为block的起始位置，然后再顺序取到一个batch size的
    start_index = np.random.randint(0, len(data) - batch_size)  #数据;属于不放回抽样，可以提高数据的利用效率
    return data[start_index:(start_index + batch_size)]

X_train, X_test =standard_scale(mnist.train.images, mnist.test.images) #标准化变换

n_samples = int(mnist.train.num_examples)
training_epochs = 20
batch_size = 128
display_step =1

autoencoder = AdditiveGaussianNoiseAutoencoder(n_input = 784,  #创建一个AGN自编码器的实例
                                               n_hidden = 200,
                                               transfer_function = tf.nn.softplus,
                                               optimizer = tf.train.AdamOptimizer(learning_rate = 0.001),
                                               scale = 0.01)

for epoch in range(training_epochs):
    avg_cost = 0.
    total_batch = int(n_samples / batch_size)
    for i in range(total_batch):
        batch_xs = get_random_block_from_data(X_train, batch_size)
        
        cost = autoencoder.partial_fit(batch_xs)
        avg_cost += cost / n_samples * batch_size
        
    if epoch % display_step == 0:
        print("Epoch:", '%04d' % (epoch + 1), "cost=", "{:.9f}".format(avg_cost))
        
print("Total cost: " + str(autoencoder.calc_total_cost(X_test)))

Extracting MNIST_data/train-images-idx3-ubyte.gz
Extracting MNIST_data/train-labels-idx1-ubyte.gz
Extracting MNIST_data/t10k-images-idx3-ubyte.gz
Extracting MNIST_data/t10k-labels-idx1-ubyte.gz
Epoch: 0001 cost= 19536.902802273
Epoch: 0002 cost= 13250.062869318
Epoch: 0003 cost= 10792.896489205
Epoch: 0004 cost= 10282.021052841
Epoch: 0005 cost= 8977.125828409
Epoch: 0006 cost= 9274.604605114
Epoch: 0007 cost= 9406.567067614
Epoch: 0008 cost= 8944.247376136
Epoch: 0009 cost= 8898.409404545
Epoch: 0010 cost= 8283.244813636
Epoch: 0011 cost= 8044.483117045
Epoch: 0012 cost= 8441.122923295
Epoch: 0013 cost= 8495.563090341
Epoch: 0014 cost= 8006.499413636
Epoch: 0015 cost= 8342.489192614
Epoch: 0016 cost= 8217.680517045
Epoch: 0017 cost= 7759.183575000
Epoch: 0018 cost= 8542.215706250
Epoch: 0019 cost= 7970.537857955
Epoch: 0020 cost= 7938.904743182
Total cost: 652433.0
