### MNIST 手写字体识别

In [3]:
#导入数据
import tensorflow as tf

#原网站提供了6W张训练图片和1W张测试图片，导入的该工具会从训练图片分出5000张作为验证集
from tensorflow.examples.tutorials.mnist import input_data

#读取路径为当前路径下的data文件夹下的MNIST文件夹内，如果该文件夹没有，则自动下载数据至该文件夹
mnist = input_data.read_data_sets("./data/MNIST/", one_hot=True)

print("Training data size: ", mnist.train.num_examples) 
print ("Validating data size: ", mnist.validation.num_examples) 
print ("Testing data size: ", mnist.test.num_examples) 

Extracting ./data/MNIST/train-images-idx3-ubyte.gz
Extracting ./data/MNIST/train-labels-idx1-ubyte.gz
Extracting ./data/MNIST/t10k-images-idx3-ubyte.gz
Extracting ./data/MNIST/t10k-labels-idx1-ubyte.gz
Training data size:  55000
Validating data size:  5000
Testing data size:  10000


In [4]:
#为了方便使用SGD，mnist.train.next_batch函数可以从所有训练数据中取一个小批量投入训练

batch_size=100

#从训练集选取batch_size个训练数据
xs,ys=mnist.train.next_batch(batch_size)

#将图片展开成一个长度为28×28=784的一维数组，一张图片可作为一个特征向量。所以batch为100的矩阵维度为100×784
print('X shapr:',xs.shape)
print('Y shape:',ys.shape)

X shapr: (100, 784)
Y shape: (100, 10)


In [3]:
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets("./data/MNIST/", one_hot=True)


#输入结点数为像素点数，输出结点数为类别数
INPUT_NODE=784
OUTPUT_NODE=10

#一个隐藏层
LAYER1_NODE=500

#一个批量中的样本量，数据量越小训练过程越接近SGD，数据量越大训练过程越接近梯度下降
BATCH_SIZE=100

#学习率和学习衰减率
LEARNING_RATE_BASE=0.8
LEARNING_RATE_DECAY=0.99

#正则化系数、迭代次数和滑动平均衰减率
REGULARIZATION_RATE=0.0001
TRAINING_STEPS=3000
MOVING_AVERAGE_DECAY=0.99

#定义推断函数，给定所有参数下计算神经网络的前向传播结果。参数avg_class可确定推断中使不使用滑动平均模型
def inference(input_tensor,avg_class,weights1,biases1,weights2,biases2):
    
    #没有提供滑动平均类时，直接使用参数当前的取值
    if avg_class == None:
        
        #计算隐藏层前向传播结果，使用ReLU激活函数
        layer1=tf.nn.relu(tf.matmul(input_tensor,weights1)+biases1)
        
        #计算输出层的前向传播结果
        return tf.matmul(layer1,weights2)+biases2
    else:
        
        #首先使用avg_class.averaage函数计算变量的滑动均值，然后计算相应的前向传播结果
        layer1=tf.nn.relu(tf.matmul(input_tensor,avg_class.average(weights1))+avg_class.average(biases1))
        return tf.matmul(layer1,avg_class.average(weights2))+avg_class.average(biases2)
    
#模型训练函数

