In [17]:
from numpy.typing import NDArray, DTypeLike
import numpy as np
import matplotlib.pyplot as plt
from abc import ABC, abstractmethod

# TODO
# Add Batch Normalization With Scale and Shift
# Add Skip Connection

class DataLoader:
    def __init__(self,
                 X: NDArray,
                 y: NDArray,
                 batch_size: int = 32,
                 shuffle: bool = True):
        assert X.shape[0] == y.shape[0], "The number of examples in x and y must match"

        y = y[:, np.newaxis] if y.ndim == 1 else y
        self.X, self.y = X, y
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.num_samples = X.shape[0]
        self.indices = np.arange(self.num_samples)

    def __iter__(self):
        if self.shuffle:
            np.random.shuffle(self.indices)

        for start in range(0, self.num_samples, self.batch_size):
            end = start + self.batch_size
            batch_idx = self.indices[start:end]

            yield self.X[batch_idx], self.y[batch_idx]

    def __len__(self):
        return (self.num_samples + self.batch_size - 1) // self.batch_size

class Module(ABC):
    def __init__(self):
        self._params: list[NDArray] = []
        self._grads: list[NDArray] = []

    @abstractmethod
    def forward(self, x: NDArray) -> NDArray:
        pass

    @abstractmethod
    def backward(self, grad: NDArray) -> NDArray:
        pass

    def parameters(self) -> list[tuple[NDArray, NDArray]]:
        return list(zip(self._params, self._grads))

    def zero_grad(self) -> None:
        for grad in self._grads:
            grad.fill(0)

# Convolutional layer (single-channel inputs assumed)
class Conv2D(Module):
    def __init__(self, in_channels: int, out_channels: int,
                 kernel_size: int, stride: int = 1, padding: int = 0) -> None:
        super().__init__()
        self.stride = stride
        self.padding = padding
        self.kernel_size = kernel_size

        fan_in = in_channels * kernel_size * kernel_size
        w = np.random.randn(out_channels, in_channels, kernel_size, kernel_size) * np.sqrt(2.0 / fan_in)
        b = np.zeros(out_channels)

        self._params = [w, b]
        self._grads = [np.zeros_like(w), np.zeros_like(b)]
        self._cache: tuple[NDArray, NDArray] = (np.zeros_like(w), np.zeros_like(w))

    def _out_size(self, size: int) -> int:
        return (size + 2 * self.padding - self.kernel_size) // self.stride + 1

    def forward(self, x: NDArray) -> NDArray:
        # x.shape: (batch_size, in_channels, height, width)
        # out.shape: (batch_size, out_channels, height_out, width_out)
        # in_channels: 1 - Black/White, 3 - RGB
        # out_channels: Number of filters
        w, b = self._params
        stride, padding, kernel_size = self.stride, self.padding, self.kernel_size
        out_channels = w.shape[0]
        batch_size, in_channels, height, width = x.shape
        assert in_channels == w.shape[1]

        # pad
        paddings = ((0, 0), (0, 0), (padding, padding), (padding, padding))
        x_pad = np.pad(x, paddings, mode='constant', constant_values=0)

        height_out = self._out_size(height)
        width_out = self._out_size(width)

        out = np.zeros((batch_size, out_channels, height_out, width_out))
        self._cache = (x, x_pad)

        for n in range(batch_size):
            for out_c in range(out_channels):
                kernel = w[out_c]
                bias = b[out_c]
                for i in range(height_out):
                    for j in range(width_out):
                        #h0 - height_stride, w0 - width_stride
                        h0, w0 = stride*i, stride*j
                        region = x_pad[n, :, h0:h0+kernel_size, w0:w0+kernel_size]
                        out[n, out_c, i, j] = np.sum(region * kernel) + bias

        return out

    def backward(self, grad: NDArray) -> NDArray:
        # x.shape: (batch_size, in_channels, height, width)
        # grad.shape: (batch_size, out_channels, height_out, width_out)
        x, x_pad = self._cache
        w, b = self._params
        dw, db = self._grads

        stride, padding, kernel_size = self.stride, self.padding, self.kernel_size
        batch_size, in_channels, height, width = x.shape
        _, out_channels, height_out, width_out = grad.shape

        dw.fill(0)
        db.fill(0)

        dx_pad = np.zeros_like(x_pad)

        for n in range(batch_size):
            for out_c in range(out_channels):
                kernel = w[out_c]
                for i in range(height_out):
                    for j in range(width_out):
                        #h0 - height_stride, w0 - width_stride
                        h0, w0 = stride*i, stride*j

                        region = x_pad[n, :, h0:h0+kernel_size, w0:w0+kernel_size]

                        grad_out = grad[n, out_c, i, j]

                        dw[out_c] += grad_out * region
                        db[out_c] += grad_out

                        dx_pad[n, :, h0:h0+kernel_size, w0:w0+kernel_size] += grad_out * kernel
        # unpad
        if padding > 0:
            return dx_pad[:, :, padding:-padding, padding:-padding]
        return dx_pad

