In [None]:
# TODO: SGD / Adam optimizer
# optimizer class?
# inference mode?


In [158]:
import numpy as np
from typing import List
from collections.abc import Callable
import math

In [276]:
class Function:
    def __init__(self, func: Callable, deriv: Callable):
        self.func = func
        self.deriv = deriv

# sigmoid
def sigmoid(x: np.array):
    return 1.0 / (1.0 + np.exp(-x))

def dsigmoid(x: np.array):
    return sigmoid(x) * (1.0 - sigmoid(x))

# softmax
def softmax(x: np.array):
    x = x - np.max(x)
    exp = np.exp(x)
    return exp / np.sum(exp)

def SMCE(y: np.array, y_pred: np.array):
    '''softmax cross entropy loss'''
    sm = softmax(y_pred)
    return -np.sum(y * np.log(sm))

def dSMCE(y: np.array, y_pred: np.array):
    '''derivative of softmax cross entropy loss'''
    return softmax(y_pred) - y

# mse
def MSE(y: np.array, y_pred: np.array):
    return math.sqrt(np.sum((y_pred - y) ** 2))

def dMSE(y: np.array, y_pred: np.array):
    return 2 * (y_pred - y)

In [277]:
class Layer:
    def __init__(self, n: int, m: int, activation: Function):
        self.W = np.random.rand(n, m)
        self.b = np.zeros((m))
        self.activation = activation

        self.x = None
        self.z = None
        self.s = None

    def _forward(self, x: np.array) -> np.array:
        # f(xW + b)
        self.x = x
        self.z = x @ self.W + self.b
        self.s = self.activation.deriv(self.z)
        return self.activation.func(self.z)
    
    def _backward(self, upstream: np.array) -> np.array:
        # dL/dz = dL/df hadamard df/dz = upstream H df/dz; (1xm) H (1xm)
        delta = upstream * self.s
        # dL/dx = dL/dz @ dz/dx
        dx = delta @ self.W.T
        # dL/db = dL/dz @ dz/db = dL/dz @ I_m
        db = np.mean(delta, axis=0)
        # dL/dW = dL/dz outer prod dz/dW (do row by row if batch)
        #dW = np.einsum('ij,ik->jk', self.x, delta) / np.shape(self.x)
        #dW = np.outer(self.x.T, delta)
        dW = self.x.T @ delta
        if self.x.ndim == 2:
            dW /= self.x.shape[0]
        return dx, dW, db