def train(mnist):
    x=tf.placeholder(tf.float32,[None,INPUT_NODE],name='x-input')
    y=tf.placeholder(tf.float32,[None,OUTPUT_NODE],name='y-input')
    
    #生成隐藏层参数
    weights1=tf.Variable(tf.truncated_normal([INPUT_NODE,LAYER1_NODE],stddev=0.1))
    biases1=tf.Variable(tf.constant(0.1,shape=[LAYER1_NODE]))
    
    #生成输出层参数
    weights2=tf.Variable(tf.truncated_normal([LAYER1_NODE,OUTPUT_NODE],stddev=0.1))
    biases2=tf.Variable(tf.constant(0.1,shape=[OUTPUT_NODE]))
    
    #计算当前参数下前向传播的结果，这里设为‘None’不会计算滑动平均值
    y_hat=inference(x,None,weights1,biases1,weights2,biases2)

    #定义储存迭代数的变量，这个变量不需要计算滑动平均值，所以这里指定的这个变量为不饿训练变量（trainable=False）
    global_step=tf.Variable(0,trainable=False)
    
    #给定滑动平均衰减率和迭代数，初始化滑动平均类。
    variable_averages=tf.train.ExponentialMovingAverage(MOVING_AVERAGE_DECAY,global_step)
    
    #在所有代表神经网络参数的变量上使用滑动平均，其他超参数不需要。tf.trainable_variables返回的就是图上的集合GraphKeys.TRAINABLE_VARIABLES中的元素。
    variables_averages_op=variable_averages.apply(tf.trainable_variables())
    
    #计算使用滑动平均后的前向传播结果，滑动平均不会改变变量本身，而是使用影子变量记录滑动平均值，需要使用滑动平均再明确调用average函数
    average_y_hat=inference(x,variable_averages,weights1,biases1,weights2,biases2)
    
    #~使用tf.argmax函数得到正确答案对应的类别编号
    cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y_hat, labels=tf.argmax(y, 1))
    
    #计算当前批量中所有样本的交叉熵均值
    cross_entropy_mean=tf.reduce_mean(cross_entropy)
    
    #计算L2正则化损失函数
    regularizer=tf.contrib.layers.l2_regularizer(REGULARIZATION_RATE)
    
    #计算模型的正则化损失，只计算神经网络权重的正则化损失，不使用偏置项
    regularization=regularizer(weights1)+regularizer(weights2)
    
    #总损失函数
    loss=cross_entropy_mean+regularization
    
    #设置指数衰减学习率.基础学习率、当前迭代次数、一个epoch所需要的迭代次数、学习衰减率
    learning_rate = tf.train.exponential_decay(
        LEARNING_RATE_BASE,
        global_step,
        mnist.train.num_examples / BATCH_SIZE,
        LEARNING_RATE_DECAY,
        staircase=True)
    
    #使用梯度下降优化算法优化损失函数，损失函数包括交叉熵损失和L2正则化损失
    train_step=tf.train.GradientDescentOptimizer(learning_rate).minimize(loss,global_step=global_step)
    
    #在训练神经网络模型时，每过一遍数据既需要通过反向传播来更新参数，也要更新每个参数的滑动平均值。为了一次完成多个操作
    #train_op=tf.group(train_step,variables_averages_op)
    with tf.control_dependencies([train_step,variables_averages_op]):
        train_op=tf.no_op(name='train')
    
    
    #检验使用滑动平均模型的前向传播结果是否正确。tf.argmax(average_y_hat,1)计算每一个样本的预测。average_y_hat是一个batch_size×10的二维数组，
    #每一行表示一个样本的前向传播结果。tf.argmax的第二个参数“1”表示选取最大值的操作只在第一个维度中进行，即每一行选取最大值的下标。
    #于是得到的结果为长度为batch的一维数组，数组的值表示每个样本对应识别的类别。tf.equal判断两个张量的每一维度是否相等，相等返回Ture。
    correct_prediction=tf.equal(tf.argmax(average_y_hat,1),tf.argmax(y,1))
    
    #该运算首先将布尔型数值转为实数型，再计算均值。该均值为模型在这一组数据上的正确率。
    accuracy=tf.reduce_mean(tf.cast(correct_prediction,tf.float32))

    #初始化会话并开始训练过程
    with tf.Session() as sess:
        
        tf.global_variables_initializer().run()
        
        #准备验证数据，可通过验证数据简要判断停止条件和训练效果。
        validate_feed = {x: mnist.validation.images, y: mnist.validation.labels}
        
        #准备测试数据，作为模型最终性能的判别标准
        test_feed={x:mnist.test.images,y:mnist.test.labels}
    
        # 迭代地训练神经网络。
        for i in range(TRAINING_STEPS):
            
            #每1000次输出1次验证集上的测试结果
            if i % 500 == 0:
                
                #计算滑动平均模型在验证集上的结果，数据集小可一次处理所有验证数据。如果验证集大，需要分为更小的batch。
                validate_acc = sess.run(accuracy, feed_dict=validate_feed)
                print("After %d training step(s), validation accuracy using average model is %g " % (i, validate_acc))
            
            #生成一次迭代需要用到的批量数据，并运行训练过程
            xs,ys=mnist.train.next_batch(BATCH_SIZE)
            sess.run(train_op,feed_dict={x:xs,y:ys})
        
        #训练结束后再测试集上计算最终模型准确度
        test_acc=sess.run(accuracy,feed_dict=test_feed)
        print(("After %d training step(s), test accuracy using average model is %g" %(TRAINING_STEPS, test_acc)))

    
    

Extracting ./data/MNIST/train-images-idx3-ubyte.gz
Extracting ./data/MNIST/train-labels-idx1-ubyte.gz
Extracting ./data/MNIST/t10k-images-idx3-ubyte.gz
Extracting ./data/MNIST/t10k-labels-idx1-ubyte.gz


In [4]:
train(mnist)

