## Data loading

First we need to load the MNIST dataset from disk. We will do 10-class classification for digit 0, 1, .., 9 from the MNIST dataset here.

In [1]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm
import edf
import mnist_loader
import copy

train_images, train_labels = mnist_loader.load_mnist(section = 'training', path = 'MNIST')
test_images, test_labels = mnist_loader.load_mnist(section = 'testing', path = 'MNIST')

# quickly check the shape of data
print('Train data shape: ', train_images.shape)
print('Train labels shape: ', train_labels.shape)
print('Test data shape: ', test_images.shape)
print('Test labels shape: ', test_labels.shape)


Train data shape:  (60000, 28, 28)
Train labels shape:  (60000,)
Test data shape:  (10000, 28, 28)
Test labels shape:  (10000,)


## A multi-layer perceptron (MLP) network with dropout

We produce the multi-layer perceptron (MLP) class. In this assignment, we implement dropout by adding `dropout` layers between hidden layers. The framework of network is defined below so your task is to implement functional part of `dropout` computational nodes. To help your understanding, I recommend you to draw the computational graph of a 2-layer MLP network including loss nodes.

In [2]:
# Params
class MLPParams:

    nInputs = None
    nHiddens = None
    nOutputs = None
    nLayers = None
    ActivationClass = None
    enableReg = None
    RegClass = None
    alpha = None
    dropOutProb = None
    
# MLP
class MLP:

    def __init__(self, params: MLPParams ):
        self.params = params

    def construct(self):

        params = self.params

        nInputs = int(params.nInputs)
        nHiddens = int(params.nHiddens)
        nOutputs = int(params.nOutputs)
        nLayers = int(params.nLayers)
        ActivationClass = params.ActivationClass
        RegClass = params.RegClass
        enableReg = params.enableReg
        alpha = params.alpha
        dropOutProb = params.dropOutProb

        if RegClass is None:
            enableReg = False
        
        # clean global parameters
        edf.clear_compgraph()

        # input
        x_node = edf.Input()
        y_node = edf.Input()
        loss_nodes = []
        weight_params = []
        
        # 1st layer
        param_first = edf.AffineParams(nInputs, nHiddens)
        weight_params.append( param_first )
        h = DropOut ( ActivationClass (edf.Affine( param_first, x_node )), dropOutProb)

        # reg
        if enableReg:
            reg_loss_node = self.RegClass(param_first.A, alpha)
            loss_nodes.append(reg_loss_node)
            reg_loss_node = self.RegClass(param_first.b, alpha)
            loss_nodes.append(reg_loss_node)

        for i in range(nLayers - 1):

            # hidden layer
            param = edf.AffineParams(nHiddens, nHiddens)
            weight_params.append(param)
            h = DropOut ( ActivationClass ( edf.Affine(param, h) ), dropOutProb)

            if enableReg:
                reg_loss_node = self.RegClass(param.A, alpha)
                loss_nodes.append(reg_loss_node)
                reg_loss_node = self.RegClass(param.b, alpha)
                loss_nodes.append(reg_loss_node)

        # the last layer
        param_last = edf.AffineParams(nHiddens, nOutputs)
        weight_params.append( param_last )
        #prob_node = DropOut ( edf.Softmax (ActivationClass ( edf.Affine(param, h) )), dropOutProb)
        prob_node = edf.Softmax(edf.Affine(param_last, h))

        if enableReg:
            reg_loss_node = self.RegClass(param_last.A, alpha)
            loss_nodes.append(reg_loss_node)
            reg_loss_node = self.RegClass(param_last.b, alpha)
            loss_nodes.append(reg_loss_node)
        
        # loss
        crossloss_node = edf.CrossEntropyLoss(prob_node, y_node)
        sum_node = AverageMiniBatch(crossloss_node)
        loss_nodes.append(sum_node)

        # probability
        self.x_node = x_node
        self.y_node = y_node
        self.loss_node = Add(loss_nodes)
        self.prob_node = prob_node
        self.output_node = prob_node
        self.weight_params = weight_params

    def forward(self, X,y = None):
        
        if y is None:
            y = np.ones_like ( X[:,0],dtype =np.int32)

        # parse input
        self.x_node.value = X
        self.y_node.value = y

        # run
        edf.Forward()

        # get output
        output = np.round(self.output_node.value)

        return output

    def load (self):
        print("TBA")

    def record (self):
        self.rep = copy.deepcopy(self)


