In [20]:
import numpy as np
import tensorflow as tf
import os

### Implementing a CNN in the TensorFlow low-level API

In [2]:
from tensorflow.examples.tutorials import mnist

In [3]:
data = mnist.input_data.read_data_sets(train_dir='MNIST_data/', 
                                       one_hot=True, reshape=False, validation_size=10000)

Extracting MNIST_data/train-images-idx3-ubyte.gz
Extracting MNIST_data/train-labels-idx1-ubyte.gz
Extracting MNIST_data/t10k-images-idx3-ubyte.gz
Extracting MNIST_data/t10k-labels-idx1-ubyte.gz


In [4]:
data.train.num_examples, data.test.num_examples, data.validation.num_examples

(50000, 10000, 10000)

In [5]:
data.train.images.shape, data.train.labels.shape

((50000, 28, 28, 1), (50000, 10))

In [32]:
def conv_layer(input_tensor, kernel_shape, n_out_channels, scope_name, 
               padding_mode='SAME', strides=(1, 1, 1, 1)):
    
    with tf.variable_scope(scope_name):
        m, n1, n2, nc = input_tensor.get_shape().as_list()
        p1, p2 = kernel_shape
        
        kernel = tf.get_variable('kernel', shape=(p1, p2, nc, n_out_channels))
        bias = tf.get_variable('bias', shape=(n_out_channels), initializer=tf.zeros_initializer)
        
        output = tf.nn.conv2d(input_tensor, kernel, padding=padding_mode, strides=strides)
        output = tf.nn.bias_add(output, bias, name='net_input')
        output = tf.nn.relu(output, name='activation')
        return output
    
def fc_layer(input_tensor, n_output, scope_name, activation_func=None):
    with tf.variable_scope(scope_name):
        input_shape = input_tensor.get_shape().as_list()[1:]
        if len(input_shape) > 1:
            # flatten to vector
            n_input = np.prod(input_shape)
            input_tensor = tf.reshape(input_tensor, shape=(-1, n_input))
        else:
            n_input = input_shape[0]
            
        # create W and b
        weight = tf.get_variable('weight', shape=(n_input, n_output))
        bias = tf.get_variable('bias', shape=(n_output), initializer=tf.zeros_initializer)
        
        # operation
        output = tf.nn.bias_add(tf.matmul(input_tensor, weight), bias, name='net_input')
        if activation_func is None:
            return output
        return activation_func(output, name='activation')
    
    
def build_cnn():
    
    # create placeholder for X, Y
    tf_X = tf.placeholder(dtype=tf.float32, shape=(None, 28, 28, 1), name='tf_X')
    tf_Y = tf.placeholder(dtype=tf.float32, shape=(None, 10), name='tf_Y')
    
    # conv_1
    h1 = conv_layer(tf_X, kernel_shape=(5, 5), n_out_channels=32, scope_name='conv1', 
                       padding_mode='SAME', strides=(1,1,1,1))
    # max_pool_1
    h1_maxpool = tf.nn.max_pool(h1, ksize=(1, 2, 2, 1), 
                                strides=(1, 2, 2, 1), padding='VALID')
    # conv_2
    h2 = conv_layer(h1_maxpool, kernel_shape=(5, 5), n_out_channels=64, scope_name='conv2',
                   padding_mode='SAME', strides=(1, 1, 1, 1))
    # max_pool_2
    h2_maxpool = tf.nn.max_pool(h2, ksize=(1, 2, 2, 1), 
                                strides=(1, 2, 2, 1), padding='VALID')
    # fc_1
    h3 = fc_layer(h2_maxpool, n_output=1024, 
                  scope_name='fc1', activation_func=tf.nn.relu)
    # dropout
    dropout = tf.placeholder(dtype=tf.float32, shape=(), name='keep_prob')
    h3_dropout = tf.nn.dropout(h3, keep_prob=dropout, name='dropout_layer')
    # fc_2
    logits = fc_layer(h3_dropout, n_output=10, 
                      scope_name='fc2', activation_func=None)
    
    # predictions: {probabilities, labels}
    predictions = {
        'probabilities': tf.nn.softmax(logits, name='probabilities'),
        'labels': tf.argmax(logits, axis=1, name='labels')
    }
    # loss function
    loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=tf_Y, logits=logits), 
                          name='cross_entropy_loss')
    # optimizer
    optimizer = tf.train.AdamOptimizer(learning_rate).minimize(loss, name='train_op')
    
    # metric: accuracy
    correct_pred = tf.equal(tf.argmax(tf_Y, axis=1), predictions['labels'])
    accuracy = tf.reduce_mean(tf.cast(correct_pred, dtype=tf.float32), 
                              name='accuracy')
    
