# Import

In [1]:
import matplotlib.pyplot as plt
import numpy as np
import os
from typing import Tuple
import torch
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision.datasets as datasets

# Forward Pass

## Linear Layer Forward

In [2]:
def linear_layer_forward(x: np.ndarray,w: np.ndarray,b: np.ndarray):

    '''
    Computes forward pass for simple linear layer 
    

    Input: 
    x: Numpy array containing input data, N x H x W we flatten last two dimensions so its N x D
    w: Numpy array of weights, D xM
    b: Numpy array of bias, M x
    
    Output:


    output: Numpy array after matrix multiplication, N x M
    cache: Cache of input params to be used in backward pass
    
    '''
    
    
    #Initialize weights and bias for this layer

    output = (x @ w) + b
    
    cache = (x,w,b)
    return output, cache

## Sigmoid Function Forward

In [3]:
def sigmoid_activation_forward(x: np.ndarray) -> (np.ndarray):

    '''
    
    Apply sigmoid function on given input
    
    Input:

    x: Numpy array, NxD

    Output:

    output: Numpy array after sigmoid activation , NxD

    '''

    #Check if we need to normalize the input before passing to sigmoid

    output  = 1 / (1 + np.exp(-x))

    cache = x

    return output,cache

## Softmax Layer Forward

In [4]:
def softmax_activation_forward(x: np.ndarray) -> (np.ndarray):
    
    '''
    Apply softmax function on given input
    
    Input:

    x: Numpy array, NxD

    Output:

    output: Numpy array after softmax activation , NxD

    '''

    normalize_input = x - np.max(x)

    output = np.exp(normalize_input)/(np.sum(np.exp(normalize_input)))

    cache = normalize_input

    return output,cache

# Loss

In [5]:
def MSE_loss(y_pred: np.ndarray, y: np.ndarray) -> float:
    
    '''
    Computes Mean squared error loss
    
    Input:

    y: Numpy array containg ground truth labels, NxD
    y_pred: Numpy array from network, NxD

    Output:

    loss: scaler, mean squared error loss
    gradient: gradient w.r.t y_pred used for backward pass

    '''


    N = y.shape[0]

    k = y.shape[1]
    loss = np.sum((np.sum(((y-y_pred)**2),axis=1)/k))/ N
    gradient = (-2 * (y-y_pred)) / (k*N)

    
    return loss,gradient

# Backward Pass

## Linear Backward Layer

In [6]:
def linear_layer_backward(upstream,cache):



    x,w,b = cache  
    dx = (upstream @ w.T).reshape(x.shape)
    dw = (upstream.T @ x).reshape(w.shape)
    db = np.sum(upstream)


    return dx,dw, db

## Sigmoid Backward Layer

In [7]:
def sigmoid_layer_backward(upstream,cache):

    x = cache
    
    softmax,_ = softmax_activation_forward(x)

    sigmoid_derivative = (softmax * (1 - softmax))

    dx = (upstream * sigmoid_derivative)

    return dx

## Softmax Activation Layer

In [8]:
def softmax_layer_backward(grad,cache):

    x = cache


    if x.shape[0] > 1:
        
        dx = np.zeros((x.shape[0],x.shape[1],x.shape[1]))
        for i in range(x.shape[0]):
            
            x_vector = x[i].reshape((x[i].shape[0],1))
            x_matrix = np.tile(x_vector,x[i].shape[0])
            softmax_derivative = np.diag(x[i]) - (x_matrix * np.transpose(x_matrix))
            dx[i] = softmax_derivative
            
            
        dx = np.einsum('ijk,ji->ij', dx,grad.T)
            
    else:
        x_vector = x.reshape((x.shape[1], 1))
        x_matrix = np.tile(x_vector, x.shape)

        dx = np.diag(x) - (x_matrix * np.transpose(x_matrix))

    return dx


# DataLoader

In [9]:
train_dataset = datasets.MNIST(
    root='./data', train=True, transform=transforms.ToTensor(), download=True)
test_dataset = datasets.MNIST(
    root='./data', train=False, transform=transforms.ToTensor(), download=True)


In [10]:
images_train = len(train_dataset)
images_test  = len(test_dataset)

print("Images Train %d"%(images_train))
print("Images Test %d"%(images_test))

Images Train 60000
Images Test 10000


In [11]:
BATCH_SIZE = 20
SHUFFLE = True


