## Neural Net

I'm coding a tiny neural net that I want to learn to approximate XOR. I'm going to do this without relying on a dedicated package/library for the nets' data and computation.

First, let's create a representation of the number of units at each layer.

In [None]:
# Configurable variables
number_of_hidden_layers = 1
units_per_hidden_layer = 2

Let's print out the number of units including the inputs and outputs.

In [None]:
number_of_inputs = 2 # one input for each signal, parallel input
number_of_outputs = 1 # simulate binary output of an XOR gate
units_count = [] # array where index 0 is the # of input units and the final index is the number of output units

units_count.append(number_of_inputs)
for i in range(number_of_hidden_layers):
    units_count.append(units_per_hidden_layer)
units_count.append(number_of_outputs)

print(units_count)

Now, we'll connect the layers by initializing weights for edges.

In [None]:
import random

W = [] # Array of all weights in a simple, fully-connected feed-forward network

for layer_idx in range(len(units_count)-1):
    weights_from_layer_to_next = []
    prev_layer_units = units_count[layer_idx]
    next_layer_units = units_count[layer_idx+1]
    for next_node_idx in range(next_layer_units):
        weights_to_node = []
        for prev_node_idx in range(prev_layer_units + 1): # add 1 for bias weight
            initial_weight = random.random()
            weights_to_node.append(initial_weight)
        weights_from_layer_to_next.append(weights_to_node)
    W.append(weights_from_layer_to_next)

print(W)

One more thing that I realized was convenient before writing the forward pass: we should create an Edge class. We will be multiplying the weights that we have already by the activation values at that point. Creating an edge class that has this activation value and the weight is helpful as it keeps our code organized, and makes us more ready for backprop, where we'll need the edges cumulative backprop. gradient.

In [None]:
class Edge:

    def __init__(self, weight: float, input_value: int = 0):
        self.weight = weight # this edge's weight
        self.input_value = input_value # this edge's value before it is multiplied by the weight

    def __repr__(self):
        return str(self)

    def __str__(self):
        return str(self.weight)

    def output_value(self) -> float :
        return self.weight * self.input_value

Let's now copy data from our list of weights into an array of edges.

In [None]:
import numpy as np

edges = [] # Array of all edges in a fully-connected feed-forward network

for layer in W:
    layer_edges = []
    for node in layer:
        node_edges = []
        for edge_weight in node:
            edge = Edge(edge_weight)
            node_edges.append(edge)
        layer_edges.append(node_edges)
    edges.append(layer_edges)

We're almost ready for our foward pass calculation, but first we need to decide on a non-linearity for our network. We'll use the ReLu function.

In [None]:
def relu(x: float) -> float:
    return x if x > 0 else 0

Now we'll write our function for forward passes.

In [None]:
from typing import List, Callable, Union

def forward_pass(input_values: tuple[int, int], tensor: List[List[List[Edge]]], activation_fn: Callable, output_fn: Union[Callable, None] = None, verbose=False) -> List[float]:
    
    # give edges connecting inputs to the first hidden layer their initial input value
    for edges_to_hidden_unit in edges[0]:
        for input_value, edge in zip(input_values + (None,), edges_to_hidden_unit):
            is_bias_edge = input_value == None
            if not is_bias_edge:
                edge.input_value = input_value
            else:
                edge.input_value = 1

    # iterate from layer to layer with forward pass now
    for pre_activiation_edges, post_activation_edges in zip(tensor[:-1], tensor[1:]):
        for unit_idx, hidden_unit_pre_edges in enumerate(pre_activiation_edges):
            hidden_unit_input = 0
            for pre_edge in hidden_unit_pre_edges:
                hidden_unit_input += pre_edge.output_value()
                if verbose:
                    print('Pre-edge output value: ' + str(pre_edge.output_value()))
            hidden_unit_output = activation_fn(hidden_unit_input)
            # Hidden units are in terms of the hidden unit that the edge gives the output
            # Below, to get a list of of edges for the same input hidden unit
            # we iterate over the post edge hidden units and always use the same index
            for hidden_unit_post_edges in post_activation_edges:
                    hidden_unit_post_edges[unit_idx].input_value = hidden_unit_output
            if verbose:
                print('Hidden unit output: ' + str(hidden_unit_output))
        for hidden_unit_post_edges in post_activation_edges: # loop over sets of nodes going to units of next layer
            bias_idx = len(pre_activiation_edges)
            hidden_unit_post_edges[bias_idx].input_value = 1 # set bias edge input to 1

    # calculate the output
    outputs = []
    for edges_to_output_node in tensor[-1]:
        output_node_value = 0
        for edge in edges_to_output_node:
            output_node_value += edge.output_value()
        if output_fn is not None:
            output_node_value = output_fn(output_node_value)
        outputs.append(output_node_value)

    return outputs

In [None]:
def sigmoid(x: float) -> float:
    return 1 / (1 + np.exp(-x))

