In [None]:
print("""Tutorial on how to create a simple residual network with Tensorflow.
forked from Parag K. Mital, Jan. 2016""")

In [None]:
import tensorflow as tf
import tensorflow.examples.tutorials.mnist.input_data as input_data
from collections import namedtuple
import math

In [None]:
def conv2d(x, n_filters, f_h=5, f_w=5,
           stride_h=2, stride_w=2,
           stddev=0.02,
           activation=None,
           bias=True,
           padding='SAME',
           name="Conv2D"):
    """2D Convolution with options for kernel size, stride, and init deviation.
    Parameters
    ----------
    x : Tensor
        Input tensor to convolve.
    n_filters : int
        Number of filters to apply.
    f_h : int, optional
        Filter height.
    f_w : int, optional
        Filter  width.
    stride_h : int, optional
        Stride in rows.
    stride_w : int, optional
        Stride in cols.
    stddev : float, optional
        Initialization's standard deviation.
    activation : arguments, optional
        Function which applies a nonlinearity
    padding : str, optional
        'SAME' or 'VALID'
    name : str, optional
        Variable scope to use.
    Returns
    -------
    x : Tensor
        Convolved input.
    """
    with tf.variable_scope(name):
        w = tf.get_variable('w', [f_h, f_w, x.get_shape()[-1], n_filters], initializer=tf.truncated_normal_initializer(stddev=stddev))
        conv = tf.nn.conv2d(x, w, strides=[1, stride_h, stride_w, 1], padding=padding)
        if bias:
            b = tf.get_variable('b', [n_filters], initializer=tf.truncated_normal_initializer(stddev=stddev))
            conv = tf.nn.bias_add(conv, b)
        if activation:
            conv = activation(conv)
        return conv

In [None]:
def linear(x, n_units, scope=None, stddev=0.02, activation=lambda x: x):
    """Fully-connected network.
    Parameters
    ----------
    x : Tensor
        Input tensor to the network.
    n_units : int
        Number of units to connect to.
    scope : str, optional
        Variable scope to use.
    stddev : float, optional
        Initialization's standard deviation.
    activation : arguments, optional
        Function which applies a nonlinearity
    Returns
    -------
    x : Tensor
        Fully-connected output.
    """
    shape = x.get_shape().as_list()

    with tf.variable_scope(scope or "Linear"):
        matrix = tf.get_variable("Matrix", [shape[-1], n_units], tf.float32, tf.random_normal_initializer(stddev=stddev))
        return activation(tf.matmul(x, matrix))

