In [6]:
import tensorflow as tf
import numpy as np
import scipy.io as sio
import math
import os

In [2]:
def GCN(data_dir, ratio):
    train_data = sio.loadmat(data_dir)
    x_train = train_data['X']
    y_train = train_data['y']
    y_train[y_train == 10] = 0
    x_train = x_train.transpose((3,0,1,2))
    x_train.astype(float)
    x_gray = np.dot(x_train, [[0.2989],[0.5870],[0.1140]])

    imsize = x_gray.shape[0]
    mean = np.mean(x_gray, axis=(1,2), dtype=float)
    std = np.std(x_gray, axis=(1,2), dtype=float, ddof=1)
    std[std < 1e-4] = 1
    x_GCN = np.zeros(x_gray.shape, dtype=float)
    for i in np.arange(imsize):
        x_GCN[i,:,:] = (x_gray[i,:,:] - mean[i]) / std[i]
    nums = x_GCN.shape[0]
    x_GCN = x_GCN.reshape((nums,-1))
    data = np.hstack((y_train,x_GCN))
    np.random.shuffle(data)
    cut=math.floor(nums*ratio)
    train,val = data[:cut,:], data[cut:,:]

    print("\n------- GCN done -------")
    return train, val

In [3]:
def read_SVHN(data_dir, ratio, batch_size):

    train,val = GCN(data_dir, ratio)
    img_width = 32
    img_height = 32
    img_depth = 1
    label_bytes = 1
    image_bytes = 1024
    record_bytes = 1025

    with tf.name_scope('input'):
        images_list = []
        label_batch_list = []
        for train_val in [train, val]:
            q = tf.train.input_producer(train_val)
            input_data = q.dequeue()

            label = tf.slice(input_data , [0], [1])
            label = tf.cast(label, tf.int32)

            image_raw = tf.slice(input_data , [1], [1024])
            image_raw = tf.reshape(image_raw, [1, 32, 32])
            image = tf.transpose(image_raw, (1,2,0))
            image = tf.cast(image, tf.float32)
            images, label_batch = tf.train.batch([image, label],
                                                batch_size = batch_size,
                                                num_threads = 16,
                                                capacity= 2000)

            n_classes = 10
            label_batch = tf.one_hot(label_batch, depth = n_classes)

            label_batch_list.append(tf.reshape(label_batch, [batch_size, n_classes]))
            images_list.append(images)


        return images_list, label_batch_list



In [5]:
if __name__ == "__main__":
    data_dir = './data/SVHN/train_32x32.mat'
    ratio = 0.1
    train, val = GCN(data_dir, ratio)
    print('train has the shape: {0} '.format(train.shape))
    print('val has the shape: {0} '.format(val.shape))
    print('Total has {0} records'.format(train.shape[0]+val.shape[0]))


------- GCN done -------
train has the shape: (7325, 1025) 
val has the shape: (65932, 1025) 
Total has 73257 records


In [7]:
def conv(layer_name, x, out_channels, kernel_size=[5,5], stride=[1,1,1,1]):
    '''Convolution op wrapper, use RELU activation after convolution
    Args:
        layer_name: e.g. conv1, pool1...
        x: input tensor, [batch_size, height, width, channels]
        out_channels: number of output channels (or comvolutional kernels)
        kernel_size: the size of convolutional kernel, VGG paper used: [3,3]
        stride: A list of ints. 1-D of length 4. VGG paper used: [1, 1, 1, 1]
        is_pretrain: if load pretrained parameters, freeze all conv layers.
        Depending on different situations, you can just set part of conv layers to be freezed.
        the parameters of freezed layers will not change when training.
    Returns:
        4D tensor
    '''

    in_channels = x.get_shape()[-1]
    with tf.variable_scope(layer_name):
        w = tf.get_variable(name='weights',
                            shape=[kernel_size[0], kernel_size[1], in_channels, out_channels],
                            initializer=tf.contrib.layers.xavier_initializer()) # default is uniform distribution initialization
        b = tf.get_variable(name='biases',
                            shape=[out_channels],
                            initializer=tf.constant_initializer(0.0))
        x = tf.nn.conv2d(x, w, stride, padding='SAME', name='conv')
        x = tf.nn.bias_add(x, b, name='bias_add')
        x = tf.nn.relu(x, name='relu')
        return x

