In [None]:
!pip install datasets

Collecting datasets
  Downloading datasets-2.15.0-py3-none-any.whl (521 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m521.2/521.2 kB[0m [31m6.9 MB/s[0m eta [36m0:00:00[0m
Collecting pyarrow-hotfix (from datasets)
  Downloading pyarrow_hotfix-0.6-py3-none-any.whl (7.9 kB)
Collecting dill<0.3.8,>=0.3.0 (from datasets)
  Downloading dill-0.3.7-py3-none-any.whl (115 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m115.3/115.3 kB[0m [31m15.7 MB/s[0m eta [36m0:00:00[0m
Collecting multiprocess (from datasets)
  Downloading multiprocess-0.70.15-py310-none-any.whl (134 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m134.8/134.8 kB[0m [31m16.4 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: pyarrow-hotfix, dill, multiprocess, datasets
Successfully installed datasets-2.15.0 dill-0.3.7 multiprocess-0.70.15 pyarrow-hotfix-0.6


In [None]:
#helper library to create base classes for inheritance(acts like a java interface)
from abc import ABC, abstractmethod
#used to give type hints(makes life 100x easier to know what you get out of a function with just a hover)
from typing import Dict,Optional, Tuple, List, Callable
import time

import numpy as np
#huggingface lib to get the mnist dset
from datasets import load_dataset

In [None]:
from PIL import Image as im

In [None]:
#Dataset params

# number of samples in the train data set
train_size = 10000
# number of samples in the test data set
test_size = 1000
# number of samples in the validation data set
valid_size = 256
# number of classes(number of characters) + add 27 if you want to add letters( will work, just retrain the model from scratch(aka press run with the correct dataset loaded))
n_classes = 10
# image size
img_size = 28

In [None]:
#DATASET OBTAIN AND PREP
x_dset, y_dset= load_dataset('mnist', split=['train', 'test'])

Downloading builder script:   0%|          | 0.00/3.98k [00:00<?, ?B/s]

Downloading metadata:   0%|          | 0.00/2.21k [00:00<?, ?B/s]

Downloading readme:   0%|          | 0.00/6.83k [00:00<?, ?B/s]

Downloading data files:   0%|          | 0/4 [00:00<?, ?it/s]

Downloading data:   0%|          | 0.00/9.91M [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/28.9k [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/1.65M [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/4.54k [00:00<?, ?B/s]

Extracting data files:   0%|          | 0/4 [00:00<?, ?it/s]

Generating train split:   0%|          | 0/60000 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/10000 [00:00<?, ? examples/s]

In [None]:
# notation for dset vars is from a standard in AI, namely we denote the model as f() and have f(x) = y
TrainX= x_dset["image"]
TrainY= np.array(x_dset["label"])
TestX = y_dset["image"]
TestY = np.array(y_dset["label"])

In [None]:
TestY.shape

(10000,)

In [None]:
TrainX= np.empty((60000,28,28),dtype = np.int32)
for index, elem in enumerate(TrainX):
  TrainX[index] = elem

TestX= np.empty((10000,28,28),dtype = np.int32)
for index, elem in enumerate(TestX):
  TestX[index] = elem


In [None]:
#creating a validation set(aka a test for the model for after we finish training), took 1k images from test set
x_train = TrainX[:train_size]
y_train = TrainY[:train_size]

x_test = TestX[:test_size]
y_test = TestY[:test_size]

x_valid = TestX[test_size : test_size + valid_size]
y_valid = TestY[test_size : test_size + valid_size]

In [None]:
x_valid.shape

(256, 28, 28)

In [None]:
#helper function to transform the img labels to arrays(5 -> 0,0,0,0,1, ...) an array of (n_classes, 1) dims
def convert_categorical2one_hot(y: np.array) -> np.array:
    """
    :param y - categorical array with (n, 1) shape
    :return one hot array with (n, k) shape
    ----------------------------------------------------------------------------
    n - number of examples
    k - number of classes
    """
    one_hot_matrix = np.zeros((y.size, y.max() + 1))
    one_hot_matrix[np.arange(y.size), y] = 1
    return one_hot_matrix


In [None]:
#Code need a 4 dim array so as to include the different results from the convolutions
x_train = x_train / 255
x_train = np.expand_dims(x_train, axis=3)
y_train = convert_categorical2one_hot(y_train)
x_test = x_test / 255
x_test = np.expand_dims(x_test, axis=3)
y_test = convert_categorical2one_hot(y_test)
x_valid = x_valid / 255
x_valid = np.expand_dims(x_valid, axis=3)
y_valid = convert_categorical2one_hot(y_valid)
print("X_train shape:", x_train.shape)
print("y_train shape:", y_train.shape)
print("X_test shape:", x_test.shape)
print("y_test shape:", y_test.shape)
print("X_valid shape:", x_valid.shape)
print("y_valid shape:", y_valid.shape)

X_train shape: (10000, 28, 28, 1)
y_train shape: (10000, 10)
X_test shape: (1000, 28, 28, 1)
y_test shape: (1000, 10)
X_valid shape: (256, 28, 28, 1)
y_valid shape: (256, 10)


In [None]:
#ABSTRACT FUNCTIONS(EQUIVALENT OF A JAVA INTERFACE CLASS OR A PURE VIRTUAL FUNCTION CLASS IN C++)

class Layer(ABC):
    @property
    def weights(self) -> Optional[Tuple[np.array, np.array]]:
        """
        Returns weights tensor if layer is trainable.
        Returns None for non-trainable layers.
        """
        return None

    @property
    def gradients(self) -> Optional[Tuple[np.array, np.array]]:
        """
        Returns bias tensor if layer is trainable.
        Returns None for non-trainable layers.
        """
        return None

    @abstractmethod
    def forward_pass(self, a_prev: np.array, training: bool) -> np.array:
        """
        Perform layer forward propagation logic.
        """
        pass

    @abstractmethod
    def backward_pass(self, da_curr: np.array) -> np.array:
        pass

    def set_wights(self, w: np.array, b: np.array) -> None:
        """
        Perform layer backward propagation logic.
        """
        pass

class Optimizer(ABC):

    @abstractmethod
    def update(self, layers: List[Layer]) -> None:
        """
        Updates value of weights and bias tensors in trainable layers.
        """
        pass

In [None]:
#2 conv methods, first is the vanilla(a k a math implemented in python) and the second is a more optimized version(still not as good as special libs, but good for a days work ;) )
class ConvLayer2D(Layer):

    def __init__(
        self, w: np.array,
        b: np.array,
        padding: str = 'valid',
        stride: int = 1
    ):
        """
        :param w -  4D tensor with shape (h_f, w_f, c_f, n_f)
        :param b - 1D tensor with shape (n_f, )
        :param padding - flag describing type of activation padding valid/same
        :param stride - stride along width and height of input volume
        ------------------------------------------------------------------------
        h_f - height of filter volume
        w_f - width of filter volume
        c_f - number of channels of filter volume
        n_f - number of filters in filter volume
        """
        self._w, self._b = w, b
        self._padding = padding
        self._stride = stride
        self._dw, self._db = None, None
        self._a_prev = None

    @classmethod
    def initialize(
        cls, filters: int,
        kernel_shape: Tuple[int, int, int],
        padding: str = 'valid',
        stride: int = 1
    ):
        w = np.random.randn(*kernel_shape, filters) * 0.1
        b = np.random.randn(filters) * 0.1
        return cls(w=w, b=b, padding=padding, stride=stride)

    @property
    def weights(self) -> Optional[Tuple[np.array, np.array]]:
        return self._w, self._b

    @property
    def gradients(self) -> Optional[Tuple[np.array, np.array]]:
        if self._dw is None or self._db is None:
            return None
        return self._dw, self._db

    def forward_pass(self, a_prev: np.array, training: bool) -> np.array:
        """
        :param a_prev - 4D tensor with shape (n, h_in, w_in, c)
        :output 4D tensor with shape (n, h_out, w_out, n_f)
        ------------------------------------------------------------------------
        n - number of examples in batch
        w_in - width of input volume
        h_in - width of input volume
        w_out - width of input volume
        h_out - width of input volume
        c - number of channels of the input volume
        n_f - number of filters in filter volume
        """
        self._a_prev = np.array(a_prev, copy=True)
        output_shape = self.calculate_output_dims(input_dims=a_prev.shape)
        n, h_in, w_in, _ = a_prev.shape
        _, h_out, w_out, _ = output_shape
        h_f, w_f, _, n_f = self._w.shape
        pad = self.calculate_pad_dims()
        a_prev_pad = self.pad(array=a_prev, pad=pad)
        output = np.zeros(output_shape)

        for i in range(h_out):
            for j in range(w_out):
                h_start = i * self._stride
                h_end = h_start + h_f
                w_start = j * self._stride
                w_end = w_start + w_f

                output[:, i, j, :] = np.sum(
                    a_prev_pad[:, h_start:h_end, w_start:w_end, :, np.newaxis] *
                    self._w[np.newaxis, :, :, :],
                    axis=(1, 2, 3)
                )

        return output + self._b

    def backward_pass(self, da_curr: np.array) -> np.array:
        """
        :param da_curr - 4D tensor with shape (n, h_out, w_out, n_f)
        :output 4D tensor with shape (n, h_in, w_in, c)
        ------------------------------------------------------------------------
        n - number of examples in batch
        w_in - width of input volume
        h_in - width of input volume
        w_out - width of input volume
        h_out - width of input volume
        c - number of channels of the input volume
        n_f - number of filters in filter volume
        """
        _, h_out, w_out, _ = da_curr.shape
        n, h_in, w_in, _ = self._a_prev.shape
        h_f, w_f, _, _ = self._w.shape
        pad = self.calculate_pad_dims()
        a_prev_pad = self.pad(array=self._a_prev, pad=pad)
        output = np.zeros_like(a_prev_pad)

        self._db = da_curr.sum(axis=(0, 1, 2)) / n
        self._dw = np.zeros_like(self._w)

        for i in range(h_out):
            for j in range(w_out):
                h_start = i * self._stride
                h_end = h_start + h_f
                w_start = j * self._stride
                w_end = w_start + w_f
                output[:, h_start:h_end, w_start:w_end, :] += np.sum(
                    self._w[np.newaxis, :, :, :, :] *
                    da_curr[:, i:i+1, j:j+1, np.newaxis, :],
                    axis=4
                )
                self._dw += np.sum(
                    a_prev_pad[:, h_start:h_end, w_start:w_end, :, np.newaxis] *
                    da_curr[:, i:i+1, j:j+1, np.newaxis, :],
                    axis=0
                )

        self._dw /= n
        return output[:, pad[0]:pad[0]+h_in, pad[1]:pad[1]+w_in, :]

    def set_wights(self, w: np.array, b: np.array) -> None:
        """
        :param w -  4D tensor with shape (h_f, w_f, c_f, n_f)
        :param b - 1D tensor with shape (n_f, )
        ------------------------------------------------------------------------
        h_f - height of filter volume
        w_f - width of filter volume
        c_f - number of channels of filter volume
        n_f - number of filters in filter volume
        """
        self._w = w
        self._b = b

    def calculate_output_dims(
        self, input_dims: Tuple[int, int, int, int]
    ) -> Tuple[int, int, int, int]:
        """
        :param input_dims - 4 element tuple (n, h_in, w_in, c)
        :output 4 element tuple (n, h_out, w_out, n_f)
        ------------------------------------------------------------------------
        n - number of examples in batch
        w_in - width of input volume
        h_in - width of input volume
        w_out - width of input volume
        h_out - width of input volume
        c - number of channels of the input volume
        n_f - number of filters in filter volume
        """
        n, h_in, w_in, _ = input_dims
        h_f, w_f, _, n_f = self._w.shape
        if self._padding == 'same':
            return n, h_in, w_in, n_f
        elif self._padding == 'valid':
            h_out = (h_in - h_f) // self._stride + 1
            w_out = (w_in - w_f) // self._stride + 1
            return n, h_out, w_out, n_f

    def calculate_pad_dims(self) -> Tuple[int, int]:
        """
        :output - 2 element tuple (h_pad, w_pad)
        ------------------------------------------------------------------------
        h_pad - single side padding on height of the volume
        w_pad - single side padding on width of the volume
        """
        if self._padding == 'same':
            h_f, w_f, _, _ = self._w.shape
            return (h_f - 1) // 2, (w_f - 1) // 2
        elif self._padding == 'valid':
            return 0, 0

    @staticmethod
    def pad(array: np.array, pad: Tuple[int, int]) -> np.array:
        """
        :param array -  4D tensor with shape (n, h_in, w_in, c)
        :param pad - 2 element tuple (h_pad, w_pad)
        :output 4D tensor with shape (n, h_out, w_out, n_f)
        ------------------------------------------------------------------------
        n - number of examples in batch
        w_in - width of input volume
        h_in - width of input volume
        w_out - width of input volume
        h_out - width of input volume
        c - number of channels of the input volume
        n_f - number of filters in filter volume
        h_pad - single side padding on height of the volume
        w_pad - single side padding on width of the volume
        """
        return np.pad(
            array=array,
            pad_width=((0, 0), (pad[0], pad[0]), (pad[1], pad[1]), (0, 0)),
            mode='constant'
        )




In [None]:
class FastConvLayer2D(ConvLayer2D):

    def __init__(
        self, w: np.array,
        b: np.array,
        padding: str = 'valid',
        stride: int = 1
    ):
        """
        :param w -  4D tensor with shape (h_f, w_f, c_f, n_f)
        :param b - 1D tensor with shape (n_f, )
        :param padding - flag describing type of activation padding valid/same
        :param stride - stride along width and height of input volume
        ------------------------------------------------------------------------
        h_f - height of filter volume
        w_f - width of filter volume
        c_f - number of channels of filter volume
        n_f - number of filters in filter volume
        """
        super(FastConvLayer2D, self).__init__(
            w=w, b=b, padding=padding, stride=stride
        )
        self._cols = None

    def forward_pass(self, a_prev: np.array, training: bool) -> np.array:
        """
        :param a_prev - 4D tensor with shape (n, h_in, w_in, c)
        :output 4D tensor with shape (n, h_out, w_out, n_f)
        ------------------------------------------------------------------------
        n - number of examples in batch
        w_in - width of input volume
        h_in - width of input volume
        w_out - width of input volume
        h_out - width of input volume
        c - number of channels of the input volume
        n_f - number of filters in filter volume
        """
        self._a_prev = np.array(a_prev, copy=True)
        n, h_out, w_out, _ = self.calculate_output_dims(input_dims=a_prev.shape)
        h_f, w_f, _, n_f = self._w.shape
        pad = self.calculate_pad_dims()
        w = np.transpose(self._w, (3, 2, 0, 1))

        self._cols = im2col(
            array=np.moveaxis(a_prev, -1, 1),
            filter_dim=(h_f, w_f),
            pad=pad[0],
            stride=self._stride
        )

        result = w.reshape((n_f, -1)).dot(self._cols)
        output = result.reshape(n_f, h_out, w_out, n)

        return output.transpose(3, 1, 2, 0) + self._b

    def backward_pass(self, da_curr: np.array) -> np.array:
        """
        :param da_curr - 4D tensor with shape (n, h_out, w_out, n_f)
        :output 4D tensor with shape (n, h_in, w_in, c)
        ------------------------------------------------------------------------
        n - number of examples in batch
        w_in - width of input volume
        h_in - width of input volume
        w_out - width of input volume
        h_out - width of input volume
        c - number of channels of the input volume
        n_f - number of filters in filter volume
        """
        n, h_out, w_out, _ = self.calculate_output_dims(
            input_dims=self._a_prev.shape)
        h_f, w_f, _, n_f = self._w.shape
        pad = self.calculate_pad_dims()

        self._db = da_curr.sum(axis=(0, 1, 2)) / n
        da_curr_reshaped = da_curr.transpose(3, 1, 2, 0).reshape(n_f, -1)

        w = np.transpose(self._w, (3, 2, 0, 1))
        dw = da_curr_reshaped.dot(self._cols.T).reshape(w.shape)
        self._dw = np.transpose(dw, (2, 3, 1, 0))

        output_cols = w.reshape(n_f, -1).T.dot(da_curr_reshaped)

        output = col2im(
            cols=output_cols,
            array_shape=np.moveaxis(self._a_prev, -1, 1).shape,
            filter_dim=(h_f, w_f),
            pad=pad[0],
            stride=self._stride
        )
        return np.transpose(output, (0, 2, 3, 1))


In [None]:
class DenseLayer(Layer):

    def __init__(self, w: np.array, b: np.array):
        """
        :param w - 2D weights tensor with shape (units_curr, units_prev)
        :param b - 1D bias tensor with shape (1, units_curr)
        ------------------------------------------------------------------------
        units_prev - number of units in previous layer
        units_curr -  number of units in current layer
        """
        self._w, self._b = w, b
        self._dw, self._db = None, None
        self._a_prev = None

    @classmethod
    def initialize(cls, units_prev: int, units_curr: int):
        """
        :param units_prev - positive integer, number of units in previous layer
        :param units_curr - positive integer, number of units in current layer
        """
        w = np.random.randn(units_curr, units_prev) * 0.1
        b = np.random.randn(1, units_curr) * 0.1
        return cls(w=w, b=b)

    @property
    def weights(self) -> Optional[Tuple[np.array, np.array]]:
        return self._w, self._b

    @property
    def gradients(self) -> Optional[Tuple[np.array, np.array]]:
        if self._dw is None or self._db is None:
            return None
        return self._dw, self._db

    def forward_pass(self, a_prev: np.array, training: bool) -> np.array:
        """
        :param a_prev - 2D tensor with shape (n, units_prev)
        :output - 2D tensor with shape (n, units_curr)
        ------------------------------------------------------------------------
        n - number of examples in batch
        units_prev - number of units in previous layer
        units_curr -  number of units in current layer
        """
        self._a_prev = np.array(a_prev, copy=True)
        return np.dot(a_prev, self._w.T) + self._b

    def backward_pass(self, da_curr: np.array) -> np.array:
        """
        :param da_curr - 2D tensor with shape (n, units_curr)
        :output - 2D tensor with shape (n, units_prev)
        ------------------------------------------------------------------------
        n - number of examples in batch
        units_prev - number of units in previous layer
        units_curr -  number of units in current layer
        """
        n = self._a_prev.shape[0]
        self._dw = np.dot(da_curr.T, self._a_prev) / n
        self._db = np.sum(da_curr, axis=0, keepdims=True) / n
        return np.dot(da_curr, self._w)

    def set_wights(self, w: np.array, b: np.array) -> None:
        """
        :param w - 2D weights tensor with shape (units_curr, units_prev)
        :param b - 1D bias tensor with shape (1, units_curr)
        ------------------------------------------------------------------------
        units_prev - number of units in previous layer
        units_curr -  number of units in current layer
        """
        self._w = w
        self._b = b

In [None]:
class MaxPoolLayer(Layer):

    def __init__(self, pool_size: Tuple[int, int], stride: int = 2):
        """
        :param pool_size - tuple holding shape of 2D pooling window
        :param stride - stride along width and height of input volume used to
        apply pooling operation
        """
        self._pool_size = pool_size
        self._stride = stride
        self._a = None
        self._cache = {}

    def forward_pass(self, a_prev: np.array, training: bool) -> np.array:
        """
        :param a_prev - 4D tensor with shape(n, h_in, w_in, c)
        :output 4D tensor with shape(n, h_out, w_out, c)
        ------------------------------------------------------------------------
        n - number of examples in batch
        w_in - width of input volume
        h_in - width of input volume
        c - number of channels of the input/output volume
        w_out - width of output volume
        h_out - width of output volume
        """
        self._a = np.array(a_prev, copy=True)
        n, h_in, w_in, c = a_prev.shape
        h_pool, w_pool = self._pool_size
        h_out = 1 + (h_in - h_pool) // self._stride
        w_out = 1 + (w_in - w_pool) // self._stride
        output = np.zeros((n, h_out, w_out, c))

        for i in range(h_out):
            for j in range(w_out):
                h_start = i * self._stride
                h_end = h_start + h_pool
                w_start = j * self._stride
                w_end = w_start + w_pool
                a_prev_slice = a_prev[:, h_start:h_end, w_start:w_end, :]
                self._save_mask(x=a_prev_slice, cords=(i, j))
                output[:, i, j, :] = np.max(a_prev_slice, axis=(1, 2))
        return output

    def backward_pass(self, da_curr: np.array) -> np.array:
        """
        :param da_curr - 4D tensor with shape(n, h_out, w_out, c)
        :output 4D tensor with shape(n, h_in, w_in, c)
        ------------------------------------------------------------------------
        n - number of examples in batch
        w_in - width of input volume
        h_in - width of input volume
        c - number of channels of the input/output volume
        w_out - width of output volume
        h_out - width of output volume
        """
        output = np.zeros_like(self._a)
        _, h_out, w_out, _ = da_curr.shape
        h_pool, w_pool = self._pool_size

        for i in range(h_out):
            for j in range(w_out):
                h_start = i * self._stride
                h_end = h_start + h_pool
                w_start = j * self._stride
                w_end = w_start + w_pool
                output[:, h_start:h_end, w_start:w_end, :] += \
                    da_curr[:, i:i + 1, j:j + 1, :] * self._cache[(i, j)]
        return output

    def _save_mask(self, x: np.array, cords: Tuple[int, int]) -> None:
        mask = np.zeros_like(x)
        n, h, w, c = x.shape
        x = x.reshape(n, h * w, c)
        idx = np.argmax(x, axis=1)

        n_idx, c_idx = np.indices((n, c))
        mask.reshape(n, h * w, c)[n_idx, idx, c_idx] = 1
        self._cache[cords] = mask

In [None]:
class FlattenLayer(Layer):

    def __init__(self):
        self._shape = ()

    def forward_pass(self, a_prev: np.array, training: bool) -> np.array:
        """
        :param a_prev - ND tensor with shape (n, ..., channels)
        :output - 1D tensor with shape (n, 1)
        ------------------------------------------------------------------------
        n - number of examples in batch
        """
        self._shape = a_prev.shape
        return np.ravel(a_prev).reshape(a_prev.shape[0], -1)

    def backward_pass(self, da_curr: np.array) -> np.array:
        """
        :param da_curr - 1D tensor with shape (n, 1)
        :output - ND tensor with shape (n, ..., channels)
        ------------------------------------------------------------------------
        n - number of examples in batch
        """
        return da_curr.reshape(self._shape)

In [None]:
class DropoutLayer(Layer):

    def __init__(self, keep_prob):
        """
        :param keep_prob - probability that given unit will not be dropped out
        """
        self._keep_prob = keep_prob
        self._mask = None

    def forward_pass(self, a_prev: np.array, training: bool) -> np.array:
        if training:
            self._mask = (np.random.rand(*a_prev.shape) < self._keep_prob)
            return self._apply_mask(a_prev, self._mask)
        else:
            return a_prev

    def backward_pass(self, da_curr: np.array) -> np.array:
        return self._apply_mask(da_curr, self._mask)

    def _apply_mask(self, array: np.array, mask: np.array) -> np.array:
        array *= mask
        array /= self._keep_prob
        return array

In [None]:
def convert_categorical2one_hot(y: np.array) -> np.array:
    """
    :param y - categorical array with (n, 1) shape
    :return one hot array with (n, k) shape
    ----------------------------------------------------------------------------
    n - number of examples
    k - number of classes
    """
    one_hot_matrix = np.zeros((y.size, y.max() + 1))
    one_hot_matrix[np.arange(y.size), y] = 1
    return one_hot_matrix


def convert_prob2categorical(probs: np.array) -> np.array:
    """
    :param probs - softmax output array with (n, k) shape
    :return categorical array with (n, ) shape
    ----------------------------------------------------------------------------
    n - number of examples
    k - number of classes
    """
    return np.argmax(probs, axis=1)


def convert_prob2one_hot(probs: np.array) -> np.array:
    """
    :param probs - softmax output array with (n, k) shape
    :return one hot array with (n, k) shape
    ----------------------------------------------------------------------------
    n - number of examples
    k - number of classes
    """
    class_idx = convert_prob2categorical(probs)
    one_hot_matrix = np.zeros_like(probs)
    one_hot_matrix[np.arange(probs.shape[0]), class_idx] = 1
    return one_hot_matrix


def generate_batches(x: np.array, y: np.array, batch_size: int):
    """
    :param x - features array with (n, ...) shape
    :param y - one hot ground truth array with (n, k) shape
    :batch_size - number of elements in single batch
    ----------------------------------------------------------------------------
    n - number of examples in data set
    k - number of classes
    """
    for i in range(0, x.shape[0], batch_size):
        yield (
            x.take(indices=range(
                i, min(i + batch_size, x.shape[0])), axis=0),
            y.take(indices=range(
                i, min(i + batch_size, y.shape[0])), axis=0)
        )

In [None]:
def softmax_accuracy(y_hat: np.array, y: np.array) -> float:
    """
    :param y_hat - 2D one-hot prediction tensor with shape (n, k)
    :param y - 2D one-hot ground truth labels tensor with shape (n, k)
    ----------------------------------------------------------------------------
    n - number of examples in batch
    k - number of classes
    """
    y_hat = convert_prob2one_hot(y_hat)
    return (y_hat == y).all(axis=1).mean()


def softmax_cross_entropy(y_hat, y, eps=1e-20) -> float:
    """
    :param y_hat - 2D one-hot prediction tensor with shape (n, k)
    :param y - 2D one-hot ground truth labels tensor with shape (n, k)
    ----------------------------------------------------------------------------
    n - number of examples in batch
    k - number of classes
    """
    n = y_hat.shape[0]
    return - np.sum(y * np.log(np.clip(y_hat, eps, 1.))) / n

In [None]:
class ReluLayer(Layer):
    def __init__(self):
        self._z = None

    def forward_pass(self, a_prev: np.array, training: bool) -> np.array:
        """
        :param a_prev - ND tensor with shape (n, ..., channels)
        :output ND tensor with shape (n, ..., channels)
        ------------------------------------------------------------------------
        n - number of examples in batch
        """
        self._z = np.maximum(0, a_prev)
        return self._z

    def backward_pass(self, da_curr: np.array) -> np.array:
        """
        :param da_curr - ND tensor with shape (n, ..., channels)
        :output ND tensor with shape (n, ..., channels)
        ------------------------------------------------------------------------
        n - number of examples in batch
        """
        dz = np.array(da_curr, copy=True)
        dz[self._z <= 0] = 0
        return dz

In [None]:
class SoftmaxLayer(Layer):
    def __init__(self):
        self._z = None

    def forward_pass(self, a_prev: np.array, training: bool) -> np.array:
        """
        :param a_prev - 2D tensor with shape (n, k)
        :output 2D tensor with shape (n, k)
        ------------------------------------------------------------------------
        n - number of examples in batch
        k - number of classes
        """
        e = np.exp(a_prev - a_prev.max(axis=1, keepdims=True))
        self._z = e / np.sum(e, axis=1, keepdims=True)
        return self._z

    def backward_pass(self, da_curr: np.array) -> np.array:
        """
        :param da_curr - 2D tensor with shape (n, k)
        :output 2D tensor with shape (n, k)
        ------------------------------------------------------------------------
        n - number of examples in batch
        k - number of classes
        """
        return da_curr



In [None]:
class RMSProp(Optimizer):
    def __init__(self, lr: float, beta: float = 0.9, eps: float = 1e-8):
        """
        :param lr - learning rate
        :param beta - discounting factor for the history/coming gradient
        :param eps - small value to avoid zero denominator
        """
        self._cache = {}
        self._lr = lr
        self._beta = beta
        self._eps = eps

    def update(self, layers: List[Layer]) -> None:
        if len(self._cache) == 0:
            self._init_cache(layers)

        for idx, layer in enumerate(layers):
            weights, gradients = layer.weights, layer.gradients
            if weights is None or gradients is None:
                continue

            (w, b), (dw, db) = weights, gradients
            dw_key, db_key = RMSProp._get_cache_keys(idx)

            self._cache[dw_key] = self._beta * self._cache[dw_key] + \
                (1 - self._beta) * np.square(dw)
            self._cache[db_key] = self._beta * self._cache[db_key] + \
                (1 - self._beta) * np.square(db)

            dw = dw / (np.sqrt(self._cache[dw_key]) + self._eps)
            db = db / (np.sqrt(self._cache[db_key]) + self._eps)

            layer.set_wights(
                w=w - self._lr * dw,
                b=b - self._lr * db
            )

    def _init_cache(self, layers: List[Layer]) -> None:
        for idx, layer in enumerate(layers):
            gradients = layer.gradients
            if gradients is None:
                continue

            dw, db = gradients
            dw_key, db_key = RMSProp._get_cache_keys(idx)

            self._cache[dw_key] = np.zeros_like(dw)
            self._cache[db_key] = np.zeros_like(db)

    @staticmethod
    def _get_cache_keys(idx: int) -> Tuple[str, str]:
        """
        :param idx - index of layer
        """
        return f"dw{idx}", f"db{idx}"

class Adam(Optimizer):
    def __init__(
        self, lr: float,
        beta1: float = 0.9,
        beta2: float = 0.999,
        eps: float = 1e-8
    ):
        """
        :param lr - learning rate
        :param beta1 -
        :param beta2 -
        :param eps - small value to avoid zero denominator
        """
        self._cache_v = {}
        self._cache_s = {}
        self._lr = lr
        self._beta1 = beta1
        self._beta2 = beta2
        self._eps = eps

    def update(self, layers: List[Layer]) -> None:
        if len(self._cache_s) == 0 or len(self._cache_v) == 0:
            self._init_cache(layers)

        for idx, layer in enumerate(layers):
            weights, gradients = layer.weights, layer.gradients
            if weights is None or gradients is None:
                continue

            (w, b), (dw, db) = weights, gradients
            dw_key, db_key = Adam._get_cache_keys(idx)

            self._cache_v[dw_key] = self._beta1 * self._cache_v[dw_key] + \
                (1 - self._beta1) * dw
            self._cache_v[db_key] = self._beta1 * self._cache_v[db_key] + \
                (1 - self._beta1) * db

            self._cache_s[dw_key] = self._beta2 * self._cache_s[dw_key] + \
                (1 - self._beta2) * np.square(dw)
            self._cache_s[db_key] = self._beta2 * self._cache_s[db_key] + \
                (1 - self._beta2) * np.square(db)

            dw = self._cache_v[dw_key] / (np.sqrt(self._cache_s[dw_key]) + self._eps)
            db = self._cache_v[db_key] / (np.sqrt(self._cache_s[db_key]) + self._eps)

            layer.set_wights(
                w=w - self._lr * dw,
                b=b - self._lr * db
            )

    def _init_cache(self, layers: List[Layer]) -> None:
        for idx, layer in enumerate(layers):
            gradients = layer.gradients
            if gradients is None:
                continue

            dw, db = gradients
            dw_key, db_key = Adam._get_cache_keys(idx)

            self._cache_v[dw_key] = np.zeros_like(dw)
            self._cache_v[db_key] = np.zeros_like(db)
            self._cache_s[dw_key] = np.zeros_like(dw)
            self._cache_s[db_key] = np.zeros_like(db)

    @staticmethod
    def _get_cache_keys(idx: int) -> Tuple[str, str]:
        """
        :param idx - index of layer
        """
        return f"dw{idx}", f"db{idx}"


class GradientDescent(Optimizer):
    def __init__(self, lr: float):
        """
        :param lr - learning rate
        """
        self._lr = lr

    def update(self, layers: List[Layer]) -> None:
        for layer in layers:
            weights, gradients = layer.weights, layer.gradients
            if weights is None or gradients is None:
                continue

            (w, b), (dw, db) = weights, gradients
            layer.set_wights(
                w = w - self._lr * dw,
                b = b - self._lr * db
            )


In [None]:
def get_im2col_idx(
    array_shape: Tuple[int, int, int, int],
    filter_dim: Tuple[int, int] = (3, 3),
    pad: int = 0,
    stride: int = 1
) -> Tuple[np.array, np.array, np.array]:
    """
    :param array_shape - 4 element tuple (n, c, h_in, w_in)
    :param filter_dim - 2 element tuple (h_f, w_f)
    :param pad - padding along width and height of input volume
    :param stride - stride along width and height of input volume
    :output 2D tensor
    ------------------------------------------------------------------------
    n - number of examples in batch
    w_in - width of input volume
    h_in - width of input volume
    c - number of channels of the input volume
    """
    n, c, h_in, w_in = array_shape
    h_f, w_f = filter_dim

    h_out = (h_in + 2 * pad - h_f) // stride + 1
    w_out = (w_in + 2 * pad - w_f) // stride + 1

    i0 = np.repeat(np.arange(h_f), w_f)
    i0 = np.tile(i0, c)
    i1 = stride * np.repeat(np.arange(h_out), w_out)
    j0 = np.tile(np.arange(w_f), h_f * c)
    j1 = stride * np.tile(np.arange(w_out), h_out)
    i = i0.reshape(-1, 1) + i1.reshape(1, -1)
    j = j0.reshape(-1, 1) + j1.reshape(1, -1)
    k = np.repeat(np.arange(c), h_f * w_f).reshape(-1, 1)
    return k, i, j


def im2col(
    array: np.array,
    filter_dim: Tuple[int, int] = (3, 3),
    pad: int = 0,
    stride: int = 1
) -> np.array:
    """
    :param array - 4D tensor with shape (n, c, h_in, w_in)
    :param filter_dim - 2 element tuple (h_f, w_f)
    :param pad - padding along width and height of input volume
    :param stride - stride along width and height of input volume
    :output 2D tensor with shape (h_f * w_f * c, h_out, w_out * n)
    ------------------------------------------------------------------------
    n - number of examples in batch
    w_in - width of input volume
    h_in - width of input volume
    w_in - width of output volume
    h_in - width of output volume
    w_f - width of filter volume
    h_f - height of filter volume
    c - number of channels of the input volume
    """
    _, c, _, _ = array.shape
    h_f, w_f = filter_dim
    array_pad = np.pad(
        array=array,
        pad_width=((0, 0), (0, 0), (pad, pad), (pad, pad)),
        mode='constant'
    )
    k, i, j = get_im2col_idx(
        array_shape=array.shape,
        filter_dim=filter_dim,
        pad=pad,
        stride=stride
    )
    cols = array_pad[:, k, i, j]
    return cols.transpose(1, 2, 0).reshape(h_f * w_f * c, -1)


def col2im(
    cols: np.array,
    array_shape: Tuple[int, int, int, int],
    filter_dim: Tuple[int, int] = (3, 3),
    pad: int = 0,
    stride: int = 1
) -> np.array:
    """
    :param cols - 2D tensor with shape (h_f * w_f * c, h_out, w_out * n)
    :param array_shape - 4 element tuple (n, c, h_in, w_in)
    :param filter_dim - 2 element tuple (h_f, w_f)
    :param pad - padding along width and height of input volume
    :param stride - stride along width and height of input volume
    :output 4D tensor with shape (n, c, h_in, w_in)
    ------------------------------------------------------------------------
    n - number of examples in batch
    w_in - width of input volume
    h_in - width of input volume
    w_in - width of output volume
    h_in - width of output volume
    w_f - width of filter volume
    h_f - height of filter volume
    c - number of channels of the input volume
    """
    n, c, h_in, w_in = array_shape
    h_f, w_f = filter_dim
    h_pad, w_pad = h_in + 2 * pad, w_in + 2 * pad
    array_pad = np.zeros((n, c, h_pad, w_pad), dtype=cols.dtype)
    k, i, j = get_im2col_idx(
        array_shape=array_shape,
        filter_dim=filter_dim,
        pad=pad,
        stride=stride
    )
    cols_reshaped = cols.reshape(c * h_f * w_f, -1, n)
    cols_reshaped = cols_reshaped.transpose(2, 0, 1)
    np.add.at(array_pad, (slice(None), k, i, j), cols_reshaped)
    return array_pad[:, :, pad:pad+h_in, pad:pad+w_in]

In [None]:
class SequentialModel:
    def __init__(self, layers: List[Layer], optimizer: Optimizer):
        self._layers = layers
        self._optimizer = optimizer

        self._train_acc = []
        self._test_acc = []
        self._train_loss = []
        self._test_loss = []

    def train(
        self,
        x_train: np.array,
        y_train: np.array,
        x_test: np.array,
        y_test: np.array,
        epochs: int,
        bs: int = 64,
        verbose = False
    ) -> None:
        """
        :param x_train - ND feature tensor with shape (n_train, ...)
        :param y_train - 2D one-hot labels tensor with shape (n_train, k)
        :param x_test - ND feature tensor with shape (n_test, ...)
        :param y_test - 2D one-hot labels tensor with shape (n_test, k)
        :param epochs - number of epochs used during model training
        :param bs - size of batch used during model training
        :param verbose - if set to True, model will produce logs during training
        :param callback - function that will be executed at the end of each epoch
        ------------------------------------------------------------------------
        n_train - number of examples in train data set
        n_test - number of examples in test data set
        k - number of classes
        """

        for epoch in range(epochs):
            print(f"epoch {epoch}")
            epoch_start = time.time()
            y_hat = np.zeros_like(y_train)
            for idx, (x_batch, y_batch) in enumerate(generate_batches(x_train, y_train, bs)):
                y_hat_batch = self._forward(x_batch, training=True)
                activation = y_hat_batch - y_batch
                self._backward(activation)
                self._update()
                n_start = idx * bs
                n_end = n_start + y_hat_batch.shape[0]
                y_hat[n_start:n_end, :] = y_hat_batch
                if idx%1000 == 0:
                  print("idx:", idx)

            self._train_acc.append(softmax_accuracy(y_hat, y_train))
            self._train_loss.append(softmax_cross_entropy(y_hat, y_train))

            y_hat = self._forward(x_test, training=False)
            test_acc = softmax_accuracy(y_hat, y_test)
            self._test_acc.append(test_acc)
            test_loss = softmax_cross_entropy(y_hat, y_test)
            self._test_loss.append(test_loss)

            if verbose:
              epoch_time = format_time(start_time=epoch_start, end_time=time.time())
              print("iter: {:05} | test loss: {:.5f} | test accuracy: {:.5f} | time: {}"
                    .format(epoch+1, test_loss, test_acc, epoch_time))

    def predict(self, x: np.array) -> np.array:
        """
        :param x - ND feature tensor with shape (n, ...)
        :output 2D one-hot labels tensor with shape (n, k)
        ------------------------------------------------------------------------
        n - number of examples in data set
        k - number of classes
        """
        return self._forward(x, training=False)

    def _forward(self, x: np.array, training: bool) -> np.array:
        activation = x
        for idx, layer in enumerate(self._layers):
            activation = layer.forward_pass(a_prev=activation, training=training)
        return activation

    def _backward(self, x: np.array) -> None:
        activation = x
        for layer in reversed(self._layers):
            activation = layer.backward_pass(da_curr=activation)

    def _update(self) -> None:
        self._optimizer.update(layers=self._layers)
    @property
    def history(self) -> Dict[str, List[float]]:
      return{
            "train_acc": self._train_acc,
            "test_acc": self._test_acc,
            "train_loss": self._train_loss,
            "test_loss": self._test_loss
        }

In [None]:
def format_time(start_time: time.time, end_time: time.time) -> str:
    """
    :param start_time - beginning of time period
    :param end_time - ending of time period
    :output - string in HH:MM:SS.ss format
    ----------------------------------------------------------------------------
    HH - hours
    MM - minutes
    SS - seconds
    ss - hundredths of a second
    """
    hours, rem = divmod(end_time - start_time, 3600)
    minutes, seconds = divmod(rem, 60)
    return "{:0>2}:{:0>2}:{:05.2f}".format(int(hours), int(minutes), seconds)

In [None]:
#INITIALIZE MODEL
layers = [
    # input (N, 28, 28, 1) out (N, 28, 28, 32)
    FastConvLayer2D.initialize(filters=32, kernel_shape=(3, 3, 1), stride=1, padding="same"),
    # input (N, 28, 28, 32) out (N, 28, 28, 32)
    ReluLayer(),
    # input (N, 28, 28, 32) out (N, 28, 28, 32)
    FastConvLayer2D.initialize(filters=32, kernel_shape=(3, 3, 32), stride=1, padding="same"),
    # input (N, 28, 28, 32) out (N, 28, 28, 32)
    ReluLayer(),
    # input (N, 28, 28, 32) out (N, 14, 14, 32)
    MaxPoolLayer(pool_size=(2, 2), stride=2),
    # input (N, 14, 14, 32) out (N, 14, 14, 32)
    DropoutLayer(keep_prob=0.75),
    # input (N, 14, 14, 32) out (N, 14, 14, 64)
    FastConvLayer2D.initialize(filters=64, kernel_shape=(3, 3, 32), stride=1, padding="same"),
    # input (N, 14, 14, 64) out (N, 14, 14, 64)
    ReluLayer(),
    # input (N, 14, 14, 64) out (N, 14, 14, 64)
    FastConvLayer2D.initialize(filters=64, kernel_shape=(3, 3, 64), stride=1, padding="same"),
    # input (N, 14, 14, 64) out (N, 14, 14, 64)
    ReluLayer(),
    # input (N, 14, 14, 64) out (N, 7, 7, 64)
    MaxPoolLayer(pool_size=(2, 2), stride=2),
    # input (N, 7, 7, 64) out (N, 7, 7, 64)
    DropoutLayer(keep_prob=0.75),
    # input (N, 7, 7, 64) out (N, 7 * 7 * 64)
    FlattenLayer(),
    # input (N, 7 * 7 * 64) out (N, 256)
    DenseLayer.initialize(units_prev=7 * 7 * 64, units_curr=256),
    # input (N, 256) out (N, 256)
    ReluLayer(),
     # input (N, 256) out (N, 32)
    DenseLayer.initialize(units_prev=256, units_curr=32),
     # input (N, 32) out (N, 32)
    ReluLayer(),
     # input (N, 32) out (N, 10)
    DenseLayer.initialize(units_prev=32, units_curr=n_classes),
     # input (N, 10) out (N, 10)
    SoftmaxLayer()
]

optimizer = Adam(lr=0.003)

model = SequentialModel(
    layers=layers,
    optimizer=optimizer
)

In [None]:
model.train(
    x_train=x_train,
    y_train=y_train,
    x_test=x_test,
    y_test=y_test,
    epochs=1,
    bs=256,
    verbose = True
)

epoch 0
idx: 0
iter: 00001 | test loss: 2.29911 | test accuracy: 0.12600 | time: 00:09:54.52


In [None]:
rez = model.history

In [None]:
rez["test_acc"]

[0.126]