In [85]:
from typing import Callable, Tuple

from keras.datasets import mnist

from matplotlib import pyplot as plt
import numpy as np

import sklearn.preprocessing
import sklearn.metrics

from tqdm import tqdm

# Наши разработки из 3-ей работы))
from nn import Nums
from nn import NN, Layer
from nn.optimizers import AdamOptimizer

type Nums = np.ndarray[np.number]
type NumsToNums = Callable[[Nums], Nums]
type NumsNumsToNums = Callable[[Nums, Nums], Nums]

np.set_printoptions(formatter={"float": lambda x: "{0:0.3f}".format(x)})

In [123]:
import scipy
import scipy.signal

Загружаем датасет, one-hot-encoding для меток классов

In [86]:
(train_x, train_y), (test_x, test_y) = mnist.load_data()

train_x = train_x / 255
test_x = test_x / 255

train_x.resize(60000, 784)
test_x.resize(10000, 784)

encoder = sklearn.preprocessing.OneHotEncoder()
train_y = encoder.fit_transform(train_y[..., None]).toarray()
test_y = encoder.fit_transform(test_y[..., None]).toarray()

train_x.shape, train_y.shape

((60000, 784), (60000, 10))

In [87]:
weights = np.random.rand(3, 3)
weights

array([[0.121, 0.415, 0.496],
       [0.631, 0.331, 0.630],
       [0.628, 0.713, 0.791]])

In [242]:
def conv2d(x: Nums, core: Nums, strides: Tuple[int, int]=(1, 1), padding: Tuple[int, int]=(0, 0)) -> Nums:
    """2д свёртка

    Args:
        x (Nums): Входной массив, 1d
        x_shape (Tuple[int, int]): Размерность входного массива 2d (Как интерпретировать, для mnist - (28, 28))
        core (Nums): Ядро свёртки
        strides (Tuple[int, int], optional): Шаги свёртки по x и y. Defaults to (1, 1).
        padding (Tuple[int, int], optional): Отступы по x и y. Defaults to (1, 1).

    Returns:
        Nums: Результат свёртки
    """
    if padding != (0, 0):
        new_shape = (x.shape[0] + padding[0] * 2, x.shape[1] + padding[1] * 2)
        new_x = np.zeros(new_shape)  # todo: make more efficient
        new_x[padding[0]:-padding[0], padding[1]:-padding[1]] = x
        x = new_x
    
    output_shape = (np.subtract(x.shape, core.shape) + strides) / strides
    output_shape = tuple(np.floor(output_shape, casting='unsafe', dtype=int))
    views_shape = output_shape + core.shape
    
    row_strides, col_strides = x.strides
    
    views = np.lib.stride_tricks.as_strided(
        x, shape=views_shape, strides=(row_strides * strides[0], col_strides * strides[1], row_strides, col_strides)
        )
    return np.einsum('ijkl, kl -> ij', views, core)

In [222]:
for i in range(len(train_x)):
    y = conv2d(train_x[i].reshape((28, 28)), w, strides=(1, 1), padding=(1, 1))

In [None]:
a = np.array([
    [0, 1, 2, 1, 0],
    [4, 1, 0, 1, 0],
    [2, 0, 1, 1, 1],
    [1, 2, 3, 1, 0],
    [0, 4, 3, 2, 0]])

w = np.array([
    [
        [0, 1, 0],
        [1, 0, 1],
        [2, 1, 0]
    ],
    [
        [0, 1, 0],
        [1, 0, 1],
        [2, 1, 0]
    ]
])

r = conv2d(a, w, strides=(2, 2))
r

ValueError: operands could not be broadcast together with shapes (2,) (3,) 

