<a href="https://colab.research.google.com/github/alex-movila/ML-Colab-Tutorials/blob/master/NeuralNetOptimized.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import cupy as np

In [None]:
import os

def load(dataset):
    dataset_file = f"{dataset}.npz"
    if not os.path.exists(dataset_file):
        raise FileNotFoundError(f"Dataset \"{dataset}\" does not exist")
    return np.load(dataset_file)

def split_batches(batch_size, *datasets):
    res = []
    for ds in datasets:
        num_batches = int(np.ceil(ds.shape[0] / batch_size))
        res.append(np.array_split(ds, num_batches))
    return res


In [None]:
class Dense():
    def __init__(self, shape, activation=None):
        self.shape = shape
        self.biases = np.zeros((shape[1], 1))
        self.weights = np.random.randn(shape[1], shape[0]) / np.sqrt(shape[0] / 2)
        self.weighted_sums = []
        self.inputs = []
        self.bias_sensitivity = []
        self.weight_sensitivity = []

        if activation == "relu":
            self.activation = self.relu
            self.d_activation = self.d_relu
        elif activation == "sigmoid":
            self.activation = self.sigmoid
            self.d_activation = self.d_sigmoid
        elif activation == "softmax":
            self.activation = self.softmax
            self.d_activation = self.d_softmax

    def activation(self, inp):
        return inp

    def d_activation(self, inp):
        return inp

    def feedforward(self, inputs):
        self.inputs = inputs
        self.weighted_sums = np.matmul(self.weights[np.newaxis, ...], inputs) + self.biases[np.newaxis, ...]
        return self.activation(self.weighted_sums)

    def backprop(self, activation_sensitivity):
        # partial derivatives magic
        # calc how sensitive the bias is (calculate the sigmoid deriv and multiply by how sensitive the output activations are)
        if self.activation == self.softmax:
            # tranposes along the second and third axis
            self.bias_sensitivity = np.matmul(self.d_activation(self.weighted_sums), activation_sensitivity)
        else:
            self.bias_sensitivity = activation_sensitivity * self.d_activation(self.weighted_sums)

        # calc how sensitive weights are, input is transposed for matrix multiplication purposes
        self.weight_sensitivity = np.matmul(self.bias_sensitivity, self.inputs.transpose([0, 2, 1]))
        # return the sensitivity of previous activation, will be used in next layer
        return np.matmul(self.weights.transpose()[np.newaxis, ...], self.bias_sensitivity)

    def relu(self, inp):
        return inp * (inp > 0)

    def d_relu(self, inp):
        return 1. * (inp > 0)

    def sigmoid(self, z):
        return 1.0 / (1.0 + np.exp(-z))

    def d_sigmoid(self, z):
        return self.sigmoid(z) * (1 - self.sigmoid(z))

    def softmax(self, x):
        e_x = np.exp(x - np.max(x, axis=1, keepdims=True))
        return e_x / e_x.sum(axis=1, keepdims=True)

    def d_softmax(self, x):
        # jacobian matrix magic
        sm = self.softmax(x).squeeze()  # shape: (n, 10)

        # Expand sm to create a (n, 10, 1) and (n, 1, 10) for broadcasting
        sm_col = sm[:, :, np.newaxis]  # (n, 10, 1)
        sm_row = sm[:, np.newaxis, :]  # (n, 1, 10)

        # Compute Jacobian matrix for each example using broadcasting
        jacobian = sm_row * np.identity(10) - sm_col * sm_row  # (n, 10, 10)

        return jacobian


In [None]:
import json
import time

