Upload the `utils.py` file [(click here)](https://drive.google.com/file/d/1kRmGjHIpIJ0Y_KDBVJSooJnPPNhUP4A-/view?usp=drive_link) to your runtime environment to use all the helper functions

In [1]:
from google.colab import files

uploaded = files.upload()

Saving utils.py to utils.py


In [2]:
from __future__ import print_function
import numpy as np
from utils import *

# To allow evaluators to see what you got
np.random.seed(677)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Homework 1 - Fully Connected Deep Networks

In this exercise you will implement the components required to train a fully connected deep network from scratch using only `numpy`. To aid in this effort, we have provided you with some starter code. Your task is to fill in the missing parts of the implementation so that you can end up with a fully trainable deep network.

You may complete this assignment on either Google Colab or your own machine. If you are using your own machine you will require a functioning `numpy` installation, and the `pickle` module.

## Part 1 - Implementation [60 points]

In the first part of this notebook, you will complete the implementation of a `Sequential` model class that help you build deep networks and also handle the computation of derivatives through backpropagation. This part of the assignment will not deal with a specific machine learning problem.

In [3]:
# superclass of modules
class Module:
    """
    Module is a super class. It could be a single layer, or a multilayer perceptron.
    """

    def __init__(self):
        self.train = True
        return

    def forward(self, _input):
        """
        h = f(z); z is the input, and h is the output.

        Inputs:
        _input: z

        Returns:
        output h
        """
        pass

    def backward(self, _input, d_output):
        """
        Compute:
        gradient w.r.t. _input
        gradient w.r.t. trainable parameters

        Inputs:
        _input: z
        _gradOutput: dL/dh

        Returns:
        gradInput: dL/dz
        """
        pass

    def parameters(self):
        """
        Return the value of trainable parameters and its corresponding gradient (Used for grandient descent)

        Returns:
        params, gradParams
        """
        pass

#### Forward and backward passes in the `Sequential` module [10 points]

Complete the implementation of a forward and backward pass in a `Sequential` module. For now we can ignore which layers actually make up the `Sequential` module. Just treat them as abstract functions.

In [5]:
class Sequential(Module):
    """
    Sequential provides a way to plug layers together in a feed-forward manner.
    """
    def __init__(self):
        Module.__init__(self)
        self.layers = []

    def add(self, layer):
        self.layers.append(layer) # Adding another layer at the end

    def size(self):
        return len(self.layers)

    def forward(self, _input):
        """
        Feed forward through all the layers, and return the output of the last layer
        """
        # self._inputs saves the input of each layer
        # self._inputs[i] is the input of i-th layer
        self._inputs = [_input]

        # YOUR CODE HERE
        for layer in self.layers:
            output = layer.forward(self._inputs[-1])  #  Output from the current layer
            self._inputs.append(output)  # Append the output to the list of inputs for the next layers

        # The last element of self._inputs is the output of last layer
        self._output = self._inputs[-1]
        return self._output

    def backward(self, _input, d_output):
        """
        Backpropogate through all the layers using chain rule.
        """

        self.d_inputs = [None] * (self.size() + 1)
        self.d_inputs[self.size()] = d_output

        # YOUR CODE HERE
        for i in range(self.size() - 1, -1, -1):
            d_input = self.layers[i].backward(self._inputs[i], self.d_inputs[i+1])
            self.d_inputs[i] = d_input
        self.d_input = self.d_inputs[0]
        return self.d_input

    def parameters(self):
        """
        Return trainable parameters and its corresponding gradient in a nested list
        """
        params = []
        d_params = []
        for m in self.layers:
            _p, _d = m.parameters()
            if _p is not None:
                params.append(_p)
                d_params.append(_d)
        return params, d_params

#### Forward and backward passes for a `FullyConnected` layer [20 points]

Complete the implementation of a forward and backward pass in an affine layer. When creating the layer, also initialize the weights using a modified version of the Xavier initialization.

In [6]:
class FullyConnected(Module):
    """
    Fully connected linear layer
    y = Wx + b
    """
    def __init__(self, inputSize, outputSize):
        Module.__init__(self)

        """
        Initalization: Use the Xavier initialization scheme to
        set the standard deviation of the weights. We will use
        the uniform distribution instead of the normal distribution
        in order to avoid large weight values. Sometimes practitioners
        also use the truncated normal distribution for this purpose.
        """

        stdv = np.sqrt(2.0 / (inputSize + outputSize)) # YOUR CODE HERE

        self.weight = np.random.uniform(-stdv, stdv, (outputSize, inputSize))
        self.d_weight = np.zeros((outputSize, inputSize))
        self.bias = np.random.uniform(-stdv, stdv, outputSize)
        self.d_bias = np.zeros(outputSize)

    def forward(self, _input):
        """

        _input:
        N x inputSize matrix

        """
        self._output = np.dot(_input, self.weight.T) + self.bias # YOUR CODE HERE
        return self._output

    def backward(self, _input, d_out):
        """
        _input:
        N x inputSize matrix
        d_out:
        N x outputSize matrix
        """
        self.d_weight = np.dot(d_out.T, _input) # YOUR CODE HERE
        self.d_bias = np.sum(d_out, axis=0) # YOUR CODE HERE

        self.d_input = np.dot(d_out, self.weight) # YOUR CODE HERE
        return self.d_input

    def parameters(self):
        """
        Return weight and bias and their g
        """
        return [self.weight, self.bias], [self.d_weight, self.d_bias]

#### Forward and backward passes for a `ReLU` layer [10 points]

Complete the implementation of a forward and backward pass in a `ReLU` layer. This layer does not have any parameters. If you are so inclined, you may use this template to also implement `Sigmoid` and `Tanh` nonlinearity layers.

In [7]:
class ReLU(Module):
    """
    ReLU activation, no trainable parameters
    y = max(x, 0)
    """
    def __init__(self):
        Module.__init__(self)
        return

    def forward(self, _input):
        """
        _input:
        N x d matrix
        """
        self._output = np.maximum(0, _input) # YOUR CODE HERE
        return self._output

    def backward(self, _input, d_out):
        """
        _input:
        N x d matrix

        d_out:
        N x d matrix
        """
        self.d_input = d_out * (_input > 0) # YOUR CODE HERE
        return self.d_input

    def parameters(self):
        """
        No trainable parameters, return None
        """
        return None, None

#### `SoftMax` loss with gradient [20 points]

Complete the implementation of a forward and backward pass for the `SoftMaxLoss`. Remember to take numerical stability issues seriously when computing the Softmax function. Large and small logit values can create issues.

In [8]:
class SoftMaxLoss:
    def __init__(self):
        return

    def forward(self, _input, _label):
        """
        Softmax and cross entropy loss layer. Should return a scalar, since it's a
        loss.

        _input: N x C
        _labels: N x C, one-hot

        Returns: loss (scalar)
        """
        #YOUR CODE HERE
        shift_input = _input - np.max(_input, axis=1, keepdims=True)

        # Softmax function
        exps = np.exp(shift_input)
        softmax_output = exps / np.sum(exps, axis=1, keepdims=True)

        # Cross-entropy loss
        self._output = -np.sum(_label * np.log(softmax_output + 1e-12)) / _input.shape[0]

        self.softmax_output = softmax_output

        return self._output

    def backward(self, _input, _label):
        self.d_input = (self.softmax_output - _label) / _input.shape[0] # YOUR CODE HERE
        return self.d_input

### Testing against Numerical Gradients

Now we can test our implementation. To do this we use the `numeric_gradient` function in `utils.py`. The numeric gradient is estimated as follows (where $e_i$ are basis vectors):

$$ [\nabla f(x)]_i = \frac{f(x+\epsilon e_i) - f(x- \epsilon e_i)}{2 \epsilon} $$

As long as the gradient we compute through backprop is close to the numerical gradient, we can be confident that our implementation will work.

In [9]:
# Test softmaxloss, the relative error should be small enough
def test_sm():
    crit = SoftMaxLoss()
    gt = np.zeros((3, 10))
    gt[np.arange(3), np.array([1,2,3])] = 1
    x = np.random.random((3,10))
    def test_f(x):
        return crit.forward(x, gt)

    crit.forward(x, gt)

    d_input = crit.backward(x, gt)
    d_input_numeric = numeric_gradient(test_f, x, 1, 1e-6)
    # print(d_input)
    # print(d_input_numeric)
    print(relative_error(d_input, d_input_numeric, 1e-8))

test_sm()

5.966419986814647e-09


In [10]:
# Test modules, all the relative errors should be small enough
def test_module(model):

    # model.evaluate()

    crit = TestCriterion()
    gt = np.random.random((3,10))
    x = np.random.random((3,10))
    def test_f(x):
        return crit.forward(model.forward(x), gt)

    d_input = model.backward(x, crit.backward(model.forward(x), gt))
    d_input_numeric = numeric_gradient(test_f, x, 1, 1e-6)
    print(relative_error(d_input, d_input_numeric, 1e-8))

# Test fully connected
model = FullyConnected(10, 10)
test_module(model)

# Test ReLU
model = ReLU()
test_module(model)

# Test Sequential
model = Sequential()
model.add(FullyConnected(10, 10))
model.add(ReLU())
test_module(model)

7.312466155746781e-08
5.962448038856628e-10
3.2784330387251717e-09


Finally we perform a sanity check for the `sgd` optimizer provided in `utils.py`

In [11]:
# Test gradient descent, the loss should be lower and lower
trainX = np.random.random((10,5))

model = Sequential()
model.add(FullyConnected(5, 3))
model.add(ReLU())
model.add(FullyConnected(3, 1))

crit = TestCriterion()

it = 0
state = None
while True:
    output = model.forward(trainX)
    loss = crit.forward(output, None)
    if it % 100 == 0:
        print(loss)
    doutput = crit.backward(output, None)
    model.backward(trainX, doutput)
    params, gradParams = model.parameters()
    sgd(params, gradParams, 0.01, 0.8)
    if it > 1000:
        break
    it += 1

0.6274186902506809
0.01806809817803746
0.006363642344207082
0.006242982311460435
0.006124830766812822
0.006009135559503139
0.005895845622969916
0.005784910952311122
0.005676282582212528
0.00556991256533503
0.005465753951151185


## Part 2 - Test on Real Data [20 points]

In this section we will test the machinery we have built on a "real world" image categorization problem. The data we will be using is a subset of the Fashion MNIST dataset. The `pickle` file containing this data set can be downloaded [here](https://www.dropbox.com/scl/fi/mxxg0k6x8q17feef2bvwg/fashion_mnist_small.pkl?rlkey=kb2j5zfqs6qen19jxs2f5g69z&dl=0). Put the data file in a convenient location, and make sure you include the path in the cell below.

The dataset contains a training set of $8000$ examples, validation set of $2000$ examples, and test set of $1000$ examples. The data consists of small $28 \times 28$ images that have been centered, scaled, and flattened into $784$ dimensional vectors with zero mean and unit variance.

In [12]:
from google.colab import files

uploaded = files.upload()

Saving fashion_mnist_small.pkl to fashion_mnist_small.pkl


In [13]:
filename = 'fashion_mnist_small.pkl'
X_train, y_train, X_val, y_val, X_test, y_test = load_fmnist_data(filename)

Write a function to build your Fully connected deep networks so that you can experiment with different hidden layer sizes and number of layers quickly.

In [14]:
def build_model(input_size, hidden_sizes, output_size, activation_func='ReLU'):
    """
    Build the model:
    input_size: the dimension of input data
    hidden_sizes: a list with the sizes of each hidden layer
    output_size: the size of the output layer
    activation_func: the activation function to use ('ReLU', 'Logistic', 'Tanh', etc.)
    """
    model = Sequential()

    # Add the first hidden layer connected to the input
    model.add(FullyConnected(input_size, hidden_sizes[0]))

    # Add the activation function after the first hidden layer
    if activation_func == 'ReLU':
        model.add(ReLU())
    elif activation_func == 'Logistic':
        model.add(Logistic())
    elif activation_func == 'Tanh':
        model.add(Tanh())


    # Add any additional hidden layers
    for i in range(1, len(hidden_sizes)):
        model.add(FullyConnected(hidden_sizes[i-1], hidden_sizes[i]))

        # Add the activation function after each hidden layer
        if activation_func == 'ReLU':
            model.add(ReLU())
        elif activation_func == 'Logistic':
            model.add(Logistic())
        elif activation_func == 'Tanh':
            model.add(Tanh())


    # Output layer
    model.add(FullyConnected(hidden_sizes[-1], output_size))

    return model

Train your fully connected deep network models using the training data. You can use the validation data for hyperparameter tuning. One suggested hyperparameter is `weight decay` ($\lambda$) but you may also experiment with different choices for the model architecture. We will use the `train` function from `utils.py` to conduct the training. You may want to read through it and familiarize yourself with the code to fix bugs that may arise.

In [15]:
def one_hot_encode(labels, num_classes):
    """Convert a 1D array of class labels to a 2D one-hot encoded matrix."""
    return np.eye(num_classes)[labels]

# Load the Fashion MNIST dataset
filename = 'fashion_mnist_small.pkl'
X_train, y_train, X_val, y_val, X_test, y_test = load_fmnist_data(filename)

# Ensure y_train and y_val are 2-dimensional (one-hot encoded)
NCLASSES = 10  # Assuming y_train contains labels 0 through NCLASSES-1
y_train_one_hot = one_hot_encode(y_train, NCLASSES)
y_val_one_hot = one_hot_encode(y_val, NCLASSES)

# Training options setup
trainopt = {
    'lr': .001,  # Initial learning rate
    'maxiter': 20000,  # Max number of iterations (updates) of SGD
    'display_iter': 500,  # Display batch loss every display_iter updates
    'batch_size': 100,
    'lr_decay': .5,  # When dropping lr, multiply it by this number (e.g., .5 means halve it)
    'lr_decay_interval': .25  # The interval to apply lr decay (as a fraction of maxiter)
}

NFEATURES = X_train.shape[1]
NCLASSES = 10


# we will maintain a record of models trained for different values of lambda
# these will be indexed directly by lambda value itself
trained_models = dict()

# set the (initial?) set of lambda values to explore
lambdas = np.array([0, 0.001, 0.01, 0.1])  # CHANGE THESE VALUES IF YOU NEED TO GET BETTER PERFORMANCE

hidden_sizes = [100, 50]# YOUR MODEL HERE

# Training loop over different values of lambda
for lambda_ in lambdas:
    trainopt['weight_decay'] = lambda_  # Set current lambda as the weight decay
    model = build_model(NFEATURES, hidden_sizes, NCLASSES)  # Build the model
    criterion = SoftMaxLoss()  # Loss function

    # -- model trained on large train set
    trained_model, train_error, val_error = train(model, criterion, X_train, y_train_one_hot, X_val, y_val_one_hot, trainopt)

    # Store the trained model and its performance metrics
    trained_models[lambda_] = {
        'model': trained_model,
        'train_err': train_error,
        'val_err': val_error
    }

    print(f'Lambda={lambda_:.4f}, Train Error={train_error:.2f}%, Validation Error={val_error:.2f}%')

       0 batch loss: 2.306 train error: 89.050 val error: 87.900
     500 batch loss: 2.084 train error: 56.725 val error: 56.900
    1000 batch loss: 1.819 train error: 42.437 val error: 43.950
    1500 batch loss: 1.475 train error: 41.138 val error: 42.850
    2000 batch loss: 1.300 train error: 38.887 val error: 40.100
    2500 batch loss: 1.259 train error: 35.638 val error: 36.100
    3000 batch loss: 1.128 train error: 32.875 val error: 33.000
    3500 batch loss: 0.885 train error: 30.500 val error: 31.800
    4000 batch loss: 0.896 train error: 29.038 val error: 30.400
    4500 batch loss: 0.945 train error: 27.975 val error: 29.750
    5000 batch loss: 0.901 train error: 26.962 val error: 28.300
    5500 batch loss: 0.711 train error: 26.363 val error: 27.800
    6000 batch loss: 0.748 train error: 25.900 val error: 27.500
    6500 batch loss: 0.846 train error: 25.575 val error: 27.300
    7000 batch loss: 0.838 train error: 25.138 val error: 26.850
    7500 batch loss: 0.65

Choose the model and hyperparameters that have the best validation error

In [16]:
best_trained_lambda = 0.
best_trained_model = None
best_trained_val_err = 100.
for lambda_, results in trained_models.items():
    print('lambda= %.4f, val error: %.2f' %(lambda_, results['val_err']))
    if results['val_err'] < best_trained_val_err:
        best_trained_val_err = results['val_err']
        best_trained_model = results['model']
        best_trained_lambda = lambda_

print("Best train model val err:", best_trained_val_err)
print("Best train model lambda:", best_trained_lambda)

lambda= 0.0000, val error: 21.92
lambda= 0.0010, val error: 21.75
lambda= 0.0100, val error: 21.15
lambda= 0.1000, val error: 21.99
Best train model val err: 21.150000000000002
Best train model lambda: 0.01


Evaluate your model on the heldout test data

In [17]:
test_err = 100 * error_rate(X_test, y_test, best_trained_model)
print("Best model test error:", test_err)

Best model test error: 88.8