def save(saver, sess, epoch, path='./model/'):
    if not os.path.exists(path):
        os.makedirs(path)
    print('Saving model in', path)
    saver.save(sess, save_path=os.path.join(path, 'cnn-model.ckpt'), 
               global_step=epoch)

def load(saver, sess, path, epoch):
    print("Loading model from", path)
    saver.restore(sess, path=os.path.join(path, 
                                          'cnn-model.ckpt-{}'.format(epoch)))

def train(sess, training_set, validation_set=None,
          initialize=True, epochs=20, shuffle=True,
          dropout=.5, random_seed=None):
    n_batch = training_set.num_examples // 64 + 1
    
    if initialize:
        sess.run(tf.global_variables_initializer())
    
    training_loss = []
    for i in range(epochs):
        avg_loss = 0.0
        for j in range(n_batch):
            x_batch, y_batch = training_set.next_batch(batch_size=64)
            feed = {'tf_X:0':x_batch, 
                    'tf_Y:0':y_batch, 
                    'keep_prob:0':dropout}
            loss, _ = sess.run(['cross_entropy_loss:0', 'train_op'], feed_dict=feed)
            avg_loss += loss
        training_loss.append(avg_loss / (j+1))
        print("Epoch {}: Training Loss {:.4f}".format(i+1, training_loss[-1]), end=' ')
        
        if validation_set is not None:
            feed = {'tf_X:0': validation_set.images,
                    'tf_Y:0': validation_set.labels,
                    'keep_prob:0': 1.0}
            loss = sess.run('cross_entropy_loss:0', feed_dict=feed)
            print("Validation Loss {:.4f}".format(loss))
        else:
            print()
        
def predict(sess, X_test, return_proba=False):
    feed = {'tf_X:0': X_test.images,
            'tf_Y:0': X_test.labels,
            'keep_prob:0': 1.0}
    if return_proba:
        return sess.run('probabilities', feed_dict=feed)
    return sess.run('labels', feed_dict=feed)



In [33]:
learning_rate = 1e-4
random_seed = 123
batch_size = 64

g = tf.Graph()
with g.as_default():
    tf.set_random_seed(random_seed)
    build_cnn()
    
    #file_writer = tf.summary.FileWriter(logdir='./logs/cnn', graph=g)
    
    saver = tf.train.Saver()

In [34]:
with tf.Session(graph=g) as sess:
    train(sess, data.train, validation_set=data.validation, 
          initialize=True, epochs=20)
    
    save(saver, sess, epoch=20)

Epoch 1: Training Loss 0.3705 Validation Loss 0.1171
Epoch 2: Training Loss 0.1000 Validation Loss 0.0726
Epoch 3: Training Loss 0.0696 Validation Loss 0.0571
Epoch 4: Training Loss 0.0551 Validation Loss 0.0516
Epoch 5: Training Loss 0.0457 Validation Loss 0.0440
Epoch 6: Training Loss 0.0375 Validation Loss 0.0396
Epoch 7: Training Loss 0.0343 Validation Loss 0.0443
Epoch 8: Training Loss 0.0290 Validation Loss 0.0379
Epoch 9: Training Loss 0.0255 Validation Loss 0.0394
Epoch 10: Training Loss 0.0239 Validation Loss 0.0353
Epoch 11: Training Loss 0.0199 Validation Loss 0.0331
Epoch 12: Training Loss 0.0179 Validation Loss 0.0346
Epoch 13: Training Loss 0.0164 Validation Loss 0.0392
Epoch 14: Training Loss 0.0142 Validation Loss 0.0347
Epoch 15: Training Loss 0.0133 Validation Loss 0.0330
Epoch 16: Training Loss 0.0113 Validation Loss 0.0321
Epoch 17: Training Loss 0.0102 Validation Loss 0.0364
Epoch 18: Training Loss 0.0108 Validation Loss 0.0365
Epoch 19: Training Loss 0.0088 Valida