In [1]:
import numpy as np
import matplotlib.pyplot as plt
import tqdm
import time

In [2]:
class Dataloader:
    
    def __init__(self, _X, _Y, _batch_size = None, shuffle = True):
        assert len(_X) == len(_Y)
        self.X = _X
        self.Y = _Y
        self.batch_size = _batch_size
        self.shuffle = shuffle

    def shuffle(self):
        indices = np.arange(len(self.X))
        np.random.shuffle(indices)
        self.X = np.array([self.X[i] for i in indices])
        self.Y = np.array([self.Y[i] for i in indices])
    
    def get_data(self):
        return self.X, self.Y
    
    def __iter__(self):
        
        if self.shuffle: self.shuffle()
        
        if self.batch_size == None:
            yield (np.matrix(self.X), np.matrix(self.Y))
            return
            
        for i in range(0, len(self.X), self.batch_size):
            yield (np.matrix(self.X[i:i + self.batch_size]), 
                   np.matrix(self.Y[i:i + self.batch_size]))

In [3]:
class LeakyReLU:
    def __init__(self, negative_slope = 0.01):
        self.negative_slope = 0.01
    
    def __leaky_relu(self, matrix):
        temp = np.matrix(matrix, dtype=float)
        return np.maximum(temp, temp*self.negative_slope)

    def derivative(self, matrix):
        result = np.matrix(matrix, dtype=float)
        result[result > 0] = 1
        result[result < 0] = self.negative_slope
        return result
    
    def __call__(self, matrix):
        return self.__leaky_relu(matrix)
    
    def __repr__(self):
        return 'LeakyReLU'
    
    def __str__(self):
        return 'LeakyReLU'
    
class Identical:
    
    def __init__(self): pass
    
    def __identical(self, matrix):
        return matrix

    def derivative(self, matrix):
        return np.matrix(np.ones(matrix.shape))
    
    def __call__(self, matrix):
        return self.__identical(matrix)
    
    def __repr__(self):
        return 'Identical'
    
    def __str__(self):
        return 'Identical'
    
class Sigmoid:
    
    def __init__(self): pass

    def __sigmoid(self, matrix):
        return np.matrix(1/(1+np.exp(matrix)))

    def derivative(self, matrix):
        sigmoid = self.__sigmoid(matrix)
        return np.multiply(sigmoid, (1- sigmoid))
    
    def __call__(self, matrix):
        return self.__sigmoid(matrix)
    
    def __repr__(self):
        return 'Sigmoid'
    
    def __str__(self):
        return 'Sigmoid'

class Softmax:
    
    def __init__(self): pass

    def val(self, matrix):
        new_mat = matrix - np.max(matrix, axis = 0)
        softmax = np.exp(-1 * new_mat) / np.sum(np.exp(-1 * new_mat), axis = 0)
        return softmax

    def derivative(self, matrix):
        return np.matrix(np.ones(matrix.shape))
    
    def __call__(self, matrix):
        return self.val(matrix)
    
    def __repr__(self):
        return 'Softmax'
    
    def __str__(self):
        return 'Softmax'


In [4]:
class MSE:
    
    def __init__(self): pass

    def __mse(self, predicted_val, expected_val):
        
        assert np.shape(predicted_val) == np.shape(expected_val)
        predicted_val, expected_val = np.matrix(predicted_val).copy(), np.matrix(expected_val).copy()
        return np.mean(np.power(predicted_val - expected_val, 2), -1)

    def derivative(self, predicted_val, expected_val):
        assert np.shape(predicted_val) == np.shape(expected_val)
        predicted_val, expected_val = np.matrix(predicted_val).copy(), np.matrix(expected_val).copy()
        return 2*(predicted_val - expected_val)  
    
    def __call__(self, expected_val, predicted_val):
        return self.__mse(predicted_val, expected_val)
    
    def __repr__(self):
        return 'L2'
    
    def __str__(self):
        return 'L2'
    
class CrossEntropy:
    
    def __init__(self): pass
    
    def __crossentropy(self, predicted_val, expected_val):
        assert np.shape(predicted_val) == np.shape(expected_val)
        expected_val[expected_val < 0] = 1e-4
        expected_log = np.log(expected_val)
        return np.mean(np.multiply(predicted_val, expected_log)) * -1
    
    def derivative(self, predicted_val, expected_val):
        assert np.shape(predicted_val) == np.shape(expected_val)
        res = np.multiply(predicted_val, 1 / expected_val) * -1
        return res
    
    def __call__(self, predicted_val, expected_val):
        return self.__crossentropy(predicted_val, expected_val)
    
    def __repr__(self):
        return 'CrossEntropy'
    
    def __str__(self):
        return 'CrossEntropy'

