In [None]:
%load_ext autoreload
%autoreload 2

# adding parent directory to import path
# otherwise simply place the 'network' folder in the same directory
import sys
import os
parent = os.path.dirname(os.path.abspath(''))
sys.path.append(parent)


import network
from network.layers.layer import Layer

import gc
import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score

In [None]:
def Parameter():
    
    def __init__(self, p):
        
        self.p = p
        self.shape = p.shape
        
        self.m1 = np.zeros(self.shape)
        self.m2 = np.zeros(self.shape)
        
    def optimize(self, dp, param):

        optimizer = param.get('optimizer', 'sgd')
        lr = param.get('lr', 1e-3)
        batch = param.get('batch', 16)
        decay_rate = param.get('decay', 0)

        momentum = param.get('momentum', 0.9)
        beta1, beta2 = param.get('beta', (0.9, 0.999))

        eps = param.get('eps', 1e-16)
        t = param.get('t', 1)

        if optimizer.lower() == 'adam':
            self.m1 = beta1 * self.m1 + (1 - beta1) * dp
            self.m2 = beta2 * self.m2 + (1 - beta2) * np.square(dp)
            u1 = self.m1 / (1 - beta1 ** t)
            u2 = self.m2 / (1 - beta2 ** t)
            dp = (lr * u1 / (np.sqrt(u2) + eps))

        elif optimizer.lower() == 'momentum':
            self.m1 = momentum * self.m1 + (1-momentum) * dp
            dp = (lr * self.m1)

        elif optimizer.lower() == 'sgd':
            dp = (lr * dp)
        
        self.p -= dp
        self.decay(decay_rate)
        
    def decay(self, rate):
        self.p *= (1-rate)
    
    def __repr__(self):
        raise NotImplementedError

In [None]:
class Linear_layer(Layer):

    def __init__(self, input_nodes, output_nodes, bias=0):
        """
        Arguments:
        (int) input_nodes = number of input nodes,
        (int) output_nodes = number of output nodes,
        (bool) bias: enable or disable bias,
        """
        
        self.type = 'linear'
        
        # number of inputs & outputs
        self.input_nodes = input_nodes
        self.output_nodes = output_nodes
        self.bias = bias
        self.reset()
    
    def reset(self):
        """The actual init function, seperate from __init__ to allow NeuralNetworks to be re-initialized"""

        weights = np.random.randn(self.input_nodes, self.output_nodes) / np.sqrt(self.input_nodes/2)
        if self.bias:
            weights = np.concatenate((weights, np.ones((1,self.output_nodes))*bias), axis=0)

        self.weights = Parameter(weights)
        
    
    def forward(self, X, param):
        """
        Forward inputs
        Arguments:
        (2d array) X: input, in the form of 2d numpy array #of instances * #of attributes
        """
        if self.bias:
            # concat ones to x as additional column
            X = np.concatenate((X, np.ones((X.shape[0], 1))), axis=1)
        
        # calculate output
        output = np.dot(X, self.weights)
        
        # record inputs & outputs for weight update later
        self.input = X
        self.output = output
        
        return output
    
    def backward(self, dout, param):
        """
        Error backpropogation function,
        Calls self.update to update weights,
        Returns this layer's error for the preceding error to propogate
        
        Arguments:
        (2d array) dout: error from the superior layer,
        (dict) param
        """
        
        lr = param.get("lr", 1e-3)
        decay = param.get("decay", 0.01)
        
        # calculate error to pass
        if self.bias:
            dx = np.dot(dout, self.weights.T[:,:-1])  # bias is not passed
        else:
            dx = np.dot(dout, self.weights.T)
        
        # update self
        dw = np.dot(self.input.T, dout)
        dw, self.m1, self.m2 = self.optimize(self.delta(dw, self.m1, self.m2), param)
        self.weights = (1 - lr*decay) * self.weights - dw
        
        return dx

In [None]:
# load
train = pd.read_csv("tests/mnist_train.csv", header=None)
test = pd.read_csv("tests/mnist_test.csv", header=None)

# preprocess
X   = train.iloc[:, 1:].to_numpy(np.float32) / 255.0 * 0.99 + 0.01
X_t = test.iloc[:, 1:].to_numpy(np.float32) / 255.0 * 0.99 + 0.01

# one hot encode
# np.eye() creates an identity matrix
# we then create the one hot matrix by referencing every element
y   = np.eye(10)[train.iloc[:,0].to_numpy((int))]
y_t = np.eye(10)[test.iloc[:,0].to_numpy((int))]
y_true = np.argmax(y_t, axis=1)

del test, train

In [None]:
# define network
nn = network.NeuralNetwork([
                    network.Linear_layer(784, 200, bias=None),
                    network.Activation_layer('ReLU'),
    
                    network.Linear_layer(200, 10, bias=None),
                    network.Activation_layer('fast_softmax')
                    ])
param = {"lr": 1e-3, 'batch': 16, "mode": "train", "eps": 1e-9, "beta":(0.9, 0.999), 
         "epoch": 0, 'optimizer': 'Adam', 't': 1, 'clip': 1.0, 'decay': 0.0}

In [None]:
nn.train(X, y, param, loss_func='fast_cross_entropy')
yhat = nn(X_t, mode='classification')
accuracy = np.sum(yhat==y_true)/y_true.shape[0]
print(f"Accuracy = {accuracy}")