# Create a NN library which uses as few libs as possible

# Common Functions

In [None]:
# def bin_classification(x):
#     return 1 if x >= 0.5 else 0
# bin_class_vectorised = np.vectorize(bin_classification)


# def regression_predict(w, x):
#     return w * x
# regression_predict_vectorized = np.vectorize(regression_predict)    

# To do
- how to do back prop for > 1 hidden layer
- add bias term

In [12]:
import numpy as np

class Data_Helper():
    def is_list_of_lists(container):        
        if not isinstance(container, list):            
            raise TypeError('Container must be of data type: List.')
            return False
        else:
            for idx, item in enumerate(container):
                if not isinstance(item, list):
                    msg = 'Items in container must be of data type "list"'
                    msg += f'Error item: index: {idx}.' 
                    raise TypeError(msg)                    
                    return False
        return True

class Sigmoid():
    activation_type = 'Sigmoid'
    
    def execute(self, input_matrix):
        z = np.exp(-input_matrix)
        sig = 1 / (1 + z)      
        return sig

class Sigmoid_Stable():
    activation_type = 'Sigmoid_Stable'
    
    def execute(self, input_matrix):
        sig = np.where(input_matrix < 0, np.exp(input_matrix)/(1 + np.exp(input_matrix)), 
                       1/(1 + np.exp(-input_matrix)))
        return sig
    

class Linear():
    activation_type = 'Linear'
    
    def execute(self, input_matrix):        
        return input_matrix * 1    
    

class Model():
    """
        Model Error depends on ALL layers, not just the output layer,
        hence, errors should be calc at this level, not at the output layer level.
        
        contains history of each epoc's weights, preds and cost
        hence, for each epoc:
            `layer class` takes forwards and predicts            
            preds are stored in this `class Model`
            costs is caculated and stored here
            weights are stored here too.
            
        and 
        def .save_model_architecture(), saves to a pickle:
            . the model (i.e. all layers and their parameters)
        def .save_trained_model()
            . saves the weights and bias
        def .load_model_architecture()
        def .load_trained_model()
            load weights and bias and inits it to the right layer
            
    """
    
    def __init__(self, layers):
        self._layers = layers  # a list of layer objects        
        self._preds = None
        self._probs = None
        self._errors = None
        self._len_targets = None
        self._cost = None
        self._error_derivative = None
        self._weight_delta = None
        
        
    def print_model_architecture(self):
        for idx, layer in enumerate(self._layers):
            print(f"\nLayer #{idx}:")
            layer.print_layer_details()
            
        return True
    
    def matrix_mul_dims_ok(layer_prev, weights):
        # class method
        m, n1 = layer_prev.shape
        print(f'\n\nLprev.shape: \tm = {m} \tby n = {n1}')

        n2, p = weights.shape
        print(f'x.shape: \tn = {n2} \tby p = {p}')

        if n1 != n2:
            print('n1 != n2, matrix multiplication will fail.\n')
            print('either A or B should be reshaped if possible.')
            return False
        else:
            return True
        
    def train(self, X, y, epochs=1, learning_rate=0.001):
        """
        X: train data
        y: targets
        epochs: Number of times we loop through a complete dataset
                consisting of N rows.
                
        each epoch, we save a history of:
        . weights, bias, preds, costs
        
        for each epoch:
            forward once
            get preds
            calc error
            calc cost
            back prop (update weights)                
        
        """
        for epoch in self._epochs:
            print(f"epoch #: {epoch}")
            for idx, layer in enumerate(self._layers):
                if layer_details['layer_type'].lower() == 'input layer':
                    continue
                else:
                    layer.forward(self._layers[idx-1].get_layer_matrix())

                if layer_details['name'].lower() == 'output':                   
                    if layer_details['layer_type'].lower() == 'output_regression':
                        self._preds = layer.predict()
                    else:
                        self._preds = layer.predict(threshold=threshold) 
                    
                self._probs = layer.get_layer_matrix()
                    

    def predict(self, threshold=0.5):
        # Forward Pass ONCE and does a prediction
        pred = None
        for idx, layer in enumerate(self._layers):
            layer_details = layer.get_layer_details()
            print(f"\nLayer #{idx}:")
            # print('*'*50)
            # print(f"name: \t\t{layer_details['name']}")
            # print(f"layer_type: \t{layer_details['layer_type']}")
            # print(f"num_nodes_prev: {layer_details['num_nodes_prev']}")
            # print(f"num_nodes: \t{layer_details['num_nodes']}")
            # print(f"layer_shape: \t{layer_details['layer_shape']}")
                        
            if layer_details['layer_type'].lower() == 'input layer':
                continue
            else:
                # Forward Once
                layer.forward(self._layers[idx-1].get_layer_matrix())    
                
                print(f"after forward pass:\n{layer.get_layer_matrix()}")
                
            if layer_details['name'].lower() == 'output':                   
                if layer_details['layer_type'].lower() == 'output_regression':
                    self._preds = layer.predict()
                else:
                    self._preds = layer.predict(threshold=threshold) 
                    
                self._probs = layer.get_layer_matrix()
                return self._preds
            
    def get_proba(self):
        return self._probs
    
    def get_model_error(self, targets):
        # model's difference between predictions and targets
        
        if Data_Helper.is_list_of_lists(targets):
            print('list of lists targets ok')
            self._len_targets = len(targets)
            # errors = preds - targets
            # preds = weights * input
            # so, errors = (weights * input) - targets 
            # targets and inputs are given, we can only adjust the weights to reduce the error
            self._errors = self._preds - targets   # a matrix
            return self._errors
    
    def get_model_cost(self, cost_fn='Mean Squared Error'):        
        # sum(errors) / len(targets) to obtain a single scalar
        # there are many types of cost fns e.g. 
        # For regression: "Mean Absolute Error", "Mean Squared Error"
        # For classification: "Binary Cross Entrophy", "Categorical Cross Entrophy"
        if cost_fn == 'Mean Squared Error':
            self.cost = np.sum(np.square(self._errors)) / self._len_targets
        elif cost_fn == 'Mean Absolute Error':
            self.cost = np.sum(np.abs(self._errors)) / self._len_targets
        return self.cost
    
    def get_weight_delta(self):
        self._error_derivative = 2 * self._errors
        
        # self._weight_delta = self._error_derivative * Inputs        
        # rc: inputs is the input layer or the preceding layer ?
        
        return self._weight_delta
    
    def update_weights(self):
        # weights -= learning_rate * np.sum(self._weight_delta) / len(self._weight_delta)
        pass
                
    def save_model_architecture(self):
        pass
    
    def save_trained_model(self):
        pass

    def load_model_architecture(self):
        pass
    
    def load_trained_model(self):
        pass
    
    
        