Now let's try to make some predictions without having done any training

In [None]:
sample_inputs = [(0, 0), (0, 1), (1, 0), (1, 1)]
expected_outputs = [0, 1, 1, 0]

for sample_input, expected_output in zip(sample_inputs, expected_outputs):
    print(f'Output is {forward_pass(sample_input, edges, relu, sigmoid)[0]} for input {sample_input}')

Great; we can can output wrong results! Let's try writing some code for backpropagation so that we can train our network.

First we realize we need a cost function. Our current network has one output node which should output the binary value, 1 or 0. This begs the question: what should we do if the network produces a value outside of this range. We definitely would need to penalize it, however, we'll take a different approach.

Instead, we'll perform forward passes that before returning the final output, constrain it with a sigmoid function.

For our cost function, we'll use cross-entropy loss (NLL).

In [None]:
import numpy as np

def cross_entropy(y_hat: float, y: float) -> float: # binary cross-entropy loss (NLL)
    if y == 1:
        return -np.log(np.clip(y_hat, .000001, 1)) # clip to avoid taking log of 0
    else:
        return -np.log(np.clip(1-y_hat, .000001, 1))

We also need the derivative of our activation function and the derivative of the pre-output + cost function to calculate updates to weights.

The derivative of our activation function, ReLu, is perhaps more straightforward than the latter derivative.

In [None]:
def relu_derivative(x: float) -> float:
    return 1 if x >= 0 else 0

The pre-output + cost function derivative involves the composition of the cross-entropy function which takes the sigmoid output as input. The use of the natural log in the cross-entropy function serves to cancel out the sigmoid function's derivative, which tends to 0 as the input tends to $\pm \inf$. In practical terms this means that learning can be very conservative when initial guesses are bad, and the natural log counters this so that learning is aggressive when very wrong. In fact, the derivative of the composition is $ \sigma(a) - y$, where a is the original input into the sigmoid function. It's no accident that it has such an elegant and useful derivative, and happily makes for a nice one liner.

In [None]:
def nll_derivative(y_hat: float, y: int) -> float:
    return y_hat - y

Now we'll write the backpropagation code.

First, a quick class to help organization

In [None]:
class Sample:

    def __init__(self, inputs: tuple[int, int], output: int):
        self.inputs = inputs # two ints that are either 0 or 1
        self.output = output # The expected output of the XOR function


In [None]:
from tqdm import tqdm

