#                                     TF实现AutoEncoder

In [1]:
import numpy as np

In [2]:
import sklearn.preprocessing as prep

In [4]:
import tensorflow as tf

In [5]:
from tensorflow.examples.tutorials.mnist import input_data

 xavirer初始化器初始化权重，满足均匀分布

In [6]:
def xavirer_init(fan_in, fan_out, constant=1):
    low = -constant * np.sqrt(6.0 / (fan_in + fan_out))
    high = constant * np.sqrt(6.0 / (fan_in + fan_out))
    return tf.random_uniform((fan_in,fan_out),minval=low, maxval=high, dtype=tf.float32)

创建加高斯噪声的自编码器，隐含层激活函数采用softplus，softplus是平滑版的ReLU，log（1-exp（x）），优化器采用Adam，scale为高斯噪声系数。自动编码器可以用于特征降维，类似主成分分析PCA，但是其相比PCA其性能更强，这是由于神经网络模型可以提取更有效的新特征。

对于高斯分布的数据，采用均方误差就好，而对于伯努利分布可以采用交叉熵，这个是可以根据似然函数推导出来的。

In [16]:
class AdditiveGaussianNoiseAutoencoder(object):
    def __init__(self,n_input,n_hidden,transfer_function=tf.nn.softplus,optimizer=tf.train.AdamOptimizer(),scale=0.1):
        self.n_input = n_input
        self.n_hidden = n_hidden
        self.transfer = transfer_function
        self.scale = tf.placeholder(tf.float32)
        self.train_scale = scale
        network_weights = self._initialize_weights()
        self.weights = network_weights
        self.x = tf.placeholder(tf.float32,[None,self.n_input])
        self.hidden = self.transfer(tf.add(tf.matmul(self.x + scale * tf.random_normal((n_input,)),self.weights['w1']),self.weights['b1']))
        #隐藏层 h=softplus((x+ sca* 高斯噪声）* w1 + b1）
        self.reconstructurction = tf.add(tf.matmul(self.hidden,self.weights['w2']),self.weights['b2'])
        #输出层 复原数据  rec = h * w2 + b2
        self.cost = 0.5*tf.reduce_sum(tf.pow(tf.subtract(self.reconstructurction,self.x),2.0))#平方误差作为cost
        self.optimizer =optimizer.minimize(self.cost)#优化器，最小化代价函数
        
        init = tf.global_variables_initializer()
        self.sess = tf.Session()
        self.sess.run(init)
        
    def _initialize_weights(self):#初始化权重
        all_weights = dict()
        all_weights['w1'] = tf.Variable(xavirer_init(self.n_input,self.n_hidden))
        all_weights['b1'] = tf.Variable(tf.zeros([self.n_hidden],dtype = tf.float32))
        all_weights['w2'] = tf.Variable(tf.zeros([self.n_hidden,self.n_input],dtype = tf.float32))
        all_weights['b2'] = tf.Variable(tf.zeros([self.n_input],dtype = tf.float32))
        return all_weights
    def partial_fit(self,X):#执行两个计算图节点（cost ，opt）用一个batch数据进行训练，并返回cost
        cost,opt = self.sess.run((self.cost,self.optimizer),feed_dict={self.x:X,self.scale:self.train_scale})
        return cost
    def calc_total_cost(self,X):#执行计算一个图节点cost，这个函数会在训练完成后，对测试集进行性能测评时用到
        return self.sess.run(self.cost,feed_dict={self.x:X,self.scale:self.train_scale})
    def transform(self,X):#返回自编码器隐藏层的输出结果
        return self.sess.run(self.hidden, feed_dict={self.x:X,self.scale:self.train_scale})
    def generate(self,hidden = None):#将隐藏层的结果作为输入，提取高阶特征，复原原始数据
        if hidden is None:
            hidden = np.random.normal(size = self.weights['b1'])
        return self.sess.run(self.reconstructurction,feed_dict={self.hidden:hidden})
    def reconstruct(self,X):#整体运行一边。包括transform和generate，输入原始数据，输出复原数据
        return self.sess.run(self.reconstructurction, feed_dict={self.x:X,self.scale:self.train_scale})
    def getWeights(self):#获取隐藏层的权重
        return self.sess.run(self.weights['w1'])
    def getBiases(self):#获取隐藏层的偏执系数
        return self.sess.run(self.weights['b1'])

In [17]:
mnist = input_data.read_data_sets('MNIST_data',one_hot=True)

Extracting MNIST_data\train-images-idx3-ubyte.gz
Extracting MNIST_data\train-labels-idx1-ubyte.gz
Extracting MNIST_data\t10k-images-idx3-ubyte.gz
Extracting MNIST_data\t10k-labels-idx1-ubyte.gz


In [18]:
def standard_scale(X_train,X_test):#对测试数据进行标准化处理，实现均值为0，标准差为1，为了保证一致性，所以公用scale，
    preprocessing = prep.StandardScaler().fit(X_train)
    x_train = preprocessing.transform(X_train)
    x_test = preprocessing.transform(X_test)
    return x_train,x_test

In [19]:
def get_random_block_from_data(data,batch_size):#随机获取block数据的函数，从0到len（data）-batch_size之间随机取一个数，然后以这个数为起始位置
    #往后取一个batch_size的数据
    start_index = np.random.randint(0, len(data)-batch_size)
    return data[start_index:(start_index + batch_size)]

In [20]:
X_train,X_test = standard_scale(mnist.train.images, mnist.test.images)

In [21]:
n_sample = int(mnist.train.num_examples)
training_epochs = 20

In [22]:
batch_size = 128
display_step = 1

mnist数据集图片的大小是28*28=784，输入层为784个节点，其中隐藏层设置了200节点，

In [24]:
autoencoder = AdditiveGaussianNoiseAutoencoder(n_input=784, n_hidden=200,
                                              transfer_function= tf.nn.softplus,
                                              optimizer= tf.train.AdamOptimizer(learning_rate = 0.001),
                                              scale=0.01)

In [25]:
for epoch in range(training_epochs):
    avg_cost = 0.
    total_batch = int(n_sample / batch_size)
    for i in range(total_batch):
        batch_xs = get_random_block_from_data(X_train,batch_size)
        
        cost = autoencoder.partial_fit(batch_xs)
        avg_cost += cost/n_sample *batch_size
        
    if epoch % display_step == 0:
        print("Epoch:",'%04d' % (epoch + 1),"cost = ","{:.9f}".format(avg_cost))
    

Epoch: 0001 cost =  19326.359272727
Epoch: 0002 cost =  12476.875864773
Epoch: 0003 cost =  10801.410061932
Epoch: 0004 cost =  9687.687532386
Epoch: 0005 cost =  10999.193707955
Epoch: 0006 cost =  9594.365609091
Epoch: 0007 cost =  9490.001972727
Epoch: 0008 cost =  8545.497261932
Epoch: 0009 cost =  8455.178255682
Epoch: 0010 cost =  8236.112156250
Epoch: 0011 cost =  8106.726167045
Epoch: 0012 cost =  8560.045123295
Epoch: 0013 cost =  8813.962448295
Epoch: 0014 cost =  7748.899409659
Epoch: 0015 cost =  8028.529601705
Epoch: 0016 cost =  8306.481492614
Epoch: 0017 cost =  7993.290469318
Epoch: 0018 cost =  7433.967768750
Epoch: 0019 cost =  8167.394723864
Epoch: 0020 cost =  7893.332692045


In [27]:
print("Total cost:" + str(autoencoder.calc_total_cost(X_test)))

Total cost:650474.3
