In [22]:
from abc import ABC, abstractmethod

import numpy as np
from numpy.random import Generator as RandomGenerator
from sklearn.datasets import load_breast_cancer
from sklearn.metrics import accuracy_score, log_loss
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

In [23]:
X, y = load_breast_cancer(return_X_y=True)
y = y.astype(np.float64).reshape(-1, 1)

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

X_train = X_train.T
X_test = X_test.T
y_train = y_train.T
y_test = y_test.T

In [24]:
class WeightInitializer:
    """Инициализатор весов нейронной сети по имени стратегии и распределения.

    Предоставляет единый интерфейс для генерации матриц весов на основе строкового имени
    инициализатора в формате <strategy>_<distribution>. Поддерживает стратегии random,
    xavier и kaiming, а также распределения normal и uniform.

    Attributes:
        _rng (RandomGenerator): Генератор случайных чисел.
    """

    def __init__(self, rng: RandomGenerator | None = None):
        """Инициализирует объект инициализации весов.

        Args:
            rng (RandomGenerator | None): Генератор случайных чисел. Если не задан,
                используется генератор по умолчанию np.random.default_rng().
        """
        self._rng: RandomGenerator = rng or np.random.default_rng()

    def _normal(self, var: float, shape: tuple[int, int]) -> np.ndarray:
        """Генерирует матрицу весов из нормального распределения с заданным масштабом.

        Args:
            var (float): Параметр масштаба распределения, используемый при генерации.
            shape (tuple[int, int]): Форма генерируемой матрицы весов.

        Returns:
            np.ndarray: Матрица весов указанной формы.
        """
        return self._rng.normal(0, var, shape)

    def _uniform(self, var: float, shape: tuple[int, int]) -> np.ndarray:
        """Генерирует матрицу весов из равномерного распределения с заданным масштабом.

        Масштаб преобразуется к границам равномерного распределения так, чтобы
        соответствовать выбранной стратегии инициализации.

        Args:
            var (float): Базовый параметр масштаба.
            shape (tuple[int, int]): Форма генерируемой матрицы весов.

        Returns:
            np.ndarray: Матрица весов указанной формы.
        """
        bound = np.sqrt(3 * var)
        return self._rng.uniform(-bound, bound, shape)

    def _parse_name(self, name: str) -> tuple[str, str]:
        """Разбирает имя инициализатора на стратегию и распределение.

        Ожидаемый формат имени: <strategy>_<distribution>, например "xavier_uniform".

        Args:
            name (str): Имя инициализатора в строковом формате.

        Raises:
            ValueError: Если строка не соответствует формату <strategy>_<distribution>.

        Returns:
            tuple[str, str]: Кортеж из стратегии и распределения.
        """
        parts = name.split("_")

        if len(parts) != 2:
            raise ValueError(
                f"Invalid format '{name}'. Expected format: <strategy>_<distribution>"
            )

        return tuple(parts)

    def __call__(self, name: str, shape: tuple[int, int]) -> np.ndarray:
        """Создаёт матрицу весов согласно выбранной стратегии и распределению.

        Поддерживаемые стратегии:
            - random: Масштаб 1 / n_in.
            - xavier: Масштаб 2 / (n_in + n_out).
            - kaiming: Масштаб 2 / n_in.

        Поддерживаемые распределения:
            - normal: Нормальное распределение.
            - uniform: Равномерное распределение.

        Args:
            name (str): Имя инициализатора в формате <strategy>_<distribution>.
            shape (tuple[int, int]): Форма матрицы весов (n_in, n_out).

        Raises:
            ValueError: Если указана неподдерживаемая стратегия инициализации.
            ValueError: Если указано неподдерживаемое распределение.

        Returns:
            np.ndarray: Матрица весов указанной формы.
        """
        fan_in, fan_out = shape
        strategy, dist = self._parse_name(name)

        match strategy:
            case "random":
                var = 1 / fan_in
            case "xavier":
                var = 2 / (fan_in + fan_out)
            case "kaiming":
                var = 2 / fan_in
            case _:
                raise ValueError(f"Unknown strategy: {strategy}")

        match dist:
            case "normal":
                return self._normal(var, shape)
            case "uniform":
                return self._uniform(var, shape)
            case _:
                raise ValueError(f"Unknown distribution: {dist}")