class CrossEntropySoftmax:
    
    def __init__(self): pass

    def val(self, predicted_val, expected_val):
        assert np.shape(predicted_val)==np.shape(expected_val)
        
        predicted_val[predicted_val < 0] = 1e-4
        predicted_log = np.log(predicted_val)
        return np.mean(np.multiply(expected_val, predicted_log)) * -1
        
    def derivative(self, predicted_val, expected_val):
        assert np.shape(predicted_val) == np.shape(expected_val)
        cross_entropy_derivative = expected_val - predicted_val
        return cross_entropy_derivative
    
    def __call__(self, predicted_val, expected_val):
        return self.val(predicted_val, expected_val)

    def __repr__(self):
        return 'CrossEntropyForSoftmax'
    
    def __str__(self):
        return 'CrossEntropyForSoftmax'

In [5]:
def random_weight(dim1, dim2, limit):
    limits = {'LOW': 150, 'MEDIUM': 100, 'HIGH': 10}
    return np.matrix(np.random.rand(dim1, dim2)) / 100

def zero_weight(dim1, dim2):
    return np.matrix(np.zeros((dim1, dim2)))

In [6]:
class Layer:
    weight_initializer_dict = {'zero': zero_weight, 'random': random_weight}
    def __init__(self, neurons, prev_layer_neurons, activation, weights_init = 'random', w_limit = 'MEDIUM'):
        
        assert type(weights_init) == str, 'Undefined activation function!'
        assert weights_init in self.weight_initializer_dict, 'Undefined weight initialization function!'
        
        self.neurons = neurons
        self.activation_function = activation
        weight_initializer = self.weight_initializer_dict[weights_init]
        self.w = weight_initializer(self.neurons, prev_layer_neurons, w_limit)
        self.b = weight_initializer(self.neurons, 1, w_limit)
        
        
        self.input = None
        self.linear_output = None
        self.activated_output = None
        
        self.last_weight_updating_value = 0
        self.last_bias_updating_value = 0
        self.dout_dl = None
        
    def forwardprop(self, _input):
        
        assert np.ndim(_input) == 2
        assert self.weight.shape[1] == _input.shape[0]
        
        self.input = np.matrix(_input).astype(float)
        self.linear_output = np.matmul(self.w, self.input) + self.b
        self.activated_output = self.activation_function(self.linear_output)
        self.dout_dl = self.activation_function.derivative(self.linear_output)
        return self.activated_output
    
    
    def backprop(self, backprop_tensor, lr, momentum):
        assert np.ndim(backprop_tensor) == 2
        assert backprop_tensor.shape[0] == self.dout_dl.shape[0]
        assert backprop_tensor.shape[1] == self.dout_dl.shape[1]
        
        
        backprop_tensor = np.matrix(backprop_tensor).astype(float)
        backprop_tensor = np.multiply(backprop_tensor, self.dout_dl)
        bias_updating_value = (1 - momentum) * np.sum(backprop_tensor, axis = 1) / backprop_tensor.shape[1] \
                            + momentum * self.last_bias_updating_value
        weight_updating_value = (1 - momentum) * np.matmul(backprop_tensor, self.input.T) / backprop_tensor.shape[1] \
                            + momentum * self.last_weight_updating_value
        backprop_tensor = np.matmul(self.w.T, backprop_tensor)

        self.w -= lr * weight_updating_value
        self.b -= lr * bias_updating_value
        
        self.last_weights_updating_value = weight_updating_value
        self.last_bias_updating_value = bias_updating_value
        
        return backprop_tensor
    
    def get_number_of_neurons(self):
        return self.neurons

