In [32]:
import numpy as np
from typing import Union, Tuple, Self, Iterable
import inspect
import matplotlib.pyplot as plt

In [33]:
# regular constants
RNG = np.random.default_rng()
DTYPE = 'float64' 

# testing constants
if DTYPE=='float64':
    EPS, ATOL, RTOL = 1e-6, 1e-5, 1e-3
else:
    EPS, ATOL, RTOL = 1e-4, 1e-4, 1e-2
K = 20

In [None]:
'''TODO:
- finish off mnist example, try to add some reusable metrics tracking/ visulisation,
  possibly work on the trainer class
- need to think about how to handle infs etc, also stability of exps and logs
- make constants/ rngs be in a seperate file
- loss functions, check correctness 
- add in requires grad functionality (enable grad, context manager etc)
- add in more layers convolutions, softmax, dropout, batch norm (possibly)
- add logging, WandB and also terminal logging
- add in auto grad visulisation
'''

class Tensor():
    def __init__(self, data, requires_grad=False, children=(), op=''):
        self.data: np.ndarray = np.array(data, dtype=DTYPE)
        self.grad = np.zeros_like(data, dtype=DTYPE)
        self.requires_grad = requires_grad
        self._prev = set(children)
        self._backward = lambda : None
        self._op = op

    @property
    def shape(self) -> Tuple[int]:
        return self.data.shape
    
    @property
    def size(self) -> int: 
        return self.data.size
    
    def zero_grad(self) -> None:
        self.grad = np.zeros_like(self.data, dtype=DTYPE)

    def item(self) -> np.ndarray:
        return self.data
    
    def _unbroadcast(self, grad: np.ndarray) -> Self:
        dims_to_remove = tuple(i for i in range(len(grad.shape) - len(self.shape))) 
        # remove prepended padding dimensions
        grad = np.sum(grad, axis=dims_to_remove, keepdims=False) 
        dims_to_reduce = tuple(i for i, (d1,d2) in enumerate(zip(grad.shape, self.shape)) if d1!=d2)
        # reduce broadcasted dimensions
        return np.sum(grad, axis=dims_to_reduce, keepdims=True)

    # need to build topo graph and then go through it and call backwards on each of the tensors
    def backward(self) -> None:
        self.grad = np.ones_like(self.data)
        topo = []
        visited = set()

        # do DFS on un-visited nodes, add node to topo-when all its children have been visited
        def build_topo(node):
            if node not in visited:
                visited.add(node)
                for child in node._prev:
                    build_topo(child)
                topo.append(node)
        build_topo(self)

        for node in reversed(topo):
            node._backward()
            
    def __add__(self, rhs) -> Self:
        rhs = rhs if isinstance(rhs, Tensor) else Tensor(rhs)
        out = Tensor(self.data + rhs.data, self.requires_grad or rhs.requires_grad, (self, rhs), '+')

        def _backward():
            if self.requires_grad:
                self.grad += self._unbroadcast(out.grad)
            if rhs.requires_grad:
                rhs.grad += rhs._unbroadcast(out.grad)
        out._backward = _backward
        return out
    
    def __neg__(self) -> Self:
        out = Tensor(-self.data, self.requires_grad, (self,), 'neg')

        def _backward():
            if self.requires_grad:
                self.grad += -out.grad
        out._backward = _backward
        return out
    
    def __sub__(self, rhs) -> Self:
        return self + (-rhs)

    def __mul__(self, rhs) -> Self:
        rhs = rhs if isinstance(rhs, Tensor) else Tensor(rhs)
        out = Tensor(self.data*rhs.data, self.requires_grad or rhs.requires_grad, (self, rhs), f'*')

        def _backward():
            if self.requires_grad:
                self.grad += self._unbroadcast(out.grad * rhs.data)
            if rhs.requires_grad:
                rhs.grad += rhs._unbroadcast(out.grad * self.data)
        out._backward = _backward
        return out
        
    def __truediv__(self, rhs) -> Self:
        return self * (rhs**-1)
    
    # TODO add check for rhs, if epxponent if negative the gradient is undefined
    def __pow__(self, rhs) -> Self: 
        rhs = rhs if isinstance(rhs, Tensor) else Tensor(rhs)
        lhs_is_neg = self.data < 0
        rhs_is_frac = ~np.isclose(rhs.data % 1, 0)
        if np.any(lhs_is_neg & rhs_is_frac):
            raise ValueError('cannot raise negative value to a decimal power')
        
        out = Tensor(self.data**rhs.data, self.requires_grad or rhs.requires_grad, (self,), f'**')

        def _backward():
            if self.requires_grad:
                self.grad += self._unbroadcast(out.grad * ((rhs.data)*(self.data**(rhs.data-1))))
            if rhs.requires_grad:
                rhs.grad += rhs._unbroadcast(out.grad * (self.data ** rhs.data) * np.log(self.data))
        out._backward = _backward
        return out
    
    '''data shape: (da, ..., d2, d1, n, k) rhs shape: (ob, ..., o2, o1, k, m)
       inputs are broadcast so that they have the same shape by expanding along
       dimensions if possible, out shape: (tc, ..., t2, t1, n, m), where ti = max(di, oi)
       if di or oi does not exist it is treated as 1, and c = max d, a
       if self is 1d shape is prepended with a 1, for rhs it would be appended'''
    def __matmul__(self, rhs) -> Self:
        rhs = rhs if isinstance(rhs, Tensor) else Tensor(rhs)
        out = Tensor(self.data @ rhs.data, self.requires_grad or rhs.requires_grad, (self, rhs), '@')

        def _backward():
            A, B, = self.data, rhs.data
            g = out.grad
            # broadcast 1d arrays to be 2d 
            A2 = A.reshape(1, -1) if len(A.shape) == 1 else A
            B2 = B.reshape(-1, 1) if len(B.shape) == 1 else B
            # extend g to have reduced dims
            g = np.expand_dims(g, -1) if len(B.shape) == 1 else g
            g = np.expand_dims(g, -2) if len(A.shape) == 1 else g
            # transpose last 2 dimensions, as matmul treats tensors as batched matricies
            if self.requires_grad:
                self.grad += self._unbroadcast(g @ B2.swapaxes(-2, -1))
            if rhs.requires_grad:
                rhs.grad += rhs._unbroadcast(A2.swapaxes(-2, -1) @ g)
        out._backward = _backward
        return out

    def relu(self) -> Self:
        out = Tensor((self.data > 0) * self.data, self.requires_grad, (self,), 'Relu')

        def _backward():
            if self.requires_grad:
                self.grad += (self.data > 0) * out.grad
        out._backward = _backward
        return out
    
    # need to check inp is non-negative
    def log(self) -> Self:
        if np.any(self.data < 0):
            raise ValueError('cannot log negative values')
        out = Tensor(np.log(self.data), self.requires_grad, (self,), 'log')

        def _backward():
            if self.requires_grad:
                self.grad += (1 / self.data) * out.grad
        out._backward = _backward
        return out
    
    def exp(self) -> Self:
        out = Tensor(np.exp(self.data), self.requires_grad, (self,), 'exp')

        def _backward():
            if self.requires_grad:
                self.grad += np.exp(self.data) * out.grad
        out._backward = _backward
        return out
    
    def sum(self, axis=None, keepdims=False) -> Self:
        out = Tensor(np.sum(self.data, axis=axis, keepdims=keepdims), self.requires_grad, (self,), 'sum')

        def _backward():
            if self.requires_grad:
                g = np.expand_dims(out.grad, axis) if (axis is not None and not keepdims) else out.grad
                self.grad += g
        out._backward = _backward
        return out

    def mean(self, axis=None) -> Self:
        out = Tensor(np.mean(self.data, axis=axis), self.requires_grad, (self,), 'mean')

        def _backward():
            if self.requires_grad:
                N =  self.size // out.size 
                g = np.expand_dims(out.grad, axis) if axis is not None else out.grad
                self.grad += g / N
        out._backward = _backward
        return out
    
    def __radd__(self, lhs) -> Self:
        return self + lhs
    
    def __rsub__(self, lhs) -> Self:
        return self + lhs
    
    def __rmul__(self, lhs) -> Self:
        return self * lhs
    
    def __rtruediv__(self, lhs) -> Self:
        try:
            lhs = Tensor(lhs)
        except TypeError:
            return NotImplementedError
        return lhs / self
    
    def __rpow__(self, lhs) -> Self:
        try:
            lhs = Tensor(lhs)
        except TypeError:
            return NotImplementedError
        return lhs ** self
    
    def __rmatmul__(self, lhs) -> Self:
        try:
            lhs = Tensor(lhs)
        except TypeError:
            return NotImplementedError
        return lhs @ self
    
    @classmethod
    def random(cls, shape: tuple, bounds = (0,1), requires_grad=False) -> Self:
        lower, upper = bounds
        data = RNG.random(shape, dtype=DTYPE)*(upper-lower) + lower
        return cls(data, requires_grad=requires_grad)
    
    def __repr__(self) -> str:
        return f'tensor shape: {self.shape}, op:{self._op}'        


