In [1]:
import math
import random
from abc import ABC, abstractmethod
import numpy as np # linear algebra
import struct
from array import array
from os.path  import join
from sklearn.model_selection import train_test_split

In [92]:
class Operation(ABC):
    def __init__(self, label) -> None:
        self.label = label
    
    @abstractmethod
    def __call__(self, data1, data2):
        raise NotImplementedError
    
    @abstractmethod
    def _backward(self, grad_out):
        raise NotImplementedError

class OperationFactory:
    def __call__(self, op, node1, node2 = None):
        # self.op = op
        
        if node2 is not None:
            if not isinstance(node2, Value):
                node2 = Value(node2)
            
            def _backward():
                grad1_update, grad2_update = op._backward(out.grad)
                node1.grad += grad1_update
                node2.grad += grad2_update
            
            out = Value(op(node1.data, node2.data), _childern = (node1, node2), _op = op.label)
            out._backward = _backward
        else:
            def _backward():
                grad1_update, _ = op._backward(out.grad)
                node1.grad += grad1_update
            
            out = Value(op(node1.data, None), _childern = (node1, ), _op = op.label)
            out._backward = _backward

        return out

class Addition(Operation):
    def __init__(self, label = "add") -> None:
        super().__init__(label)
    
    def __call__(self, data1, data2):
        return data1 + data2
    
    def _backward(self, grad_out):
        return (grad_out, grad_out)

class Multiplication(Operation):
    def __init__(self, label = "mul") -> None:
        super().__init__(label)
    
    def __call__(self, data1, data2):
        self.data1, self.data2 = data1, data2
        return data1 * data2

    def _backward(self, grad_out):
        return (grad_out * self.data2, grad_out * self.data1)

class Subtraction(Operation):
    def __init__(self, label = "sub") -> None:
        super().__init__(label)
    
    def __call__(self, data1, data2):
        return data1 - data2

    def _backward(self, grad_out):
        return (grad_out, grad_out)

class Division(Operation):
    def __init__(self, label = "div") -> None:
        super().__init__(label)
    
    def __call__(self, data1, data2):
        if data2 == 0:
            raise ZeroDivisionError

        self.data1, self.data2 = data1, data2
        return data1 / data2
    
    def _backward(self, grad_out):
        return (grad_out / self.data2, - self.data1 * grad_out / (self.data2 ** 2))

class Power(Operation):
    def __init__(self, label = "pow") -> None:
        super().__init__(label)
    
    def __call__(self, data1, data2):
        if not isinstance(data2, (int, float)):
            raise TypeError
        
        self.data1, self.data2 = data1, data2
        return data1 ** data2
    
    def _backward(self, grad_out):
        return (grad_out * self.data2 * self.data1 ** (self.data2 - 1), 0.0)

class Exp(Operation):
    def __init__(self, label = "exp") -> None:
        super().__init__(label)
    
    def __call__(self, data1, data2):
        self.out = math.exp(data1)
        return math.exp(data1)
    
    def _backward(self, grad_out):
        return (self.out * grad_out, None)

class Tanh(Operation):
    def __init__(self, label = "tanh") -> None:
        super().__init__(label)
    
    def __call__(self, data1, data2):
        self.out = math.tanh(data1)
        return self.out
    
    def _backward(self, grad_out):
        return ((1 - self.out ** 2) * grad_out, None)

class ReLU(Operation):
    def __init__(self, label = "relu") -> None:
        super().__init__(label)
    
    def __call__(self, data1, data2):
        self.data = data1
        return max(0.0, data1)
    
    def _backward(self, grad_out):
        return (grad_out if self.data > 0.0 else 0.0, None)

class LossFunction(ABC):
    def __init__(self, label) -> None:
        self.label = label

    @abstractmethod
    def __call__(self, data1, data2):
        raise NotImplementedError

class MSE(LossFunction):
    def __init__(self, label = "mse") -> None:
        super().__init__(label)

    def __call__(self, data1, data2):
        return (data1 - data2) ** 2
    
class MAE(LossFunction):
    def __init__(self, label = "mae") -> None:
        super().__init__(label)
    
    def __call__(self, data1, data2):
        return data1 - data2 if data1 > data2 else data2 - data1

