In [1]:
import h5py
import tensorflow as tf 
import numpy as np

### Prepare MNIST data

In [2]:
def readImages_hdf5(filename):
    '''Reads hdf5 file.
       Parameter
       ---------
       filename : the name of the hdf5 file
    '''
    file = h5py.File( filename + '.h5', "r+") #open the hdf5 file.
    
    hdf5_images = np.array(file["/images"]).astype("uint8") #read the images as np array
    hdf5_labels = np.array(file["/meta"]).astype("uint8")
    
    return hdf5_images, hdf5_labels

In [3]:
def one_hot_encoder(label_arr):
    '''Returns the given MNIST labels from np arrays of integers to np array of one hot labels.
       Parameter
       ---------
       label_arr : np array of MNIST integer labels
    '''
    total_labels  = label_arr.shape[0] #get the total number of labels
    one_hot_label = np.zeros([total_labels, 10]) #10 for num of classes in MNIST
    
    for i in range(label_arr.shape[0]): #loop through all the labels
        
        one_hot_label[i][int(label_arr[i])] = 1.0 #the label value will be marked as 1.0 at that specific index
        
    return one_hot_label #returns the np one-hot label 

In [4]:
filename = './60000'

images, labels = readImages_hdf5(filename)
labels = one_hot_encoder(labels)
images = images.reshape(images.shape[0],28, 28, 1) #reshape into a tensor of rank 4 for CNN filtering

### Graph

In [5]:
tf.reset_default_graph()
sess = tf.InteractiveSession()

class Model():
    
    
    def normalize(self, inputs, batch_mean, batch_var, scale, beta, epsilon=1e-9):
        '''
        From the paper, to normalize the input, we should minus the input by the mean of the whole batch
        and divide it by the square root of its variance. Epsilon is added to avoid division by zero.
        The normalized input will then be transformed linearly using two learnable parameters. Scale and beta.
        '''
        
        input_hat = (inputs - batch_mean) / tf.sqrt(batch_var + epsilon)
        
        normalized = scale*input_hat + beta
        
        return normalized
    
    def batch_norm_wrapper(self, inputs, is_training, decay=0.999, is_conv=False):
        '''
        First we need to initialize the parameters. Scale and beta is used for linear transformation of the 
        normalized input. 
        In order for the model to work during testing time, we need to measure the population mean and variance.
        Think about it, if you provide only 1 data during testing, there would be no mean nor variance.
        Therefore, we use calculate the moving average of the mean and variance of each batch to estimate the
        population variance and mean. Here we used an exponential moving average for easier implementation.
        IMPORTANT: Note that this implementation only works for a relatively large dataset. The larger the 
        dataset, the closer the value of the decay to 1 should be.
        '''
    
        scale = tf.Variable(tf.ones([inputs.get_shape()[-1]]))
        beta = tf.Variable(tf.zeros([inputs.get_shape()[-1]]))
        
        #For population mean and variance.
        #NOTE: Even though the variable name is same and being called more than one time, since the shape is
        #different, tensorflow graph treats them as different separate variables each time the function is
        #called with different inputs.
        pop_mean = tf.Variable(tf.zeros([inputs.get_shape()[-1]]), trainable=False)
        pop_var = tf.Variable(tf.ones([inputs.get_shape()[-1]]), trainable=False)

        #During training, we want to calculate the exponential moving average of the batch mean and batch
        #variance to estimate the population mean and variance. 
        if is_training:

            batch_mean, batch_var = None,None
            #For conv networks, we have to calculate the mean and variance for three axes since its a 4D input.
            if is_conv :
                batch_mean, batch_var = tf.nn.moments(inputs, [0,1,2])
            else:
                batch_mean, batch_var = tf.nn.moments(inputs, [0])

            train_mean = tf.assign(pop_mean,pop_mean*decay + batch_mean * (1 - decay))
            train_var = tf.assign(pop_var, pop_var*decay + batch_var * (1 - decay))
    
            #We need to use this so that train_mean and train_var op will be run first before it returns
            #the normalized value back.Note that the train_mean and train_var variable is not part of the 
            #optimizer's concern.
            with tf.control_dependencies([train_mean, train_var]):
                return self.normalize(inputs, batch_mean, batch_var, scale, beta, 1e-9)

        else:
            #During testing, we use the esitmated population mean and variance instead.
            return self.normalize(inputs, pop_mean, pop_var, scale, beta, 1e-9)
        
    
    def __init__(self, is_training):
    
        self.x = tf.placeholder(tf.float32, [None, 28,28,1])
        self.y = tf.placeholder(tf.float32, [None, 10])

        conv1 = tf.contrib.layers.conv2d(self.x, num_outputs=64, kernel_size=3, stride=1,padding='SAME', activation_fn=None)
    
        #batch normalization before the activation function but after the linear transformation.
        conv1_actv = tf.nn.relu(self.batch_norm_wrapper(conv1, is_training=is_training, is_conv=True))

        conv2 = tf.contrib.layers.conv2d(conv1_actv, num_outputs=64, kernel_size=3, stride=2,padding='SAME', activation_fn=None)
        
        #batch normalization before the activation function but after the linear transformation.
        conv2_actv = tf.nn.relu(self.batch_norm_wrapper(conv2, is_training=is_training, is_conv=True))

        output_size = 14*14*64
        output_layer = tf.reshape(conv2_actv, (-1, output_size))


        W2 = tf.Variable(tf.truncated_normal([output_size, 100], stddev=0.1))
        B2 = tf.Variable(tf.ones([100]))

        fc2 =  tf.add(tf.matmul(output_layer, W2), B2)
        
        #batch normalization before the activation function but after the linear transformation.
        fc2_actv = tf.nn.relu(self.batch_norm_wrapper(fc2, is_training=is_training))

        W3 = tf.Variable(tf.truncated_normal([100, 10], stddev=0.1))
        B3 = tf.Variable(tf.ones([10]))

        self.logits = tf.add(tf.matmul(fc2_actv, W3), B3)

        Y_pred = tf.nn.softmax(self.logits)

        self.loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=self.y, logits=self.logits))
        self.optimizer = tf.train.AdamOptimizer(learning_rate=1e-3).minimize(self.loss)
        correct_prediction = tf.equal(tf.argmax(self.logits, 1), tf.argmax(self.y, 1))
        self.accuracy = tf.reduce_mean(tf.cast(correct_prediction, 'float'))

### Building the graph

In [None]:
model_cnn = Model(is_training=True) #passing in the parameter as True is Important!

sess.run(tf.global_variables_initializer())
saver = tf.train.Saver(tf.global_variables())