In [1]:
import numpy as np
from tensorflow.keras.datasets import mnist
import random

In [2]:
# Open file to store model parameters in
file = open(".\stored parameters", "w")

In [3]:
(trainImgs, trainLabels), (testImgs, testLabels) = mnist.load_data()

In [4]:
trainImgs = trainImgs/255
testImgs = testImgs/255

In [5]:
# Flatten each 28x28 image into one row
flatTrainImgs = trainImgs.reshape(60000, 28**2)
flatTestImgs = testImgs.reshape(10000, 28**2)

In [18]:
class Model:
    def __init__(self, nodeCounts, activationFuncs):
        # Input and output node count
        self.inCount = nodeCounts[0]
        self.outCount = nodeCounts[-1]

        # Number of layers
        self.layers = len(nodeCounts)

        # The activation function for each layer
        self.activationFuncs = activationFuncs
        
        # Initialize weights and biases
        self.params = [np.random.randn(nodeCounts[i]+1, nodeCounts[i+1]) for i in range(len(nodeCounts)-1)]
    
    # Run the model on a set of inputs
    def run(self, inArray):
        '''
        Returns all node activation values from a single input vector or 
        a set of input vectors

        --------
        Parameters
        inArray: ndarray
            Input row vector(s) to be entered into the model.\n
            Can be a 1d array for one input vector, or
            a 2d array for multiple input vectors

        --------
        Returns
        nodeValues: list of ndarrays
            Resulting node values for the input vector(s).\n
            Each element corresponds to a layer in the model, with leftmost being the input layer
        '''
        array1 = inArray

        nodeValues = [array1]
        for i in range(self.layers- 1):
            currentParams = self.params[i]

            array2 = np.matmul(array1, currentParams[:-1])
            array2 += currentParams[-1]
            
            array1 = self.activationFunc(array2, self.activationFuncs[i])
            
            nodeValues.append(array1)

        return nodeValues # Return list of node values at each layer
    

    # Calculate the gradient for a set of inputs using backpropagation
    def gradient(self, inArray, labels):
        '''
        Returns the gradient of the cost function
        '''
        nodeValues = self.run(inArray)

        # Correct output values according to labels
        labelArray = np.zeros((inArray.shape[0], self.outCount))
        labelArray[np.arange(inArray.shape[0]), labels] = 1

        # Derivatives of cost function w.r.t. weights and biases
        grad = []
        # Derivatives of cost function w.r.t. node values
        nodeGrad = [2*(nodeValues[-1] - labelArray)]
        
        for i in range(-1,-self.layers,-1):
            weights = self.params[i][:-1]

            activationDerivs = self.activationDeriv(nodeValues[i], self.activationFuncs[i])

            nodeDerivs = np.einsum("ij,kj->ki", weights, activationDerivs * nodeGrad[-1])
            nodeGrad.append(nodeDerivs)
            
            # Derivatives of node values w.r.t. weights and biases
            x = np.column_stack((nodeValues[i-1], np.ones(nodeValues[i-1].shape[0])))
            paramDerivs = np.einsum("ij,ik->ijk", x, activationDerivs)

            grad.append(np.einsum("ijk,ik->ijk", paramDerivs, nodeGrad[-2]))
        
        grad.reverse()
        grad = [np.sum(layer, axis=0) for layer in grad] # Total sum of gradients from all inputs

        return grad, np.sum((nodeValues[-1] - labelArray)**2)
    

    # Change the weights and biases based on the gradient and given step size
    def adjustParams(self, gradient, learnRate):
        for i in range(self.layers-1):
            self.params[i] -= gradient[i] * learnRate
    
    # Adjust model parameters based on samples of training data
    def train(self, trainData, labelData, batchSize, epochs, learnRate=0.001):
        trainDataSize = trainData.shape[0]
        batchGrad = [0] * (self.layers-1)

        for epoch in range(epochs):
            epochCost = 0

            for i in range(0, trainDataSize, batchSize):
                batchGrad, cost = self.gradient(trainData[i:i+batchSize], labelData[i:i+batchSize])
                
                epochCost += cost

                # True # of samples taken (differs from batchSize if at end of epoch)
                sampleCount = min(trainDataSize - i, batchSize)
                # Average the batch gradient
                batchGrad = [x/sampleCount for x in batchGrad]

                self.adjustParams(batchGrad, learnRate)
                
                print("Epoch {} {:%} complete. Avg cost: {}".format(
                    epoch+1, (i+1)/trainDataSize, epochCost/(i+1)
                ), end="\r")
            
            print("Epoch {} completed. Avg cost: {}".format(epoch+1, epochCost/trainDataSize))
    

    # The activation function
    def activationFunc(self, rawValues, function):
        if function.lower() == "sigmoid": return self.sigmoid(rawValues)
        elif function.lower() == "relu": return self.relu(rawValues)
        elif function.lower() == "leaky relu": return self.leakyRelu(rawValues)
        elif function.lower() == "identity": return rawValues

        raise ValueError('''Specified activation function is not supported
        Supported functions: identity, sigmoid, relu, leaky relu''')
    
    # Derivative of the activation function
    def activationDeriv(self, nodeValues, function):
        if function.lower() == "sigmoid": return self.sigmoidDeriv(nodeValues)
        elif function.lower() == "relu": return self.reluDeriv(nodeValues)
        elif function.lower() == "leaky relu": return self.leakyReluDeriv(nodeValues)
        elif function.lower() == "identity": return 1

        raise ValueError('''Specified activation function is not supported
        Supported functions: identity, sigmoid, relu, leaky relu''')


    def sigmoid(self, rawValues):
        return np.reciprocal(1 + np.exp(-rawValues))
    
    def sigmoidDeriv(self, nodeValues):
        return nodeValues - np.power(nodeValues, 2)

    def relu(self, rawValues):
        rawValues[rawValues < 0] = 0
        return rawValues
    
    def reluDeriv(self, nodeValues):
        nodeValues[nodeValues >= 0] = 1
        nodeValues[nodeValues < 0] = 0
        return nodeValues

    def leakyRelu(self, rawValues):
        rawValues[rawValues < 0] *= 0.01
        return rawValues
    
    def leakyReluDeriv(self, nodeValues):
        nodeValues[nodeValues >= 0] = 1
        nodeValues[nodeValues < 0] = 0.01
        return nodeValues


    def getParams(self):
        return self.params

    def setParams(self, newParams):
        assert len(newParams) == len(self.params)
        for i, k in zip(self.params, newParams):
            assert i.shape == k.shape
        
        self.params = newParams

In [19]:
model = Model([784, 64, 32, 10], ["leaky relu", "leaky relu", "sigmoid"])

In [None]:
i = random.choice(range(60000))

out = model.run(flatTrainImgs[0:5])[-1]

# print("Predicted", np.where(np.isclose(out, np.max(out)))[0][0])
# print("Actual:", trainLabels[i])
out

In [13]:
flatTrainImgs.shape

(60000, 784)

In [20]:
model.train(flatTrainImgs, trainLabels, 500, 50, 1e-4)

Epoch 1 completed. Avg cost: 5.54528015953234451728868535
Epoch 2 22.501667% complete. Avg cost: 5.6242390875476225

KeyboardInterrupt: 

In [45]:
flatTestImgs[random.sample(range(len(testLabels)), 5)]

array([[0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.]])