In [1]:
import numpy as np
import time

from dataset import get_2D_normalised



In [2]:
class EarlyStopping:
    def __init__(self, patience=10, delta=0.0):
        self.patience = patience
        self.delta = delta
        self.counter = 0
        self.best_loss = None
        self.early_stop = False

    def __call__(self, val_loss):
        if self.best_loss is None:
            self.best_loss = val_loss
        elif val_loss >= self.best_loss - self.delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_loss = val_loss
            self.counter = 0

In [3]:
class ReduceLR:
    def __init__(self, lr=0.01, patience=10, delta=0.0):
        self.patience = patience
        self.delta = delta
        self.counter = 0
        self.lr = lr
        self.best_loss = None
        self.reduce_lr = False

    def __call__(self, val_loss):
        if self.best_loss is None:
            self.best_loss = val_loss
        elif val_loss >= self.best_loss - self.delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.reduce_lr = True
        else:
            self.best_loss = val_loss
            self.counter = 0
            
    def get_lr(self):
        return self.lr
            
    def reset(self):
        self.lr /= 10
        self.reduce_lr = False

In [4]:
class DenseLayer:
    def __init__(self, neurons):
        self.neurons = neurons
        
    def relu(self, inputs):
        """
        ReLU Activation Function
        """
        return np.maximum(0, inputs)

    def softmax(self, inputs):
        """
        Softmax Activation Function
        """
        
        in_mean = np.mean(inputs, axis=(0, 1), dtype=np.float16)
        in_std = np.std(inputs, axis=(0, 1), dtype=np.float16)
        
        in_norm = (inputs - in_mean) / in_std
        
        exp_scores = np.exp(in_norm)
        probs = exp_scores / np.sum(exp_scores, axis=1, keepdims=True)
        return probs
    
    def relu_derivative(self, dA, Z):
        """
        ReLU Derivative Function
        """
        dZ = np.array(dA, copy = True, dtype=np.float16)
        dZ[Z <= 0] = 0
        return dZ
    
    def forward(self, inputs, weights, bias, activation):
        """
        Single Layer Forward Propagation
        """
        Z_curr = np.dot(inputs, weights.T) + bias
        if activation == 'relu':
            A_curr = self.relu(inputs=Z_curr)
        elif activation == 'softmax':
            A_curr = self.softmax(inputs=Z_curr)

        return A_curr, Z_curr
    
    def backward(self, dA_curr, W_curr, Z_curr, A_prev, activation):
        """
        Single Layer Backward Propagation
        """
        if activation == 'softmax':
            dW = np.dot(A_prev.T, dA_curr)
            db = np.sum(dA_curr, axis=0, keepdims=True)
            dA = np.dot(dA_curr, W_curr) 
        else:
            dZ = self.relu_derivative(dA_curr, Z_curr)
            dW = np.dot(A_prev.T, dZ)
            db = np.sum(dZ, axis=0, keepdims=True)
            dA = np.dot(dZ, W_curr)

        return dA, dW, db