class Flatten(Module):
    def __init__(self) -> None:
        super().__init__()
        self._cache: tuple[int, ...] = ()

    def forward(self, x: NDArray) -> NDArray:
        # x shape: (batch_size, ...)
        self._cache = x.shape
        batch_size = x.shape[0]
        return x.reshape(batch_size, -1)

    def backward(self, grad: NDArray) -> NDArray:
        # grad shape: (batch_size, features)
        return grad.reshape(self._cache)


class MaxPool2D(Module):
    def __init__(self, kernel_size: int, stride: int = 1) -> None:
        super().__init__()
        self.kernel_size = kernel_size
        self.stride = stride
        self._cache: tuple[NDArray, NDArray] = (np.zeros_like(1), np.zeros_like(1))  # (x, mask)

    def _out_size(self, size: int) -> int:
        return (size - self.kernel_size) // self.stride + 1

    def forward(self, x: NDArray) -> np.ndarray:
        # x shape: (batch_size, channels, height, width)
        stride, kernel_size = self.stride, self.kernel_size
        batch_size, channels, height, width = x.shape
        height_out = self._out_size(height)
        width_out = self._out_size(width)

        out = np.zeros((batch_size, channels, height_out, width_out))
        mask = np.zeros_like(x, dtype=bool)

        for n in range(batch_size):
            for c in range(channels):
                for i in range(height_out):
                    for j in range(width_out):
                        #h0 - height_stride, w0 - width_stride
                        h0, w0 = i * stride, j * stride
                        window = x[n, c, h0:h0 + kernel_size, w0:w0 + kernel_size]
                        max_val = np.max(window)
                        out[n, c, i, j] = max_val
                        # create mask
                        mask[n, c, h0:h0 + kernel_size, w0:w0 + kernel_size] = (window == max_val)

        self._cache = (x.shape, mask)
        return out

    def backward(self, grad: np.ndarray) -> np.ndarray:
        # grad shape: (batch_size, channels, h_out, w_out)
        x_shape, mask = self._cache
        stride, kernel_size = self.stride, self.kernel_size
        batch_size, channels, height, width = x_shape
        _, _, height_out, width_out = grad.shape

        dx = np.zeros(x_shape)
        for n in range(batch_size):
            for c in range(channels):
                for i in range(height_out):
                    for j in range(width_out):
                        #h0 - height_stride, w0 - width_stride
                        h0, w0 = i * stride, j * stride

                        # distribute gradient to max positions
                        dx[n, c, h0:h0 + kernel_size, w0:w0 + kernel_size] += \
                            mask[n, c, h0:h0 + kernel_size, w0:w0 + kernel_size] * grad[n, c, i, j]
        return dx

class Sequential(Module):
    def __init__(self, *layers: Module):
        super().__init__()
        self.layers = list(layers)

    def forward(self, x: NDArray) -> NDArray:
        out = x
        for layer in self.layers:
            out = layer.forward(out)
        return out

    def backward(self, grad: NDArray) -> NDArray:
        grad_out = grad
        for layer in reversed(self.layers):
            grad_out = layer.backward(grad_out)
        return grad_out

    def parameters(self):
        params = []
        for layer in self.layers:
            params.extend(layer.parameters())
        return params

    def zero_grad(self):
        for layer in self.layers:
            layer.zero_grad()

class DropOut(Module):
    def __init__(self, p: float = 0.5):
        super().__init__()
        assert 0 <= p < 1, "Dropout probability must be in [0, 1)"
        self.p = p
        self.mask: NDArray = np.array([])

    def forward(self, x: NDArray) -> NDArray:
        #if self.training and self.p > 0:
        self.mask = (np.random.rand(*x.shape) > self.p) / (1 - self.p)
        return x * self.mask
        #return x

    def backward(self, grad: NDArray) -> NDArray:
        #if self.training and self.p > 0:
        return grad * self.mask
        #return grad

# Linear layer: y = Wx + b
class Linear(Module):
    def __init__(self, in_features: int, out_features: int) -> None:
        super().__init__()

        weight: NDArray = np.random.randn(out_features, in_features) * np.sqrt(2 / in_features)
        bias: NDArray = np.zeros((out_features, 1))

        self._params = [weight, bias]
        self._grads = [np.zeros_like(weight), np.zeros_like(bias)]

        self._input: NDArray = np.zeros((in_features, 1))

    def forward(self, x: NDArray) -> NDArray:
        self._input = x
        W, b = self._params
        return W @ x + b

    # grad it's dLoss/dy
    def backward(self, grad: NDArray) -> NDArray:
        W, b = self._params
        dW, db = self._grads
        x = self._input

        dW[...] = grad @ x.T
        db[...] = np.sum(grad, axis=1, keepdims=True)

        return W.T @ grad

class Normalization(ABC):
    @abstractmethod
    def penalty(self, weights: NDArray) -> float:
        pass

    @abstractmethod
    def grad(self, weights: NDArray) -> NDArray:
        pass

