# Практическое задание 2



## Замечания

* Задание необходимо сдать боту до 06.12.2021
* Соблюдаем кодекс чести (по нулям и списавшему, и давшему списать)
* Можно (и нужно!) применять для реализации только библиотеку **Numpy**
* Ничего, крому Numpy, нельзя использовать для реализации 
* **Keras** используется только для тестирования Вашей реализации
* Если какой-то из классов не проходит приведенные тесты, то соответствующее задание не оценивается
* Возможно использование дополнительных (приватных) тестов
 

## Реализация собственного нейросетевого пакета для запуска и обучения нейронных сетей

Задание состоит из трёх частей:
1. Реализация прямого вывода нейронной сети (первое практическое задание)
2. Реализация градиентов по входу и распространения градиента по сети (back propagation)
3. Реализация градиентов по параметрам и метода обратного распространения ошибки с обновлением парметров сети

###  1. Реализация вывода собственной нейронной сети

1.1 Внимательно ознакомьтесь с интерфейсом слоя. Любой слой должен содержать как минимум три метода:
- конструктор
- прямой вывод 
- обратный вывод, производные по входу и по параметрам

In [1]:
class Layer(object):
    def __init__(self):
        self.name = 'Layer'       
    def forward(self, input_data):
        pass
    def backward(self, input_data):
        return [self.grad_x(input_data), self.grad_param(input_data)]
    
    def grad_x(self, input_data):
        pass
    def grad_param(self, input_data):
        return []
    
    def update_param(self, grads, learning_rate):
        pass


1.2 Ниже предствален интерфейс класса  Network. Обратите внимание на реализацию метода predict, который последовательно обрабатывает входные данные слой за слоем.

In [2]:
import numpy as np
from sklearn.model_selection import train_test_split
from tqdm import tqdm

class Network(object):
    def __init__(self, layers, loss=None):
        self.name = 'Network'
        self.layers = layers
        self.loss = loss
    
    def forward(self, input_data):
        return self.predict(input_data)

    def grad_x(self, input_data, labels):
        gradients_x = []
        current_input = input_data
        for layer in self.layers:
            gradients_x.append(layer.grad_x(current_input))
            current_input = layer.forward(current_input) 
        gradient_x = self.loss.grad_x(current_input, labels)
        gradient_x = np.reshape(gradient_x, (gradient_x.shape[0], 1, gradient_x.shape[1]))
        for gradient_x_layer in reversed(gradients_x):
            gradient_x = np.matmul(gradient_x, gradient_x_layer)
        gradient_x = np.reshape(gradient_x, (gradient_x.shape[0], gradient_x.shape[2]))
        return gradient_x

    def grad_param(self, input_data, labels):
        gradients_param = []
        current_input = input_data
        for layer in self.layers:
            gradients_param.append(layer.backward(current_input))
            current_input = layer.forward(current_input) 
        gradients_param.append([self.loss.grad_x(current_input, labels), []])
        return gradients_param

    def update(self, grad_list, learning_rate):
        gradient_x = grad_list[-1][0]
        gradient_x = np.reshape(gradient_x, (gradient_x.shape[0], 1, gradient_x.shape[1]))
        for i in range(len(grad_list)-2, -1, -1):
            layer = self.layers[i]
            if len(grad_list[i][1]) != 0:
                layer.update_param([np.matmul(gradient_x, grad_list[i][1][0]), np.matmul(gradient_x, grad_list[i][1][1])], learning_rate)
            gradient_x = np.matmul(gradient_x, grad_list[i][0])
    
    def predict(self, input_data):
        current_input = input_data
        for layer in self.layers:
            current_input = layer.forward(current_input)     
        return current_input
    
    def calculate_loss(self, input_data, labels):
        return self.loss.forward(self.predict(input_data), labels)
    
    def train_step(self, input_data, labels, learning_rate=0.001):
        grad_list = self.grad_param(input_data, labels)
        self.update(grad_list, learning_rate)
    
    def fit(self, trainX, trainY, validation_split=0.25, 
            batch_size=1, nb_epoch=1, learning_rate=0.01):
        
        train_x, val_x, train_y, val_y = train_test_split(trainX, trainY, 
                                                          test_size=validation_split,
                                                          random_state=42)
        for epoch in range(nb_epoch):
            #train one epoch
            for i in tqdm(range(int(len(train_x)/batch_size))):
                batch_x = train_x[i*batch_size: (i+1)*batch_size]
                batch_y = train_y[i*batch_size: (i+1)*batch_size]
                self.train_step(batch_x, batch_y, learning_rate)
            #validate
            val_accuracy = self.evaluate(val_x, val_y)
            print("\n",'%d epoch: val %.2f' %(epoch+1, val_accuracy), "\n")
            
    def evaluate(self, testX, testY):
        y_pred = np.argmax(self.predict(testX), axis=1)            
        y_true = np.argmax(testY, axis=1)
        val_accuracy = np.sum((y_pred == y_true))/(len(y_true))
        return val_accuracy

#### 1.1 (6 баллов) Необходимо реализовать метод forward для вычисления следующих слоёв:

- DenseLayer
- ReLU
- Softmax
- FlattenLayer
- MaxPooling

In [3]:
#импорты
import numpy as np

In [4]:
class DenseLayer(Layer):
    def __init__(self, input_dim, output_dim, W_init=None, b_init=None):
        self.name = 'Dense'
        self.input_dim = input_dim
        self.output_dim = output_dim
        if W_init is None or b_init is None:
            self.W = np.random.normal(0, np.sqrt(2. / input_dim), size = (input_dim, output_dim))
            self.b = np.zeros(output_dim, 'float32')
        else:
            self.W = W_init
            self.b = b_init
    def forward(self, input_data):
        assert input_data.shape[1] == self.W.shape[0]
        out = np.matmul(input_data, self.W) + self.b
        return out
    def grad_x(self, input_data):
        (input_dim, output_dim) = (self.input_dim, self.output_dim)
        out = np.zeros(shape = (input_data.shape[0], output_dim, input_dim))
        for batch_id in range(input_data.shape[0]):
            array = input_data[batch_id]
            for out_id in range(output_dim):
                for in_id in range(input_dim):
                    out[batch_id, out_id, in_id] = self.W[in_id, out_id]
        return out
    def grad_b(self, input_data):
        (input_dim, output_dim) = (self.input_dim, self.output_dim)
        out = np.zeros(shape = (input_data.shape[0], output_dim, output_dim))
        for batch_id in range(input_data.shape[0]):
            for out_id in range(output_dim):
                out[batch_id, out_id, out_id] = 1
        return out
    def grad_W(self, input_data):
        (input_dim, output_dim) = (self.input_dim, self.output_dim)
        out = np.zeros(shape = (input_data.shape[0], output_dim, input_dim * output_dim))
        for batch_id in range(input_data.shape[0]):
            for k in range(output_dim):
                # d out_j / d w_s_t, if j != t zero
                for j in range(input_dim):
                    out[batch_id, k, j*output_dim + k] = input_data[batch_id, j]
        return out
    
    def update_W(self, grad, learning_rate):
        self.W -= learning_rate * np.mean(grad, axis=0).reshape(self.W.shape)
    
    def update_b(self, grad,  learning_rate):
        self.b -= learning_rate * np.mean(grad, axis=0).reshape(self.b.shape)
        
    def update_param(self, params_grad, learning_rate):
        self.update_W(params_grad[0], learning_rate)
        self.update_b(params_grad[1], learning_rate)
    
    def grad_param(self, input_data):
        return [self.grad_W(input_data), self.grad_b(input_data)]
    

