In [3]:
import tensorflow as tf
import numpy as np
import pickle
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from datetime import datetime

In [4]:
### constants

WEIGHT_DECAY = 1e-4
BN_MOMENTUM = 0.9997
EPSILON = 1e-4

### utils

def unpickle(file):
    with open(file, 'rb') as f:
        _dict = pickle.load(f, encoding='bytes')
    return _dict

def get_v(name, shape, initializer=tf.variance_scaling_initializer, decay=None, trainable=True, dtype='float'):
    regularizer = None
    if decay:
        regularizer = tf.contrib.layers.l2_regularizer(decay)
    return tf.get_variable(name, shape=shape, initializer=initializer, regularizer=regularizer, trainable=trainable, dtype=dtype)

def conv(x, out, ksize=3, stride=1):
    weights = get_v('weights', [ksize, ksize, x.get_shape()[-1], out], decay=WEIGHT_DECAY)
    return tf.nn.conv2d(x, weights, [1, stride, stride, 1], padding='SAME')

def fc(x, units, scope):
    with tf.variable_scope(scope):
        weights = get_v('weights', [x.get_shape()[1], units], decay=WEIGHT_DECAY)
        biases = get_v('biases', [units], initializer=tf.zeros_initializer)
        x = tf.nn.xw_plus_b(x, weights, biases)
    return x

def bn(x, training):
    return tf.layers.batch_normalization(x, momentum=BN_MOMENTUM, epsilon=EPSILON, training=training)

def activation(x, typ='relu'):
    if typ == 'relu':
        return tf.nn.relu(x)

def block(x, training, bottleneck=False, downsample=False):
    identity = x
    stride = 2 if downsample else 1
    out = x.get_shape()[-1] * 2 if downsample else x.get_shape()[-1]
    if bottleneck:
        internal = x.get_shape()[-1] / 2 if downsample else x.get_shape()[-1] / 4
        with tf.variable_scope('bottle_1'):
            x = conv(x, internal, ksize=1, stride=stride) 
            x = bn(x, training)
            x = activation(x)
        with tf.variable_scope('bottle_2'):
            x = conv(x, internal)
            x = bn(x, training)
            x = activation(x)
        with tf.variable_scope('bottle_3'):
            x = conv(x, internal * 4, ksize=1)
            x = bn(x, training)
    else:
        with tf.variable_scope('reg_1'):
            x = conv(x, out, stride=stride)
            x = bn(x, training)
            x = activation(x)
        with tf.variable_scope('reg_2'):
            x = conv(x, out)
            x = bn(x, training)
    with tf.variable_scope('identity'):
        if downsample:
            in_channels = identity.get_shape()[-1]
            identity = tf.nn.avg_pool(identity, ksize=[1, stride, stride, 1], strides=[1, stride, stride, 1], padding='SAME')
            channels = int((int(x.get_shape()[-1]) - int(in_channels)) / 2)
            #double_zero = tf.constant([0, 0], tf.int64)
            #chan = tf.pack([channels, channels])
            #padding = tf.pack([double_zero * 3, chan])
            padding = [[0, 0], [0, 0], [0, 0], [channels, channels]]
            identity = tf.pad(identity, padding)
        x = activation(x + identity)
    return x

def group(x, nblock, scope, bottleneck=False, training=True, downsample=True):
    with tf.variable_scope(scope):
        for i in range(nblock):
            if downsample:
                downsample = True if i == 0 else False
            else:
                downsample = False if i > 0 else True
            with tf.variable_scope('block{}'.format(i + 1)):
                x = block(x, training, bottleneck=bottleneck, downsample=downsample)
    return x        
    
def flatten(x):
    a, b, c = x.get_shape()[1:]
    x = tf.reshape(x, [-1, a * b * c])
    return x