Here, we provide a `ReLU` activation node and computational nodes for our MLP network. 

In [3]:
# Add
class Add(edf.CompNode):

    def __init__(self, x_list):
        edf.CompNodes.append(self)
        self.x_list = x_list

    def forward(self):

        value = np.zeros_like(self.x_list[0].value, self.x_list[0].value.dtype)

        for x in self.x_list:
            value += x.value

        self.value = value

    def backward(self):

        for x in self.x_list:
            x.addgrad(np.ones_like(x.value, x.value.dtype)* self.grad[0]) 

# Average loss over a minibach
class AverageMiniBatch(edf.CompNode):

    def __init__(self, x):
        edf.CompNodes.append(self)
        self.x = x

    def forward(self):
        value = np.average(self.x.value)
        self.value = np.resize(value, (1, 1))

    def backward(self):
        self.x.addgrad(np.ones_like(self.x.value, self.x.value.dtype)/self.x.value.size* self.grad[0]) 

# ReLU
class ReLU(edf.CompNode):
    def __init__(self, x):
        edf.CompNodes.append(self)
        self.x = x

    def forward(self):
        self.value = np.maximum(0, self.x.value)
        # pass

    def backward(self):
        # pass
        v = np.ones_like(self.x.value)
        v[self.x.value <= 0.0] = 0.0
        self.x.addgrad(self.grad * v)

## Dropout

Implement backward and forward parts of a `Dropout` node. In the forward function, we first generate the random mask according to the dropout probability. Then, we filter out the input signal, using the mask. In the backward function, we do backpropagation only on the valid part. Note that dropout should be performed during training. To this end, we use `edf.enable_dropout` to specify whether dropout should be performed or not. If `edf.enable_dropout` is false, this node give input as output without any operation.

In [None]:
# Dropout
class DropOut ( edf.CompNode ):

    def __init__(self, x, dropout_prob = 0.5):
        edf.CompNodes.append(self)
        self.x = x
        self.dropout_prob = dropout_prob

    def forward(self):

        mask = np.ones_like(self.x)
        value = self.x.value
        if edf.enable_dropout:

            #########################################################################
            # TO-DO: implement the forward part of a dropout computation node.
            # when dropout is activated,
            # you first generate the random mask which has the same size with input x.
            # then you filter out signal x according to the generated mask.
            #########################################################################


        self.mask = np.array( mask , dtype = edf.DT)
        self.value = value
        
        
        
        
    def backward(self):

        #########################################################################
        # TO-DO: implement the backward part of a dropout computation node.
        # You compute the gradient while considering the mask we used for forward.
        # then you filter out signal x according to the generated mask.
        #########################################################################
        
        

We provide functions for training. By passing `optimizer` and `transformer` objects into `train_and_test` function, we can train the network with various optimization methods and data augmentation. About `transformer`, we will talk later. Here we provide the basic optimizer, which update the parameters based on pure gradients and the learning rate. Note that the learning rate is multiplied with the gradient and the multiplication is added to the parameter in `p.UpdateParameters()`. Please see the `edf.py` for details.

In [None]:
# basic gradient descent method
class GDOptimizer:

    def __init__(self):
        print("")

    def initialize (self):
        print("")
           
    def updateParameters (self):

        for p in edf.Parameters:
            p.UpdateParameters()


In [None]:
def run_epoch(batch_size, data, labels, x_node, y_node, prob_node, loss_node = None, optimizer=GDOptimizer(), transformer = None):

    num_samples = len(data)
    total_err = 0.0
    num_batches = num_samples//batch_size


    for i in range(num_batches):
        start, end = i*batch_size, (i+1)*batch_size

        # load supervised training data
        if transformer is None:
            x_node.value = data[start:end].reshape(end - start, -1)
        else:
            x_node.value = transformer.transform ( data[start:end] ).reshape(end - start, -1)
  
        y_node.value = labels[start:end]

        #  forward
        edf.Forward()

        # compute error
        total_err += np.sum(np.not_equal(np.argmax(prob_node.value, axis=1), y_node.value))


        # step
        if loss_node:

            # backpropagation
            edf.Backward(loss_node)
            optimizer.updateParameters()

        if i>0 and i%400 == 0:
            print ("\t Batch {}/{}".format(i, num_batches))

    return 100*total_err/num_samples

