# TTIC 31020 Introduction to Statistical Machine Learning: Neural Networks
---

In this notebook you will perform classification on the Fashion MNIST dataset with neural networks. Your task is (mostly) to implement the forward and backward methods for different layers (forward methods compute a layer's output given its input, while backward methods compute gradients for its parameters and its input given the gradient of its output).

After filling the missing code, try to achieve the best performance by changing the hyperparameters. Neural networks are typically more hyperparameter-sensitive than other methods you've seen in the past homeworks, so good hyperparameter tuning is crucial to get good results.

In [None]:
%load_ext autoreload

%autoreload 2

import numpy as np
from utils import *
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

In [None]:
# superclass of neural network "modules" (layers)
class Module:
    """
    Module is a super class. It could be a single layer, or a multilayer perceptron.
    """
    
    def __init__(self):
        self.train = True
        return
    
    def forward(self, _input):
        """
        z = f(a); a is the input, and h is the output.
        
        Inputs:
        _input: a
        
        Returns:
        output z
        """
        pass
    
    def backward(self, _input, _gradOutput):
        """
        Compute:
        gradient w.r.t. _input
        gradient w.r.t. trainable parameters
        
        Inputs (in lecture notation):
        _input: a 
        _gradOutput: dL/dz
        
        Returns:
        gradInput: dL/dz
        """
        pass
        
    def parameters(self):
        """
        Return the value of trainable parameters and its corresponding gradient (Used for grandient descent)
        
        Returns:
        params, gradParams
        """
        pass
    
    def training(self):
        """
        Turn the module into training mode.(Only useful for Dropout layer)
        Ignore it if you are not using Dropout.
        """
        self.train = True
        
    def evaluate(self):
        """
        Turn the module into evaluate mode.(Only useful for Dropout layer)
        Ignore it if you are not using Dropout.
        """
        self.train = False

In [None]:
# a class representing a sequence of modules (a layered network)
class Sequential(Module):
    """
    Sequential provides a way to plug layers together in a feed-forward manner.
    """
    def __init__(self):
        Module.__init__(self)
        self.layers = [] # layers contain all the layers in order
    
    def add(self, layer):
        self.layers.append(layer) # Add another layer at the end
    
    def size(self):
        return len(self.layers) # How many layers.
    
    def forward(self, _input):
        """
        Feed forward through all the layers, and return the output of the last layer
        """
        for i in range(self.size()):
            # The output of (i-1)-th layer is the _input of i-th layer
            _input = self.layers[i].forward(_input)
        self._output = _input
        return self._output
    
    def backward(self, _gradOutput):
        """
        Backpropagate through all the layers using chain rule.
        """
        for i in reversed(range(self.size())):
            # The (i-1)-th layer receives the error from the i-th layer
            _gradOutput = self.layers[i].backward(_gradOutput)
        return _gradOutput
    
    def parameters(self):
        """
        Return trainable parameters and its corresponding gradient in a list
        """
        params = []
        gradParams = []
        for m in self.layers:
            p, g = m.parameters()
            if p is not None:
                for _p, _g in zip(p,g):
                    params.append(_p)
                    gradParams.append(_g)
        return params, gradParams

    def training(self):
        """
        Turn all the layers into training mode
        """
        Module.training(self)
        for m in self.layers:
            m.training()
    
    def evaluate(self):
        """
        Turn all the layers into evaluate mode
        """
        Module.evaluate(self)
        for m in self.layers:
            m.evaluate()
        

In [None]:
class FullyConnected(Module):
    """
    Fully connected layer (parameters include a matrix of weights a vector of biases)
    """
    def __init__(self, inputSize, outputSize):
        Module.__init__(self)
        
        # ADD CODE here to initialize the weights and biases
        self.weight = 
        self.bias = 
        
        self.gradWeight = np.ndarray((inputSize, outputSize))
        self.gradBias = np.ndarray(outputSize)
        
    def forward(self, _input):
        """
        output = W * input + b
        """
        self._input = _input
        
        self._output =   # ADD CODE to compute the layer's output
        return self._output
    
    def backward(self, _gradOutput):
        """
        gradWeight = gradOutput * input
        gradBias = gradOutput * vec(1)
        gradInput =  gradWeight * gradOutput
        """
        self.gradWeight.fill(0)
        self.gradBias.fill(0)
        
        self.gradWeight += # ADD CODE to compute the gradient for the layer's weight
        self.gradBias += # ADD CODE to compute the gradient for the layer's bias
        self._gradInput =  # ADD CODE to compute the gradient for the layer's input

        return self._gradInput
        
    def parameters(self):
        """
        Return weight and bias and their g
        """
        return [self.weight, self.bias], [self.gradWeight, self.gradBias]

In [None]:
class ReLU(Module):
    """
    ReLU activation, not trainable.
    """
    def __init__(self):
        Module.__init__(self)
        return
    
    def forward(self, _input):
        """
        output = max(0, input)
        """
        self._input = _input
        self._output =  # ADD CODE to compute the layer's output
        return self._output
    
    def backward(self, _gradOutput):
        """
        gradInput = gradOutput * mask
        mask = _input > 0
        """
        self._gradInput =  # ADD CODE to compute the gradient for the layer's input
        return self._gradInput
        
    def parameters(self):
        """
        No trainable parametersm, return None
        """
        return None, None

In [None]:
class Dropout(Module):
    """
    A dropout layer
    """
    def __init__(self, p = 0.5):
        Module.__init__(self)
        self.p = p #self.p is the drop rate, if self.p is 0, then it's a identity layer
        
    def forward(self, _input):
        self._output = _input
        if self.p > 0:
            if self.train:
                # Randomize a mask from bernoulli distrubition
                self.mask = np.random.binomial(1, 1 - self.p, _input.shape).astype('float64')
                # Scale the mask
                self.mask /= 1 - self.p
                self._output *= self.mask
        return self._output
    
    def backward(self, _gradOutput):
        self._gradInput = _gradOutput
        if self.train:
            if self.p > 0:
                self._gradInput = self.mask* self._gradInput 
        return self._gradInput
    
    def parameters(self):
        """
        No trainable parameters.
        """
        return None, None

In [None]:
class SoftMaxLoss(object):
    def __init__(self):
        return
        
    def forward(self, _input, _label):
        """
        Softmax and cross entropy loss layer. Should return a scalar, since it's a
        loss. (It's almost identical to what we had in Pset 2)

        Inputs:
        _input: N x C
        _labels: N x C, one-hot

        Returns: loss (scalar)
        """
        self._input = _input - _input.max(1)[:, np.newaxis]
        self._logprob = self._input - np.log(np.exp(self._input).sum(1)[:, np.newaxis])
        
        self._output = np.mean(np.sum(-self._logprob * _label, 1))
        return self._output
    
    def backward(self, _label):
        self._gradInput =  # ADD CODE to compute the gradient for the layer's input
        return self._gradInput

In [None]:
# Test softmaxloss, the relative error should be small enough
def test_sm():
    crit = SoftMaxLoss()
    gt = np.zeros((3, 10))
    gt[np.arange(3), np.array([1,2,3])] = 1
    x = np.random.random((3,10))
    def test_f(x):
        return crit.forward(x, gt)

    print(crit.forward(x, gt))
    gradInput = crit.backward(gt)
    gradInput_num = numeric_gradient(test_f, x, 1, 1e-6)
    print(relative_error(gradInput, gradInput_num, 1e-8))
    
test_sm()

In [None]:
# Test modules, all the relative errors should be small enough (on the order of 1e-6 or smaller)
def test_module(model):
    model.evaluate()

    crit = TestCriterion()
    gt = np.random.random((3,10))
    x = np.random.random((3,10))
    def test_f(x):
        return crit.forward(model.forward(x), gt)

    test_f(x)
    gradInput = model.backward(crit.backward(gt))
    gradInput_num = numeric_gradient(test_f, x, 1, 1e-6)
    print(relative_error(gradInput, gradInput_num, 1e-8))

# Test fully connected
model = FullyConnected(10, 10)
print('testing FullyConnected')
test_module(model)

# Test ReLU
model = ReLU()
print('testing ReLU')
test_module(model)

# Test Dropout
model = Dropout()
print('testing Dropout')
test_module(model)


In [None]:
model = Sequential()
#ADD CODE to add layers using the add attribute of sequential 
#to construct 2-layer Neural Network with hidden size 10


In [None]:
# Test your neural network
print('testing 2-layer model')
test_module(model)

In [None]:
def sgd(x, dx, lr, weight_decay = 0):
    for _x, _dx in zip(x, dx):
        _x =   #ADD CODE to perform one gradient descent step

In [None]:
# Test gradient descent, the loss should be lower and lower
trainX = np.random.random((10,10))



crit = TestCriterion()

params, gradParams = model.parameters()

it = 0
state = None
while True:
    output = model.forward(trainX)
    loss = crit.forward(output, None)
    if it % 100 == 0:
        print(loss)
    doutput = crit.backward(None)
    model.backward(doutput)
    sgd(params, gradParams, 0.01)
    if it > 1000:
        break
    it += 1

Now we start to work on Fashion MNIST.

In [None]:
import FMNIST_utils

# We only consider large set this time
print("Load large trainset.")
Xlarge,Ylarge = FMNIST_utils.load_data("Tr")
print(Xlarge.shape)
print(Ylarge.shape)
if Xlarge.max() > 1: Xlarge = Xlarge/255

print("Load valset.")
Xval,Yval = FMNIST_utils.load_data("Vl")
print(Xval.shape)
print(Yval.shape)
if Xval.max() > 1: Xval = Xval/255

In [None]:
def predict(X, model):
    """
    Evaluate the soft predictions of the model.
    Input:
    X : N x d array (no unit terms)
    model : a multi-layer perceptron
    Output:
    yhat : N x C array
        yhat[n][:] contains the score over C classes for X[n][:]
    """
    return model.forward(X)

def error_rate(X, Y, model):
    """
    Compute error rate (between 0 and 1) for the model
    """
    model.evaluate()
    res = 1 - (model.forward(X).argmax(-1) == Y.argmax(-1)).mean()
    model.training()
    return res

from copy import deepcopy

def runTrainVal(X,Y,model,Xval,Yval,trainopt):
    """
    Run the train + evaluation on a given train/val partition
    trainopt: various (hyper)parameters of the training procedure
    During training, choose the model with the lowest validation error. (early stopping)
    Assumes (global) variable crit containing the loss (training "criterion" to be minimized)
    """
    
    params, gradParams = model.parameters()
    
    eta = trainopt['eta']
    
    N = X.shape[0] # number of data points in X
    
    # Save the model with lowest validation error
    minValError = np.inf
    saved_model = None # Save the best model accoring to validation error
    
    shuffled_idx = np.random.permutation(N)
    start_idx = 0
    for iteration in range(trainopt['maxiter']):
        if iteration % int(trainopt['eta_frac'] * trainopt['maxiter']) == 0:
            eta *= trainopt['etadrop']
        # form the next mini-batch
        stop_idx = min(start_idx + trainopt['batch_size'], N)
        batch_idx = range(N)[int(start_idx):int(stop_idx)]
        
        s_idx = shuffled_idx[batch_idx]
        
        bX = X[s_idx,:]
        bY = Y[s_idx,:]

        score = model.forward(bX)
        loss = crit.forward(score, bY)
        # note: this computes loss on the *batch* only, not on the entire training set!
        
        dscore = crit.backward(bY)
        model.backward(dscore)
        
        sgd(params, gradParams, eta, weight_decay = trainopt['lambda'])

        start_idx = stop_idx % N
        
        if (iteration % trainopt['display_iter']) == 0:
            #compute train and val error; multiply by 100 for readability (make it percentage points)
            trainError = 100 * error_rate(X, Y, model)
            valError = 100 * error_rate(Xval, Yval, model)
            print('{:8} batch loss: {:.3f} train error: {:.3f} val error: {:.3f}'.format(iteration, loss, trainError, valError))
            
            # early stopping: save the best model snapshot so far (i.e., model with lowest val error)
            if valError < minValError:
                saved_model = deepcopy(model)
                minValError = valError
        
    return saved_model, minValError, trainError

In [None]:
def build_model(input_size, hidden_size, output_size, activation_func = 'ReLU', dropout = 0):
    """
    Build a 2-layer model:
    input_size: the dimension of input data
    hidden_size: the dimension of hidden vector, hidden_size == 0 means only one layer
    output_size: the output size of final layer.
    activation_func: ReLU, sigmoid (defined above), Tanh (you'd have to define), etc. 
    dropout: the dropout rate: if dropout == 0, this is equivalent to no dropout
    """
    model = Sequential()
    
    if type(hidden_size) is int:
        hidden_size = [hidden_size] # ensure it's a list
    
    prev_size=input_size
    
    # add hidden layer(s) as requested
    if hidden_size[0] == 0: # no hidden layer
        pass
    
    else:
        for l in range(len(hidden_size)):
            # ADD CODE to add a fully connected layer 
            
            prev_size=hidden_size[l]

            # ADD CODE to add a Relu
                 
            if dropout > 0:
                #ADD CODE to add Dropout 
            
                
                
    # ADD CODE to add output layer  (which is a fully connected layer)
    
    return model

In [None]:
trainopt = {
    'eta': 1e-3,   # initial learning rate
    'maxiter': 10000,   # max number of iterations (updates) of SGD
    'display_iter': 5000,  # display batch loss every display_iter updates
    'batch_size': 128,  
    'etadrop': .5, # when dropping eta, multiply it by this number (e.g., .5 means halve it)
    'eta_frac': .25,  # drop eta after every eta_frac*maxiter
    'update': 'sgd'
}
NFEATURES = Xlarge.shape[1]

# we will maintain a record of models trained for different values of lambda
# these will be indexed directly by lambda value itself
trained_models = dict()

# choose the set of hyperparameters to explore

lambda_=0.0
hidden_size_=[] # ADD CODE to specify hidden dim for each layer; 
trainopt['lambda'] = lambda_
model = build_model(NFEATURES, hidden_size_, 10, dropout = 0.1) 
crit = SoftMaxLoss()
# -- model trained on large train set
trained_model,valErr,trainErr = runTrainVal(Xlarge, Ylarge, model, Xval, Yval, trainopt)
trained_models[lambda_] = {'model': trained_model, "val_err": valErr, "train_err": trainErr }
print('train set model [ h = ',end='')
for l in range(len(hidden_size_)):
    print('%d '%hidden_size_[l],end='')
print(' ], lambda= %.4f ] --> train error: %.2f, val error: %.2f' % (lambda_, trainErr, valErr))


In [None]:
#Generate a Kaggle submission file using best_trained_model which you should set based on your experiments
kaggleX = FMNIST_utils.load_data('kaggle')
if kaggleX.max() > 1: kaggleX = kaggleX/255
kaggleYhat = predict(kaggleX, trained_model).argmax(-1)
save_submission('submission-fmnist.csv', kaggleYhat)