# Library

In [None]:
import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
from typing import Callable
from dataclasses import dataclass
from pprint import pformat
from loguru import logger
from pathlib import Path
import pandas as pd

## Part 1

In [2]:
# Load the npy file
# You can use numpy's load function to read .npy files
npy_data : np.ndarray= np.load('./weights.npy', allow_pickle=True)

In [None]:
def softmax(x):
    exp_x = np.exp(x - np.max(x, axis=0, keepdims=True))  # Numerical stability
    return exp_x / np.sum(exp_x, axis=0, keepdims=True)


def softmax_derivative(softmax_output):
    # The derivative of softmax is generally used with cross-entropy loss, but for this example:
    s = softmax_output.reshape(-1, 1)
    return np.diagflat(s) - np.dot(s, s.T)


In [None]:
class Tools:
    activate_funcs = {
        "sigmoid": (
            lambda x: 1 / (1 + np.exp(-x)),
            lambda x: 1 / (1 + np.exp(-x) ** 2),
        ),
        "relu": (lambda x: np.maximum(0, x), lambda x: np.where(x > 0, 1, 0)),
        "leaky_relu": (
            lambda x: np.where(x > 0, x, x * 0.01),
            lambda x: np.where(x > 0, 1, 0.01),
        ),
        "tanh": (lambda x: np.tanh(x), lambda x: 1 - np.tanh(x) ** 2),
        "linear": (lambda x: x, lambda x: 1),
        "softmax": (
            lambda x: np.clip(softmax(x), 1e-15, 1 - 1e-15),
            softmax_derivative,
        ),
    }

    loss_funcs = {
        "sse": (
            lambda y, y_hat: np.sum((y - y_hat) ** 2),
            lambda y, y_hat: -2 * (y - y_hat),
        ),
        "rms": (
            lambda y, y_hat: np.sqrt(np.mean((y - y_hat) ** 2)),
            lambda y, y_hat: -(y - y_hat)
            / (len(y) * np.sqrt(np.sum((y - y_hat) ** 2))),
        ),
        "crossentropy": (
            lambda y, y_hat: -np.sum(y * np.log(y_hat + 1e-15)),
            lambda y, y_hat: y_hat - y,
        ),
    }

    @staticmethod
    def activate_func(name: str) -> tuple[Callable, Callable]:
        return Tools.activate_funcs[name]

    @staticmethod
    def loss_func(name: str) -> tuple[Callable, Callable]:
        return Tools.loss_funcs[name]

    @staticmethod
    def rms(y_true: np.ndarray, y_hat: np.ndarray) -> float:
        return np.sqrt(np.mean(np.sum(y_true - y_hat) ** 2))

In [None]:
@dataclass
class Layer:
    input_node: int
    activate: str
    output_node: int
    dropout_rate: float = 0.0


@dataclass
class BaseModelLayer:
    w: np.ndarray
    b: np.ndarray

    @classmethod
    def zero_like(cls, layer_basic):
        w = np.zeros_like(layer_basic.w)
        b = np.zeros_like(layer_basic.b)
        return cls(w, b)

    @property
    def shape(self) -> dict:
        return {"w": self.w.shape, "b": self.b.shape}

In [None]:

@dataclass
class ModelLayer(BaseModelLayer):
    activate_str: str
    activate: Callable[[np.ndarray], np.ndarray]
    activate_derivative: Callable[[np.ndarray], np.ndarray]
    dropout_rate: float

    @classmethod
    def build_layer(cls, layer_config: Layer):
        if not (0 <= layer_config.dropout_rate < 1):
            raise ValueError("dropout_rate must between 0 and 1")

        w = np.random.randn(layer_config.output_node, layer_config.input_node)
        b = np.random.randn(layer_config.output_node, 1)

        w, b = (
            w * np.sqrt(1.0 / layer_config.input_node),
            b * 0.01,
        )
        activate, activate_derivative = Tools.activate_func(layer_config.activate)
        return cls(
            w=w,
            b=b,
            dropout_rate=layer_config.dropout_rate,
            activate_str=layer_config.activate,
            activate=activate,
            activate_derivative=activate_derivative,
        )

    def forward_drop_out(self, a: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
        mask = np.random.binomial(1, 1 - self.dropout_rate, size=a.shape)
        a_drop_out = (a * mask) / (1 - self.dropout_rate)
        return a_drop_out, mask

    def update_delta(
        self,
        delta: np.ndarray,
        layer_next,
        z_next: np.ndarray,
        mask: np.ndarray = None,
    ) -> np.ndarray:

        if mask is not None:
            delta *= mask

        return (self.w.T @ delta) * layer_next.activate_derivative(z_next)

    def update(self, grad_item: BaseModelLayer, lr: float) -> None:
        assert (
            self.w.shape == grad_item.w.shape
        ), f"权重形状不匹配: {self.w.shape} vs {grad_item.w.shape}"
        assert self.b.shape[1] == 1

        assert (
            self.b.shape == grad_item.b.shape
        ), f"偏置形状不匹配: {self.b.shape} vs {grad_item.b.shape} {grad_item.w.shape}"

        self.w -= lr * grad_item.w
        self.b -= lr * grad_item.b
        return

    def __call__(self, x: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
        z = self.w @ x + self.b
        a = self.activate(z)
        return z, a

    def __str__(self) -> str:
        return f"w: {self.w.shape}, b: {self.b.shape}, activate: {self.activate_str}"

    def __repr__(self) -> str:
        return self.__str__()

In [None]:
class DNN:
    def __init__(self, layers_config: list[Layer], loss_func: str = "sse"):

        self._input_dim, self._output_dim = (
            layers_config[0].input_node,
            layers_config[-1].output_node,
        )

        self._layers: dict[int, ModelLayer] = {
            i: ModelLayer.build_layer(layer)
            for i, layer in enumerate(layers_config, start=1)
        }

        self._layers_len = len(self._layers)

        self._loss_func, self._loss_derivative = Tools.loss_func(loss_func)

        # self._is_multi = self._output_dim > 1

        self._model_struct = {
            "input_dim": self._input_dim,
            "output_dim": self._output_dim,
            "hidden_layers": self._layers,
            "loss_func": loss_func,
        }

        return

    @property
    def model_struct(self):
        return self._model_struct

    @property
    def w(self):
        return self._w

    @property
    def b(self):
        return self._b

    def forward(
        self, x: np.ndarray, for_backward: bool = False
    ) -> (
        tuple[dict[int, np.ndarray], dict[int, np.ndarray], dict[int, np.ndarray]]
        | np.ndarray
    ):

        if len(x.shape) == 1:
            x = x[:, np.newaxis]

        a_out, z_out, mask_out = {0: x}, dict(), dict()

        for i, layer in self._layers.items():
            z_out[i], a_out[i] = layer(a_out[i - 1])

            if layer.dropout_rate > 0 and for_backward:
                a_out[i], mask_out[i] = layer.forward_drop_out(a_out[i])

        last_item = next(reversed(a_out.values()))

        if for_backward:
            return last_item, a_out, z_out, mask_out

        return last_item

    def init_delta(self, a_out: np.ndarray, y: np.ndarray) -> np.ndarray:
        # if self._is_multi:

        #     return self._loss_derivative(y, softmax(a_out)) / y.shape[1]
        # print(y.shape)
        return self._loss_derivative(y, a_out) / y.shape[1]

    def backward(
        self,
        end_y_hat: np.ndarray,
        a_out: dict[int, np.ndarray],
        z_out: dict[int, np.ndarray],
        mask_out: dict[int, np.ndarray],
        y: np.ndarray,
    ) -> dict[int, BaseModelLayer]:
        # https://medium.com/@erikhallstrm/backpropagation-from-the-beginning-77356edf427d
        """
        put y data is a column vector , so if data is row vector , please transpose
        """
        grad = {i: BaseModelLayer.zero_like(layer) for i, layer in self._layers.items()}

        # init the delta
        delta = self.init_delta(end_y_hat, y)
        # print({"delta": delta, "y_hat": end_y_hat, "y": y})

        for layer_index in reversed(self._layers.keys()):
            next_index = layer_index - 1

            grad[layer_index].w = delta @ a_out[next_index].T

            grad[layer_index].b = np.sum(delta, axis=1, keepdims=True)

            if layer_index > 1:
                delta = self._layers[layer_index].update_delta(
                    delta=delta,
                    mask=mask_out.get(layer_index, None),
                    layer_next=self._layers[next_index],
                    z_next=z_out[next_index],
                )

        return grad

    def _update(self, grad: dict[int, BaseModelLayer], learning_rate: float) -> None:
        for i, grad_item in grad.items():
            self._layers[i].update(grad_item, lr=learning_rate)

        return

    def __call__(self, x) -> np.ndarray:
        return self.forward(x.T, for_backward=False)

    def __str__(self) -> str:
        return pformat(self._model_struct)

    def _l2_regularization(self, lambda_reg: float = 0.01):
        weight_item = [np.sum(np.square(layer.w)) for layer in self._layers.values()]
        return lambda_reg * 0.5 * np.sum(weight_item)

    def train(
        self,
        x: np.ndarray,
        y: np.ndarray,
        epochs: int,
        batch_size: int,
        learning_rate: float,
        l2: float = 0,
        save_folder: str = None,
    ) -> None:
        loss_log = []

        for epoch in range(epochs):

            indices = np.random.permutation(x.shape[0])
            X_shuffled = x[indices]
            y_shuffled = y[indices]

            for i in range(0, X_shuffled.shape[0], batch_size):

                X_batch = X_shuffled[i : i + batch_size]
                Y_batch = y_shuffled[i : i + batch_size]

                x_batch = X_batch.T
                y_batch = Y_batch.T

                y_hat, a_out, z_out, mask_out = self.forward(x_batch, for_backward=True)

                loss = self._loss_func(y_batch, y_hat)
                grad = self.backward(y_hat, a_out, z_out, mask_out, y_batch)

                if l2:
                    loss += self._l2_regularization(l2)

                    # update grad with l2
                    for i in grad.keys():
                        grad[i].w += l2 * self._layers[i].w

                # update
                self._update(grad, learning_rate)

                loss_log.append({"epoch": epoch, "loss": loss})
                logger.info(f"epoch {epoch}, loss {loss}")

        if save_folder is not None:
            path = Path(save_folder)
            path.mkdir(parents=True, exist_ok=True)

            df_log = pd.DataFrame(loss_log)
            df_log.to_csv(path.joinpath("loss.csv"), index=False)

            plt.figure(figsize=(8, 6))
            plt.plot(df_log.index, df_log["loss"], marker="o", label="Loss")

            # Labeling the plot
            plt.xlabel("Index")
            plt.ylabel("Loss")
            plt.title("Loss vs. Index")
            plt.legend()
            plt.savefig(path.joinpath("loss.png"))

            logger.info(f"save to {path}")

        return

    @classmethod
    def lazy_build(
        cls,
        input_dim: int,
        output_dim: int,
        calculate_layers: int,
        activate_func: str,
        up_dim: int,
        loss_func: str = "sse",
    ):

        lazy_config = (
            [Layer(input_node=input_dim, output_node=up_dim, activate=activate_func)]
            + [
                Layer(input_node=up_dim, output_node=up_dim, activate=activate_func)
                for _ in range(calculate_layers - 2)
            ]
            + [Layer(input_node=up_dim, output_node=output_dim, activate=activate_func)]
        )

        return cls(lazy_config, loss_func)

In [3]:
class Fnn:
    def __init__(self, weight_path:str):
        self.weights = np.load(weight_path, allow_pickle=True)
        
        self.model = {
            key : self.weights.item()[key] 
            for key in self.weights.item().keys()
        }
        
    
        

In [4]:
model = Fnn('./weights.npy')

In [5]:
model.model

{'w1': array([[-0.8091491 , -0.97136003,  0.87402759, ..., -0.07917783,
         -0.48049972,  0.14332123],
        [ 0.13680957, -0.42658406, -0.1065486 , ..., -0.35674324,
          0.73503393,  0.97974145],
        [-0.98785399,  0.13864091, -0.12334028, ...,  0.97998961,
          0.81049359, -0.96672978],
        ...,
        [ 0.07590487, -0.72909673, -0.49320916, ..., -0.97016631,
          0.37905837, -0.39720247],
        [ 0.80353272,  0.49302694,  0.78355299, ...,  0.49187097,
          0.22976861, -0.14471415],
        [-0.65034227, -0.44644287,  0.96614131, ..., -0.41951292,
         -0.39306069, -0.21858973]], shape=(784, 2048)),
 'b1': array([[ 0.92383328],
        [-0.60657822],
        [-0.03953955],
        ...,
        [-0.7922291 ],
        [-0.06904414],
        [-0.77732582]], shape=(2048, 1)),
 'w2': array([[ 0.83440492,  0.73153573,  0.71223082, ..., -0.73398912,
          0.57465462,  0.54825252],
        [ 0.79550778,  0.74281046, -0.50577499, ..., -0.13199838

## Part 2