def backprop(tensor: List[List[List[Edge]]], samples: List[Sample], activation_grad_fn: Callable, output_cost_grad_fn: Callable, number_of_batches=1, iterations=10, alpha=1e-3) -> List[List[List[Edge]]]:
    reversed_edges = tensor[::-1]
    # split data into batches
    batches = []
    slice_size = int(len(samples) / number_of_batches) # naively leave extra samples out if it doesn't divide evenly
    for batch_idx in range(number_of_batches):
        start_idx = slice_size * batch_idx
        end_idx = (slice_size * (batch_idx + 1)) - 1
        batches.append(samples[start_idx:end_idx+1])

    for _ in range(iterations):
        #for batch in tqdm(batches): # as of 2022: .ipynb + vscode + tqdm = silent crashes and heachaches
        for batch in batches:
            # weight_gradient_update_batch = np.zeros(np.shape(reversed_edges)) # array to store cumulative gradients for each edge per batch and then divide by batch size
            # TODO: give the copy values of all 0's in a helper function
            weight_gradient_update_batch = create_gradient_tensor(reversed_edges) # list to store cumulative gradients for each edge per batch to later compute update
            cost_of_batch = 0
            for sample in batch:

                # run the forward pass
                y_hat = forward_pass(sample.inputs, tensor, relu, output_fn=sigmoid)[0] # TODO: tweak this to accomodate multi-dimensional output case
                y = sample.output
                cost = cross_entropy(y_hat, y)
                cost_of_batch += cost

                # calculate the gradient of the output input into the cost function and weights into final node for the batch
                # we seperate this code from other layer connections because of cost and output functions which differ from just the activation function
                backprop_chain_value = create_gradient_tensor(reversed_edges) # only stores the current example data; for backprop
                output_cost_gradient = output_cost_grad_fn(y_hat, y) # TODO: tweak this to accomodate multi-dimensional output case
                for output_node_idx, output_node_edges in enumerate(reversed_edges[0]):
                    for edge_idx, edge in enumerate(output_node_edges):
                        weight_gradient_one_sample = output_cost_gradient * edge.input_value
                        weight_gradient_update_batch[0][output_node_idx][edge_idx] += weight_gradient_one_sample
                        # we need to multiply the weight b/c when we go to the prev layer we are no longer
                        # differentiating w.r.t. to this layer's weight; it becomes a multiplier
                        backprop_chain_value[0][output_node_idx][edge_idx] = output_cost_gradient * edge.weight

                # iterate over layer connections
                if len(reversed_edges) <= 1:
                    raise ValueError('Tensor is not of appriate size') # there should be at least one hidden layer
                for slice_layer_idx, layer_edges in enumerate(reversed_edges[1:]): # start from penultimate layer as we calculated output/cost gradient above
                    layer_idx = slice_layer_idx + 1 # layer_idx corresponds to the same layer in the unsliced, reversed tensor
                    for node_idx, node_edges in enumerate(layer_edges):
                        post_activation_grad = 0 # the gradients of all post-activation edges summed for the node that we are looking at
                        for post_activation_layer in backprop_chain_value[layer_idx-1]: # TODO change name to post_activation_unit
                            for post_activation_edge_gradient in post_activation_layer[:-1]: # do not include the next layer's bias; which is unconnected
                                post_activation_grad += post_activation_edge_gradient
                        # TODO: Use a data structure and save hidden unit input in the forward pass
                        hidden_unit_input = 0 # recalculate hidden unit input using values stored in 
                        for pre_act_edge_idx, pre_activation_edge in enumerate(node_edges):
                            hidden_unit_input += pre_activation_edge.output_value()
                        activation_function_grad = activation_grad_fn(hidden_unit_input)
                        for pre_act_edge_idx, pre_activation_edge in enumerate(node_edges):
                            weight_gradient_one_sample = pre_activation_edge.input_value * activation_function_grad * post_activation_grad # chain rule
                            weight_gradient_update_batch[layer_idx][node_idx][pre_act_edge_idx] += weight_gradient_one_sample
                            # we need to multiply the weight b/c when we go to the prev layer we are no longer
                            # differentiating w.r.t. to this layer's weight; it becomes a multiplier
                            backprop_chain_value[layer_idx][node_idx][pre_act_edge_idx] = activation_function_grad * post_activation_grad * pre_activation_edge.weight


            for layer_idx, layer in enumerate(reversed_edges):
                for node__idx, node in enumerate(layer):
                    for edge_idx, edge in enumerate(node):
                        edge.weight = -weight_gradient_update_batch[layer_idx][node__idx][edge_idx]/len(batch) * alpha + edge.weight # update weight

            avg_cost_per_example = cost_of_batch / len(batch)
            # print(f'The average cost per example of the batch is: {avg_cost_per_example}')

    tensor_str_array = []
    for layer in tensor:
        layer_list = []
        for node in layer:
            node_list = []
            for edge in node:
                node_list.append(str(edge))
            layer_list.append(node_list)
        tensor_str_array.append(layer_list)
    print(tensor)

    return tensor

# Helper function that creates a tensor as the same structure as the input tensor, but with float's representing gradient update for that edge
def create_gradient_tensor(tensor: List[List[List[Edge]]]) -> List[List[List[float]]]:
    gradient_tensor = []
    for layer in tensor:
        layer_list = []
        for node in layer:
            node_list = []
            for _ in node:
                node_list.append(0) # initialize to 0, should be modified before use in backprop
            layer_list.append(node_list)
        gradient_tensor.append(layer_list)
    return  gradient_tensor
        

And now we'll create our samples for XOR; there's not too many possibilities

In [None]:
both_low = Sample((0, 0), 0)
left_high = Sample((1, 0), 1)
right_high = Sample((0, 1), 1)
both_high = Sample((1, 1), 0)
samples_list = [both_low, left_high, right_high, both_high]

shortened_samples_list_high = [left_high, right_high] # network should learn to output 1 for one hot samples; useful for debug as I suspect local min. prob is less problematic
shortened_samples_list_low = [both_low, both_high]

In [None]:
backprop(edges, samples_list, activation_grad_fn=relu_derivative, output_cost_grad_fn=nll_derivative, iterations=10000, alpha=1e-2, number_of_batches=1)

In [None]:
sample_inputs = [(0, 0), (0, 1), (1, 0), (1, 1)]
expected_outputs = [0, 1, 1, 0]

for sample_input, expected_output in zip(sample_inputs, expected_outputs):
    print(f'Output is {forward_pass(sample_input, edges, relu, sigmoid)[0]} for input {sample_input}')

It's likely that on your first try you will fall into a local minimum. As XOR has extremely low diminsionality and my optimization is decidedly unsophisticated; that is a reality. I understand why neural nets would have had their detractors before it became more known that significantly incorrect local minimums are more easily avoidable in higher dimensions.

## Reflection

There were some interesting bugs that I had to work through once I had a the code running. I think these bugs test your intuition of the fundamental concepts that underly neural nets. I had to think about "Should a node's input determine magnitude of its update (no, although I wonder if this is stictly bad; are per-sample specialized nodes bad is an interesting question; I'm sure there's lit. on this)", "For backprop, are weights in the layer to the right considered? (yes)", "For backprop, are later layers' inputs considered? (no)" Make sure that the forward pass is solid, and then look at backprop.