class Layer():
    layer_type = 'Base Layer'
    
    def __init__(self, layer_name, nodes_prev_layer, nodes_in_layer):
        self._layer_name = layer_name
        self._nodes_prev_layer = nodes_prev_layer
        self._nodes = nodes_in_layer
    
    def get_layer_details(self):
        layer_details = {}
        layer_details['name'] = self._layer_name
        layer_details['layer_type'] = self.layer_type  # uses child class's class variable
        layer_details['num_nodes_prev'] = self._nodes_prev_layer
        layer_details['num_nodes'] = self._nodes
        layer_details['layer_shape'] = self._layer_matrix.shape
        layer_details['layer_matrix'] = self._layer_matrix
        return layer_details
    
    def print_layer_details(self):
        layer_details = self.get_layer_details()        
        print('*'*50)
        print(f"name: \t\t{layer_details['name']}")
        print(f"layer_type: \t{layer_details['layer_type']}")
        print(f"num_nodes_prev: {layer_details['num_nodes_prev']}")
        print(f"num_nodes: \t{layer_details['num_nodes']}")
        print(f"layer_shape: \t{layer_details['layer_shape']}")
        
    
    # def forward(self, X=None):
    #     # stub fn, implementation is to be overriden in derived classes if required.
    #     pass
    
    def get_layer_matrix(self):
        return self._layer_matrix

    