In [None]:
class Layer(ABC):
    """Базовый абстрактный класс слоя нейронной сети.

    Определяет общий интерфейс для всех слоёв модели, включая методы
    прямого и обратного распространения, шаг обновления параметров,
    а также переключение между режимами обучения и инференса.


    Attributes:
        weight_init (str): Имя стратегии инициализации весов
            в формате <strategy>_<distribution>.
        is_train (bool): Флаг режима работы слоя.
            True — режим обучения, False — режим инференса.
    """

    def __init__(self, weight_init: str = "random_uniform"):
        """Инициализирует базовый слой.

        Args:
            weight_init (str): Имя стратегии инициализации весов,
                используемое при создании параметров слоя.
        """
        self.weight_init = weight_init
        self.is_train = True

    @abstractmethod
    def forward(self, x: np.ndarray) -> np.ndarray:
        """Выполняет прямой проход слоя.

        Args:
            x (np.ndarray): Входные данные слоя.

        Returns:
            np.ndarray: Результат преобразования входных данных.
        """
        pass

    @abstractmethod
    def backward(self, grad: np.ndarray):
        """Выполняет обратное распространение градиента через слой.

        Args:
            grad (np.ndarray): Градиент функции потерь по выходу слоя.

        Returns:
            np.ndarray: Градиент функции потерь по входу слоя.
        """
        pass

    def step(self, _lr: float) -> None:
        """Выполняет шаг обновления параметров слоя.

        Базовая реализация предназначена для слоёв без обучаемых параметров.

        Args:
            _lr (float): Скорость обучения.
        """
        return

    def train(self) -> None:
        """Переводит слой в режим обучения."""
        self.is_train = True

    def eval(self) -> None:
        """Переводит слой в режим инференса."""
        self.is_train = False


class Linear(Layer):
    """Полносвязный линейный слой нейронной сети.

    Выполняет аффинное преобразование входных данных по формуле:
        z = W @ x + b

    Сохраняет входные данные для последующего вычисления градиентов
    на этапе обратного распространения ошибки.

    Attributes:
        in_dim (int): Число входных признаков.
        out_dim (int): Число выходных признаков.
        w (np.ndarray): Матрица весов формы (out_dim, in_dim).
        b (np.ndarray): Вектор смещений формы (out_dim, 1).
        dw (np.ndarray | None): Градиент функции потерь по матрице весов.
        db (np.ndarray | None): Градиент функции потерь по вектору смещений.
    """

    def __init__(
        self, in_dim: int, out_dim: int, weight_init: str = "previous"
    ) -> None:
        """Инициализирует линейный слой с заданной размерностью.

        Args:
            in_dim (int): Число входных признаков.
            out_dim (int): Число выходных признаков.
            weight_init (str): Имя стратегии инициализации весов.
        """
        super().__init__(weight_init)

        self.in_dim = in_dim
        self.out_dim = out_dim

        self.w: np.ndarray
        self.b = np.zeros((out_dim, 1))

        self.x = None
        self.z = None

        self.dw = None
        self.db = None

    def forward(self, x: np.ndarray) -> np.ndarray:
        """Выполняет прямой проход линейного слоя.

        Args:
            x (np.ndarray): Входной тензор формы (in_dim, batch_size).

        Returns:
            np.ndarray: Выходной тензор формы (out_dim, batch_size).
        """
        self.x = x
        self.z = self.w @ x + self.b
        return self.z

    def backward(self, grad: np.ndarray) -> np.ndarray:
        """Вычисляет градиенты по параметрам и по входу слоя.

        Args:
            grad (np.ndarray): Градиент функции потерь по выходу слоя.

        Returns:
            np.ndarray: Градиент функции потерь по входу слоя.
        """
        if self.x is None:
            raise RuntimeError(
                "Cannot call backward() before forward(). "
                "Linear layer input is not cached."
            )

        self.dw = grad @ self.x.T
        self.db = np.sum(grad, axis=1, keepdims=True)

        grad_x = self.w.T @ grad
        return grad_x

    def step(self, lr: float) -> None:
        """Обновляет параметры слоя с использованием вычисленных градиентов.

        Args:
            lr (float): Скорость обучения.
        """
        if self.db is None or self.dw is None:
            raise RuntimeError(
                "Cannot update parameters before gradients are computed. "
                "Call backward() before step()."
            )

        self.w -= lr * self.dw
        self.b -= lr * self.db


class Sigmoid(Layer):
    """Сигмоидная функция активации.

    Преобразует входные значения в диапазон (0, 1) по формуле:
        σ(z) = 1 / (1 + exp(-z))
    """

    def __init__(self, weight_init: str = "kaiming_uniform"):
        """Инициализирует слой сигмоидной активации.

        Args:
            weight_init (str): Имя стратегии инициализации,
                передаётся для согласованности интерфейса.
        """
        super().__init__(weight_init)

        self.y = None

    def forward(self, x: np.ndarray) -> np.ndarray:
        """Выполняет прямой проход сигмоидной активации.

        Args:
            x (np.ndarray): Входной тензор предактивации.

        Returns:
            np.ndarray: Выходной тензор после применения сигмоиды.
        """
        self.y = 1 / (1 + np.exp(-x))
        return self.y

    def backward(self, grad: np.ndarray):
        """Вычисляет градиент сигмоиды по входу.

        Args:
            grad (np.ndarray): Градиент функции потерь по выходу слоя.

        Returns:
            np.ndarray: Градиент функции потерь по входу слоя.
        """
        if self.y is None:
            raise RuntimeError(
                "Cannot call backward() before forward(). Sigmoid output is not cached."
            )

        return grad * self.y * (1 - self.y)


