In [1]:
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf

  from ._conv import register_converters as _register_converters


In [2]:
import os
import struct

def load_mnist(path='dataset', kind='train'):
    """
    A helper function to load the MNIST dataset.
    :param path:
    :param kind:
    :param rescale:
    :return: images, labels
    """
    path = os.path.join(os.getcwd(), path)
    labels_path = os.path.join(path, '%s-labels.idx1-ubyte' % kind)
    images_path = os.path.join(path, '%s-images.idx3-ubyte' % kind)
    with open(labels_path, 'rb') as lbpath:
        magic, n_labels = struct.unpack('>II', lbpath.read(8))
        labels = np.fromfile(lbpath, dtype=np.uint8)
    with open(images_path, 'rb') as imgpath:
        magic, n_images, rows, columns = struct.unpack('>IIII', imgpath.read(16))
        images = np.fromfile(imgpath, dtype=np.uint8).reshape(n_images, rows * columns)
        images = ((images - 255.0) - 0.5) * 2
    print('Load %s data, images rows: %d, label rows: %d' % (kind, n_images, n_labels))
    return images, labels


X_data, y_data = load_mnist(kind='train')
X_test, y_test = load_mnist(kind='t10k')


Load train data, images rows: 60000, label rows: 60000
Load t10k data, images rows: 10000, label rows: 10000


In [3]:
X_validation, y_validation = X_data[50000:, :], y_data[50000:]
X_train, y_train = X_data[:50000, :], y_data[:50000]
mean_vals = np.mean(X_train, axis=0)
std_vals = np.std(X_train)
X_train_centered = (X_train - mean_vals) / std_vals
X_validation_centered = (X_validation - mean_vals) / std_vals
X_test_centered = (X_test - mean_vals) / std_vals

del X_data, y_data, X_train, X_validation, X_test

In [4]:
def batch_generator(X, y, batch_size=64, shuffle=False, random_seed=None):
    """
    A helper function to generate batches of instances.
    :param X: 
    :param y: 
    :param batch_size: 
    :param shuffle: 
    :param random_seed: 
    :return: 
    """
    if shuffle:
        idx = np.arange(y.shape[0])
        random = np.random.RandomState(random_seed)
        random.shuffle(X)
        X = X[idx]
        y = y[idx]
    for i in range(0, X.shape[0], batch_size):
        yield (X[i:i + batch_size, :], y[i:i + batch_size])