class Input_Layer(Layer):
    layer_type = 'Input Layer'
    
    def __init__(self, layer_name, nodes_prev_layer, nodes_in_layer, data):
        super().__init__(layer_name, nodes_prev_layer, nodes_in_layer)  
        
        if Data_Helper.is_list_of_lists(data):
            self._data = data
            
        self._layer_matrix = np.array(data).reshape(-1, self._nodes)  # representation of this layer

    def data_wrangling_fns(self):
        # placeholder - otherwise, no reason to have a child class just for input
        pass
    
        
class Fully_Connected_Layer(Layer):
    layer_type = 'Fully Connected Layer'
    
    def __init__(self, layer_name, nodes_prev_layer, nodes_in_layer, act_fn):
        super().__init__(layer_name, nodes_prev_layer, nodes_in_layer)  
        
        self._weights_matrix = np.ones(self._nodes_prev_layer * self._nodes)
        # Todo: Uncomment next line when testing is over
        #self._weights_matrix = np.random.rand(self._nodes_prev_layer * self._nodes)
        self._weights_matrix = self._weights_matrix.reshape(self._nodes_prev_layer, self._nodes)
               
        self._layer_matrix = np.zeros(self._nodes).reshape(1,self._nodes)  # representation of this layer
        
        self.act_fn = act_fn

    def get_layer_details(self):
        layer_details = super().get_layer_details()
        layer_details['weights_shape'] = self._weights_matrix.shape
        layer_details['weights_matrix'] = self._weights_matrix
        layer_details['activation'] = self.act_fn.activation_type
        return layer_details
    
    def print_layer_details(self):
        super().print_layer_details()
        layer_details = self.get_layer_details()
        print(f"weights_shape: \t{layer_details['weights_shape']}")
        print(f"activation: \t{layer_details['activation']}")

    def get_weights_matrix(self):
        return self._weights_matrix
    
    def forward(self, X):
        if X.ndim == 1:
            X = X.reshape(1,-1)
            
        # .sum( axis=0) collapses the rows into 1 row
        #self._layer_matrix = np.sum(np.dot(X, self._weights_matrix), axis=0)        
        self._layer_matrix = np.dot(X, self._weights_matrix)
        
        # print(f'before act fn: {self.get_layer_matrix()}')        
        self._layer_matrix = self.act_fn.execute(self._layer_matrix)                
        
        if self._layer_matrix.ndim ==1:
            self._layer_matrix = self._layer_matrix.reshape(1,-1)            
    
    def test(self):
        self.act_fn.execute(888)
       
        
class Output_Binary_Classification(Fully_Connected_Layer):
    layer_type = 'Output_Binary_Classification'
    
    def __init__(self, layer_name, nodes_prev_layer, nodes_in_layer, act_fn):
        nodes_in_layer = 1 # hardcode coz binary classification
        
        super().__init__(layer_name, nodes_prev_layer, nodes_in_layer, act_fn)  
        
        self._predicted_class = None
    
    def get_probability_matrix(self):
        return self._layer_matrix
        
    def predict(self, threshold=0.5):
        # returns an array of array, hence [0][0] to get the int

        # both the below are the same, but the .astype() ver is supposed to be twice as fast
        # but ver B can only produce 0 and 1 while ver A is more flexible
        #self._predicted_class = np.where(self._layer_matrix >= threshold, 1, 0)]  # ver A
        self._predicted_class = (self._layer_matrix>threshold).astype(int)  # ver B
        # nb: .round() is the slowest, in some cases 10 times slower
        # ver A and B are faster than list comprehension coz they are vectorized.
        return self._predicted_class

# not implemented, act fn sh be softmax    
# class Output_MultiClass_Classification(Fully_Connected_Layer): # predict 1 class out of multiple classes 
# class Output_MultiLabel_Classification(Fully_Connected_Layer): # predict p-values of 1 or more classes
                                                                 #   an input can belong to >1 class

class Output_Regression(Fully_Connected_Layer):
    layer_type = 'Output_Regression'
    
    def __init__(self, layer_name, nodes_prev_layer, nodes_in_layer, act_fn):
        nodes_in_layer = 1 # hardcode coz regression
        
        super().__init__(layer_name, nodes_prev_layer, nodes_in_layer, act_fn)  
        
    def predict(self):        
        return self._layer_matrix
    