class Dropout(Layer):
    """Слой Dropout для регуляризации нейронной сети.

    В режиме обучения случайно зануляет элементы входа с вероятностью p
    и масштабирует оставшиеся элементы для сохранения математического ожидания.
    """

    def __init__(self, p: float, weight_init: str = "previous"):
        """Инициализирует слой Dropout.

        Args:
            p (float): Вероятность зануления элемента.
            weight_init (str): Имя стратегии инициализации,
                передаётся для согласованности интерфейса.
        """
        super().__init__(weight_init)

        self.p = p
        self.mask = None

    def forward(self, x: np.ndarray) -> np.ndarray:
        """Применяет Dropout к входным данным в режиме обучения.

        Args:
            x (np.ndarray): Входные данные слоя.

        Returns:
            np.ndarray: Выходные данные после применения маски Dropout.
        """
        if not self.is_train or self.p == 0:
            return x

        self.mask = (np.random.rand(*x.shape) > self.p).astype(x.dtype)
        self.mask /= 1.0 - self.p

        return x * self.mask

    def backward(self, grad: np.ndarray):
        """Применяет маску Dropout к градиенту на обратном проходе.

        Args:
            grad (np.ndarray): Градиент функции потерь по выходу слоя.

        Returns:
            np.ndarray: Градиент функции потерь по входу слоя.
        """
        if self.mask is None:
            raise RuntimeError(
                "Cannot call backward() before forward() in training mode. "
                "Dropout mask is not initialized."
            )

        return grad * self.mask


class ReLU(Layer):
    """Функция активации ReLU (Rectified Linear Unit).

    Выполняет преобразование:
        f(x) = max(0, x)
    """

    def __init__(self, weight_init: str = "xavier_uniform"):
        """Инициализирует слой ReLU.

        Args:
            weight_init (str): Имя стратегии инициализации,
                передаётся для согласованности интерфейса.
        """
        super().__init__(weight_init)

        self.mask = None

    def forward(self, x: np.ndarray) -> np.ndarray:
        """Применяет функцию ReLU к входным данным.

        Args:
            x (np.ndarray): Входной тензор.

        Returns:
            np.ndarray: Выходной тензор после применения ReLU.
        """
        self.mask = x > 0
        return x * self.mask

    def backward(self, grad: np.ndarray):
        """Вычисляет градиент ReLU по входу.

        Args:
            grad (np.ndarray): Градиент функции потерь по выходу слоя.

        Returns:
            np.ndarray: Градиент функции потерь по входу слоя.
        """
        if self.mask is None:
            raise RuntimeError(
                "Cannot call backward() before forward(). "
                "ReLU activation mask is not initialized."
            )

        return grad * self.mask

