In [1]:
#!pip install numpy

Imports

In [2]:
import numpy as np
from abc import ABC, abstractmethod
import math

Base Neural Network Class

In [52]:
# Currently only implements a linear network structure
# input_shape does not include batch size
# input_shape needs to be refactored.  Currently takes an int
class NeuralNetwork():
    def __init__(self, input_shape):
        self.input_shape = input_shape
        self.layers = []

    def add_layer(self, layer):
        input_shape = self.layers[-1].output_shape if len(self.layers) > 0 else self.input_shape 
        layer.build(input_shape=input_shape)
        self.layers.append(layer)

    def set_loss_function(self, loss_func, loss_deriv):
        self.loss_func = loss_func
        self.loss_deriv = loss_deriv

    # Lacks error checking on input dimensions
    def train(self, X, Y, lr, epochs, batch_size=None):
        num_samples = X.shape[0]
        if batch_size is None or batch_size > num_samples:
            batch_size = num_samples
        num_batches = math.ceil(num_samples / batch_size)

        # NOTE: this method is quick and dirty and will only work for a linear network
        for epoch in range(epochs):
            error = 0

            # Should probably shuffle batches(or samples?)...
            for i in range(0, num_batches, 1):
                X_batch = X[batch_size*i : batch_size*(i+1)]
                Y_batch = Y[batch_size*i : batch_size*(i+1)]
                
                output = X_batch
                for layer in self.layers:
                    output = layer.forward_prop(output)

                Y_batch = Y_batch.reshape(output.shape)

                # Compute reported error(loss)
                # In Keras, reported losses are the average of per sample losses in each batch
                # Assumption: error function returns avg error of samples within batch
                # Multiply by number of samples in batch, then later divide by total number of samples
                # This accounts for variable batch size
                error += self.loss_func(output, Y_batch) * X_batch.shape[0]

                error_gradient = self.loss_deriv(output, Y_batch)
                for layer in reversed(self.layers):
                    error_gradient = layer.backprop(error_gradient)

                # Update using the computed weight gradients
                for layer in self.layers:
                    layer.update(lr)

            # Divide total error by number of samples for per-sample mean error
            error /= len(X)
            
            print("Epoch {:d}: {:f}".format(epoch, error))


    def predict(self, X):
        output = X
        for layer in self.layers:
            output = layer.forward_prop(output)
        return output

Base Layer Class

In [54]:
class Layer(ABC):
    def __init__(self):
        self.input_shape = None
        self.output_shape = None

    # Used to set expected input, output dimensions once adjacent layers are known,
    # as well as construct weight matrices
    @abstractmethod
    def build(self, input_shape=None, output_shape=None):
        raise NotImplementedError

    @abstractmethod
    def forward_prop(self, input):
        raise NotImplementedError

    @abstractmethod
    def backprop(self, error, lr):
        raise NotImplementedError

    def update(self, lr):
        return



Fully Connected(Dense) Layer

In [50]:

# aka Dense Layer
class FullyConnectedLayer(Layer):
    def __init__(self, neurons, input_shape=None, weight_range=(-0.5,0.5)):
        super().__init__()
        self.neurons = neurons
        self.input_shape = input_shape
        self.output_shape = neurons
        self.weight_range = weight_range
        self.weights = None
        self.bias = None
        self.grad_weights = None
        self.bias_weights = None
        self.num_samples_used = 0

    def build(self, input_shape=None, output_shape=None):
        # Output shape is equal to neurons for standard dense layer
        self.input_shape = input_shape or self.input_shape
        self.init_weights()

    # Weight initialization
    #   See https://www.analyticsvidhya.com/blog/2021/05/how-to-initialize-weights-in-neural-networks/
    #   -Small magnitude is recommended
    #   -Heuristics are good
    #   -Just randomizing with a range of 1
    def init_weights(self):
        min_w = self.weight_range[0]
        max_w = self.weight_range[1]
        self.weights = np.random.uniform(min_w, max_w, (self.input_shape, self.neurons))
        self.bias = np.zeros((1, self.neurons))
        # Matrices for summed weight gradients during backprop
        # Used to store gradients for post-backprop GD update
        self.grad_weights = np.zeros(self.weights.shape)
        self.grad_bias = np.zeros(self.bias.shape)
        # Stores number of samples adding to current gradient sum matrices
        self.num_samples_used = 0  

    def forward_prop(self, inputs):
        # Track amount of samples included in this batch so far
        # Required for averaging sum of sample gradients in update
        self.num_samples_used += inputs.shape[0]

        # Y = XW + B, where
        #   X is vector of inputs
        #   W is matrix of weights
        #   B is column of biases
        outputs = np.matmul(inputs, self.weights) + self.bias
        self.inputs = inputs
        return outputs

    # dE_dY is of shape (1, neurons)
    def backprop(self, dE_dY):
        # Compute gradient
        # Recall Y = XW + B, where
        #   X is vector of inputs
        #   W is matrix of weights
        #   B is column of biases
        # So, for a given weight wij(neuron i, weight from input j)
        #   yi = xj*wij + bi
        #   dE/dwij = dE/dyi * dyi/dwij             = dE/dyi * xj
        #   dE/dbi  = dE/dyi * dyi/dbi = dE/dyi * 1 = dE/dyi
        # (inputs, outputs) = (inputs, 1) . (1, outputs)
        transpose = self.inputs.swapaxes(-1,-2)
        dE_dW = np.matmul(transpose, dE_dY)
        dE_dB = dE_dY  

        # For each layer, have matrix of weight/bias derivatives matching weight dimensions
        # Add onto it for each sample, then divide by batch size for avg deriv
        self.grad_weights += np.sum(dE_dW, axis=0)
        self.grad_bias += np.sum(dE_dB, axis=0)

        # Pass along error gradient(dE_dX)
        # Y(output) of previous layer is this layer's X(input)
        #   dE/dxj = dE/dyi * dyi/dxj               = dE/dyi * wij
        # (1, inputs) = (1, outputs) . (outputs, inputs)
        dE_dX = np.dot(dE_dY, self.weights.T)
        return dE_dX

    def update(self, lr):
        # Average summed weight gradients by dividing by number of samples in batch
        self.grad_weights /= self.num_samples_used
        self.grad_bias /= self.num_samples_used

        # Update via gradient descent
        self.weights  = self.weights - (lr * self.grad_weights)
        self.bias = self.bias - (lr * self.grad_bias)

        # Reset gradient sums, batch size count for next batch
        self.grad_weights = np.zeros(self.weights.shape)
        self.grad_bias = np.zeros(self.bias.shape)
        self.num_samples_used = 0



