In [1]:
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt
import numpy as np
import struct
import pickle
import time

In [2]:
class Layer:
    def __init__(self, layer_in_size: int, layer_out_size: int, activation: str):
        np.random.seed(42)
        self.weights = np.random.normal(loc=0, scale=(1 / layer_in_size), size=(layer_in_size, layer_out_size))
        self.biases = np.random.normal(loc=0, scale=(1 / layer_in_size), size=(layer_out_size, 1))

        if activation == 'tanh':
            self.__activation_function = lambda v: np.tanh(v)
            self.__derivative_function = lambda v: 1 - (np.tanh(v) ** 2)
        elif activation == 'sigmoid':
            self.__activation_function = lambda v: 1 / (1 + np.exp(-v))
            # self.__derivative_function = lambda v: self.__activation_function(v) * (1 - self.__activation_function(v))
            self.__derivative_function = lambda v: self.__sigmoid_derivative(v)
        else:
            self.__activation_function = lambda v: v
            self.__derivative_function = lambda v: 1

    def local_fields(self, data_in):
        return self.weights.T.dot(data_in) + self.biases

    def activations(self, local_fields):
        return self.__activation_function(local_fields)

    def derivatives(self, local_fields):
        return self.__derivative_function(local_fields)

    @staticmethod
    def __sigmoid_derivative(v):
        a = 1 / (1 + np.exp(-v))
        return a * (1 - a)

In [3]:
class NeuralNetwork:
    def __init__(self,
                 data_x: np.ndarray,
                 data_y: np.ndarray,
                 test_x: np.ndarray,
                 test_y: np.ndarray,
                 hidden_layers: tuple,
                 learning_rate: float,
                 lr_decay_factor: float = 0.9):
        self.data = data_x
        self.labels = data_y

        self.test_x = test_x
        self.test_y = test_y

        self.learning_rate = learning_rate
        self.decay_factor = lr_decay_factor

        self.n_features = self.data.shape[0]
        self.n_outputs = self.labels.shape[0]

        self.n_samples = self.data.shape[1]

        # Layers of neural network
        self.nn_layers = list()
        self.nn_layers.append(Layer(self.n_features, hidden_layers[0]['num_nodes'], hidden_layers[0]['activation']))
        for i in range(1, len(hidden_layers)):
            self.nn_layers.append(Layer(hidden_layers[i - 1]['num_nodes'], hidden_layers[i]['num_nodes'],
                                        hidden_layers[i]['activation']))
        self.nn_layers.append(Layer(hidden_layers[-1]['num_nodes'], self.n_outputs, 'tanh'))

    def calc_stats(self, data, labels):
        predictions = self.predict(data)
        mse = np.sum(((predictions - labels) ** 2) / labels.shape[1])
        acc = accuracy_score(np.argmax(labels, axis=0), np.argmax(predictions, axis=0))
        return mse, acc

    def predict(self, data):
        local_fields, activations = self.forward(data)
        return activations[-1]

    def forward(self, x_i):
        local_fields = list()
        activations = list()
        current_input = x_i
        for layer in self.nn_layers:
            z = layer.local_fields(current_input)
            local_fields.append(z)
            a = layer.activations(z)
            activations.append(a)
            current_input = a
        return local_fields, activations

    def backward(self, initial_delta, local_fields):
        current_delta = initial_delta
        layer_delta = list()
        for i in reversed(range(len(self.nn_layers))):
            if i == len(self.nn_layers) - 1:
                delta = current_delta * self.nn_layers[i].derivatives(local_fields[i])
            else:
                delta = self.nn_layers[i + 1].weights.dot(current_delta) * self.nn_layers[i].derivatives(
                    local_fields[i])
            layer_delta.insert(0, delta)
            current_delta = delta
        return layer_delta

    def update_layer_params(self, x_i, layer_delta, activations):
        current_input = x_i
        for layer, activation, delta in zip(self.nn_layers, activations, layer_delta):
            layer.weights = layer.weights - self.learning_rate * current_input.dot(delta.T)
            layer.biases = layer.biases - self.learning_rate * delta
            current_input = activation

    def train(self):
        train_epoch_stats = list()
        test_epoch_stats = list()
        train_mse, train_acc = self.calc_stats(self.data, self.labels)
        train_epoch_stats.append([0, train_mse, train_acc])
        test_mse, test_acc = self.calc_stats(self.test_x, self.test_y)
        test_epoch_stats.append([0, test_mse, test_acc])
        epoch_cnt = 1
        # while train_epoch_stats[-1][1] > 0.05 and epoch_vs_accuracy[-1][1] < 0.97:
        while test_epoch_stats[-1][2] < 0.955:
            start_time = time.time()
            for i in range(self.n_samples):
                x_i = self.data[:, i].reshape((self.n_features, 1))
                d_i = self.labels[:, i].reshape((self.n_outputs, 1))
                local_fields, activations = self.forward(x_i)
                y_i = activations[-1]
                initial_delta = 2 * (y_i - d_i) / self.n_samples
                delta_list = self.backward(initial_delta, local_fields)
                self.update_layer_params(x_i, delta_list, activations)
            train_mse, train_acc = self.calc_stats(self.data, self.labels)
            test_mse, test_acc = self.calc_stats(self.test_x, self.test_y)
            print(
                '[Epoch: {}] => Train MSE: {:.4f}, Train Accuracy: {:.4f}, Test Accuracy: {:.4f}, Epoch Duration: {:.4f} S'.format(
                    epoch_cnt, train_mse, train_acc, test_acc, time.time() - start_time))

            if test_acc <= test_epoch_stats[-1][2]:
                self.learning_rate = self.learning_rate * self.decay_factor

            train_epoch_stats.append([0, train_mse, train_acc])
            test_epoch_stats.append([0, test_mse, test_acc])
            epoch_cnt += 1
        return np.array(train_epoch_stats), np.array(test_epoch_stats)

    def save_params(self, save_path):
        nn_params = list()
        for layer in self.nn_layers:
            nn_params.append({
                'weight': layer.weights,
                'bias': layer.biases
            })
        with open(save_path, 'wb') as model_params:
            pickle.dump(nn_params, model_params)

    def load_params(self, params_path):
        nn_params = pickle.load(open(params_path, 'rb'))
        assert len(self.nn_layers) == len(nn_params)
        for layer, params in zip(self.nn_layers, nn_params):
            layer.weights = params['weight']
            layer.biases = params['bias']