In [35]:
class Parameter(Tensor):
    def __init__(self, data):
        super().__init__(data, requires_grad=True)
    
    @classmethod
    def kaiming(cls, fan_in, shape):
        std = np.sqrt(2/fan_in)
        weights = RNG.standard_normal(shape, dtype=DTYPE)*std
        return cls(weights)
    
    @classmethod
    def zeros(cls, shape):
        return cls(np.zeros(shape, dtype=DTYPE))
    
    def __repr__(self) -> str:
        return f'parameter shape: {self.shape}, size: {self.size}' 

In [None]:
from abc import ABC, abstractmethod

class Module(ABC):
    
    def __call__(self, input: Tensor) -> Tensor:
        return self.forward(input)
    
    @property
    def modules(self) -> list[Self]:
        modules: list[Self] = []
        for value in self.__dict__.values():
            if isinstance(value, Module):
                modules.append(value)

            elif isinstance(value, dict):
                for v in value.values():
                    if isinstance(v, Module):
                        modules.append(v)

            elif isinstance(value, Iterable) and not isinstance(value, (str, bytes)):
                for v in value:
                    if isinstance(v, Module):
                        modules.append(v)
                    
        return modules
    
    @property
    def params(self) -> list[Parameter]:
        immediate_params = [attr for attr in self.__dict__.values() 
                                    if isinstance(attr, Parameter)]
        modules_params = [param for module in self.modules 
                                    for param in module.params]
        return immediate_params + modules_params
    
    @abstractmethod
    def forward(self, input: Tensor) -> Tensor:
        pass
    
    def zero_grad(self) -> None:
        for param in self.params:
            param.zero_grad()

    def train(self) -> None:
        for param in self.params:
            param.requires_grad = True
    
    def eval(self) -> None:
        for param in self.params:
            param.requires_grad = False