Activation Layer

In [6]:
class ActivationLayer(Layer):
    def __init__(self,activation_func, derivative_func):
        super().__init__()
        self.activation = activation_func
        self.derivative = derivative_func

    def build(self, input_shape=None, output_shape=None):
        if input_shape is not None:
            self.input_shape = input_shape
        self.output_shape = self.input_shape

    def forward_prop(self, inputs):
        self.inputs = inputs
        outputs = self.activation(inputs)
        return outputs

    # dE_dY = (1, outputs)
    # derivative(inputs) is another vector of (1, inputs)
    # |inputs| = |outputs| since just applying function to each  
    def backprop(self, dE_dY, lr):
        dY_dX = self.derivative(self.inputs)
        dE_dX = dE_dY * dY_dX
        return dE_dX

    def update(self, lr):
        return

        

Activations

In [53]:
def relu(x):
    return np.maximum(0,x)

def relu_d(x):
    return np.where(x <= 0, 0, 1)

def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def sigmoid_d(x):
    return sigmoid(x) * (1 - sigmoid(x))
    # Equivalent to?
    #return np.exp(-x) / np.power(1 + np.exp(-x), 2)

def tanh(x):
    return np.tanh(x)

def tanh_d(x):
    return 1 - np.power(np.tanh(x),2)

# Temperature controls "confidence"
# AKA low temperature(<1) means high values will be counted more strongly, small values even smaller
# High temperature(>1) means everything is more similar
def softmax(X, temp=1):
    E = np.exp(X)
    sum = np.sum(np.exp(X))
    return E / sum

def softmax_d(X):
    return softmax(X)
    #raise NotImplementedError

Loss Functions

In [8]:
# TODO
class LossFunction(ABC):
    def __init__(self):
        self.func = None
        self.deriv = None  

# Sum of squared errors for all samples divided by num samples
# Will this work for batch?
# https://stackoverflow.com/questions/55936214/correct-way-to-calculate-mse-for-autoencoders-with-batch-training
def mse(y_pred, y_true):
    #return np.mean(np.power(y_true - y_pred, 2))
    return np.mean(np.mean(np.power(y_true - y_pred, 2), axis=1))

def mse_d(y_pred, y_true):
    return -2 * (y_true - y_pred) / y_true.size;


def cross_entropy(y_pred, y_true):
    raise NotImplementedError

def cross_entropy_d(y_pred, y_true):
    raise NotImplementedError


Basic Test

In [17]:
np.set_printoptions(suppress=True)

x_train = np.array([[[0,0]], [[0,1]], [[1,0]], [[1,1]]])
y_train = np.array([[[0]], [[1]], [[1]], [[0]]])

#x_train = np.array([[0,0], [0,1], [1,0], [1,1]])
#y_train = np.array([[0], [1], [1], [0]])

#print(x_train.shape)
#print(y_train.shape)

model = NeuralNetwork(x_train.shape[-1]) #(x_train.shape[1:])
model.add_layer(FullyConnectedLayer(3))
model.add_layer(ActivationLayer(tanh, tanh_d))
model.add_layer(FullyConnectedLayer(1))
model.add_layer(ActivationLayer(tanh, tanh_d))
model.set_loss_function(mse, mse_d)