class L1(Normalization):
    def penalty(self, weights: NDArray) -> float:
        return np.sum(np.abs(weights))

    def grad(self, weights: NDArray) -> NDArray:
        return np.sign(weights)

class L2(Normalization):
    def penalty(self, weights: NDArray) -> float:
        return 0.5 * np.sum(weights ** 2)

    def grad(self, weights: NDArray) -> NDArray:
        return weights

class ElasticNet(Normalization):
    def __init__(self, ratio: float = 0.5, lambda_: float = 1e-4) -> None:
        self.ratio = ratio
        self.lambda_ = lambda_

    def penalty(self, weights: NDArray) -> float:
        return self.ratio * L1().penalty(weights) + (1 - self.ratio) * L2().penalty(weights)

    def grad(self, weights: NDArray) -> NDArray:
        return self.ratio * L1().grad(weights) + (1 - self.ratio) * L2().grad(weights)

# Activation: ReLU
class ReLU(Module):
    def __init__(self):
        super().__init__()
        self._cache: NDArray = np.zeros_like(1)

    def forward(self, x: NDArray) -> NDArray:
        self._cache = x
        return np.maximum(0, x)

    def backward(self, grad: NDArray) -> NDArray:
        return grad * (self._cache > 0)

# Activation: Sigmoid
class Sigmoid(Module):
    def __init__(self):
        super().__init__()
        self._cache: NDArray = np.zeros_like(1)

    def forward(self, x: NDArray) -> NDArray:
        self._cache = 1 / (1 + np.exp(-x))
        return self._cache

    def backward(self, grad: NDArray) -> NDArray:
        return self._cache * (1 - self._cache)

class CrossEntropy:
    def __init__(self):
        self.probs: NDArray = np.array([])

    def forward(self, logits: NDArray, labels: NDArray) -> float:
        max_logit: DTypeLike = np.max(logits, axis=0, keepdims=True) # Avoid overflow
        exps = np.exp(logits - max_logit)
        self.probs = exps / np.sum(exps, axis=0, keepdims=True) # Softmax(logits)

        m = labels.shape[1]
        loss = -np.sum(labels * np.log(self.probs + 1e-15)) / m

        return loss

    def backward(self, labels: NDArray) -> NDArray:
        m = labels.shape[1]
        return (self.probs - labels) / m

class Optimizer(ABC):
    def __init__(self, params: list[tuple[NDArray, NDArray]], lr: float = 1e-2) -> None:
        self.params = params
        self.lr = lr

    @abstractmethod
    def step(self) -> None:
        pass

class SGD(Optimizer):
    def step(self) -> None:
        for (param, grad) in self.params:
            param -= self.lr * grad

class Adagrad(Optimizer):
    def __init__(self, params: list[tuple[NDArray, NDArray]],
                 lr: float = 1e-2, eps: float = 1e-15) -> None:
        super().__init__(params, lr)

        self.eps = eps
        self.cache: list[NDArray] = [np.zeros_like(param) for param, _ in params]

    def step(self) -> None:
        for idx, (p, grad) in enumerate(self.params):
            self.cache[idx] += grad ** 2
            lr = self.lr / (np.sqrt(self.cache[idx]) + self.eps)
            p -= lr * grad

class RMSProp(Optimizer):
    def __init__(self, params: list[tuple[NDArray, NDArray]],
                 lr: float = 1e-2, beta: float = 0.9, eps: float = 1e-15) -> None:
        super().__init__(params, lr)

        self.beta, self.eps = beta, eps
        self.cache: list[NDArray] = [np.zeros_like(param) for param, _ in params]

    def step(self) -> None:
        for idx, (p, grad) in enumerate(self.params):
            self.cache[idx] = self.beta * self.cache[idx] + (1 - self.beta) * grad ** 2
            lr = self.lr / (np.sqrt(self.cache[idx]) + self.eps)
            p -= lr * grad

class Adam(Optimizer):
    def __init__(self, params: list[tuple[NDArray, NDArray]], lr: float = 1e-3,
                 beta1: float = 0.9, beta2: float = 0.999, eps: float = 1e-8):
        super().__init__(params, lr)

        self.beta1, self.beta2, self.eps = beta1, beta2, eps
        self.m = [np.zeros_like(p) for p, _ in params]
        self.v = [np.zeros_like(p) for p, _ in params]
        self.t = 0

    def step(self) -> None:
        self.t += 1
        for idx, (p, grad) in enumerate(self.params):
            self.m[idx] = self.beta1 * self.m[idx] + (1 - self.beta1) * grad
            self.v[idx] = self.beta2 * self.v[idx] + (1 - self.beta2) * (grad ** 2)

            m_hat = self.m[idx] / (1 - self.beta1 ** self.t)
            v_hat = self.v[idx] / (1 - self.beta2 ** self.t)

            p -= self.lr * m_hat / (np.sqrt(v_hat) + self.eps)