In [240]:
import numpy as np
import random
from abc import ABC, abstractmethod

In [301]:
class Node:
    def __init__(self, data, childern = None, grad = None, _op='', label=''):
        self.data = data
        self.grad = 0
        self._backward = lambda: None
        self.childern = childern
        self._op = _op
    
    def __add__(self, node):
        assert isinstance(node, (Node, float, int))
        node = Node(node) if not isinstance(node, Node) else node
        out = Node(self.data + node.data, _op='+')
        out.childern = [self, node]

        def _backward():
            self.grad += 1 * out.grad
            node.grad += 1 * out.grad
        out._backward = _backward

        return out
    
    def __sub__(self, node):
        node = Node(node) if not isinstance(node, Node) else node
        out = Node(self.data - node.data, childern=[self, node], _op='-')

        def _backward():
            self.grad += 1 * out.grad
            node.grad += 1 * out.grad
        out._backward = _backward

        return out
    
    def __mul__(self, node):
        node = Node(node) if not isinstance(node, Node) else node
        out = Node(self.data * node.data, childern=[self, node], _op='*')

        def _backward():
            self.grad += node.data * out.grad
            node.grad += self.data * out.grad
        out._backward = _backward

        return out
    
    def __div__(self, node):
        node = Node(node) if not isinstance(node, Node) else node
        if node.data == 0:  raise ZeroDivisionError
        out = Node(self.data / node.data, childern=[self, node], _op='/')

        def _backward():
            self.grad += (1 / node.data) * out.grad
            node.grad += - (self.data * node.data ** 2) * out.grad
        out._backward = _backward

        return out
    
    def __pow__(self, other):
        assert isinstance(other, (int, float)), "only supporting int/float powers"
        out = Node(self.data ** other, childern=[self], _op='**')

        def _backward():
            self.grad += other * (self.data ** other - 1) * out.grad
        out._backward = _backward

        return out

    def __radd__(self, other):
        return self + other

    def __rmul__(self, other):
        return self * other

    def __rsub__(self, other):
        return other + (-self)
    
    def __neg__(self):
        return self * -1

    def tanh(self):
        out = Node(np.tanh(self.data), childern=[self], _op='tanh')

        def _backward():
            x = self.data
            t = (math.exp(2*x) - 1) / (math.exp(2*x) + 1)
            self.grad += (1 - t**2) * out.grad
        out._backward = _backward

        return out
    
    def relu(self):
        x = self.data if self.data > 0.0 else 0.0
        out = Node(x, childern=[self], _op='relu')

        def _backward():
            self.grad += (out.data > 0) * out.grad
        out._backward = _backward

        return out
    
    def sigmoid(self):
        x = 1 / (1 + np.exp(-self.data))
        out = Node(x, childern=[self], _op='sigmoid')
        
        def _backward():
            self.grad += x * (1 - x) * out.grad
        out._backward = _backward
        return out
    
    def _backward():
        def recurse_topo_sort(node):
            if not node in visited:
                visited.add(node)
                topo.append(node)
                for child in node.childern:
                    recurse_topo_sort(child)
        
        topo = []
        visited = set()
        recurse_topo_sort(self)

        for node in topo:
            node._backward()

    def __repr__(self):
        return f"Value(data={self.data:.4f}, grad={self.grad:.4f}), op={self._op}"

In [302]:
class NNModule(ABC):
    def zero_grad(self):
        for p in self.parameters:
            p.grad = 0

    @abstractmethod
    def get_parameters(self):
        raise NotImplementedError

In [None]:
class Neuron(NNModule):
    def __init__(self, input_dim, count = 0):
        self.weights = [Node(random.uniform(-1, 1), label=f'w_{count}_{i}') for i in range(input_dim)]
        # self.weights = np.random.rand(shape = (output_dim, input_dim))
        self.bias = Node(random.uniform(-1, 1), label=f'b_{count}')
    
    def __call__(self, x):
        """ fn(<w, x> + b)"""

        if len(x) != len(self.weights) : raise ValueError

        # s = 0
        # for w, x in zip(self.weights, x):
        #     print((w * x).data)
        #     s += w * x
        # ip = s
        # ip = sum(w * x for w, x in zip(self.weights, x))
        # o = ip + self.bias
        # self.out = o.relu()
        # self.out = sum(sum(w * x for w, x in zip(self.weights, x)), self.bias).relu()

        self.out = sum((wi*xi for wi, xi in zip(self.weights, x)), self.bias).relu()
        return self.out

    def _backward(self, grad):
        x

    def get_parameters(self):
        return self.weights + [self.bias]

    def __repr__(self):
        return f"Neuron(inputs={len(self.w)}, nonlin={self.nonlin})"