After 0 training step(s), validation accuracy using average model is 0.1354 
After 500 training step(s), validation accuracy using average model is 0.9682 
After 1000 training step(s), validation accuracy using average model is 0.975 
After 1500 training step(s), validation accuracy using average model is 0.9794 
After 2000 training step(s), validation accuracy using average model is 0.9816 
After 2500 training step(s), validation accuracy using average model is 0.983 
After 3000 training step(s), test accuracy using average model is 0.9815


### 重构MNIST识别
在了解了TensorFlow的变量管理和模型持久化操作后，我们可以重构上面简单的全连接网络。

切分功能模块，推断过程抽象为单独库函数。模型分为三个模块：mnist_inference.py定义了前向传播过程与神经网络中的参数。
mnist_train.py定义了神经网络的训练过程。mnist_eval.py定义了测试过程

In [1]:
#mnist_inference.py
# -*- coding: utf-8 -*-
import tensorflow as tf

#定义神经网络结构相关的参数
INPUT_NODE=784
OUTPUT_NODE=10
LAYER1_NODE=500

#通过tf.get_variable函数获取变量。在训练神经网络时会创建这些变量；在测试时会通过保存的模型加载这些变量的取值。
#而且更加方便的是，因为可以在变量加载时将滑动平均变量重命名，所以可以直接通过同样的名字在训练是使用变量自身，
#而在测试时使用变量的滑动平均值。在这个函数中也会将变量的正则化损失加入损失集合。
def get_weight_variable(shape, regularizer):
    weights=tf.get_variable('weights',shape,initializer=tf.truncated_normal_initializer(stddev=0.1))
    
    #当给出了正则化生成函数时，将当前变量的正则化损失加入名为losses的集合。
    if regularizer != None:
        tf.add_to_collection('losses',regularizer(weights))
    return weights

#定义神经网络的前向传播过程
def inference(input_tensor,regularizer):
    
    #声明第一层神经网络的的变量并完成前向传播过程
    with tf.variable_scope('layer1'):
        #这里tf.get_variable或tf.Variable没有本质区别，因为在训练或测试中没有在同一个程序中多次调用该函数。
        #如果在同一程序中多次调用，在第一次调用后需要将reuse参数设为True。
        weights=get_weight_variable([INPUT_NODE, LAYER1_NODE], regularizer)
        biases=tf.get_variable('biases', [LAYER1_NODE], initializer = tf.constant_initializer(0.0))
        layer1=tf.nn.relu(tf.matmul(input_tensor, weights)+biases)
    
    #类似的声明第二层神经网络的变量并完成前向传播过程
    with tf.variable_scope('layer2'):
        weights=get_weight_variable([LAYER1_NODE, OUTPUT_NODE], regularizer)
        biases=tf.get_variable('biases', [OUTPUT_NODE], initializer = tf.constant_initializer(0.0))
        layer2 = tf.matmul(layer1, weights) + biases
    
    return layer2
    

In [None]:
#mnist_train.py
# -*- coding: utf-8 -*-
import os
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
import mnist_inference

# 当前py文件在同一个地址

# 配置神经网络的参数
BATCH_SIZE = 100
LEARNING_RATE_BASE = 0.8
LEARNING_RATE_DECAY = 0.99
REGULARIZATION_RATE = 0.0001
TRAINING_STEPS = 10000
MOVING_AVERAGE_DECAY = 0.99
MODEL_SAVE_PATH = "./model/fcn_mnist"
MODEL_NAME = "fcn_mnist.ckpt"