In [4]:
def read_idx(filename):
    with open(filename, 'rb') as f:
        zero, data_type, dims = struct.unpack('>HBB', f.read(4))
        shape = tuple(struct.unpack('>I', f.read(4))[0] for d in range(dims))
        return np.frombuffer(f.read(), dtype=np.uint8).reshape(shape)

In [5]:
train_images = read_idx('data/train-images-idx3-ubyte')
train_images = train_images.reshape((train_images.shape[0], train_images.shape[1] * train_images.shape[2])).T

train_labels = read_idx('data/train-labels-idx1-ubyte')
train_labels = np.eye(10)[train_labels].T

test_images = read_idx('data/t10k-images-idx3-ubyte')
test_images = test_images.reshape((test_images.shape[0], test_images.shape[1] * test_images.shape[2])).T

test_labels = read_idx('data/t10k-labels-idx1-ubyte')
test_labels = np.eye(10)[test_labels].T

In [None]:
hidden_layer_params = (
    {'num_nodes': 256,
     'activation': 'tanh'},
    {'num_nodes': 16,
     'activation': 'tanh'}
)

nn_regressor = NeuralNetwork(train_images,
                             train_labels,
                             test_images,
                             test_labels,
                             hidden_layers=hidden_layer_params,
                             learning_rate=12,
                             lr_decay_factor=0.7)
train_epoch_stats, test_epoch_stats = nn_regressor.train()
nn_regressor.save_params('./model_params/256_16_tanh_params.pkl')

[Epoch: 1] => Train MSE: 0.2208, Train Accuracy: 0.9053, Test Accuracy: 0.9105, Epoch Duration: 163.2248 S
[Epoch: 2] => Train MSE: 0.1906, Train Accuracy: 0.9203, Test Accuracy: 0.9233, Epoch Duration: 164.1236 S


In [None]:
with open('./model_stats/256_16_tanh_stats.pkl', 'wb') as stats_file:
    pickle.dump((train_epoch_stats, test_epoch_stats), stats_file)