In [5]:
class Network:
    def __init__(self):
        self.network = []  # layers
        self.architecture = []  # mapping input neurons --> output neurons
        self.params = []  # W, b
        self.memory = []  # Z, A
        self.gradients = []  # dW, db
        self.layers_size = len(self.network)

    def add(self, layer):
        """
        Add layers to the network
        """
        self.network.append(layer)
        self.layers_size += 1

    def _compile(self, data):
        """
        Initialize model architecture
        """
        for idx in range(self.layers_size):

            input_dim = data.shape[1] if idx == 0 else self.network[idx - 1].neurons
            output_dim = self.network[idx].neurons
            activation = 'relu' if idx != self.layers_size - 1 else 'softmax'

            self.architecture.append({'input_dim': input_dim,
                                      'output_dim': output_dim,
                                      'activation': activation})

        return self

    def _init_weights(self, data):
        """
        Initialize the model parameters 
        """
        self._compile(data)

        np.random.seed(99)

        for i in range(self.layers_size):
            w = np.float16(np.random.uniform(low=-1, high=1,
                                             size=(self.architecture[i]['output_dim'],
                                                   self.architecture[i]['input_dim'])))
            b = np.float16(np.zeros((1, self.architecture[i]['output_dim'])))

            self.params.append({'W': w, 'b': b})

        return self

    def _forwardprop(self, data):
        """
        Performs one full forward pass through network
        """
        A_curr = data

        for i in range(self.layers_size):
            A_prev = A_curr
            A_curr, Z_curr = self.network[i].forward(inputs=A_prev,
                                                     weights=self.params[i]['W'],
                                                     bias=self.params[i]['b'],
                                                     activation=self.architecture[i]['activation'])

            self.memory.append({'inputs': A_prev, 'Z': Z_curr})

        return A_curr

    def _backprop(self, predicted, actual):
        """
        Performs one full backward pass through network
        """
        num_samples = len(actual)

        # compute the gradient on predictions
        dscores = predicted
        dscores[np.arange(num_samples).reshape(-1, 1), actual] -= 1
        dscores /= num_samples

        dA_prev = dscores

        for idx, layer in reversed(list(enumerate(self.network))):
            dA_curr = dA_prev

            A_prev = self.memory[idx]['inputs']
            Z_curr = self.memory[idx]['Z']
            W_curr = self.params[idx]['W']

            activation = self.architecture[idx]['activation']

            dA_prev, dW_curr, db_curr = layer.backward(
                dA_curr, W_curr, Z_curr, A_prev, activation)

            self.gradients.append({'dW': dW_curr, 'db': db_curr})

    def _update(self, lr_reduce):
        """
        Update the model parameters --> lr * gradient
        """
        lr = lr_reduce.get_lr()

        reversed_gradients = list(reversed(self.gradients))
        for idx in range(self.layers_size):
            self.params[idx]['W'] -= lr * reversed_gradients[idx]['dW'].T
            self.params[idx]['b'] -= lr * reversed_gradients[idx]['db']

    def _get_accuracy(self, predicted, actual):
        """
        Calculate accuracy after each iteration
        """
        return np.mean(np.argmax(predicted, axis=1) == np.argmax(actual, axis=1))
        # return np.mean(np.argmax(predicted, axis=1) == actual)

    def _calculate_loss(self, predicted, actual):
        """
        Calculate cross-entropy loss after each iteration
        """
        samples = len(actual)

        correct_logprobs = - \
            np.log(predicted[np.arange(samples).reshape(-1, 1), actual])
        data_loss = np.sum(correct_logprobs)/samples

        return data_loss

    def train(self, X_train, y_train, epochs):
        """
        Train the model using SGD
        """

        earlyStop = EarlyStopping()
        lr_reduce = ReduceLR(lr=0.01, patience=5, delta=1e-4)

        self.loss = []
        self.accuracy = []

        self._init_weights(X_train)

        for i in range(epochs):
            start_time = time.time()
            yhat = self._forwardprop(X_train)

            self.accuracy.append(self._get_accuracy(
                predicted=yhat, actual=y_train))
            self.loss.append(self._calculate_loss(
                predicted=yhat, actual=y_train))

            self._backprop(predicted=yhat, actual=y_train)

            earlyStop(self.loss[-1])
            lr_reduce(self.loss[-1])

            self._update(lr_reduce)

            end_time = time.time()
            print(f"EPOCH: {i}, ACCURACY: {self.accuracy[-1]}, LOSS: {self.loss[-1]}, TIME: {end_time-start_time} sec")

            if lr_reduce.reduce_lr and not earlyStop.early_stop:
                lr_reduce.reset()

            if earlyStop.early_stop:
                print(f"Stopping early at epoch: {i}")
                break


In [6]:
(train_data, train_labels), (test_data, test_labels) = get_2D_normalised()

In [7]:
# Convert labels to one-hot encoded vectors
num_classes = 10
train_labels_enc = np.eye(num_classes)[train_labels.ravel()]
test_labels_enc = np.eye(num_classes)[test_labels.ravel()]

In [8]:
# Create train and validation sets
val_split = 0.2
num_examples = train_data.shape[0]
val_size = int(val_split * num_examples)
train_size = num_examples - val_size

x_train = train_data[:train_size].astype('float16')
y_train = train_labels_enc[:train_size].astype('int')
x_val = train_data[train_size:].astype('float16')
y_val = test_labels_enc[train_size:].astype('int')

In [12]:
model = Network()
model.add(DenseLayer(2048))
model.add(DenseLayer(1024))
model.add(DenseLayer(512))
model.add(DenseLayer(256))
model.add(DenseLayer(128))
model.add(DenseLayer(64))
model.add(DenseLayer(y_train.shape[1]))

In [13]:
model._compile(x_train)
print(model.architecture)

[{'input_dim': 3072, 'output_dim': 2048, 'activation': 'relu'}, {'input_dim': 2048, 'output_dim': 1024, 'activation': 'relu'}, {'input_dim': 1024, 'output_dim': 512, 'activation': 'relu'}, {'input_dim': 512, 'output_dim': 256, 'activation': 'relu'}, {'input_dim': 256, 'output_dim': 128, 'activation': 'relu'}, {'input_dim': 128, 'output_dim': 64, 'activation': 'relu'}, {'input_dim': 64, 'output_dim': 10, 'activation': 'softmax'}]


In [10]:
model.train(x_train, y_train, 100)