model.train(x_train, y_train, 0.1, 1000, batch_size=1)

pred = model.predict(x_train)
print(pred)

Epoch 0: 0.416257
Epoch 1: 0.312337
Epoch 2: 0.293873
Epoch 3: 0.288785
Epoch 4: 0.286772
Epoch 5: 0.285680
Epoch 6: 0.284918
Epoch 7: 0.284286
Epoch 8: 0.283708
Epoch 9: 0.283148
Epoch 10: 0.282590
Epoch 11: 0.282023
Epoch 12: 0.281442
Epoch 13: 0.280841
Epoch 14: 0.280219
Epoch 15: 0.279572
Epoch 16: 0.278899
Epoch 17: 0.278198
Epoch 18: 0.277466
Epoch 19: 0.276702
Epoch 20: 0.275904
Epoch 21: 0.275071
Epoch 22: 0.274202
Epoch 23: 0.273296
Epoch 24: 0.272350
Epoch 25: 0.271365
Epoch 26: 0.270339
Epoch 27: 0.269272
Epoch 28: 0.268163
Epoch 29: 0.267012
Epoch 30: 0.265819
Epoch 31: 0.264585
Epoch 32: 0.263310
Epoch 33: 0.261994
Epoch 34: 0.260640
Epoch 35: 0.259248
Epoch 36: 0.257820
Epoch 37: 0.256359
Epoch 38: 0.254867
Epoch 39: 0.253347
Epoch 40: 0.251802
Epoch 41: 0.250237
Epoch 42: 0.248654
Epoch 43: 0.247059
Epoch 44: 0.245456
Epoch 45: 0.243848
Epoch 46: 0.242242
Epoch 47: 0.240640
Epoch 48: 0.239049
Epoch 49: 0.237473
Epoch 50: 0.235915
Epoch 51: 0.234380
Epoch 52: 0.232872
Epo

Test

In [None]:
from keras.datasets import mnist
from keras.utils import np_utils

# X is of shape (samples, 28, 28) with each value being [0-255] (greyscale)
# Y is of shape (samples,) with values [0-9]
(x_train, y_train), (x_test, y_test) = mnist.load_data()

# Reshape to (samples, w*h), EXCEPT
# Actually (samples, 1, w*h) for matrix multiplication reasons in first layer backprop
# Should add a fix in NN class to handle this internally
# Maybe InputLayer class
x_train = x_train.reshape((x_train.shape[0], 1, np.prod(x_train.shape[1:]))) / 255
x_test = x_test.reshape((x_test.shape[0], 1, np.prod(x_test.shape[1:]))) / 255

# Normalization/scaling makes a big difference here(may depend on final layer)
#x_train /= 255
#x_test /= 255

y_train = np_utils.to_categorical(y_train)
y_test = np_utils.to_categorical(y_test)

act = sigmoid
act_d = sigmoid_d

model = NeuralNetwork(x_train.shape[-1])
model.add_layer(FullyConnectedLayer(100))
model.add_layer(ActivationLayer(act, act_d))
model.add_layer(FullyConnectedLayer(100))
model.add_layer(ActivationLayer(act, act_d))
model.add_layer(FullyConnectedLayer(10))
model.add_layer(ActivationLayer(act, act_d))
model.set_loss_function(mse, mse_d)

def calc_loss(model, X, Y, batch_size, loss_func):
    error = 0
    num_batches = math.ceil(X.shape[0] / batch_size)
    for i in range(0, num_batches, 1):
        X_batch = X[batch_size*i : batch_size*(i+1)]
        Y_batch = Y[batch_size*i : batch_size*(i+1)]
        
        output = X_batch
        for layer in model.layers:
            output = layer.forward_prop(output)

        Y_batch = Y_batch.reshape(output.shape)
        e = loss_func(output, Y_batch) 
        error += e * X_batch.shape[0]
    return error

# Error per sample will be same magnitude
# Mean of 1 sample will be same as mean of many samples
# So with batches, sum of errors will be a factor of batch_size smaller
# Multiply final error by batch_size?
#print(calc_loss(model, x_train, y_train, 1, mse))
#print(calc_loss(model, x_train, y_train, 100, mse))

np.random.seed(10)
#model.train(x_train[:1000], y_train[0:1000], 0.1, 30, batch_size=1)
model.train(x_train[:1000], y_train[0:1000], 0.1*32, 500, batch_size=32)

np.set_printoptions(precision=2)
n_y = 10
predicts = model.predict(x_test[:n_y])
for i in range(n_y):
    print("Predict={:d}  True={:d}".format(np.argmax(predicts[i]),np.argmax(y_test[i])))


TODO: Batch/minibatch updates, input shapes, softmax, crossentropy, refactor, check for others 