In [None]:
def residual_network(x, n_outputs, activation=tf.nn.relu):
    """Builds a residual network.

    Parameters
    ----------
    x : Tensor
        Input to the network
    n_outputs : int
        Number of outputs of final softmax
    activation : arguments, optional
        Nonlinearity function to apply after each convolution

    Returns
    -------
    net : Tensor
        Description

    Raises
    ------
    ValueError
        If a 2D Tensor is input, the Tensor must be square or else
        the network can't be converted to a 4D Tensor.
    """

    LayerBlock = namedtuple('LayerBlock', ['num_repeats', 'num_filters', 'bottleneck_size'])
    blocks = [LayerBlock(3, 128, 32),
              LayerBlock(3, 256, 64),
              LayerBlock(3, 512, 128),
              LayerBlock(3, 1024, 256)]

    input_shape = x.get_shape().as_list()
    if len(input_shape) == 2:
        ndim = int(math.sqrt(input_shape[1]))
        if ndim * ndim != input_shape[1]:
            raise ValueError('input_shape should be square')
        x = tf.reshape(x, [-1, ndim, ndim, 1]) # {(n, H, W, 1) | H == W} else (n, H, W, C)

    # First convolution expands to 64 channels and downsamples
    net = conv2d(x, 64, f_h=7, f_w=7, name='conv1', activation=activation) # (n, H/2, W/2, 64)

    # Max pool and downsampling
    net = tf.nn.max_pool(net, ksize=[1, 3, 3, 1], strides=[1, 2, 2, 1], padding='SAME') # (n, H/4, W/4, 64)

    # Setup first chain of resnets
    net = conv2d(net, blocks[0].num_filters, f_h=1, f_w=1, stride_h=1, stride_w=1, 
                 padding='VALID', name='conv2') # (n, H/4, W/4, 128)

    # Loop through all res blocks
    for block_i, block in enumerate(blocks):
        for repeat_i in range(block.num_repeats):
            
            name = 'block_%d/repeat_%d' % (block_i, repeat_i)
            # net: (n, H/4, W/4, 128) (n, H/4, W/4, 256) (n, H/4, W/4, 512) (n, H/4, W/4, 1024)
            
            conv = conv2d(net, block.bottleneck_size, f_h=1, f_w=1, stride_h=1, stride_w=1,
                          activation=activation, padding='VALID', name=name + '/conv_in')
            # (n, H/4, W/4, 32) (n, H/4, W/4, 64) (n, H/4, W/4, 128) (n, H/4, W/4, 256)
            
            conv = conv2d(conv, block.bottleneck_size, f_h=3, f_w=3, stride_h=1, stride_w=1,
                          activation=activation, padding='SAME',name=name + '/conv_bottleneck')
            # (n, H/4, W/4, 32) (n, H/4, W/4, 64) (n, H/4, W/4, 128) (n, H/4, W/4, 256)
            
            conv = conv2d(conv, block.num_filters, f_h=1, f_w=1, stride_h=1, stride_w=1,
                          activation=activation, padding='VALID', name=name + '/conv_out')
            # (n, H/4, W/4, 128) (n, H/4, W/4, 256) (n, H/4, W/4, 512) (n, H/4, W/4, 1024)
            
            net = conv + net
        try:
            # upscale to the next block size
            next_block = blocks[block_i + 1]
            net = conv2d(net, next_block.num_filters, f_h=1, f_w=1, stride_h=1, stride_w=1, 
                         bias=False, padding='SAME', name='block_%d/conv_upscale' % block_i)
            # (n, H/4, W/4, 256) (n, H/4, W/4, 512) (n, H/4, W/4, 1024)
        except IndexError:
            pass

    
    net_shape = net.get_shape().as_list() # (n, H/4, W/4, 1024)    
    net = tf.nn.avg_pool(net, ksize=[1, net_shape[1], net_shape[2], 1],
                         strides=[1, 1, 1, 1], padding='VALID')
    
    net_shape = net.get_shape().as_list() # (n, 1, 1, 1024)
    
    net = tf.reshape(net, [-1, net_shape[1] * net_shape[2] * net_shape[3]]) # (n, 1024)
    
    net = linear(net, n_outputs, activation=tf.nn.softmax) # (n, n_outputs)

    return net

In [None]:
def test_mnist():
    """Test the resnet on MNIST."""

    mnist = input_data.read_data_sets('MNIST_data/', one_hot=True)
    x = tf.placeholder(tf.float32, [None, 784])
    y = tf.placeholder(tf.float32, [None, 10])
    y_pred = residual_network(x, 10)

    # Define loss/eval/training functions
    cross_entropy = -tf.reduce_sum(y * tf.log(y_pred))
    optimizer = tf.train.AdamOptimizer().minimize(cross_entropy)

    # Monitor accuracy
    correct_prediction = tf.equal(tf.argmax(y_pred, 1), tf.argmax(y, 1))
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, 'float'))

    # We now create a new session to actually perform the initialization the
    # variables:
    sess = tf.Session()
    sess.run(tf.global_variables_initializer())

    # We'll train in minibatches and report accuracy:
    batch_size = 50
    n_epochs = 5
    for epoch_i in range(n_epochs):
        # Training
        train_accuracy = 0
        for batch_i in range(mnist.train.num_examples // batch_size):
            batch_xs, batch_ys = mnist.train.next_batch(batch_size)
            train_accuracy += sess.run([optimizer, accuracy], feed_dict={x: batch_xs, y: batch_ys})[1]
            print(train_accuracy)
        train_accuracy /= (mnist.train.num_examples // batch_size)

        # Validation
        valid_accuracy = 0
        for batch_i in range(mnist.validation.num_examples // batch_size):
            batch_xs, batch_ys = mnist.validation.next_batch(batch_size)
            valid_accuracy += sess.run(accuracy, feed_dict={x: batch_xs, y: batch_ys})
        valid_accuracy /= (mnist.validation.num_examples // batch_size)
        print('epoch:', epoch_i, ', train:', train_accuracy, ', valid:', valid_accuracy)

In [None]:
test_mnist()