class Sequential(Module):
    def __init__(self, layers):
        self.layers = layers
    
    def forward(self, input: Tensor) -> Tensor:
        x = input
        for layer in self.layers:
            x = layer(x)
        return x
    
class Affine(Module):
    def __init__(self, in_dim, out_dim):
        self.A = Parameter.kaiming(in_dim, (in_dim, out_dim))
        self.b = Parameter.zeros((out_dim))

    def forward(self, x: Tensor):
        # x: (B, in), A : (in, out), B: out
        return (x @ self.A) + self.b

class Relu():
    def __call__(self, x: Tensor):
        return x.relu()
    
class SoftMax():
    def __call__(self, x: Tensor):
        x = x.exp()
        norm_c = x.sum(axis=-1, keepdims=True)
        return x / norm_c

In [37]:
class SoftMaxCrossEntropy():
    def __call__(self, z: Tensor, y) -> Tensor:
        '''logits z, shape (B, C), true lables y, shape (B, C)'''
        loss = (-(z * y).sum(axis=-1) + ((z.exp()).sum(axis=-1)).log()).mean()
        return loss

class CrossEntropy():
    def __call__(self, q: Tensor, y) -> Tensor:
        '''pred q, shape (B, C), true lables y, shape (B, C)'''
        loss = -(y * q.log()).sum(axis=-1).mean()
        return loss
    