In [26]:
class NeuralNetwork:
    """Последовательная нейронная сеть, объединяющая набор слоёв.

    Управляет инициализацией весов линейных слоёв, выполнением прямого прохода,
    обратного распространения градиента и обновлением параметров. Также поддерживает
    переключение всех слоёв между режимами обучения и инференса.

    Инициализация весов выполняется с учётом стратегии, указанной в слое. Если для слоя
    задано значение "previous", используется стратегия инициализации, унаследованная
    от последнего встреченного слоя, который явно указал стратегию.

    Attributes:
        layers (list[Layer]): Список слоёв в порядке применения.
        _rng (np.random.Generator): Генератор случайных чисел для воспроизводимости.
        w_init (WeightInitializer): Инициализатор весов.
    """

    def __init__(self, layers: list[Layer], random_state: int | None = None) -> None:
        """Инициализирует нейронную сеть и выполняет инициализацию весов.

        Args:
            layers (list[Layer]): Слои сети в порядке выполнения.
            random_state (int | None): Начальное состояние генератора случайных чисел.
                Если не задано, используется недетерминированная инициализация.
        """
        super().__init__()

        self.layers = layers
        self._rng = np.random.default_rng(random_state)

        self.w_init = WeightInitializer(self._rng)

        self._init_weights()

    def _init_weights(self) -> None:
        """Инициализирует веса линейных слоёв в соответствии с выбранными стратегиями.

        Проходит по слоям в обратном порядке, чтобы определить стратегию инициализации
        для линейных слоёв на основе параметра weight_init. Если слой имеет значение
        "previous", то используется ранее определённая стратегия инициализации.
        """
        prev_init = "random_uniform"
        for layer in self.layers[::-1]:
            if isinstance(layer, Linear):
                init_name = prev_init
                if layer.weight_init != "previous":
                    init_name = layer.weight_init
                layer.w = self.w_init(init_name, (layer.out_dim, layer.in_dim))

            layer_init = layer.weight_init
            if layer_init != "previous":
                prev_init = layer_init

    def forward(self, x: np.ndarray) -> np.ndarray:
        """Выполняет прямой проход через все слои сети.

        Args:
            x (np.ndarray): Входные данные сети.

        Returns:
            np.ndarray: Выход сети после последовательного применения всех слоёв.
        """
        out = x
        for layer in self.layers:
            out = layer.forward(out)
        return out

    def backward(self, grad_out: np.ndarray) -> np.ndarray:
        """Выполняет обратное распространение градиента через все слои сети.

        Args:
            grad_out (np.ndarray): Градиент функции потерь по выходу сети.

        Returns:
            np.ndarray: Градиент функции потерь по входу сети.
        """
        grad = grad_out
        for layer in self.layers[::-1]:
            grad = layer.backward(grad)
        return grad

    def step(self, lr: float) -> None:
        """Обновляет параметры всех слоёв сети с заданной скоростью обучения.

        Args:
            lr (float): Скорость обучения.
        """
        for layer in self.layers:
            layer.step(lr)

    def train(self) -> None:
        """Переводит все слои сети в режим обучения."""
        for layer in self.layers:
            layer.train()

    def eval(self) -> None:
        """Переводит все слои сети в режим инференса."""
        for layer in self.layers:
            layer.eval()

In [27]:
n_features = 30

nn = NeuralNetwork(
    [
        Linear(n_features, 64),
        Dropout(0.3),
        ReLU(),
        Linear(64, 32),
        Dropout(0.2),
        ReLU(),
        Linear(32, 1),
        Sigmoid(),
    ],
    random_state=42,
)

In [28]:
learning_rate = 0.05
epochs = 200
batch_size = 32
N = X_train.shape[1]


for epoch in range(epochs + 1):
    nn.train()

    idx = np.random.permutation(N)

    for start in range(0, N, batch_size):
        batch_idx = idx[start : start + batch_size]
        x = X_train[:, batch_idx]
        yb = y_train[:, batch_idx]
        B = x.shape[1]

        y_hat = nn.forward(x)
        grad = (y_hat - yb) / B
        nn.backward(grad)
        nn.step(learning_rate)

    if epoch % 10 == 0:
        nn.eval()

        y_train_pred = nn.forward(X_train).ravel()
        y_test_pred = nn.forward(X_test).ravel()

        y_train_true = y_train.ravel()
        y_test_true = y_test.ravel()

        print(
            f"Epoch {epoch:>3}",
            "| train log_loss:",
            round(log_loss(y_train_true, y_train_pred), 4),
            "| train acc:",
            round(accuracy_score(y_train_true, y_train_pred > 0.5), 4),
            "| test log_loss:",
            round(log_loss(y_test_true, y_test_pred), 4),
            "| test acc:",
            round(accuracy_score(y_test_true, y_test_pred > 0.5), 4),
        )

Epoch   0 | train log_loss: 0.1852 | train acc: 0.9275 | test log_loss: 0.2058 | test acc: 0.9123
Epoch  10 | train log_loss: 0.0784 | train acc: 0.9758 | test log_loss: 0.1033 | test acc: 0.9561
Epoch  20 | train log_loss: 0.0615 | train acc: 0.9846 | test log_loss: 0.0998 | test acc: 0.9561
Epoch  30 | train log_loss: 0.0525 | train acc: 0.9868 | test log_loss: 0.0969 | test acc: 0.9561
Epoch  40 | train log_loss: 0.051 | train acc: 0.989 | test log_loss: 0.0921 | test acc: 0.9561
Epoch  50 | train log_loss: 0.0485 | train acc: 0.989 | test log_loss: 0.0941 | test acc: 0.9561
Epoch  60 | train log_loss: 0.0458 | train acc: 0.989 | test log_loss: 0.0922 | test acc: 0.9561
Epoch  70 | train log_loss: 0.0439 | train acc: 0.9912 | test log_loss: 0.0947 | test acc: 0.9561
Epoch  80 | train log_loss: 0.0427 | train acc: 0.9912 | test log_loss: 0.097 | test acc: 0.9561
Epoch  90 | train log_loss: 0.0417 | train acc: 0.9912 | test log_loss: 0.0966 | test acc: 0.9561
Epoch 100 | train log_los