class ReLU(Layer):
    def __init__(self):
        self.name = 'ReLU'
    def forward(self, input_data):
        out = input_data.copy()
        out[out < 0] = 0
        return out
    def grad_x(self, input_data):
        if len(input_data.shape) == 2:
            (batch_size, logits) = input_data.shape
            out = np.zeros(shape = (batch_size, logits, logits))
            for batch_id in range(batch_size):
                for i in range(logits):
                    if input_data[batch_id, i] > 0:
                        out[batch_id, i, i] = 1
            return out
        batch_size = input_data.shape[0]
        input_channels = input_data.shape[1]
        n = input_data.shape[2]
        m = input_data.shape[3]
        out = np.zeros(shape = (batch_size, input_channels*n*m, input_channels*n*m))
        for batch_id in range(batch_size):
            for in_ch in range(input_channels):
                for x in range(n):
                    for y in range(m):
                        if input_data[batch_id, in_ch, x, y] > 0:
                            out[batch_id, in_ch*n*m+x*m+y, in_ch*n*m+x*m+y] = 1
        return out
    
    
class Softmax(Layer):
    def __init__(self):
        self.name = 'Softmax'
    def forward(self, input_data):
        def g(array):
            array = np.exp(array)
            array /= np.sum(array)
            return array
        return np.apply_along_axis(g, -1, input_data)
    def grad_x(self, input_data):
        (batch_size, logits) = input_data.shape
        out = np.zeros(shape = (batch_size, logits, logits))
        for batch_id in range(batch_size):
            array = np.exp(input_data[batch_id])
            sum_of_array = np.sum(array)
            for i in range(logits):
                for j in range(logits):
                    out[batch_id, i, j] = - (array[i] * array[j]) / sum_of_array**2
                    if i == j:
                         out[batch_id, i, j] += array[i] / sum_of_array

        return out
    

class FlattenLayer(Layer):
    def __init__(self):
        self.name = 'Flatten'
    def forward(self, input_data):
        length = input_data.shape[1] * input_data.shape[2] \
            * input_data.shape[3]
        out = np.zeros((input_data.shape[0], length))
        for batch_id in range(input_data.shape[0]):
            out[batch_id] = np.transpose(input_data[batch_id], axes = (1, 2, 0)).reshape(length)
        return out
    def grad_x(self, input_data):
        length = input_data.shape[1] * input_data.shape[2] \
            * input_data.shape[3]
        out = np.zeros((input_data.shape[0], length, length))
        for batch_id in range(input_data.shape[0]):
            for i in range(length):
                out[batch_id, i, i] = 1
        return out