In [8]:
def pool(layer_name, x, kernel=[1,2,2,1], stride=[1,2,2,1]):
    '''Pooling op
    Args:
        x: input tensor
        kernel: pooling kernel, VGG paper used [1,2,2,1], the size of kernel is 2X2
        stride: stride size, VGG paper used [1,2,2,1]
        padding:'SAME'
    '''
    x = tf.nn.max_pool(x, kernel, strides=stride, padding='SAME', name=layer_name)
    return x

In [9]:
def FC_layer(layer_name, x, out_nodes):
    '''Wrapper for fully connected layers with RELU activation as default
    Args:
        layer_name: e.g. 'FC1', 'FC2'
        x: input feature map
        out_nodes: number of neurons for current FC layer
    '''
    shape = x.get_shape()
    size = shape[1].value * shape[2].value * shape[3].value

    with tf.variable_scope(layer_name):
        w = tf.get_variable('weights',
                            shape=[size, out_nodes],
                            initializer=tf.contrib.layers.xavier_initializer())
        b = tf.get_variable('biases',
                            shape=[out_nodes],
                            initializer=tf.constant_initializer(0.0))
        flat_x = tf.reshape(x, [-1, size]) # flatten into 1D

        x = tf.nn.bias_add(tf.matmul(flat_x, w), b)
        x = tf.nn.relu(x)
        return x

In [10]:
def final_layer(layer_name, x, out_nodes):
    shape = x.get_shape()
    size = shape[-1].value
    with tf.variable_scope(layer_name):
        w = tf.get_variable('weights',
                            shape=[size, out_nodes],
                            initializer=tf.contrib.layers.xavier_initializer())
        b = tf.get_variable('biases',
                            shape=[out_nodes],
                            initializer=tf.constant_initializer(0.0))
        flat_x = tf.reshape(x, [-1, size]) # flatten into 1D
        x = tf.nn.bias_add(tf.matmul(flat_x, w), b)
        return x

In [11]:
def drop_out(layer_name, x, keep_prob = 0.5):
    with tf.variable_scope(layer_name):
        x = tf.nn.dropout(x, keep_prob)
        return x

In [12]:
def loss(logits, labels):
    with tf.variable_scope('loss') as scope:
        labels = tf.cast(labels, tf.int64)
        cross_entropy = tf.nn.softmax_cross_entropy_with_logits\
                        (logits=logits, labels=labels, name='loss')
        loss = tf.reduce_mean(cross_entropy, name='loss')
        tf.summary.scalar(scope.name+'/loss', loss)
    return loss

In [13]:
def accuracy(logits, labels):
  """Evaluate the quality of the logits at predicting the label.
  Args:
    logits: Logits tensor, float - [batch_size, NUM_CLASSES].
    labels: Labels tensor,
  """
  with tf.name_scope('accuracy') as scope:
      correct = tf.equal(tf.argmax(logits, 1), tf.argmax(labels, 1))
      correct = tf.cast(correct, tf.float32)
      accuracy = tf.reduce_mean(correct)*100.0
      tf.summary.scalar(scope+'/accuracy', accuracy)
  return accuracy

In [14]:
def optimize(loss, learning_rate, global_step):
    '''optimization, use Gradient Descent as default
    '''
    with tf.name_scope('optimizer'):
        # optimizer = tf.train.GradientDescentOptimizer(learning_rate=learning_rate)
        optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)
        train_op = optimizer.minimize(loss, global_step=global_step)
        return train_op