class Network:
    def __init__(self, cost="crossentropy"):
        self.layers = []
        self.size = []
        if cost == "MSE":
            self.cost = self.MSE
            self.d_cost = self.d_MSE
        if cost == "crossentropy":
            self.cost = self.xentropy
            self.d_cost = self.d_xentropy

    def cost(self, output, expected):
        return output

    def d_cost(self, output, expected):
        return output

    def add_layers(self, *layers):
        for l in layers:
            self.layers.append(l)
            if len(self.size) != 0:
                self.size.pop()
            self.size.append(l.shape[0])
            self.size.append(l.shape[1])

    def feedforward(self, input):
        for l in self.layers:
            input = l.feedforward(input)
        return input

    def backprop(self, output, expected):
        # calculate how sensitive the output activation is to the cost func
        activation_sensitivity = self.d_cost(output, expected)
        # loop through model, running backprop to get the gradients
        for i in range(len(self.layers) - 1, -1, -1):
            activation_sensitivity = self.layers[i].backprop(activation_sensitivity)

    def sgd(self, training_data, learn_rate, lmbda, epochs=1, validation_data=None):
        tr_x, tr_y = training_data
        previous_cost = None
        train_data_size = 0
        for t in training_data[0]:
          train_data_size += len(t)
        val_data_size = 0
        for v in validation_data[0]:
          val_data_size += len(v)

        for i in range(epochs):
            start = time.time()
            training_correct = 0
            training_total = 0
            training_cost = 0
            for batch_num in range(len(tr_x)):
                # feed data through model to calc weights, biases, activations, etc
                batch_size = tr_x[batch_num].shape[0]
                output = self.feedforward(tr_x[batch_num])
                self.backprop(output, tr_y[batch_num])
                self.update_parameters(learn_rate, lmbda, batch_size, train_data_size)

                training_correct += np.sum(np.argmax(output, axis=1) == np.argmax(tr_y[batch_num], axis=1))
                training_total += batch_size
                training_cost += np.sum(self.cost(output, tr_y[batch_num]))
                print(f"\rEpoch {i + 1}/{epochs}: ({batch_num + 1}/{len(tr_x)}) | Accuracy: {training_correct}/{training_total} | Cost: {training_cost / training_total}", end="")

            elapsed = (time.time() - start) * 1000
            print(f"\rEpoch {i + 1} Complete, Time: {round(elapsed)}ms ({round(elapsed / len(tr_x))}ms/batch), Test Accuracy: {training_correct} / {training_total}, Average Test Cost: {training_cost / training_total}", end="")
            if validation_data:
                correct, total_cost = self.evaluate(validation_data)
                avg_cost = total_cost / val_data_size
                print(f", Validation Accuracy: {correct} / {val_data_size}, Validation Average Cost = {np.round(avg_cost, 5)} {'({0:+})'.format(np.round(avg_cost - previous_cost, 5)) if previous_cost is not None else ''}", end="")
                previous_cost = avg_cost
            print()


    def update_parameters(self, learn_rate, lmbda, batch_size, total_size):
        # update weight and biases based on gradients
        for l in self.layers:
            total_bias_sens = np.sum(l.bias_sensitivity, axis=0)
            total_weight_sens = np.sum(l.weight_sensitivity, axis=0)
            l.biases -= learn_rate * (total_bias_sens / batch_size)
            l.weights -= learn_rate * (total_weight_sens / batch_size) + learn_rate * (lmbda / total_size) * l.weights

    def evaluate(self, validation_data):
        val_x, val_y = validation_data
        total_cost = 0
        total_correct = 0
        for i in range(len(validation_data[0])):
          out = self.feedforward(val_x[i])

          total_cost += np.sum(self.cost(out, val_y[i]))
          total_correct += np.sum(np.argmax(out, axis=1) == np.argmax(val_y[i], axis=1))

        return total_correct, total_cost

    def MSE(self, output, expected):
        return np.square(output - expected)

    def d_MSE(self, output, expected):
        return 2 * (output - expected)

    def xentropy(self, output, expected):
        return -expected * np.log(output)

    def d_xentropy(self, output, expected):
        return -expected/output

    def load_model(self, filename):
        f = open(filename, "r")
        data = json.load(f)
        f.close()
        if (self.size != data["sizes"]):
            raise TypeError(f"Data does not match network size of {self.size}")

        for i in range(len(data["weights"])):
            w = np.array(data["weights"][i])
            self.layers[i].weights = w

        for i in range(len(data["biases"])):
            w = np.array(data["biases"][i])
            self.layers[i].biases = w

    def export_model(self, filename):
        data = {
            "sizes": self.size,
            "weights": [l.weights.tolist() for l in self.layers],
            "biases": [l.biases.tolist() for l in self.layers]
        }

        f = open(filename, "w")
        json.dump(data, f)
        f.close()



In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import locale
locale.getpreferredencoding = lambda: "UTF-8"


In [None]:
!cp /content/drive/MyDrive/mnist.npz mnist.npz

In [None]:
data = load("mnist")
train_data, train_ans, test_data, test_ans = data["train_data"] / 255.0, data["train_ans"], data["test_data"] / 255.0, data["test_ans"]
train_data, train_ans = split_batches(64, train_data, train_ans)
test_data, test_ans = split_batches(100, test_data, test_ans)
print("dataset loaded")

dataset loaded


In [None]:
net = Network(cost="crossentropy")

net.add_layers(
    Dense((784, 80), activation="relu"),
    Dense((80, 80), activation="relu"),
    Dense((80, 10), activation="softmax"),
)

# net.load_model("network.json")

net.sgd((train_data, train_ans), 0.001, 0.01, epochs=100, validation_data=(test_data, test_ans))

net.export_model("network.json")

Epoch 1 Complete, Time: 9129ms (10ms/batch), Test Accuracy: 23282 / 60000, Average Test Cost: 1.9815222629563225, Validation Accuracy: 6290 / 10000, Validation Average Cost = 1.61146 
Epoch 2 Complete, Time: 4899ms (5ms/batch), Test Accuracy: 42047 / 60000, Average Test Cost: 1.31432902504639, Validation Accuracy: 7659 / 10000, Validation Average Cost = 1.04069 (-0.57077)
Epoch 3 Complete, Time: 4141ms (4ms/batch), Test Accuracy: 47214 / 60000, Average Test Cost: 0.9014261853827125, Validation Accuracy: 8177 / 10000, Validation Average Cost = 0.75718 (-0.28351)
Epoch 4 Complete, Time: 4047ms (4ms/batch), Test Accuracy: 49680 / 60000, Average Test Cost: 0.7011210269133893, Validation Accuracy: 8454 / 10000, Validation Average Cost = 0.61683 (-0.14035)
Epoch 5 Complete, Time: 5085ms (5ms/batch), Test Accuracy: 51054 / 60000, Average Test Cost: 0.5931457802531758, Validation Accuracy: 8631 / 10000, Validation Average Cost = 0.53479 (-0.08205)
Epoch 6 Complete, Time: 4030ms (4ms/batch), Te