class MaxPooling(Layer):
    def __init__(self, pool_size=(2, 2), strides=2):
        self.name = 'MaxPooling'
        self.pool_size = pool_size
        self.strides = 2
    def forward(self, input_data):
        input = input_data.copy()
        (k1, k2) = self.pool_size
        s = self.strides
        (batch_count, channels_count) = (input.shape[0], input.shape[1])
        (n, m) = (input.shape[2], input.shape[3])
        (out_n, out_m) = ((n - k1 + 1 + s - 1) // s, (m - k2 + 1 + s - 1) // s)
        assert out_n >= 0
        assert out_m >= 0
        out = np.zeros([batch_count, channels_count, out_n, out_m])
        for x in range(0, n - k1 + 1, s):
            for y in range(0, m - k2 + 1, s):
                a = input[:, :, x:x + k1, y:y + k2]
                out[:, :, x // s, y // s] = np.amax(a, axis =(2,3))
        return out    
    def grad_x(self, input_data):
        input = input_data.copy()
        (k1, k2) = self.pool_size
        s = self.strides
        (batch_count, channels_count) = (input.shape[0], input.shape[1])
        (n, m) = (input.shape[2], input.shape[3])
        (out_n, out_m) = ((n - k1 + 1 + s - 1) // s, (m - k2 + 1 + s - 1) // s)
        assert out_n >= 0
        assert out_m >= 0
        out = np.zeros([batch_count, channels_count*out_n*out_m, channels_count*n*m])
        for batch_id in range(batch_count):
            for out_ch in range(channels_count):
                for x in range(0, n - k1 + 1, s):
                    for y in range(0, m - k2 + 1, s):
                        mymax = np.amax(input[batch_id, out_ch, x:x+k1, y:y+k2])
                        for nx in range(x,x+k1):
                            for ny in range(y,y+k2):
                                if input[batch_id, out_ch, nx, ny] == mymax:
                                    id1 = out_ch*out_n*out_m + (x//s)*out_m + (y//s)
                                    id2 = out_ch*n*m + nx*m + ny
                                    out[batch_id, id1, id2] = 1
        return out    

#### 1.2 (3 балла) Реализуйте теперь свёртночный слой   (опционально)

In [5]:
class Conv2DLayer(Layer):
    def __init__(self, kernel_size=3, input_channels=2, output_channels=3, 
                 stride=1, K_init=None, b_init=None):
      # padding: 'valid'
      # Работаем с квадратными ядрами, поэтому kernel_size - одно число
      # Работаем с единообразным сдвигом, поэтому stride - одно число
      # Фильтр размерности [kernel_size, kernel_size, input_channels, output_channels]
      self.name = 'Conv2D'
      self.kernel_size = kernel_size
      self.input_channels = input_channels
      self.output_channels = output_channels
      self.stride = stride
      if K_init is None or b_init is None:
          self.kernel = np.random.normal(0, np.sqrt(2. / (kernel_size*kernel_size*output_channels)), size =  (kernel_size,kernel_size,input_channels,output_channels))
          self.bias = np.zeros(output_channels, 'float32')
      else:
          self.kernel = K_init
          self.bias = b_init
            
    def forward(self, input_data):
        def dot(a, b):
            a = np.moveaxis(a, 0, -1)
            assert a.shape == b.shape
            return (a*b).sum()

        input = input_data.copy()
        assert self.input_channels == input.shape[1]
        k = self.kernel_size
        s = self.stride
        (batch_count, channels_count) = (input.shape[0], input.shape[1])
        (n, m) = (input.shape[2], input.shape[3])
        (out_n, out_m) = ((n - k + 1 + s - 1) // s, (m - k + 1 + s - 1) // s)
        assert out_n >= 0
        assert out_m >= 0
        out = np.zeros([batch_count, self.output_channels, out_n, out_m])
        for batch_id in range(batch_count):
            for out_ch in range(self.output_channels):
                for x in range(0, n - k + 1, s):
                    for y in range(0, m - k + 1, s):
                        a = input[batch_id, :, x:x + k, y:y + k]
                        b = self.kernel[:, :, :, out_ch]
                        out[batch_id, out_ch, x // s, y // s] = dot(a, b) \
                            + self.bias[out_ch]
        return out
    def grad_x(self, input_data):
        input = input_data.copy()
        k = self.kernel_size
        s = self.stride
        (batch_count, input_channels) = (input.shape[0], input.shape[1])
        output_channels = self.output_channels
        (n, m) = (input.shape[2], input.shape[3])
        (out_n, out_m) = ((n - k + 1 + s - 1) // s, (m - k + 1 + s - 1) // s)
        out = np.zeros(shape = (batch_count, output_channels*out_n*out_m, input_channels*n*m))
        for batch_id in range(batch_count):
            for out_ch in range(self.output_channels):
                for x in range(0, n - k + 1, s):
                    for y in range(0, m - k + 1, s):
                        for in_ch in range(input_channels):
                            for nx in range(x,x+k):
                                for ny in range(y,y+k):
                                    dx, dy = nx-x, ny-y
                                    id1 = out_ch*out_n*out_m + x*out_m + y
                                    id2 = in_ch*n*m + nx*m + ny
                                    out[batch_id, id1, id2] += self.kernel[dx, dy, in_ch, out_ch]
        return out
    def grad_kernel(self, input_data):
        input = input_data.copy()
        k = self.kernel_size
        s = self.stride
        (batch_count, input_channels) = (input.shape[0], input.shape[1])
        output_channels = self.output_channels
        (n, m) = (input.shape[2], input.shape[3])
        (out_n, out_m) = ((n - k + 1 + s - 1) // s, (m - k + 1 + s - 1) // s)
        out = np.zeros(shape = (batch_count, output_channels*out_n*out_m, k*k*input_channels*output_channels))
        for batch_id in range(batch_count):
            for out_ch in range(self.output_channels):
                for x in range(0, n - k + 1, s):
                    for y in range(0, m - k + 1, s):
                        for in_ch in range(input_channels):
                            for nx in range(x,x+k):
                                for ny in range(y,y+k):
                                    dx, dy = nx-x, ny-y
                                    id1 = out_ch*out_n*out_m + x*out_m + y
                                    id2 = dx*k*input_channels*output_channels + dy*input_channels*output_channels + in_ch*output_channels + out_ch
                                    out[batch_id, id1, id2] += input[batch_id, in_ch, nx, ny]
        return out
    
    def grad_bias(self, input_data):
        input = input_data.copy()
        k = self.kernel_size
        s = self.stride
        (batch_count, input_channels) = (input.shape[0], input.shape[1])
        output_channels = self.output_channels
        (n, m) = (input.shape[2], input.shape[3])
        (out_n, out_m) = ((n - k + 1 + s - 1) // s, (m - k + 1 + s - 1) // s)
        out = np.zeros(shape = (batch_count, output_channels*out_n*out_m, output_channels))
        for batch_id in range(batch_count):
            for out_ch in range(self.output_channels):
                for x in range(0, n - k + 1, s):
                    for y in range(0, m - k + 1, s):
                        id1 = out_ch*out_n*out_m + x*out_m + y
                        out[batch_id, id1, out_ch] += 1
        return out
        
    def update_kernel(self, grad, learning_rate):
        self.kernel -= learning_rate * np.mean(grad, axis=0).reshape(self.kernel.shape)
    
    def update_bias(self, grad,  learning_rate):
        self.bias -= learning_rate * np.mean(grad, axis=0).reshape(self.bias.shape)
        
    def update_param(self, params_grad, learning_rate):
        self.update_kernel(params_grad[0], learning_rate)
        self.update_bias(params_grad[1], learning_rate)
    
    def grad_param(self, input_data):
        return [self.grad_kernel(input_data), self.grad_bias(input_data)]

#### 1.4 Теперь настало время теста. 
#### Если вы всё сделали правильно, то запустив следующие ячейки у вас должна появиться надпись: Test PASSED

Переходить к дальнейшим заданиям не имеем никакого смысла, пока вы не добьётесь прохождение теста
    

#### Чтение данных

In [6]:
import numpy as np
np.random.seed(123)  # for reproducibility
from keras.utils import np_utils
from keras.datasets import mnist
 
(X_train, y_train), (X_test, y_test) = mnist.load_data()

X_train = X_train.reshape(X_train.shape[0], 1, 28, 28)
X_test = X_test.reshape(X_test.shape[0], 1, 28, 28)
X_train = X_train.astype('float32')
X_test = X_test.astype('float32')
X_train /= 255
X_test /= 255
 

Y_train = np_utils.to_categorical(y_train, 10)
Y_test = np_utils.to_categorical(y_test, 10)
print(X_train.shape, Y_train.shape, X_test.shape, Y_test.shape)

(60000, 1, 28, 28) (60000, 10) (10000, 1, 28, 28) (10000, 10)


#### Подготовка моделей

In [7]:
from keras.models import Sequential, Model
from keras.layers import Dense, Dropout, Activation, Flatten, Input
from keras.layers import Convolution2D, Conv2D, MaxPooling2D

def get_keras_model():
    input_image = Input(shape=(1, 28, 28))
    pool1 = MaxPooling2D(pool_size=(2,2), data_format='channels_first')(input_image)
    flatten = Flatten()(pool1)
    dense1 = Dense(10, activation='softmax')(flatten)
    model = Model(inputs=input_image, outputs=dense1)

    from tensorflow.keras.optimizers import Adam, SGD
    sgd = SGD(learning_rate=0.01, momentum=0.9, nesterov=True)
    model.compile(loss='categorical_crossentropy',
                  optimizer=sgd,
                  metrics=['accuracy'])

    history = model.fit(X_train, Y_train, validation_split=0.25, 
                        batch_size=32, epochs=2, verbose=1)
    return model

In [None]:
def get_our_model(keras_model):
    maxpool = MaxPooling()
    flatten = FlattenLayer()
    dense = DenseLayer(196, 10, W_init=keras_model.get_weights()[0],
                       b_init=keras_model.get_weights()[1])
    softmax = Softmax()
    net = Network([maxpool, flatten, dense, softmax])
    return net

In [None]:
keras_model = get_keras_model()
our_model = get_our_model(keras_model)

Epoch 1/2
Epoch 2/2


In [None]:
our_model = get_our_model(keras_model)

In [None]:
keras_prediction = keras_model.predict(X_test)
our_model_prediction = our_model.predict(X_test)

In [None]:
if np.sum(np.abs(keras_prediction - our_model_prediction)) < 0.01:
    print('Test PASSED')
else:
    print('Something went wrong!')

Test PASSED


### 2. Вычисление производных по входу для слоёв нейронной сети

#### 2.1 (1 балл) Реализуйте метод forward для класса CrossEntropy
Напоминание: $$ crossentropy = L(p, y) =  - \sum\limits_i y_i log p_i, $$
где вектор $(p_1, ..., p_k) $ -  выход классификационного алгоритма, а $(y_1,..., y_k)$ - правильные метки класса в унарной кодировке (one-hot encoding)

In [8]:
class CrossEntropy(object):
    def __init__(self, eps=0.00001):
        self.name = 'CrossEntropy'
        self.eps = eps
    
    def forward(self, input_data, labels):
        out = np.zeros(shape = input_data.shape[0])
        for batch_id in range(input_data.shape[0]):
            for i in range(input_data.shape[1]):
                out[batch_id] -= labels[batch_id, i] * np.log(input_data[batch_id, i]+self.eps)
        return out
    
    def calculate_loss(self,input_data, labels):
        return self.forward(input_data, labels)
    
    def grad_x(self, input_data, labels):
        out = np.zeros_like(input_data, dtype = np.float64)
        for batch_id in range(input_data.shape[0]):
            for i in range(input_data.shape[1]):
                out[batch_id, i] = (-labels[batch_id, i] / (input_data[batch_id, i]+self.eps))
        return out

#### 2.2 (2 баллa) Реализуйте метод grad_x класса CrossEntropy, который возвращает $\frac{\partial L}{\partial p}$

Проверить работоспособность кода поможет следующий тест:

In [None]:
def numerical_diff_net(net, x, labels):
    eps = 0.00001
    right_answer = []
    for i in range(len(x[0])):
        delta = np.zeros(len(x[0]))
        delta[i] = eps
        diff = (net.calculate_loss(x + delta, labels) - net.calculate_loss(x-delta, labels)) / (2*eps)
        right_answer.append(diff)
    return np.array(right_answer).T

def test_net(net):
    x = np.array([[1, 2, 3], [2, 3, 4]])
    labels = np.array([[0.3, 0.2, 0.5], [0.3, 0.2, 0.5]])
    num_grad = numerical_diff_net(net, x, labels)
    grad = net.grad_x(x, labels)
    if np.sum(np.abs(num_grad - grad)) < 0.01:
        print('Test PASSED')
    else:
        print('Something went wrong!')
        print('Numerical grad is')
        print(num_grad)
        print('Your gradiend is ')
        print(grad)
        
loss = CrossEntropy()
test_net(loss)

Test PASSED


#### 2.3 (2 балла)   Реализуйте метод grad_x класса Softmax, который возвращает $\frac{\partial Softmax}{\partial x}$

Проверить работоспособность кода поможет следующий тест:

In [None]:
def numerical_diff_layer(layer, x):
    eps = 0.00001
    right_answer = []
    for i in range(len(x[0])):
        delta = np.zeros(len(x[0]))
        delta[i] = eps
        diff = (layer.forward(x + delta) - layer.forward(x-delta)) / (2*eps)
        right_answer.append(diff.T)
    return np.array(right_answer).T

def test_layer(layer):
    x = np.array([[1, 2, 3], [2, -3, 4]])
    num_grad = numerical_diff_layer(layer, x)
    grad = layer.grad_x(x)
    if np.sum(np.abs(num_grad - grad)) < 0.01:
        print('Test PASSED')
    else:
        print('Something went wrong!')
        print('Numerical grad is')
        print(num_grad)
        print('Your gradiend is ')
        print(grad)
        
layer = Softmax()
test_layer(layer)

Test PASSED


#### 2.4 (5 баллов) Реализуйте метод grad_x для классов ReLU и DenseLayer

In [None]:
layer = ReLU()
test_layer(layer)

Test PASSED


In [None]:
layer = DenseLayer(3,4)
test_layer(layer)

Test PASSED


#### 2.5 (4 балла) Для класса Network реализуйте метод grad_x, который должен реализовывать взятие производной от лосса по входу

In [None]:
net = Network([DenseLayer(3, 10), ReLU(), DenseLayer(10, 3), Softmax()], loss=CrossEntropy())
test_net(net)

Test PASSED


### 3. Реализация градиентов по параметрам и метода обратного распространения ошибки с обновлением парметров сети

#### 3.1 (4 балла) Реализуйте функции grad_b и grad_W. При подготовке теста grad_W предполагается, что W является отномерным вектором.

In [None]:
def numerical_grad_b(input_size, output_size, b, W, x):
    eps = 0.00001
    right_answer = []
    for i in range(len(b)):
        delta = np.zeros(b.shape)
        delta[i] = eps
        dense1 = DenseLayer(input_size, output_size, W_init=W, b_init=b+delta)
        dense2 = DenseLayer(input_size, output_size, W_init=W, b_init=b-delta)
        diff = (dense1.forward(x) - dense2.forward(x)) / (2*eps)
        right_answer.append(diff.T)
    return np.array(right_answer).T

def test_grad_b():
    input_size = 3
    output_size = 4 
    W_init = np.random.random((input_size, output_size))
    b_init = np.random.random((output_size,))
    x = np.random.random((2, input_size))
    
    dense = DenseLayer(input_size, output_size, W_init, b_init)
    grad = dense.grad_b(x)

    num_grad = numerical_grad_b(input_size, output_size, b_init, W_init, x)
    if np.sum(np.abs(num_grad - grad)) < 0.01:
        print('Test PASSED')
    else:
        print('Something went wrong!')
        print('Numerical grad is')
        print(num_grad)
        print('Your gradiend is ')
        print(grad)

test_grad_b()

Test PASSED


In [None]:
def numerical_grad_W(input_size, output_size, b, W, x):
    eps = 0.00001
    right_answer = []
    for i in range(W.shape[0]):
        for j in range(W.shape[1]):
            delta = np.zeros(W.shape)
            delta[i, j] = eps
            dense1 = DenseLayer(input_size, output_size, W_init=W+delta, b_init=b)
            dense2 = DenseLayer(input_size, output_size, W_init=W-delta, b_init=b)
            diff = (dense1.forward(x) - dense2.forward(x)) / (2*eps)
            right_answer.append(diff.T)
    return np.array(right_answer).T

def test_grad_W():
    input_size = 3
    output_size = 4 
    W_init = np.random.random((input_size, output_size))
    b_init = np.random.random((4,))
    x = np.random.random((2, input_size))
        
    dense = DenseLayer(input_size, output_size, W_init, b_init)
    grad = dense.grad_W(x)

    num_grad = numerical_grad_W(input_size, output_size, b_init, W_init, x)
    if np.sum(np.abs(num_grad - grad)) < 0.01:
        print('Test PASSED')
    else:
        print('Something went wrong!')
        print('Numerical grad is')
        print(num_grad)
        print('Your gradiend is ')
        print(grad)

test_grad_W()

Test PASSED


#### 3.2 (4 балла) Полностью реализуйте метод обратного распространения ошибки в функции train_step класса Network


Рекомендуем реализовать сначала функцию Network.grad_param(), которая возвращает список длиной в количество слоёв и элементом которого является список градиентов по параметрам.
После чего, имея список градиентов, написать функцию обновления параметров для каждого слоя. 

Совет: рекомендуем написать тест для кода подсчета градиента по параметрам, чтобы быть уверенным в том, что градиент через всю сеть считается правильно
    

#### 3.3 Ознакомьтесь с реализацией функции fit класса Network. Запустите обучение модели. Если всё работает правильно, то точность на валидации должна будет возрастать

In [None]:
net = Network([DenseLayer(784, 10), Softmax()], loss=CrossEntropy())
trainX = X_train.reshape(len(X_train), -1)
net.fit(trainX[::3], Y_train[::3], validation_split=0.25, 
            batch_size=16, nb_epoch=5, learning_rate=0.01)

100%|██████████| 937/937 [03:25<00:00,  4.57it/s]



 1 epoch: val 0.85 



100%|██████████| 937/937 [03:24<00:00,  4.58it/s]



 2 epoch: val 0.87 



100%|██████████| 937/937 [03:24<00:00,  4.59it/s]



 3 epoch: val 0.88 



100%|██████████| 937/937 [03:23<00:00,  4.60it/s]



 4 epoch: val 0.89 



100%|██████████| 937/937 [03:24<00:00,  4.58it/s]


 5 epoch: val 0.89 






In [None]:
net = Network([DenseLayer(784, 20), ReLU(), DenseLayer(20, 10), Softmax()], loss=CrossEntropy())
trainX = X_train.reshape(len(X_train), -1)
net.fit(trainX[::6], Y_train[::6], validation_split=0.25, 
            batch_size=16, nb_epoch=5, learning_rate=0.01)    

100%|██████████| 468/468 [03:06<00:00,  2.51it/s]



 1 epoch: val 0.81 



100%|██████████| 468/468 [03:07<00:00,  2.49it/s]



 2 epoch: val 0.85 



100%|██████████| 468/468 [03:07<00:00,  2.50it/s]



 3 epoch: val 0.87 



100%|██████████| 468/468 [03:07<00:00,  2.50it/s]



 4 epoch: val 0.88 



100%|██████████| 468/468 [03:07<00:00,  2.50it/s]


 5 epoch: val 0.88 






In [None]:
net = Network([DenseLayer(784, 20), ReLU(), DenseLayer(20, 10), Softmax()], loss=CrossEntropy())
trainX = X_train.reshape(len(X_train), -1)
net.fit(trainX[::6], Y_train[::6], validation_split=0.25, 
            batch_size=16, nb_epoch=5, learning_rate=0.01)    

100%|██████████| 468/468 [03:03<00:00,  2.55it/s]



 1 epoch: val 0.78 



100%|██████████| 468/468 [03:02<00:00,  2.56it/s]



 2 epoch: val 0.84 



100%|██████████| 468/468 [03:03<00:00,  2.55it/s]



 3 epoch: val 0.86 



100%|██████████| 468/468 [03:03<00:00,  2.55it/s]



 4 epoch: val 0.87 



100%|██████████| 468/468 [03:02<00:00,  2.56it/s]


 5 epoch: val 0.88 






#### 3.5 (2 балла) Продемонстрируйте, что ваша реализация позволяет обучать более глубокие нейронные сети 

In [8]:
net = Network([DenseLayer(784, 80), ReLU(), DenseLayer(80, 40), ReLU(), DenseLayer(40, 20), ReLU(), DenseLayer(20, 10), Softmax()], loss=CrossEntropy())
trainX = X_train.reshape(len(X_train), -1)
net.fit(trainX[::6], Y_train[::6], validation_split=0.25, 
            batch_size=16, nb_epoch=5, learning_rate=0.01)    

100%|██████████| 468/468 [13:04<00:00,  1.68s/it]



 1 epoch: val 0.81 



100%|██████████| 468/468 [13:05<00:00,  1.68s/it]



 2 epoch: val 0.86 



100%|██████████| 468/468 [13:05<00:00,  1.68s/it]



 3 epoch: val 0.88 



100%|██████████| 468/468 [13:05<00:00,  1.68s/it]



 4 epoch: val 0.89 



100%|██████████| 468/468 [13:06<00:00,  1.68s/it]


 5 epoch: val 0.90 






Я решил реализовать градиенты для слоя свертки, а также обратное распространение ошибки для сети, для слоев свертки.
Сначала я сделал это, считая grad_x, также grad_bias и grad_kernel. 
Также я добавил в слой ReLU вариант grad_x, когда на вход подают четырехмерный тензор.

Код, который был написан для этого - это все, что сверху.

Потом я заметил, что если я ставлю сеть с большим количество сверток, то заканчивается ОЗУ.
Причина была в том, что grad_x для слоя свертки имел очень большой размер, когда сверток много.

Я вот рассказал эту проблему Вам на паре. Одно из решений - это использовать разреженные матрицы. Но, зная локальность свертки, я подумал, что может быть, зная градиент следующего слоя, можно пересчитать градиент текущего без перемножения матриц конкретно для слоев свертки (и также макс-пулинг изменил потом).

Пусть у нас есть dLoss/d(выхода слоя свертки) и нам надо посчитать
dLoss/d(вход слоя свертки).
Для каждого выхода мы знали, какие элементы входа участвовали для его получения (окно свертки) - тогда зная градиент конкретного выхода, можно было прибавить его, домноженный на соответствующий коэффициент ядра в градиент входа.
Получается, что это что-то похожее на транспонированную свертку... Я не смог это себе представить, как будет выглядеть в операциях свертки, но написал вот то, что описывал - "зная градиент конкретного выхода, можно было прибавить его, домноженный на соответствующий коэффициент ядра в градиент входа".

Это позволило решить проблему с ОЗУ, так как я избавился от матричного умножения в слоях свертки.

Для того, чтобы это было возможно - я в метод grad_param не вычисляю grad_x, а запоминаю вход, который был для конкретного слоя. Также для слоя ReLU я не вычисляю grad_x, а проделываю вручную - в элементах, где на входе был отрицательный элемент - я зануляю градиент.

In [8]:
import numpy as np
from sklearn.model_selection import train_test_split
from tqdm import tqdm

class Network(object):
    def __init__(self, layers, loss=None):
        self.name = 'Network'
        self.layers = layers
        self.loss = loss
    
    def forward(self, input_data):
        return self.predict(input_data)

    def grad_x(self, input_data, labels):
        gradients_x = []
        current_input = input_data
        for layer in self.layers:
            gradients_x.append(layer.grad_x(current_input))
            current_input = layer.forward(current_input) 
        gradient_x = self.loss.grad_x(current_input, labels)
        gradient_x = np.reshape(gradient_x, (gradient_x.shape[0], 1, gradient_x.shape[1]))
        for gradient_x_layer in reversed(gradients_x):
            gradient_x = np.matmul(gradient_x, gradient_x_layer)
        gradient_x = np.reshape(gradient_x, (gradient_x.shape[0], gradient_x.shape[2]))
        return gradient_x

    def grad_param(self, input_data, labels):
        gradients_param = []
        current_input = input_data
        for layer in self.layers:
            gradients_param.append([current_input.copy(), layer.grad_param(current_input)])
            current_input = layer.forward(current_input) 
        gradients_param.append([self.loss.grad_x(current_input, labels), []])
        return gradients_param

    def update(self, grad_list, learning_rate):
        gradient_x = grad_list[-1][0]
        gradient_x = np.reshape(gradient_x, (gradient_x.shape[0], 1, gradient_x.shape[1]))
        for i in range(len(grad_list)-2, -1, -1):
            layer = self.layers[i]
            if layer.name == 'Dense':
                layer.update_param([np.matmul(gradient_x, grad_list[i][1][0]), np.matmul(gradient_x, grad_list[i][1][1])], learning_rate)
                gradient_x = np.matmul(gradient_x, layer.grad_x(grad_list[i][0]))
            elif layer.name == 'Softmax':
                gradient_x = np.matmul(gradient_x, layer.grad_x(grad_list[i][0]))
            elif layer.name == 'MaxPooling':
                gradient_x = layer.grad_loss_x(grad_list[i][0], gradient_x)
            elif layer.name == 'ReLU':
                input_image = grad_list[i][0]
                input_image[input_image<0] = 0
                input_image[input_image>=0] = 1
                input_image = np.reshape(input_image, gradient_x.shape)
                gradient_x = gradient_x * input_image
            elif layer.name == 'Conv2D':
                layer.update_param([np.matmul(gradient_x, grad_list[i][1][0]), np.matmul(gradient_x, grad_list[i][1][1])], learning_rate)
                gradient_x = layer.grad_loss_x(grad_list[i][0], gradient_x)
            elif layer.name == 'Flatten':
                gradient_x = gradient_x #nothing changed, gradient_x * E
            else:
                print("BAD")
    
    def predict(self, input_data):
        current_input = input_data
        for layer in self.layers:
            current_input = layer.forward(current_input)     
        return current_input
    
    def calculate_loss(self, input_data, labels):
        return self.loss.forward(self.predict(input_data), labels)
    
    def train_step(self, input_data, labels, learning_rate=0.001):
        grad_list = self.grad_param(input_data, labels)
        self.update(grad_list, learning_rate)
    
    def fit(self, trainX, trainY, validation_split=0.25, 
            batch_size=1, nb_epoch=1, learning_rate=0.01):
        
        train_x, val_x, train_y, val_y = train_test_split(trainX, trainY, 
                                                          test_size=validation_split,
                                                          random_state=42)
        for epoch in range(nb_epoch):
            #train one epoch
            for i in tqdm(range(int(len(train_x)/batch_size))):
                batch_x = train_x[i*batch_size: (i+1)*batch_size]
                batch_y = train_y[i*batch_size: (i+1)*batch_size]
                self.train_step(batch_x, batch_y, learning_rate)
            #validate
            val_accuracy = self.evaluate(val_x, val_y)
            print("\n",'%d epoch: val %.2f' %(epoch+1, val_accuracy), "\n")
            
    def evaluate(self, testX, testY):
        y_pred = np.argmax(self.predict(testX), axis=1)            
        y_true = np.argmax(testY, axis=1)
        val_accuracy = np.sum((y_pred == y_true))/(len(y_true))
        return val_accuracy

In [9]:
class DenseLayer(Layer):
    def __init__(self, input_dim, output_dim, W_init=None, b_init=None):
        self.name = 'Dense'
        self.input_dim = input_dim
        self.output_dim = output_dim
        if W_init is None or b_init is None:
            self.W = np.random.normal(0, np.sqrt(2. / input_dim), size = (input_dim, output_dim))
            self.b = np.zeros(output_dim, 'float32')
        else:
            self.W = W_init
            self.b = b_init
    def forward(self, input_data):
        assert input_data.shape[1] == self.W.shape[0]
        out = np.matmul(input_data, self.W) + self.b
        return out
    def grad_x(self, input_data):
        (input_dim, output_dim) = (self.input_dim, self.output_dim)
        out = np.zeros(shape = (input_data.shape[0], output_dim, input_dim))
        for batch_id in range(input_data.shape[0]):
            array = input_data[batch_id]
            for out_id in range(output_dim):
                for in_id in range(input_dim):
                    out[batch_id, out_id, in_id] = self.W[in_id, out_id]
        return out
    def grad_b(self, input_data):
        (input_dim, output_dim) = (self.input_dim, self.output_dim)
        out = np.zeros(shape = (input_data.shape[0], output_dim, output_dim))
        for batch_id in range(input_data.shape[0]):
            for out_id in range(output_dim):
                out[batch_id, out_id, out_id] = 1
        return out
    def grad_W(self, input_data):
        (input_dim, output_dim) = (self.input_dim, self.output_dim)
        out = np.zeros(shape = (input_data.shape[0], output_dim, input_dim * output_dim))
        for batch_id in range(input_data.shape[0]):
            for k in range(output_dim):
                # d out_j / d w_s_t, if j != t zero
                for j in range(input_dim):
                    out[batch_id, k, j*output_dim + k] = input_data[batch_id, j]
        return out
    
    def update_W(self, grad, learning_rate):
        self.W -= learning_rate * np.mean(grad, axis=0).reshape(self.W.shape)
    
    def update_b(self, grad,  learning_rate):
        self.b -= learning_rate * np.mean(grad, axis=0).reshape(self.b.shape)
        
    def update_param(self, params_grad, learning_rate):
        self.update_W(params_grad[0], learning_rate)
        self.update_b(params_grad[1], learning_rate)
    
    def grad_param(self, input_data):
        return [self.grad_W(input_data), self.grad_b(input_data)]
    

class ReLU(Layer):
    def __init__(self):
        self.name = 'ReLU'
    def forward(self, input_data):
        out = input_data.copy()
        out[out < 0] = 0
        return out
    def grad_x(self, input_data):
        if len(input_data.shape) == 2:
            (batch_size, logits) = input_data.shape
            out = np.zeros(shape = (batch_size, logits, logits))
            for batch_id in range(batch_size):
                for i in range(logits):
                    if input_data[batch_id, i] > 0:
                        out[batch_id, i, i] = 1
            return out
        batch_size = input_data.shape[0]
        input_channels = input_data.shape[1]
        n = input_data.shape[2]
        m = input_data.shape[3]
        out = np.zeros(shape = (batch_size, input_channels*n*m, input_channels*n*m))
        for batch_id in range(batch_size):
            for in_ch in range(input_channels):
                for x in range(n):
                    for y in range(m):
                        if input_data[batch_id, in_ch, x, y] > 0:
                            out[batch_id, in_ch*n*m+x*m+y, in_ch*n*m+x*m+y] = 1
        return out
    
    
class Softmax(Layer):
    def __init__(self):
        self.name = 'Softmax'
    def forward(self, input_data):
        def g(array):
            array = np.exp(array)
            array /= np.sum(array)
            return array
        return np.apply_along_axis(g, -1, input_data)
    def grad_x(self, input_data):
        (batch_size, logits) = input_data.shape
        out = np.zeros(shape = (batch_size, logits, logits))
        for batch_id in range(batch_size):
            array = np.exp(input_data[batch_id])
            sum_of_array = np.sum(array)
            for i in range(logits):
                for j in range(logits):
                    out[batch_id, i, j] = - (array[i] * array[j]) / sum_of_array**2
                    if i == j:
                         out[batch_id, i, j] += array[i] / sum_of_array

        return out
    

class FlattenLayer(Layer):
    def __init__(self):
        self.name = 'Flatten'
    def forward(self, input_data):
        length = input_data.shape[1] * input_data.shape[2] \
            * input_data.shape[3]
        out = np.zeros((input_data.shape[0], length))
        for batch_id in range(input_data.shape[0]):
            out[batch_id] = np.transpose(input_data[batch_id], axes = (1, 2, 0)).reshape(length)
        return out
    def grad_x(self, input_data):
        length = input_data.shape[1] * input_data.shape[2] \
            * input_data.shape[3]
        out = np.zeros((input_data.shape[0], length, length))
        for batch_id in range(input_data.shape[0]):
            for i in range(length):
                out[batch_id, i, i] = 1
        return out


class MaxPooling(Layer):
    def __init__(self, pool_size=(2, 2), strides=2):
        self.name = 'MaxPooling'
        self.pool_size = pool_size
        self.strides = 2
    def forward(self, input_data):
        input = input_data.copy()
        (k1, k2) = self.pool_size
        s = self.strides
        (batch_count, channels_count) = (input.shape[0], input.shape[1])
        (n, m) = (input.shape[2], input.shape[3])
        (out_n, out_m) = ((n - k1 + 1 + s - 1) // s, (m - k2 + 1 + s - 1) // s)
        assert out_n >= 0
        assert out_m >= 0
        out = np.zeros([batch_count, channels_count, out_n, out_m])
        for x in range(0, n - k1 + 1, s):
            for y in range(0, m - k2 + 1, s):
                a = input[:, :, x:x + k1, y:y + k2]
                out[:, :, x // s, y // s] = np.amax(a, axis =(2,3))
        return out    
    def grad_x(self, input_data):
        input = input_data.copy()
        (k1, k2) = self.pool_size
        s = self.strides
        (batch_count, channels_count) = (input.shape[0], input.shape[1])
        (n, m) = (input.shape[2], input.shape[3])
        (out_n, out_m) = ((n - k1 + 1 + s - 1) // s, (m - k2 + 1 + s - 1) // s)
        assert out_n >= 0
        assert out_m >= 0
        out = np.zeros([batch_count, channels_count*out_n*out_m, channels_count*n*m])
        for batch_id in range(batch_count):
            for out_ch in range(channels_count):
                for x in range(0, n - k1 + 1, s):
                    for y in range(0, m - k2 + 1, s):
                        mymax = np.amax(input[batch_id, out_ch, x:x+k1, y:y+k2])
                        for nx in range(x,x+k1):
                            for ny in range(y,y+k2):
                                if input[batch_id, out_ch, nx, ny] == mymax:
                                    id1 = out_ch*out_n*out_m + (x//s)*out_m + (y//s)
                                    id2 = out_ch*n*m + nx*m + ny
                                    out[batch_id, id1, id2] = 1
        return out  
    def grad_loss_x(self, input_data, grad_loss_nxt):
        input = input_data.copy()
        (k1, k2) = self.pool_size
        s = self.strides
        (batch_count, channels_count) = (input.shape[0], input.shape[1])
        (n, m) = (input.shape[2], input.shape[3])
        (out_n, out_m) = ((n - k1 + 1 + s - 1) // s, (m - k2 + 1 + s - 1) // s)
        out = np.zeros([batch_count, 1, channels_count*n*m])
        for batch_id in range(batch_count):
            for out_ch in range(channels_count):
                for x in range(0, n - k1 + 1, s):
                    for y in range(0, m - k2 + 1, s):
                        mymax = np.amax(input[batch_id, out_ch, x:x+k1, y:y+k2])
                        cntmx = 0
                        for nx in range(x,x+k1):
                            for ny in range(y,y+k2):
                                if input[batch_id, out_ch, nx, ny] == mymax:
                                    cntmx += 1
                        for nx in range(x,x+k1):
                            for ny in range(y,y+k2):
                                if input[batch_id, out_ch, nx, ny] == mymax:
                                    id1 = out_ch*out_n*out_m + (x//s)*out_m + (y//s)
                                    id2 = out_ch*n*m + nx*m + ny
                                    out[batch_id, 0, id2] = 1/cntmx * grad_loss_nxt[batch_id, 0, id1]
        return out  

In [10]:
class Conv2DLayer(Layer):
    def __init__(self, kernel_size=3, input_channels=2, output_channels=3, 
                 stride=1, K_init=None, b_init=None):
      # padding: 'valid'
      # Работаем с квадратными ядрами, поэтому kernel_size - одно число
      # Работаем с единообразным сдвигом, поэтому stride - одно число
      # Фильтр размерности [kernel_size, kernel_size, input_channels, output_channels]
      self.name = 'Conv2D'
      self.kernel_size = kernel_size
      self.input_channels = input_channels
      self.output_channels = output_channels
      self.stride = stride
      if K_init is None or b_init is None:
          self.kernel = np.random.normal(0, np.sqrt(2. / (kernel_size*kernel_size*output_channels)), size =  (kernel_size,kernel_size,input_channels,output_channels))
          self.bias = np.zeros(output_channels, 'float32')
      else:
          self.kernel = K_init
          self.bias = b_init
            
    def forward(self, input_data):
        def dot(a, b):
            a = np.moveaxis(a, 0, -1)
            assert a.shape == b.shape
            return (a*b).sum()

        input = input_data.copy()
        assert self.input_channels == input.shape[1]
        k = self.kernel_size
        s = self.stride
        (batch_count, channels_count) = (input.shape[0], input.shape[1])
        (n, m) = (input.shape[2], input.shape[3])
        (out_n, out_m) = ((n - k + 1 + s - 1) // s, (m - k + 1 + s - 1) // s)
        assert out_n >= 0
        assert out_m >= 0
        out = np.zeros([batch_count, self.output_channels, out_n, out_m])
        for batch_id in range(batch_count):
            for out_ch in range(self.output_channels):
                for x in range(0, n - k + 1, s):
                    for y in range(0, m - k + 1, s):
                        a = input[batch_id, :, x:x + k, y:y + k]
                        b = self.kernel[:, :, :, out_ch]
                        out[batch_id, out_ch, x // s, y // s] = dot(a, b) \
                            + self.bias[out_ch]
        return out
    def grad_x(self, input_data):
        input = input_data.copy()
        k = self.kernel_size
        s = self.stride
        (batch_count, input_channels) = (input.shape[0], input.shape[1])
        output_channels = self.output_channels
        (n, m) = (input.shape[2], input.shape[3])
        (out_n, out_m) = ((n - k + 1 + s - 1) // s, (m - k + 1 + s - 1) // s)
        out = np.zeros(shape = (batch_count, output_channels*out_n*out_m, input_channels*n*m))
        for batch_id in range(batch_count):
            for out_ch in range(self.output_channels):
                for x in range(0, n - k + 1, s):
                    for y in range(0, m - k + 1, s):
                        for in_ch in range(input_channels):
                            for nx in range(x,x+k):
                                for ny in range(y,y+k):
                                    dx, dy = nx-x, ny-y
                                    id1 = out_ch*out_n*out_m + x*out_m + y
                                    id2 = in_ch*n*m + nx*m + ny
                                    out[batch_id, id1, id2] += self.kernel[dx, dy, in_ch, out_ch]
        return out
    def grad_loss_x(self, input_data, grad_loss_nxt):
        input = input_data.copy()
        k = self.kernel_size
        s = self.stride
        (batch_count, input_channels) = (input.shape[0], input.shape[1])
        output_channels = self.output_channels
        (n, m) = (input.shape[2], input.shape[3])
        (out_n, out_m) = ((n - k + 1 + s - 1) // s, (m - k + 1 + s - 1) // s)
        out = np.zeros(shape = (batch_count, 1, input_channels*n*m))
        for batch_id in range(batch_count):
            for out_ch in range(self.output_channels):
                for x in range(0, n - k + 1, s):
                    for y in range(0, m - k + 1, s):
                        for in_ch in range(input_channels):
                            for nx in range(x,x+k):
                                for ny in range(y,y+k):
                                    dx, dy = nx-x, ny-y
                                    id1 = out_ch*out_n*out_m + x*out_m + y
                                    id2 = in_ch*n*m + nx*m + ny
                                    out[batch_id, 0, id2] += self.kernel[dx, dy, in_ch, out_ch] * grad_loss_nxt[batch_id, 0, id1]
        return out
    def grad_kernel(self, input_data):
        input = input_data.copy()
        k = self.kernel_size
        s = self.stride
        (batch_count, input_channels) = (input.shape[0], input.shape[1])
        output_channels = self.output_channels
        (n, m) = (input.shape[2], input.shape[3])
        (out_n, out_m) = ((n - k + 1 + s - 1) // s, (m - k + 1 + s - 1) // s)
        out = np.zeros(shape = (batch_count, output_channels*out_n*out_m, k*k*input_channels*output_channels))
        for batch_id in range(batch_count):
            for out_ch in range(self.output_channels):
                for x in range(0, n - k + 1, s):
                    for y in range(0, m - k + 1, s):
                        for in_ch in range(input_channels):
                            for nx in range(x,x+k):
                                for ny in range(y,y+k):
                                    dx, dy = nx-x, ny-y
                                    id1 = out_ch*out_n*out_m + x*out_m + y
                                    id2 = dx*k*input_channels*output_channels + dy*input_channels*output_channels + in_ch*output_channels + out_ch
                                    out[batch_id, id1, id2] += input[batch_id, in_ch, nx, ny]
        return out
    
    def grad_bias(self, input_data):
        input = input_data.copy()
        k = self.kernel_size
        s = self.stride
        (batch_count, input_channels) = (input.shape[0], input.shape[1])
        output_channels = self.output_channels
        (n, m) = (input.shape[2], input.shape[3])
        (out_n, out_m) = ((n - k + 1 + s - 1) // s, (m - k + 1 + s - 1) // s)
        out = np.zeros(shape = (batch_count, output_channels*out_n*out_m, output_channels))
        for batch_id in range(batch_count):
            for out_ch in range(self.output_channels):
                for x in range(0, n - k + 1, s):
                    for y in range(0, m - k + 1, s):
                        id1 = out_ch*out_n*out_m + x*out_m + y
                        out[batch_id, id1, out_ch] += 1
        return out
        
    def update_kernel(self, grad, learning_rate):
        self.kernel -= learning_rate * np.mean(grad, axis=0).reshape(self.kernel.shape)
    
    def update_bias(self, grad,  learning_rate):
        self.bias -= learning_rate * np.mean(grad, axis=0).reshape(self.bias.shape)
        
    def update_param(self, params_grad, learning_rate):
        self.update_kernel(params_grad[0], learning_rate)
        self.update_bias(params_grad[1], learning_rate)
    
    def grad_param(self, input_data):
        return [self.grad_kernel(input_data), self.grad_bias(input_data)]

Это была изначальная архитектура - так как для других переполнялась ОЗУ. До этого на работала примерно 35 минут за эпоху, после изменений стало 25 минут. Не сильное ускорение вышло - видимо потому, что я делаю вручную обновления градиента.

In [None]:
#(1,28,28) MaxPooling -> (1,14,14) Conv2dLayer -> (24, 12, 12) ReLU -> (24, 12, 12) MaxPooling -> (24, 6, 6) Flatten -> (864) DenseLayer -> (20) DenseLayer -> (10) -> Softmax -> (10) CrossEntropy -> (1)
net = Network([MaxPooling(pool_size=(2,2), strides=2), Conv2DLayer(3,1,24), ReLU(), MaxPooling(pool_size=(2,2), strides=2), FlattenLayer(), DenseLayer(864, 20), DenseLayer(20, 10), Softmax()], loss=CrossEntropy())

In [None]:
trainX = X_train.copy()
print(trainX.shape)
net.fit(trainX[::6], Y_train[::6], validation_split=0.25, 
            batch_size=16, nb_epoch=10, learning_rate=0.01)    

(60000, 1, 28, 28)


100%|██████████| 468/468 [25:54<00:00,  3.32s/it]



 1 epoch: val 0.59 



100%|██████████| 468/468 [25:54<00:00,  3.32s/it]



 2 epoch: val 0.70 



100%|██████████| 468/468 [25:52<00:00,  3.32s/it]



 3 epoch: val 0.80 



100%|██████████| 468/468 [25:52<00:00,  3.32s/it]



 4 epoch: val 0.84 



100%|██████████| 468/468 [25:50<00:00,  3.31s/it]



 5 epoch: val 0.86 



100%|██████████| 468/468 [25:46<00:00,  3.31s/it]



 6 epoch: val 0.87 



100%|██████████| 468/468 [25:56<00:00,  3.32s/it]



 7 epoch: val 0.88 



100%|██████████| 468/468 [25:26<00:00,  3.26s/it]



 8 epoch: val 0.88 



100%|██████████| 468/468 [25:25<00:00,  3.26s/it]



 9 epoch: val 0.88 



100%|██████████| 468/468 [25:31<00:00,  3.27s/it]



 10 epoch: val 0.88 



Реализация выше доходила до 0.92 в лучшем случае, но в колабе я случайно перезапустил ячейку и результат не сохранился

___

То, что ниже - показать, что более полноценную сверточную сеть можно обучать без переполнения ОЗУ. Но я оставил на ночь Colab и он перестал работать.
Как-то полноценно обучить не получилось из-за длительного времени работы моей реализации - Colab вылетал.

In [11]:
#(1,28,28) Conv2dLayer -> (24, 26, 26) ReLU -> (24, 26, 26) MaxPooling -> (24, 13, 13) Flatten -> (4056) DenseLayer -> (32) DenseLayer -> (10) -> Softmax -> (10) CrossEntropy -> (1)
net_conv = Network([Conv2DLayer(3,1,24), ReLU(), MaxPooling(pool_size=(2,2), strides=2), FlattenLayer(), DenseLayer(4056, 32), DenseLayer(32, 10), Softmax()], loss=CrossEntropy())

In [12]:
trainX = X_train.copy()
print(trainX.shape)
net_conv.fit(trainX[::6], Y_train[::6], validation_split=0.25, 
            batch_size=16, nb_epoch=10, learning_rate=0.01)    

  0%|          | 0/468 [00:00<?, ?it/s]

(60000, 1, 28, 28)


100%|██████████| 468/468 [1:29:12<00:00, 11.44s/it]
  0%|          | 0/468 [00:00<?, ?it/s]


 1 epoch: val 0.82 



100%|██████████| 468/468 [1:31:53<00:00, 11.78s/it]
  0%|          | 0/468 [00:00<?, ?it/s]


 2 epoch: val 0.86 



100%|██████████| 468/468 [1:31:22<00:00, 11.71s/it]
  0%|          | 0/468 [00:00<?, ?it/s]


 3 epoch: val 0.88 



100%|██████████| 468/468 [1:31:30<00:00, 11.73s/it]
  0%|          | 0/468 [00:00<?, ?it/s]


 4 epoch: val 0.89 



100%|██████████| 468/468 [1:31:12<00:00, 11.69s/it]
  0%|          | 0/468 [00:00<?, ?it/s]


 5 epoch: val 0.89 



100%|██████████| 468/468 [1:26:32<00:00, 11.10s/it]
  0%|          | 0/468 [00:00<?, ?it/s]


 6 epoch: val 0.89 



100%|██████████| 468/468 [1:23:23<00:00, 10.69s/it]
  0%|          | 0/468 [00:00<?, ?it/s]


 7 epoch: val 0.90 



100%|██████████| 468/468 [1:24:27<00:00, 10.83s/it]
  0%|          | 0/468 [00:00<?, ?it/s]


 8 epoch: val 0.90 



100%|██████████| 468/468 [1:25:57<00:00, 11.02s/it]
  0%|          | 0/468 [00:00<?, ?it/s]


 9 epoch: val 0.90 



100%|██████████| 468/468 [1:24:35<00:00, 10.84s/it]



 10 epoch: val 0.90 



In [11]:
#(1,28,28) Conv2dLayer -> (24, 26, 26) ReLU -> (24, 26, 26) MaxPooling -> (24, 13, 13) Flatten -> (4056) DenseLayer -> (32) DenseLayer -> (10) -> Softmax -> (10) CrossEntropy -> (1)
net_conv = Network([Conv2DLayer(3,1,16), ReLU(), Conv2DLayer(3,16,16), ReLU(), Conv2DLayer(3,16,16), ReLU(), Conv2DLayer(3,16,16), ReLU(), Conv2DLayer(3,16,16), ReLU(), Conv2DLayer(3,16,16), ReLU(), MaxPooling(pool_size=(16,16), strides=1), FlattenLayer(), DenseLayer(16, 10), Softmax()], loss=CrossEntropy())



In [12]:
trainX = X_train.copy()
print(trainX.shape)
net_conv.fit(trainX[::6], Y_train[::6], validation_split=0.25, 
            batch_size=16, nb_epoch=10, learning_rate=0.01)    

  0%|          | 0/468 [00:00<?, ?it/s]

(60000, 1, 28, 28)


  0%|          | 0/468 [02:45<?, ?it/s]


KeyboardInterrupt: 