In [None]:
class Model:
    def __init__(self, layers: List[Layer] = None):
        self.layers = [] if layers is None else layers
        self.gradient = [None] * len(self.layers)
        self.logits = None
        self.dL = 1.0

    def forward(self, x: np.array) -> np.array:
        '''forward pass'''
        h = x
        for layer in self.layers:
            h = layer._forward(h)
        self.logits = h
        return self.logits

    def backward(self):
        '''calc gradient via backpropagation'''
        # dL/df (1xn gradient)
        upstream = self.dL
        for i, layer in enumerate(reversed(self.layers)):
            dx, dW, db = layer._backward(upstream)
            self.gradient[-i-1] = [dW, db]
            upstream = dx

    def step(self, lr: float):
        '''lr = learning rate'''
        for layer, grad in zip(self.layers, self.gradient):
            layer.W -= grad[0] * lr
            layer.b -= grad[1] * lr

    def train_step(self, x: np.array, y: np.array, loss: Function, lr: float = 0.001, verbose: bool = False):
        '''X is just a single sample'''
        y_pred = self.forward(x)
        train_loss = loss.func(y, y_pred)
        self.dL = loss.deriv(y, y_pred)
        self.backward()
        self.step(lr)

        if verbose:
            print(f'x: {x}, y: {y}')
            print(f'Predicted y: {y_pred}')
            # print(f'Gradient: {self.gradient}')
        return train_loss
    
    def train_batch(self, X: np.array, Y: np.array, loss: Function, lr: float = 0.001, verbose: bool = False):
        '''X: pxn, Y: pxm'''
    
        Y_pred = self.forward(X)
        # mean loss across batch
        train_loss = np.mean([loss.func(y, y_pred) for y, y_pred in zip(Y, Y_pred)])
        self.dL = np.array([loss.deriv(y, y_pred) for y, y_pred in zip(Y, Y_pred)])

        self.backward()
        self.step(lr)

        if verbose:
            print(f'x: {X}, y: {Y}')
            print(f'Predicted y: {Y_pred}')
        return train_loss
    
    def train(self, X: np.array, Y: np.array, loss: Function, epochs: int, lr: float = 0.001, verbose: bool = False):
        n = 5
        for i in range(epochs):
            train_loss = self.train_batch(X, Y, loss, lr=lr, verbose=verbose)
            if i % (epochs // n) == 0:
                print(f'Epoch {i}:')
                print(f'Loss: {train_loss}')
                print('\n')

In [435]:
# sa leads to nonsense
# Activation Functions:
identity = Function(lambda x: x, lambda x: np.ones_like(x))
relu = Function(lambda x: np.maximum(x, 0), lambda x: np.where(x > 0, 1, 0))
sigmoidf = Function(sigmoid, dsigmoid)

# Loss Functions:
cross_entropy = Function(SMCE, dSMCE)
MSE_loss = Function(MSE, dMSE)

In [414]:
# XOR Model:
layers = [Layer(2, 2, sigmoidf), Layer(2, 1, identity)]
XOR_model = Model(layers)

# data
X = np.array([[1, 0], [0, 1], [1, 1], [0, 0]])
Y = np.array([[1], [1], [0], [0]])

XOR_model.train(X, Y, MSE_loss, 100000, lr=0.01)

Epoch 0:
Loss: 0.4976219642263848
Epoch 20000:
Loss: 0.49586164059081506
Epoch 40000:
Loss: 0.24805294256299415
Epoch 60000:
Loss: 2.1407220027025442e-05
Epoch 80000:
Loss: 6.668153251787601e-10


In [470]:
# XOR Model:
layers = [Layer(2, 2, sigmoidf), Layer(2, 1, relu)]
XOR_model2 = Model(layers)

# data
X = np.array([[1, 0], [0, 1], [1, 1], [0, 0]])
Y = np.array([[1], [1], [0], [0]])

XOR_model2.train(X, Y, MSE_loss, 1000000, lr=0.001)

Epoch 0:
Loss: 0.515232797735823
Epoch 200000:
Loss: 0.28892203735680694
Epoch 400000:
Loss: 2.780747213371182e-05
Epoch 600000:
Loss: 7.904616960985322e-10
Epoch 800000:
Loss: 1.1994016890781722e-12


In [437]:
# Multiplication Model
layers = [Layer(2, 2, relu), Layer(2, 1, relu)]
mult_model = Model(layers)

# data
X = np.random.rand(3000,2)
Y = np.array([x[0] * x[1] for x in X])

mult_model.train(X, Y, MSE_loss, 5000, lr=0.001)

Epoch 0:
Loss: 0.672345147442822
Epoch 1000:
Loss: 0.047115901025098925
Epoch 2000:
Loss: 0.047437749842675146
Epoch 3000:
Loss: 0.0472471467132408
Epoch 4000:
Loss: 0.047088673370622426


In [438]:
def _multiply(x, y):
    return mult_model.forward(np.array([x, y]))[0]

In [465]:
a = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
b = np.ones((3, 3))

x = [1, 1, 1]
print(x)
print(np.stack(x))
print(np.einsum('ij,ik->jk', a, b))
print(sum(np.outer(A, B) for A, B in zip(a, b)))
print(a @ b)
print(np.where(a > 4, 1, 0))

[1, 1, 1]
[1 1 1]
[[12. 12. 12.]
 [15. 15. 15.]
 [18. 18. 18.]]
[[12. 12. 12.]
 [15. 15. 15.]
 [18. 18. 18.]]
[[ 6.  6.  6.]
 [15. 15. 15.]
 [24. 24. 24.]]
[[0 0 0]
 [0 1 1]
 [1 1 1]]


In [464]:
_multiply(0.5, 0.8)

np.float64(0.3966454920348912)