def train_and_test(num_epochs, batch_size, x_node, y_node, prob_node, loss_node, optimizer=GDOptimizer(), transformer=None, enable_dropout = False):

    train_err_log = []
    test_err_log = []
    
    # initialize optimizer
    optimizer.initialize()
    
    for epoch in range(num_epochs):

        print("Epoch: {}/{} (learning rate: {})".format(epoch+1, num_epochs, edf.learning_rate))

        edf.enable_dropout = enable_dropout
        train_err = run_epoch(batch_size, train_images, train_labels, x_node, y_node, prob_node, loss_node, optimizer, transformer)
        train_err_log.append(train_err)
        print ("\t Training Error {:.2f} %".format(train_err))

        edf.enable_dropout = False
        test_err = run_epoch(len(test_images), test_images, test_labels, x_node, y_node, prob_node, None, optimizer, None)
        test_err_log.append(test_err)
        print ("\t Test Error {:.2f} %".format(test_err))
        
        # print out the error late per label.
        for i in range(10):
            err_n_per_i = np.sum(np.not_equal(np.argmax(prob_node.value[y_node.value==i], axis=1), y_node.value[y_node.value==i]))
            n_per_i = np.sum(y_node.value==i)
            print( i," : ", err_n_per_i,'/', n_per_i,'  ', '%.2f' % (err_n_per_i / n_per_i))
        

    return train_err_log, test_err_log

def plot(train_err_log, test_err_log):

    plt.xlabel("epochs")
    plt.ylabel("error (%)")
    plt.plot(np.arange(len(test_err_log)), test_err_log, color='red')
    plt.plot(np.arange(len(train_err_log)), train_err_log, color='blue')
    plt.legend(['test error', 'train error'], loc='upper right')
    plt.show()
    plt.clf()

Construct a MLP network and train the network without data augmentation. You can activate norm regularization by adding regularization class in this jupyter notebook and changing parameter settings. After constructing the MLP network, train the network and show the error graph.

In [None]:
param = MLPParams()
param.nInputs = 784 # 784-dimension
param.nOutputs = 10 # Output dimension
param.nLayers = 3
param.nHiddens = 64 # Number of neurons in the hidden layer
param.ActivationClass = ReLU
param.RegClass = None
param.alpha = 0.001
param.dropOutProb = 0.5
mlp_ReLU = MLP(param)

Train the network with dropout.

In [None]:
mlp_ReLU.construct()


#Train
num_epochs = 20
batch_size = 64
edf.learning_rate = 0.1

###############################################################
# TO-DO: call the train_and_test function activating dropout mode.
# Hint : train_err_log, test_err_log = train_and_test(num_epochs, batch_size, mlp_ReLU.x_node, mlp_ReLU.y_node, mlp_ReLU.prob_node, mlp_ReLU.loss_node, ? )
###############################################################


plot(train_err_log, test_err_log)

Train the network without dropout.

In [None]:
mlp_ReLU.construct()

#Train
num_epochs = 20
batch_size = 64
edf.learning_rate = 0.1

###############################################################
# TO-DO: call the train_and_test function activating dropout mode.
# Hint : train_err_log, test_err_log = train_and_test(num_epochs, batch_size, mlp_ReLU.x_node, mlp_ReLU.y_node, mlp_ReLU.prob_node, mlp_ReLU.loss_node, ? )
###############################################################


plot(train_err_log, test_err_log)

## Data augmentation: vertical and horizontal flip

In this assignment, instead of increasing the variety of valid data pools, we pollute the training data set by flipping images. Let us check how the accuracy changes. To augment the dataset, we now implement the augmentation  class. We transform the input data before the network is trained. Your task is to fill in `RandomFlipAug` class. You can refer to other augmentation class to understand how the augmentation class works.