In [7]:
class FeedForwardNN:
    
    def __init__(self, input_shape):
        
        self.input_shape = input_shape
        self.output_shape = None
        self.layers = []
        self.DEFAULT_LR = 1e-3
        self.DEFAULT_MOMENTUM = 0
        self.lr = None
        self.momentum = None
        self.loss = None
        self.lr_AUTO = False
        
    def learning_rate(self, epoch):
        return (390)/(epoch**(8/3)+103000)
    
    def add_layer(self, neurons, activation = LeakyReLU(), initial_weight = 'random', w_limit = "MEDIUM"):
         
        assert type(neurons) == int, "Invalid number of neurons for the layer!"
        assert neurons > 0, "Invalid number of neurons for the layer!"
        
        if len(self.layers): prev_neurons = self.layers[-1].get_number_of_neurons()
        else: prev_neurons = self.input_shape
            
        new_layer = Layer(neurons, prev_neurons, activation, initial_weight, w_limit)
        self.layers.append(new_layer)
        self.output_shape = self.layers[-1].get_number_of_neurons()
        
    
    def set_training_param(self, loss = MSE(), **param):
        assert self.layers, "Uncomplete model!"
        self.loss = loss
        self.lr = param['lr'] if 'lr' in param.keys() else self.DEFAULT_LR
        self.momentum = param['momentum'] if 'momentum' in param.keys() else self.DEFAULT_MOMENTUM
        if self.lr == 'AUTO': self.lr_AUTO = True
    
    def predict(self, X):
        return self.forward(X)
    
    def accuracy(self, X_test, X_train, Y_test, Y_train):
        Y_pred_test = self.predict(X_test.T)
        test_acc = np.mean(np.equal(np.argmax(Y_pred_test, axis = 0), np.argmax(Y_test.T, axis = 0)))
        Y_pred_train = self.predict(X_train.T)
        train_acc = np.mean(np.equal(np.argmax(Y_pred_train, axis = 0), np.argmax(Y_train.T, axis = 0)))
        return test_acc, train_acc
    
    def get_network_info(self):
        print(len(self.layers), 'layers:')
        for layer in self.layers:
            print(layer.get_number_of_neurons(), 'neurons.', 'activation function:', layer.activation_function)
        print('Momentum:\t' + str(self.momentum))
        print('Loss Function:\t', self.loss)
        if self.lr_AUTO:
            plt.plot(range(150), [self.learning_rate(i) for i in range(150)])
            plt.title('Learning rate decay')
            plt.show()
        else: print('Learning rate:\t' + str(self.lr))
    
    def epoch_log(self, i, train_loss, test_loss, train_acc, test_acc, acc):
        print('-' * 15, 'EPOCH: ' + '#' + str(i), '-' * 15)
        print('learning rate:\t' + str(self.lr))
        print('trin Loss:\t' + str(round(train_loss, 4)) + '\ttest Loss:\t' + str(round(test_loss, 4)))
        if acc: print('trin accuracy:\t' + str(round(train_acc, 4)) + '\ttest accuracy:\t' + str(round(test_acc, 4)))
        print()
    
    def forward(self, input_tensor):
        assert type(self.output_shape) != None, "Model is not compiled!"
        
        output_tensor = input_tensor
        for network_layer in self.layers:
            output_tensor = network_layer.forwardprop(output_tensor)  
        return output_tensor
    
    def optimize(self, backprop_tensor):
        for network_layer in reversed(self.layers):
            backprop_tensor = network_layer.backprop(backprop_tensor, self.lr, self.momentum)
        
    def fit(self, EPOCHS, trainloader, testloader, log = False, acc = False):
        train_accs, test_accs, train_losses, test_losses = [], [], [], []
        train_acc, test_acc = 0, 0
        result = {}
        now = time.time()
        for i in range(EPOCHS):
            if self.lr_AUTO: self.lr = round(self.learning_rate(i), 5)
            train_loss = self.epoch_train(trainloader)
            test_loss = self.epoch_test(testloader)
            train_losses.append(train_loss), test_losses.append(test_loss)
            if acc:
                X_train, Y_train = trainloader.get_data()
                X_test, Y_test = testloader.get_data()
                test_acc, train_acc = self.accuracy(X_test, X_train, Y_test, Y_train)
                train_accs.append(train_acc), test_accs.append(test_acc)
            if log: self.epoch_log(i, train_loss, test_loss, train_acc, test_acc, acc)
        print('-'*31)
        print('--- Tooks ' + str() + '(s) to fit. ---')
        print('-'*31)
        result['train_loss'], result['test_loss'] = train_losses, test_losses
        result['train_acc'], result['test_acc'] = train_accs, test_accs
        return result
    
    def epoch_train(self, trainloader):
        batch_losses = []
        for x_train, y_train in trainloader:
            batch_output = self.forward(x_train.T)
            backprop_tensor = self.loss.derivative(batch_output, y_train.T)
            self.optimize(backprop_tensor)
            batch_loss = np.mean(self.loss(batch_output, y_train.T))
            batch_losses.append(batch_loss)
        return np.mean(batch_losses)
    
    def epoch_test(self, testloader):
        batch_losses = []
        for x_test, y_test in testloader:
            test_output = self.forward(x_test.T)
            batch_loss = np.mean(self.loss(test_output, y_test.T))
            batch_losses.append(batch_loss)
        return np.mean(batch_losses)        