class Value:
    def __init__(self, data, label="", _childern = (), _op = '') -> None:
        """
            Value object to store numerical values
            :param data      - numerical value
            :param label     - label for human readability
            :param _childern - all of the childern of the current value node
            :param _op       - operation leading to the current value
        """

        self.data = data
        self.label = label
        self._prev = _childern  # used for backprop (childern is previous)
        self._op = _op
        self.op_fact = OperationFactory()
        self.grad = 0.0  # records the partial derivative of output wrt this node
        self._backward = lambda : None  # used for backpropagation
    
    def __repr__(self) -> str:
        return f"{self.data}"

    def operate(self, other, op):
        out = self.op_fact(op, node1=self, node2=other)
        return out

    def __add__(self, other):
        add = Addition()
        # out = self.op_fact(add, self, other)
        # return out
        return self.operate(other, add)

    def __radd__(self, other):
        return self + other

    def __mul__(self, other):
        mul = Multiplication()
        return self.operate(other, mul)

    def __rmul__(self, other):  # other * self
        return self * other    
    
    def __sub__(self, other):
        sub = Subtraction()
        return self.operate(other, sub)

    def __rsub__(self, other):
        return self - other
    
    def __truediv__(self, other):
        division = Division()
        return self.operate(other, division)

    def __pow__(self, other): # self ** other
        pow = Power()
        return self.operate(other, pow)

    def __rpow__(self, other):  # a ^ x
        other = Value(other)
        return other ** self
    
    def exp(self):
        exp = Exp()
        return self.operate(None, exp)

    def __le__(self, other):
        return self.data <= other.data

    def __gt__(self, other):
        return self.data > other.data

    def __lt__(self, other):
        return self.data < other.data
    
    def __ge__(self, other):
        return self.data >= other.data
    
    # def __eq__(self, other):
    #     return self.data == other.data

    def __ne__(self, other):
        return self.data != other.data

    def act(self, func):
        out = self.op_fact(func, self, None)
        return out

    def tanh(self):
        tanh = Tanh()
        return self.act(tanh)

    def relu(self):
        relu = ReLU()
        return self.act(relu)

    def backward(self):
        def build_topo(node):
            if node not in visited:
                visited.add(node)
                topo.append(node)  # WHERE IT MIGHT GO WRONG
                for child in node._prev:
                    build_topo(child)
        
        topo = []
        visited = set()
        self.grad = 1.0
        
        build_topo(self)
        for node in topo:
            node._backward()

In [93]:
class Neuron:
    def __init__(self, n_inputs) -> None:
        self.w = [Value(random.uniform(-1, 1)) for _ in range(n_inputs)]
        self.b = Value(random.uniform(-1, 1))
    
    def __call__(self, x):
        act = sum((w.data*i for w,i in zip(self.w, x)), self.b)
        out = act.tanh()
        return out

    def parameters(self):
        return self.w + [self.b]

class Layer:
    def __init__(self, n_in, n_out) -> None:
        self.neurons = [Neuron(n_in) for _ in range(n_out)]
    
    def __call__(self, x):
        out = [n(x) for n in self.neurons]
        return out
    
    def parameters(self):
        return [param for n in self.neurons for param in n.parameters()]

class MLP:
    def __init__(self, n_input, n_outs) -> None:
        ins = [n_input] + n_outs
        self.layers = [Layer(ins[i], ins[i+1]) for i in range(len(n_outs))]
    
    def __call__(self, x):
        temp = x
        for layer in self.layers:
            temp = layer(temp)
        return temp
    
    def parameters(self):
        return [param for layer in self.layers for param in layer.parameters()]
    
    def zero_grad(self):
        for param in self.parameters():
            param.grad = 0

# TODO: Documentation


In [94]:
def loss_fn(t, p):
    return (t - p) ** 2  # mse

X = [
    [2.0, 3.0, -1.0],
    [3.0, -1.0, 0.5],
    [0.5, 1.0, 1.0],
    [1.0, 1.0, -1.0]
]
y = [1.0, -1.0, -1.0, 1.0]

n_input = 3
n_outs = [4, 1]
mlp = MLP(n_input, n_outs)

lr = 0.01
n_epochs = 20

In [95]:
for i in range(n_epochs):
    pred = [mlp(x)[0] for x in X]
    loss = sum(loss_fn(t, p) for t, p in zip(y, pred))

    for param in mlp.parameters():
        param.grad = 0  # so that the grad of loss calculated is only for this new iteration
    loss.backward()

    print(f"{i} : {loss.data}")

    for param in mlp.parameters():
        param.data += -lr * param.grad

0 : 7.6271149299329695
1 : 7.623004070113655
2 : 7.618829522546875
3 : 7.61459026903665
4 : 7.610285272300583
5 : 7.605913474707976
6 : 7.60147379688267
7 : 7.59696513616047
8 : 7.59238636489067
9 : 7.587736328570908
10 : 7.583013843804305
11 : 7.578217696067626
12 : 7.573346637279032
13 : 7.568399383153826
14 : 7.563374610336575
15 : 7.558270953297884
16 : 7.5530870009842355
17 : 7.547821293209292
18 : 7.542472316775301
19 : 7.537038501313398