class MeanSquaredError():
    def __call__(self, q: Tensor, y) -> Tensor:
        '''pred q, shape (B, C), true lables y, shape (B, C)'''
        loss = ((q - y) ** 2).sum(axis=-1).mean()
        return loss
    
class SGD():
    def __init__(self, params: list[Parameter], lr: float=0.005):
        self.lr = lr
        self.params = params
    
    def step(self) -> None:
        for param in self.params:
            if not param.requires_grad:
                continue 
            param.data += -self.lr * param.grad

class Adam():
    def __init__(self, params: list[Parameter], lr: float=0.005, 
                 betas: Tuple[float, float]=(0.9, 0.999), eps: float=1e-8):
        self.lr = lr
        self.params = params
        self.b1 , self.b2 = betas
        self.eps = eps
        self.time_step = 0
        self.m = [np.zeros_like(param.data, dtype=DTYPE) for param in params]
        self.v = [np.zeros_like(param.data, dtype=DTYPE) for param in params]
    
    def step(self) -> None:
        self.time_step += 1
        for i, p in enumerate(self.params):
            if not p.requires_grad:
                continue 

            g = p.grad
            self.m[i] = self.b1*self.m[i] + (1-self.b1)*g
            self.v[i] = self.b2*self.v[i] + (1-self.b2)*(g**2)
            m_hat = self.m[i]/(1-self.b1**self.time_step)
            v_hat = self.v[i]/(1-self.b2**self.time_step)

            p.data += -self.lr * m_hat / (v_hat ** 0.5 + self.eps)

In [38]:
class Trainer():
    def __init__(self, model, optimiser, loss, train_loader, test_loader, logger, wandb_run = None):
        self.model = model
        self.optimiser = optimiser
        self.loss = loss
        self.train_loader = train_loader
        self.test_loader = test_loader
        self.epoch = 1
        self.logger = logger
        self.wandb_run = wandb_run

    def train_epoch():
        pass

    def validate():
        pass
    
    def fit():
        pass
    
    def log_metrics():
        pass

In [45]:
from math import ceil

class DataLoader():
    def __init__(self, input_data, true_data, batch_size, shuffle=False, rng: np.random.Generator=RNG):
        assert input_data.shape[0] == true_data.shape[0], 'must have the same number of inputs and true outputs'
        self.X = input_data
        self.y = true_data
        self.N = batch_size
        self.shuffle = shuffle
        self.rng = rng

    def __iter__(self):
        X, y = self.X, self.y
        if self.shuffle:
            permutation = self.rng.permutation(X.shape[0])
            X = X[permutation]
            y = y[permutation]
        splits = np.arange(self.N, X.shape[0], self.N)
        X = np.split(X, splits, axis=0)
        y = np.split(y, splits, axis=0)
        return zip(X, y)

    def __len__(self):
        # samples/batch size rounded up
        return ceil(self.X.shape[0]/self.N)
    

In [46]:
# import urllib.request, numpy as np
# import os

# os.makedirs('datasets')

# url = "https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz"
local_path = r"datasets\\mnist.npz"

# urllib.request.urlretrieve(url, local_path)   # â‡¦ makes a real file
data = np.load(local_path)

# im = X_train[0:3]
# print(type(im))
# plt.imshow(im, cmap='grey')
# plt.show()

In [None]:
def one_hot_encode(array, num_c):
    one_hot = np.zeros(shape=(array.size, num_c))
    for idx, i in enumerate(array):
        one_hot[idx, i] = 1
    return one_hot

def train_test_step(train, nn, loader, loss_fn, optimiser):
    if train:
        nn.train()
    else:
        nn.eval()
    
    avg_loss = 0
    loss_N = 0

    avg_acc= 0
    acc_N = 0

    for X, y in loader:
        loss_N += 1
        acc_N += 1

        nn.zero_grad()
        out = nn(Tensor(X))
        loss = loss_fn(out, Tensor(y))
        if train:
            loss.backward()
            optimiser.step()
        
        preds = np.argmax(out.item(), axis=-1)
        labels = np.argmax(y, axis=-1)
        acc = np.sum(preds == labels) / preds.size 
        
        avg_acc = avg_acc + (acc - avg_acc)/acc_N
        avg_loss = avg_loss + (loss.item() - avg_loss)/loss_N
    
    return avg_loss, avg_acc