In [None]:
# Data augmentation

# Gaussian noise
class RandomNoiseAug:

    amplitude_max = 10

    def __init__(self, amplitude_max):
        self.amplitude_max = amplitude_max

    def transform (self, x):

        image_n = x.shape[0]
        x_new = copy.deepcopy(x)

        #
        noise_max = np.random.rand ( image_n, 1, 1 ) * self.amplitude_max
        gaussian_noise = np.random.randn ( x.shape[0], x.shape[1],x.shape[2] )
        gaussian_noise = noise_max * gaussian_noise

        x_new = gaussian_noise + x_new
        x_new [x_new<0] = 0
        x_new [x_new>1] = 1

        return x_new

# Translation
class RandomTranslationAug:

    width = None

    def __init__(self, width):
        self.width = width

    def transform ( self, x ):

        image_n = x.shape[0]
        x_new = copy.deepcopy(x)

        trans_cols = np.array(np.round(np.random.rand ( image_n ) * self.width - 0.4999), dtype = np.int32 )
        trans_rows = np.array(np.round(np.random.rand ( image_n ) * self.width - 0.4999), dtype = np.int32 )

        trans_cols -= int(self.width/2)
        trans_rows -= int(self.width/2)


        # translate for each imgae
        for i in range(0,image_n):

            # wrapping
            x_temp = np.tile(x[i,:,:], [3, 3])

            trans_col = int( trans_cols[i])
            trans_row = int(trans_rows[i])

            x_new[i,:,:]=x_temp[ x.shape[1]+trans_row:x.shape[1]*2+trans_row, x.shape[2] + trans_col : x.shape[2] * 2 + trans_col ]

        return x_new

# Flip
class RandomFlipAug:

    flip_prob_vertical = 0.5
    flip_prob_horizontal = 0.5

    def __init__(self, prob_vertical, prob_horizontal ):
        self.flip_prob_vertical = prob_vertical
        self.flip_prob_horizontal = prob_horizontal

    def transform(self, x):

        image_n = x.shape[0]
        x_new = copy.deepcopy(x)
       
        ##############################################################
        # TO-DO: first generate random samples to decide flipping or not.
        # If the random samples are below than flip_prob_vertical we do vertical flip. 
        # If the random samples are below than flip_prob_horizontal we do horizontal flip.
        # You should generate two random sample per each image.   
        ##############################################################

        return x_new

Check the performance after we flip the training dataset horizontally. How the performance is changed depending on the label?

In [None]:
param = MLPParams()
param.nInputs = 784 # 784-dimension
param.nOutputs = 10 # Output dimension
param.nLayers = 2
param.nHiddens = 64 # Number of neurons in the hidden layer
param.ActivationClass = ReLU
param.RegClass = None
param.alpha = 0.001
param.dropOutProb = 0.5

mlp_ReLU = MLP(param)

mlp_ReLU.construct()

#Train
num_epochs = 10
batch_size = 64
edf.learning_rate = 0.1

# Horizontal flip
#########################################
# TO-DO : generate the RandomFlipAug instance which always flip images horizontally.
# Hint : AugObject =  
#########################################


train_err_log, test_err_log = train_and_test(num_epochs, batch_size, mlp_ReLU.x_node, mlp_ReLU.y_node, mlp_ReLU.prob_node, mlp_ReLU.loss_node, transformer = AugObject, enable_dropout=False)

plot(train_err_log, test_err_log)

Check the performance after we flip the training dataset vertically. How the performance is changed depending on the label?

In [None]:
mlp_ReLU.construct()

#Train
num_epochs = 10
batch_size = 64
edf.learning_rate = 0.1

# Vertical flip
#########################################
# TO-DO : generate the RandomFlipAug instance which always flip images vertically.
# Hint : AugObject =  
#########################################


train_err_log, test_err_log = train_and_test(num_epochs, batch_size, mlp_ReLU.x_node, mlp_ReLU.y_node, mlp_ReLU.prob_node, mlp_ReLU.loss_node, transformer = AugObject, enable_dropout = False)

plot(train_err_log, test_err_log)