def train(mnist):
    x = tf.placeholder(tf.float32, [None, IMAGE_SIZE, IMAGE_SIZE, NUM_CHANNELS], name='x-input')
    y = tf.placeholder(tf.float32, [None, OUTPUT_NODE], name='y-output')

    regularizer = tf.contrib.layers.l2_regularizer(REGULARIZATION_RATE)
    # 调用推断过程
    y_hat = inference(x, regularizer)
    global_step = tf.Variable(0, trainable=False)

    # 定义损失函数、学习率、滑动平均操作及训练过程
    variable_averages = tf.train.ExponentialMovingAverage(MOVING_AVERAGE_DECAY, global_step)
    variables_average_op = variable_averages.apply(tf.trainable_variables())
    cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y_hat, labels=tf.argmax(y, 1))
    cross_entropy_mean = tf.reduce_mean(cross_entropy)
    loss = cross_entropy_mean + tf.add_n(tf.get_collection('losses'))
    learning_rate = tf.train.exponential_decay(LEARNING_RATE_BASE, global_step, mnist.train.num_examples / BATCH_SIZE,
                                              LEARNING_RATE_DECAY)
    train_step = tf.train.GradientDescentOptimizer(learning_rate).minimize(loss, global_step=global_step)

    with tf.control_dependencies([train_step, variables_average_op]):
        train_op = tf.no_op(name='train')

    # 初始化TF持久化类
    saver = tf.train.Saver()
    with tf.Session() as sess:
        sess.run(tf.global_variables_initializer())

        # 在训练过程中不再测试模型在验证数据上的表现，验证和测试的过程会有独立的过程完成
        for i in range(TRAINING_STEPS):
            xs, ys = mnist.train.next_batch(BATCH_SIZE)
            reshaped_xs=np.reshape(xs,(BATCH_SIZE, IMAGE_SIZE, IMAGE_SIZE, NUM_CHANNELS))
            _, loss_value, step = sess.run([train_op, loss, global_step], feed_dict={x: xs, y: ys})

            # 每1000次迭代保存一次模型
            if i % 1000 == 0:
                # 输出模型在当前训练批量下的损失函数大小
                print('After %d training steps, loss on training batch is %g.' % (step, loss_value))

                # 保存当前模型，并使用global_step 参数特定地命名
                saver.save(sess, os.path.join(MODEL_SAVE_PATH, MODEL_NAME), global_step=global_step)


def main(argv=None):
    mnist = input_data.read_data_sets('./data/MNIST/', one_hot=True)
    train(mnist)


if __name__ == '__main__':
    tf.app.run()


Extracting ./data/MNIST/train-images-idx3-ubyte.gz
Extracting ./data/MNIST/train-labels-idx1-ubyte.gz
Extracting ./data/MNIST/t10k-images-idx3-ubyte.gz
Extracting ./data/MNIST/t10k-labels-idx1-ubyte.gz
After 1 training steps, loss on training batch is 2.81242.


In [None]:
#mnist_eval.py
import time
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
import mnist_inference
import mnist_train

#每10秒加载一次最新的模型，并在测试数据上测试最新模型的准确率
EVAL_INTERVAL_SECS=10

def evaluate(mnist):
    with tf.Graph().as_default() as g:
        #定义输入输出格式
        x=tf.placeholder(tf.float32, [None, mnist_inference.INPUT_NODE], name='x-input')
        y=tf.placeholder(tf.float32, [None, mnist_inference.OUTPUT_NODE], name='y-output')
        validate_feed={x:mnist.validation.images, y:mnist.validation.labes}
        
        #直接通过调用封装好的函数来计算前向传播结果。因为测试时不关注正则化损失值，所以用于计算正则化损失的函数可以设置为None
        y_hat=mnist_inference.inference(x, None)
        
        #使用前向传播结果计算准确度，如需对未知样本进行分类，使用tf.argmax(y_hat, 1)就可以得到输入样本的预测类别
        corret_prediction=tf.equal(tf.argmax(y_hat, 1), tf.argmax(y, 1))
        accuracy=tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
        
        #通过变量重命名的方式加载模型，因此前向传播过程不需调用滑动平均操作取平均值。完全共用mnist_inference.py定义的传播过程
        variable_averages=tf.train.ExponentialMovingAverage(mnist_train.MOVING_AVERAGE_DECAY)
        variables_to_restore=variable_averages.variables_to_restore()
        
        #variables_to_restore函数生成tf.train.Saver类所需要的变量重命名字典
        saver=tf.train.Saver(variables_to_restore)
        
        #每隔EVAL_INTERVAL_SECS秒调用一次计算准确度的过程以检测训练过程中的正确率变化
        while True:
            with tf.Session() as sess:
                #tf.train.get_checkpoint_state函数会通过checkpoint文件自动找到目录中最新模型的文件名
                ckpt=tf.train.get_checkpoint_state(mnist_train.MODEL_SAVE_PATH)
                if ckpt and ckpt.model_checkpoint_path:
                    #加载模型
                    saver.restore(sess, ckpt.model_checkpoint_path)
                    #通过文件名得到模型保存时迭代的次数
                    global_step=ckpt.model_checkpoint_path.split('/')[-1].split('-')[-1]
                    accuracy_score=sess.run(accuracy, feed_dict=validate_feed)
                    print("After %s training step(s), validation accuracy = %g" % (global_step, accuracy_score))
                else:
                    print('No checkpoint file found')
                    return
            time.sleep(EVAL_INTERVAL_SECS)
            
def main(argv=None):
    mnist=input_data.read_data_sets('./data/MNIST/', one_hot=True)
    evaluate(mnist)
    
if __name__=='__main__':
    tf.app.run()
                    
        