def train_nn(epochs):
    X_train, y_train = data["x_train"].reshape((-1,784)) / 255, data["y_train"]
    X_test, y_test = data["x_test"].reshape((-1,784)) / 255, data["y_test"]

    y_train = one_hot_encode(y_train, 10)
    y_test = one_hot_encode(y_test, 10)

    train_loader = DataLoader(X_train, y_train, 256, shuffle=True)
    test_loader = DataLoader(X_test, y_test, 256, shuffle=False)

    nn = Sequential([Affine(784, 100), Relu(), Affine(100, 200), Relu(), Affine(200, 10), SoftMax()])
    loss_fn = CrossEntropy()
    # optimiser = Adam(nn.params)
    optimiser = SGD(nn.params, lr=0.01)

    for t in range(epochs):
        print(f'epoch: {t}')
        avg_loss, avg_acc = train_test_step(True, nn, train_loader, loss_fn, optimiser)
        print(f'train: cross entropy: {avg_loss:.4f}, accuracy: {avg_acc:.4f}')

        avg_loss, avg_acc = train_test_step(False, nn, test_loader, loss_fn, optimiser)
        print(f'test: cross entropy: {avg_loss:.4f}, accuracy: {avg_acc:.4f}')


In [77]:
train_nn(20)

epoch: 0
train: cross entropy: 1.4266, accuracy: 0.6266
test: cross entropy: 0.7772, accuracy: 0.8210
epoch: 1
train: cross entropy: 0.6299, accuracy: 0.8404
test: cross entropy: 0.4975, accuracy: 0.8666
epoch: 2
train: cross entropy: 0.4698, accuracy: 0.8722
test: cross entropy: 0.4079, accuracy: 0.8861
epoch: 3
train: cross entropy: 0.4040, accuracy: 0.8876
test: cross entropy: 0.3623, accuracy: 0.8978
epoch: 4
train: cross entropy: 0.3657, accuracy: 0.8975
test: cross entropy: 0.3313, accuracy: 0.9065
epoch: 5
train: cross entropy: 0.3394, accuracy: 0.9041
test: cross entropy: 0.3124, accuracy: 0.9142
epoch: 6
train: cross entropy: 0.3202, accuracy: 0.9085
test: cross entropy: 0.2968, accuracy: 0.9165
epoch: 7
train: cross entropy: 0.3050, accuracy: 0.9132
test: cross entropy: 0.2829, accuracy: 0.9203
epoch: 8
train: cross entropy: 0.2923, accuracy: 0.9166
test: cross entropy: 0.2715, accuracy: 0.9241
epoch: 9
train: cross entropy: 0.2805, accuracy: 0.9205
test: cross entropy: 0.261

In [69]:
train_nn(40)

epoch: 0
train: cross entropy: 0.2533, accuracy: 0.9237
test: cross entropy: 0.1310, accuracy: 0.9604
epoch: 1
train: cross entropy: 0.1024, accuracy: 0.9687
test: cross entropy: 0.1005, accuracy: 0.9680
epoch: 2
train: cross entropy: 0.0763, accuracy: 0.9761
test: cross entropy: 0.1204, accuracy: 0.9631
epoch: 3
train: cross entropy: 0.0582, accuracy: 0.9821
test: cross entropy: 0.0810, accuracy: 0.9749
epoch: 4
train: cross entropy: 0.0475, accuracy: 0.9851
test: cross entropy: 0.1204, accuracy: 0.9672
epoch: 5
train: cross entropy: 0.0424, accuracy: 0.9862
test: cross entropy: 0.0875, accuracy: 0.9744
epoch: 6


KeyboardInterrupt: 

In [78]:
nn = Sequential([Affine(784, 100), Relu(), Affine(100, 200), Relu(), Affine(200, 10), SoftMax()])
nn.params

[parameter shape: (784, 100), size: 78400,
 parameter shape: (100,), size: 100,
 parameter shape: (100, 200), size: 20000,
 parameter shape: (200,), size: 200,
 parameter shape: (200, 10), size: 2000,
 parameter shape: (10,), size: 10]