In [None]:
import tensorflow as tf
import numpy as np
import os
import struct

# 数据准备

In [None]:
def load_image(path):
    with open(path, 'rb') as fd:
        magic, num, rows, cols = struct.unpack('>IIII', fd.read(16))
        res = np.fromfile(fd, dtype=np.uint8).reshape(-1, 784)
    return res


def load_label(path):
    with open(path, 'rb') as fd:
        magic, n = struct.unpack('>II', fd.read(8))
        res = np.fromfile(fd, dtype=np.uint8)
    return res

In [None]:
from sklearn.preprocessing import StandardScaler


class MnistData:
    def __init__(self, data_path,label_path, batch_size=32, normalize=False, shuffle=False):
        '''
        paths: 文件路径
        '''
        self._data = list()
        self._target = list()
        self._n_samples = 0
        self.n_features = 0

        self._idx = 0    # mini-batch的游标
        self._batch_size = batch_size

        self._load(data_path,label_path)

        if shuffle:
            self._shuffle_data()
        if normalize:
            self._normalize_data()

        print(self._data.shape, self._target.shape)
        
    def _load(self, data_path,label_path):
        '''
        载入数据
        '''
        self._data=load_image(data_path)
        self._target=load_label(label_path)

        self._n_samples, self.n_features = self._data.shape[0], self._data.shape[1]
        
    def _shuffle_data(self):
        '''
        打乱数据
        '''
        idxs = np.random.permutation(self._n_samples)
        self._data = self._data[idxs]
        self._target = self._target[idxs]

    def _normalize_data(self):
        scaler = StandardScaler()
        self._data = scaler.fit_transform(self._data)

    def next_batch(self):
        '''
        生成mini-batch
        '''
        while self._idx < self._n_samples:
            yield self._data[self._idx: (self._idx+self._batch_size)], self._target[self._idx: (self._idx+self._batch_size)]
            self._idx += self._batch_size

        self._idx = 0
        self._shuffle_data()

In [None]:
MNIST_DIR = '../dataset/MNIST/'
train_data_path = os.path.join(MNIST_DIR, 'train-images.idx3-ubyte')
train_label_path = os.path.join(MNIST_DIR, 'train-labels.idx1-ubyte')
test_data_path = os.path.join(MNIST_DIR, 't10k-images.idx3-ubyte')
test_label_path = os.path.join(MNIST_DIR, 't10k-labels.idx1-ubyte')

batch_size = 50
train_data = MnistData(train_data_path,train_label_path, batch_size=batch_size,
                       normalize=True, shuffle=True)
test_data = MnistData(test_data_path,test_label_path, batch_size=batch_size,
                      normalize=True, shuffle=False)

# 网络结构设计
LeNet-5的网络结构如下表所示：

|kernel|n_kernel|padding|stride|
|-|:-:|:-:|:-:|
|Conv 5*5|5|2|1|
|MaxPool 2*2|-|0|2|
|Conv 5*5|16|0|1|
|MaxPool 2*2|-|0|2|
|Conv 1*1|120|0|1|
|FC 84|-|-|-|
|Output 10|-|-|-|

In [None]:
unit_I = train_data.n_features    # 输入单元数，等于特征数

n_filters = [5, 16, 120]    # 卷积核数量
conv_sizes = [(5, 5), (5, 5), (1, 1)]    # 卷积核尺寸

pool_size = (2, 2)    # 池化核尺寸
strides = (2, 2)    # 核移动的步长

FC_size=84    # 全连接层单元数

unit_O = 10    # 输出单元数，类别数

# 搭建网络

In [None]:
# 输入必须是可由用户指定的，所以设为placeholder
X = tf.placeholder(tf.float32, [None, unit_I])  # 数据的样本数不指定，只指定特征数
Y = tf.placeholder(tf.int64, [None])    # 目标值为列向量，int64为了兼容
# 转为图片格式送入模型，(n_samples,width,height,depth)
X_img = tf.reshape(X, [-1, 28, 28, 1])
training = tf.placeholder_with_default(False, shape=[], name='training')

# 网络结构图
with tf.name_scope('LeNet-5'):
    C1 = tf.layers.conv2d(X_img, filters=n_filters[0],
                          kernel_size=conv_sizes[0], padding='same',
                          activation=tf.nn.tanh, name='C1')
    S2 = tf.layers.max_pooling2d(C1, pool_size=pool_size,
                                 strides=strides, name='S2')
    C3 = tf.layers.conv2d(S2, filters=n_filters[1],
                          kernel_size=conv_sizes[1],
                          activation=tf.nn.tanh, name='C3')
    S4 = tf.layers.max_pooling2d(C3, pool_size=pool_size,
                                 strides=strides, name='S4')
    C5 = tf.layers.conv2d(S4, filters=n_filters[2],
                          kernel_size=conv_sizes[2],
                          activation=tf.nn.tanh, name='C5')
    FC6 = tf.layers.dense(tf.layers.flatten(
        C5), FC_size, activation=tf.nn.tanh)
    logits=tf.layers.dense(FC6, unit_O, activation=None)    # 最后一层直接输出logits，无激活函数

# 评估图
with tf.name_scope('Eval'):
    # 计算一维向量与onehot向量之间的损失
    loss = tf.losses.sparse_softmax_cross_entropy(labels=Y, logits=logits)
    predict = tf.argmax(logits, 1)
    accuracy = tf.reduce_mean(tf.cast(tf.equal(predict, Y), tf.float32))

# 优化图
with tf.name_scope('train_op'):
    lr = 1e-3
    train_op = tf.train.AdamOptimizer(lr).minimize(loss)

init = tf.global_variables_initializer()
config = tf.ConfigProto()
config.gpu_options.allow_growth = True    # 按需使用显存

# 训练网络

In [None]:
with tf.Session(config=config) as sess:
    sess.run(init)
    epochs = 20

    batch_cnt = 0
    for epoch in range(epochs):
        for batch_data, batch_labels in train_data.next_batch():
            batch_cnt += 1
            loss_val, acc_val, _ = sess.run(
                [loss, accuracy, train_op],
                feed_dict={
                    X: batch_data,
                    Y: batch_labels,
                    training:True})

            # 每1000batch输出一次信息
            if (batch_cnt+1) % 1000 == 0:
                print('epoch: {}, batch_loss: {}, batch_acc: {}'.format(
                    epoch, loss_val, acc_val))

            # 每5000batch做一次验证
            if (batch_cnt+1) % 5000 == 0:
                all_test_acc_val = list()
                for test_batch_data, test_batch_labels in test_data.next_batch():
                    test_acc_val = sess.run(
                        [accuracy],
                        feed_dict={
                            X: test_batch_data,
                            Y: test_batch_labels
                        })
                    all_test_acc_val.append(test_acc_val)
                test_acc = np.mean(all_test_acc_val)
                print('epoch: {}, test_acc: {}'.format(epoch, test_acc))