In [12]:
train_loader = torch.utils.data.DataLoader(train_dataset,batch_size=BATCH_SIZE,shuffle=SHUFFLE)
test_loader = torch.utils.data.DataLoader(test_dataset,batch_size=BATCH_SIZE,shuffle=SHUFFLE)

In [13]:
def array_to_one_hot(arr, C):
    one_hot = np.zeros((arr.shape[0], C))

    for i in range(arr.shape[0]):

        idx = int(arr[i])
        one_hot[i, idx] = 1

    return one_hot

# Model

In [14]:
class TwoLayerModel(object):

    def __init__(self,input_dim: int, hidden_dim: int, output_dim: int) -> None:

        self.params = {}


        #Initialize weights and bias

        self.params['w1'] = np.random.uniform(-0.5,0.5,(input_dim,hidden_dim))
        self.params['b1'] = np.zeros((hidden_dim,))
        self.params['w2'] = np.random.uniform(-0.5,0.5,(hidden_dim,output_dim))
        self.params['b2'] = np.zeros((output_dim,))
        self.reg = 0.2

        pass

    def loss(self,X,y):


        #Forward Pass 

        x,cache_linear_1 = linear_layer_forward(X,self.params['w1'],self.params['b1'])
        x,cache_sigmoid = sigmoid_activation_forward(x)
        x,cache_linear_2 = linear_layer_forward(x,self.params['w2'],self.params['b2'])
        
        y_pred,cache_softmax = softmax_activation_forward(x)
        


        loss = 0
        grads = {}

        # Calculate loss

        loss,upstreamgrad_loss = MSE_loss(y_pred,y)
        loss += self.reg * 0.5 * \
            (np.sum(self.params['w1']**2) + np.sum(self.params['w2']**2))

        #Backward pass
    
        dsoftmax_activation = softmax_layer_backward(upstreamgrad_loss,cache_softmax)
        dx2,dw2,db2 = linear_layer_backward(dsoftmax_activation,cache_linear_2)
        dsigmoid_activation = sigmoid_layer_backward(dx2,cache_sigmoid)
        dx1,dw1,db1 = linear_layer_backward(dsigmoid_activation,cache_linear_1)


        grads['w1'] = dw1
        grads['w2'] = dw2
        grads['b1'] = db1
        grads['b2'] = db2

        return loss,grads

# Training

In [15]:
class Solver(object):

    def __init__(self,model,train_dataloader,test_dataloader,num_iterations,lr) -> None:

        self.model = model
        self.train_dataloader = train_dataloader
        self.test_dataloader = test_dataloader
        self.num_iterations = num_iterations
        self.learning_rate = lr

        self._reset()

    def _reset(self):
        """
        Set up some book-keeping variables for optimization. Don't call this
        manually.
        """
        # Set up some variables for book-keeping
        self.epoch = 0
        self.best_val_acc = 0
        self.best_params = {}
        self.loss_history = []
        self.train_acc_history = []
        self.val_acc_history = []
    
    


    def array_to_one_hot(arr, C):
     
        one_hot = np.zeros((arr.shape[0], C))

        for i in range(arr.shape[0]):

            idx = int(arr[i])
            one_hot[i, idx] = 1

        return one_hot
    
    def _step(self,X_batch,y_batch):

        loss,grads = self.model.loss(X_batch,y_batch)
        self.loss_history.append(loss)

        for p,w in self.model.params.items():
            
            dw = grads[p]
            next_w = self.update_rule(w, dw, self.learning_rate)
            self.model.params[p] = next_w
            # self.optim_configs[p] = next_config

    
    def update_rule(self,w,dw,lr):
        
        
        w -= lr * dw
        
        return w

    
    def train(self):


        for t in range(self.num_iterations):

            for i, (imgs, labels) in enumerate(train_loader):
                
                X_batch = (imgs.squeeze(1).reshape((imgs.shape[0], np.prod(imgs.shape[1:], axis=0)))).cpu().numpy().astype(np.float128)
                labels = array_to_one_hot(labels.cpu().numpy(), 10)

                self._step(X_batch,labels)

           

                print(
                    "(Iteration %d / %d) loss: %f"
                    % (t + 1, self.num_iterations, self.loss_history[-1])
                )


In [None]:
solver = Solver(TwoLayerModel(784,64,10),train_loader,test_loader,1,1e-3)

solver.train()

# Testing/Validation

# Plots