In [233]:
def pooling(x: Nums, window: Tuple[int, int], strides: Tuple[int, int], func=np.max) -> Nums:
    output_shape = (np.subtract(x.shape, window) + strides) / strides
    output_shape = tuple(np.floor(output_shape, casting='unsafe', dtype=int))
    
    views_shape = output_shape + window
    
    row_strides, col_strides = x.strides
    
    views = np.lib.stride_tricks.as_strided(
        x, shape=views_shape, strides=(row_strides * strides[0], col_strides * strides[1], row_strides, col_strides)
    )
    return views
    

In [235]:
views = pooling(np.array(
    [
        [0, 1, 2, 1],
        [4, 1, 0, 1],
        [2, 0, 1, 1],
        [1, 2, 3, 1]
    ]), window=(2, 2), strides=(2, 2))
np.max(views, axis=(-2, -1))

array([[4, 2],
       [2, 3]])

In [None]:
from typing import Tuple, Callable

from .base import Nums, NumsToNums
from .base import np
from .optimizers import OptimizerBase


class ConvLayer:
    """
    Реализует свёрточный слой

    input_size: int - количество входных нейронов
    output_size: int - количество выходных нейронов
    weights: np.ndarray[np.number] - веса слоя
    bias: np.ndarray[np.number] - смещения

    self.activation_function: NumsToNums - функция активации слоя
    self.activation_function_derivation: NumsToNums - производная функции активации слоя
    """

    def __init__(
        self,
        window_size: Tuple[int, int],
        strides: Tuple[int, int],
        padding: Tuple[int, int],
        pooling_function: NumsToNums,
        activation_function: Tuple[
            NumsToNums,
            NumsToNums,
        ],
        weights_initialize_function: Callable[[Tuple[int, int]], Nums] = None,
    ) -> None:
        if weights_initialize_function is None:
            weights_initialize_function = np.random.random

        self.weights = weights_initialize_function(window_size)
        self.bias = weights_initialize_function(window_size)
        self.activation_function = activation_function[0]
        self.activation_function_derivation = activation_function[1]
        self.strides = strides
        self.padding = padding
        self.pooling_function = pooling_function

        # Значения x и t, фиксируемые при forward, нужны для вычисления ошибки
        self._xt = ()
        """
        Контейнеры для хранения вычисленных ошибок для весов и смещений
        Необходимы для того, чтобы можно было обучать батчами
        
        В батче, для каждого примера необходимо вызвать forward и backward.
        В конце батча, для модификации весов, необходимо вызвать update
        _errors_log будет очищен
        """
        self._errors_log = ([], [])

    def forward(self, x: Nums) -> Nums:
        t = conv2d(x, self.weights, self.strides, self.padding)
        h = self.activation_function(t)
        p = pooling(h, (2, 2), (1, 1), self.pooling_function)
        
        self._xt = (x, t)
        return h

    def backward(self, error: Nums) -> Nums:
        """
        Обратное распространение ошибки
        Запоминает производные ошибки, но не изменяет параметры модели
        Для изменения нужно вызвать update
        """
        x, t = self._xt
        self._xt = ()

        de_dT = error * self.activation_function_derivation(t)
        de_dW = x.T @ de_dT
        de_dB = de_dT
        de_dX = de_dT @ self.weights.T
        self._errors_log[0].append(de_dW)
        self._errors_log[1].append(de_dB)
        return de_dX

    def update(self, learning_rate: float):
        """
        Модифицирует веса модели
        learning_rate: float, [0, 1] - на сколько сильно модель будет реагировать на ошибку
        """
        de_dWs, de_dBs = self._errors_log
        de_dW = np.sum(de_dWs, axis=0)
        de_dB = np.sum(de_dBs, axis=0)

        self.optimizer.step(self.weights, self.bias, learning_rate, de_dW, de_dB)

        self._errors_log = ([], [])

    def set_optimizer(self, optimizer: OptimizerBase):
        optimizer.init_params(self.weights, self.bias)
        self.optimizer = optimizer

    def __repr__(self) -> str:
        return f"<LinearLayer, {self.input_size}x{self.output_size}>"