In [7]:
class ConvNN(object):
    def __init__(self, batch_size=100, epochs=20, learning_rate=1e-4, dropout_rate=0.5,
                 shuffle=True, random_seed=None):
        self.batch_size = batch_size
        self.epochs = epochs
        self.learning_rate = learning_rate
        self.dropout_rate = dropout_rate
        self.shuffle = shuffle
        np.random.seed(random_seed)
        g = tf.Graph()
        with g.as_default():
            tf.set_random_seed(random_seed)
            self.build()
            self.init_op = tf.global_variables_initializer()
            self.saver = tf.train.Saver()
        self.sess = tf.Session(graph=g)

    def build(self):
        tf_x = tf.placeholder(tf.float32, shape=[None, 784], name='tf_x')
        tf_y = tf.placeholder(tf.int32, shape=[None], name='tf_y')
        is_train = tf.placeholder(tf.bool, shape=(), name='is_train')
        # reshape tf_x to 2d image
        tf_x_2dimage = tf.reshape(tf_x, shape=[-1, 28, 28, 1])
        # one hot coding on tf_y
        tf_y_onehot = tf.one_hot(indices=tf_y, depth=10, dtype=tf.float32)

        # 1st layer: conv2d layer
        h1 = tf.layers.conv2d(tf_x_2dimage, kernel_size=[5, 5], filters=32,
                              padding='valid', activation=tf.nn.relu)
        # max pooling
        h1_pool = tf.layers.max_pooling2d(h1, pool_size=[2, 2], strides=2)

        # 2nd layer: con2d layer
        h2 = tf.layers.conv2d(h1_pool, kernel_size=[5, 5], filters=64,
                              padding='valid', activation=tf.nn.relu)
        # max pooling
        h2_pool = tf.layers.max_pooling2d(h2, pool_size=[2, 2], strides=2)

        # 3rd layer: dense
        input_shape = h2_pool.get_shape().as_list()
        n_input_units = np.prod(input_shape[1:])
        h2_pool_flat = tf.reshape(h2_pool, shape=[-1, n_input_units])
        h3 = tf.layers.dense(h2_pool_flat, units=1024, activation=tf.nn.relu)
        # dropout
        h3_drop = tf.layers.dropout(h3, training=is_train, rate=self.dropout_rate)

        # 4th layer: dense
        logits = tf.layers.dense(h3_drop, units=10, activation=None)

        # predictions
        prediction_labels = tf.cast(tf.argmax(logits, axis=1), tf.int32, name='prediction_labels')

        # loss function
        cross_entropy_loss = tf.reduce_mean(
            tf.nn.softmax_cross_entropy_with_logits(logits=logits, labels=tf_y_onehot),
            name='cross_entropy_loss')

        # optimizer
        optimizer = tf.train.AdamOptimizer(learning_rate=self.learning_rate)
        optimizer = optimizer.minimize(cross_entropy_loss, name='train_op')

        # accuracy
        correct = tf.equal(prediction_labels, tf_y)
        accuracy = tf.reduce_mean(tf.cast(correct, tf.float32), name='accuracy')

    def train(self, training_set, validation_set=None):
        self.sess.run(self.init_op)
        x_data = np.array(training_set[0])
        y_data = np.array(training_set[1])
        for epoch in range(1, self.epochs + 1):
            batch = batch_generator(x_data, y_data, shuffle=self.shuffle)
            avg_loss = 0.0
            for _, (x, y) in enumerate(batch):
                feed_dict = {'tf_x:0': x, 'tf_y:0': y, 'is_train:0': True}
                loss, _ = self.sess.run(['cross_entropy_loss:0', 'train_op'], feed_dict=feed_dict)
                avg_loss += loss
            avg_loss = avg_loss / self.batch_size
            print("Epoch: %d, average loss: %7.3f" % (epoch, avg_loss), end=', ')
            if validation_set is not None:
                x_validation = np.array(validation_set[0])
                y_validation = np.array(validation_set[1])
                feed_dict = {'tf_x:0': x_validation, 'tf_y:0': y_validation, 'is_train:0': False}
                accuracy = self.sess.run('accuracy:0', feed_dict=feed_dict)
                print('Validation accuracy: %.2f%%' % (100 * accuracy))

    def predict(self, x):
        x = np.array(x)
        feed_dict = {'tf_x:0': x, 'is_train:0': False}
        predict_labels = self.sess.run('prediction_labels:0', feed_dict=feed_dict)
        return predict_labels

    def save(self, epoch, path='./model/'):
        if not os.path.isdir(path):
            os.makedirs(path)
        self.saver.save(self.sess, os.path.join(path, 'model.ckpt'), global_step=epoch)
        print('Model saved in %s' % path)


In [8]:
cnn = ConvNN(random_seed=123)
cnn.train(training_set=(X_train_centered, y_train), 
          validation_set=(X_validation_centered, y_validation))
cnn.save(epoch=20)


Epoch: 1, average loss:  18.032, Validation accuracy: 11.12%
Epoch: 2, average loss:  18.000, Validation accuracy: 10.64%
Epoch: 3, average loss:  17.997, Validation accuracy: 10.64%
Epoch: 4, average loss:  17.998, Validation accuracy: 10.64%
Epoch: 5, average loss:  17.997, Validation accuracy: 10.64%
Epoch: 6, average loss:  17.995, Validation accuracy: 10.64%
Epoch: 7, average loss:  17.996, Validation accuracy: 10.64%
Epoch: 8, average loss:  17.995, Validation accuracy: 10.64%


KeyboardInterrupt: 

In [None]:
predict_labels = cnn.predict(X_test_centered)
accuracy = np.sum(predict_labels == y_test) / len(y_test)
print('Test accuracy: %.2f%%' % (100 * accuracy))