### Test the NN Code

In [None]:
# # Test the NN code    
# data = [
#     [1,1,1],
#     [2,2,2],
#     [3,3,3],
#     [4,4,4]]
                 
# output_type = "Binary Classification"
# #output_type = 'Regression'

# if output_type == "Binary Classification":
#     layers = [
#         Input_Layer('Input', 0, 3, data),
#         Fully_Connected_Layer('H1', 3, 5, Sigmoid()),
#         Fully_Connected_Layer('H2', 5, 8, Sigmoid()),
#         Output_Binary_Classification('Output', 8, 1, Sigmoid())    
#     ]
# elif output_type == 'Regression':    
#     layers = [
#         Input_Layer('Input', 0, 3, data),
#         Fully_Connected_Layer('H1', 3, 5, Sigmoid()),
#         Fully_Connected_Layer('H2', 5, 8, Sigmoid()),
#         Output_Regression('Output', 8, 1, Linear())
#     ]

# model = Model(layers)
# #model.print_model_architecture()

# if output_type == 'Regression':
#     pred = model.predict()
#     print(f"\nModel predicts: {pred:0.2f}")
# else:
#     pred = model.predict(threshold=0.5)
#     print(f"\nModel predicts: 'Class {pred}`")

# Temp Testing

##### Regression

In [13]:
data = [[1],[2],[3],[4]]
targets = [[2],[4],[6],[8]]
#targets = [[1],[2],[3],[4]]
                 
layers = [
        Input_Layer('Input', 0, 1, data),
        Fully_Connected_Layer('H1', 1, 1, Linear()),
        Output_Regression('Output', 1, 1, Linear())
]

model = Model(layers)
#model.train()

#model.print_model_architecture()
pred = model.predict()
probs = model.get_proba()
print(f"\nModel proba: \n{probs}")
print("\nRegression:")
print(f'targets:\n{targets}')
print(f"Model predicts: \n{pred}")
errors = model.get_model_error(targets)
print(f"Model Errors: \n{errors}")
cost = model.get_model_cost(cost_fn='Mean Squared Error')
print(f"Model Cost: \n{cost}")


Layer #0:

Layer #1:
after forward pass:
[[1.]
 [2.]
 [3.]
 [4.]]

Layer #2:
after forward pass:
[[1.]
 [2.]
 [3.]
 [4.]]

Model proba: 
[[1.]
 [2.]
 [3.]
 [4.]]

Regression:
targets:
[[2], [4], [6], [8]]
Model predicts: 
[[1.]
 [2.]
 [3.]
 [4.]]
list of lists targets ok
Model Errors: 
[[-1.]
 [-2.]
 [-3.]
 [-4.]]
Model Cost: 
7.5


##### Bin Classification

In [14]:
data = [[1],[2],[3],[4]]
targets = [[1],[0],[0],[1]]
                 
layers = [
        Input_Layer('Input', 0, 1, data),
        Fully_Connected_Layer('H1', 1, 1, Linear()),
        Output_Binary_Classification('Output', 1, 1, Sigmoid())
]

model = Model(layers)
#model.print_model_architecture()
pred = model.predict(threshold=0.75)
probs = model.get_proba()
print(f"\nModel proba: \n{probs}")
print("\nBin Classification")
print(f'targets:\n{targets}')
print(f"Model predicts: \n{pred}")
errors = model.get_model_error(targets)
print(f"Model errors: \n{errors}")
cost = model.get_model_cost()
print(f"Model Cost: \n{cost}")


Layer #0:

Layer #1:
after forward pass:
[[1.]
 [2.]
 [3.]
 [4.]]

Layer #2:
after forward pass:
[[0.73105858]
 [0.88079708]
 [0.95257413]
 [0.98201379]]

Model proba: 
[[0.73105858]
 [0.88079708]
 [0.95257413]
 [0.98201379]]

Bin Classification
targets:
[[1], [0], [0], [1]]
Model predicts: 
[[0]
 [1]
 [1]
 [1]]
list of lists targets ok
Model errors: 
[[-1]
 [ 1]
 [ 1]
 [ 0]]
Model Cost: 
0.75