In [None]:
class Layer(NNModule):
    def __init__(self, input_dim, output_dim):
        self.input_dim, self.output_dim = input_dim, output_dim
        self.neurons = [Neuron(input_dim, count=i) for i in range(output_dim)]

    def __call__(self, x) -> list:
        if len(x) != self.input_dim:    raise ValueError
        self.out = [n(x) for n in self.neurons]
        return self.out

    def get_parameters(self):
        total_parameters = []

        for n in self.neurons:
            total_parameters += n.get_parameters()
        return total_parameters

    def _backward(self, grad):
        cul_grad = []
        for neuron in self.neurons:
            cul_grad += neuron._backward(grad)
        return cul_grad
    
    def __repr__(self):
        return f"Layer(neurons={len(self.neurons)})"

In [None]:
class MLP(NNModule):
    def __init__(self, layers = None):
        self.layers = layers
        self.input_dim = self.layers[0].input_dim if self.layers else 0
        self.output_dim = self.layers[-1].output_dim if self.layers else 0
        self.n_layers = len(layers) if layers else 0

    def __init__(self, input_dim, output_dim, hidden_dims):
        self.input_dim = input_dim
        self.output_dim = output_dim
        
        self.layers = [Layer(input_dim, hidden_dims[0])]
        for i in range(1, len(hidden_dims)):
            self.layers.append(Layer(hidden_dims[i - 1], hidden_dims[i]))
        self.layers.append(Layer(hidden_dims[-1], output_dim))
        self.n_layers = len(self.layers)

    def add_layer(self, layer):
        if self.layers[-1].output_dim != layer.input_dim:   raise ValueError

        self.layers.append(layer)
        self.output_dim = layer.output_dim

    def __call__(self, x) -> list:
        if len(x) != self.input_dim:    raise ValueError
        for layer in self.layers:
            out = layer(x)
            x = out
        self.out = out
        return self.out
    
    def get_parameters(self):
        total_parameters = []
        for layer in self.layers:
            total_parameters += layer.get_parameters()
        return total_parameters

    def _backward(self, train_loss):
        grad = self.layers[-1]._backward(train_loss)
        for layer in self.layers[:-1:-1]:
            grad = layer._backward(grad)
    
    def __repr__(self):
        s = f"MLP(layers=[\n"
        for i, layer in enumerate(self.layers):
            s += f"  Layer {i}: {len(layer.neurons)} neurons (input {layer.neurons[0].w[0].label.count('w')})\n"
        s += "])"
        return s

In [306]:
mse = lambda pred, y: sum((p - z) ** 2 for p, z in zip(pred, y))

In [None]:
class ModelTrainer:
    def __init__(self, model, epochs, lr = 1e-2, loss = mse):
        self.epochs = epochs
        self.lr = lr
        self.model = model
        self.loss = loss
    
    def train(self, X, y):
        if X.shape[1] != self.model.input_dim:   raise ValueError
        if X.shape[0] != y.shape[0]:    raise ValueError

        train_err_schedule = []
        for epoch in range(self.epochs):
            train_loss = 0
            for i in range(X.shape[0]):
                pred = self.model(X[i])
                e = mse(pred, y[i])
                train_loss += e
            print(epoch, train_loss.data)
            train_err_schedule.append(train_loss.data)

            train_loss._backward()
            
            for p in self.model.get_parameters():
                p -= train_loss * self.lr
        return train_err_schedule
    
    def test(self, X):
        if X.shape[1] != self.model.input_dim:   raise ValueError
        return [self.model(x) for x in X]

In [308]:
xs = np.array([
    [2.0, 3.0],
    [3.0, -1.0],
    [0.5, 1.0],
    [1.0, 1.0]
])
ys = np.array([1.0, -1.0, -1.0, 1.0])[:, np.newaxis]
model = MLP(2, 1, [4, 4])

In [309]:
model_trainer = ModelTrainer(model, epochs=20)

In [None]:
tr = model_trainer.train(xs, ys)

1. Prior parameter:  Value(data=-0.8217, grad=0.0000), op=
2. Post parameter:  Value(data=-0.8982, grad=0.0000), op=-
3. Changed? Value(data=-0.8217, grad=0.0000), op=