def prepare_cifar(directory):
    cifar = []
    for num in range(5):
        cifar.append(unpickle(directory + '\\data_batch_{}'.format(num + 1)))
    test = unpickle(directory + '\\test_batch')
    unshaped = np.stack([cifar[x][b'data'] for x in range(len(cifar))]).reshape(-1, 3, 32, 32)
    data = np.empty((unshaped.shape[0], 32, 32, 3))
    train_labels = np.stack([cifar[x][b'labels'] for x in range(len(cifar))]).reshape(50000)
    test_data_unshaped = test[b'data'].reshape(-1, 3, 32, 32)
    test_data = np.empty((test[b'data'].shape[0], 32, 32, 3))
    for i in range(data.shape[3]):
        data[:, :, :, i] = unshaped[:, i, :, :]
        test_data[:, :, :, i] = test_data_unshaped[:, i, :, :]
    test_labels = np.array(test[b'labels'])
    return (data, train_labels), (test_data, test_labels)

In [56]:
### resnet32 construction
decay = .01 
n = 5
X = tf.placeholder(tf.float32, shape=(None, 32, 32, 3), name='X')
y = tf.placeholder(tf.int64, shape=(None), name='y')
training = tf.placeholder(tf.bool, name='training')
with tf.name_scope('cnn'):
    with tf.variable_scope('conv'):
        out = conv(X, 16)
    out = group(out, n, 'group1', downsample=False)
    out = group(out, n, 'group2')
    out = group(out, n, 'group3')
    out = flatten(out)
    logits = fc(out, 10, 'fc')
with tf.name_scope('loss'):
    xentropy = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=y, logits=logits)
    base_loss = tf.reduce_mean(xentropy)
    reg_losses = tf.get_collection(tf.GraphKeys.REGULARIZATION_LOSSES)
    loss = tf.add_n([base_loss] + reg_losses, name='loss')
learning_rate = 0.1
with tf.name_scope('train'):
    optimizer= tf.train.MomentumOptimizer(learning_rate=learning_rate, momentum=0.9, use_nesterov=True)
    training_op = optimizer.minimize(loss)
with tf.name_scope('eval'):
    correct = tf.nn.in_top_k(logits, y, 1)
    accuracy = tf.reduce_mean(tf.cast(correct, tf.float32))
init = tf.global_variables_initializer()
saver = tf.train.Saver()
n_epochs = 10
batch_size = 100
now = datetime.utcnow().strftime('%Y%m%d%H%M%S')
root_logdir = 'tf_logs'
logdir = '{}/run-{}/'.format(root_logdir, now)
xentropy_summary = tf.summary.scalar('xentropy', loss)
file_writer = tf.summary.FileWriter(logdir, tf.get_default_graph())
(train_data, train_labels), (test_data, test_labels) = prepare_cifar(r'C:\Users\matthew\Documents\ScienceFair2017-2018\PythonCode\NeuralNet\env\cifar-10_test')
with tf.Session() as sess:
    init.run()(
    for epoch in range(n_epochs):
        for iteration in range(50000 // batch_size):
            index = iteration * batch_size
            X_batch = train_data[index:index + batch_size, :, :, :]
            y_batch = train_labels[index:index + batch_size]
            sess.run(training_op, feed_dict={X: X_batch, y: y_batch, training: True})
            if iteration % 10 == 0:
                summary_str = xentropy_summary.eval(feed_dict={X: X_batch, y: y_batch, training: True})
                step = epoch * (50000 // batch_size) + iteration
                file_writer.add_summary(summary_str, step)
            print(iteration, end='', flush=True)
        learning_rate = learning_rate * 1/1+decay*epoch)
        acc_train = accuracy.eval(feed_dict={X: X_batch, y: y_batch, training: False})
        acc_test = accuracy.eval(feed_dict={X: test_data, y: test_labels, training: False})
        print(epoch, 'Train accuracy:', acc_train, 'Test accuracy', acc_test)
    save_path = saver.save(sess, './resnet_test.ckpt')

0123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369

KeyboardInterrupt: 

In [55]:
tf.reset_default_graph()

In [6]:
(testing_data, test_labels), (testing_test_data, testing_test_labels) = prepare_cifar(r'C:\Users\matthew\Documents\ScienceFair2017-2018\PythonCode\NeuralNet\cifar-10-batches-py')

In [8]:
test_labels[0]

6