# Домашнее задание 2.1

В этом задании вы должны:
1. Написать слой Conv2d на Numpy и определить в нем forward-backward методы
2. Определить слой MaxPool2d
3. Написать всю необходимую обвязку для обучения: оптимизатор с адаптивным шагом и класс, позволяющий изменять расписание для learning rate'а

> Обратите внимание, что в этом задании больше нет тестов.
> Вы должны сами проверять свой код.  
> Это можно сделать так:
> 1. Написать юнит-тесты с помощью Pytorch. То есть, ваш модудь должен повторять поведение torch'а
> 2. Проверять архитектуру не на всем датасете (одна эпоха в наивной имплементации будет занимать около двух часов), а на подвыборке

## Numpy-имплементация сверточной нейронной сети


Вставьте сюда имплементацию из первого домашнего задания.



> Обратите внимание, что обновление весов теперь производится с помощью специального класса **Optimizer**



In [1]:
import numpy as np

In [2]:
import os
import sys

def load_mnist_legacy(flatten=False):
    """taken from https://github.com/Lasagne/Lasagne/blob/master/examples/mnist.py"""
    # We first define a download function, supporting both Python 2 and 3.
    if sys.version_info[0] == 2:
        from urllib import urlretrieve
    else:
        from urllib.request import urlretrieve

    def download(filename, source='http://yann.lecun.com/exdb/mnist/'):
        print("Downloading %s" % filename)
        urlretrieve(source + filename, filename)

    # We then define functions for loading MNIST images and labels.
    # For convenience, they also download the requested files if needed.
    import gzip

    def load_mnist_images(filename):
        if not os.path.exists(filename):
            download(filename)
        # Read the inputs in Yann LeCun's binary format.
        with gzip.open(filename, 'rb') as f:
            data = np.frombuffer(f.read(), np.uint8, offset=16)
        # The inputs are vectors now, we reshape them to monochrome 2D images,
        # following the shape convention: (examples, channels, rows, columns)
        data = data.reshape(-1, 1, 28, 28)
        # The inputs come as bytes, we convert them to float32 in range [0,1].
        # (Actually to range [0, 255/256], for compatibility to the version
        # provided at http://deeplearning.net/data/mnist/mnist.pkl.gz.)
        return data / np.float32(256)

    def load_mnist_labels(filename):
        if not os.path.exists(filename):
            download(filename)
        # Read the labels in Yann LeCun's binary format.
        with gzip.open(filename, 'rb') as f:
            data = np.frombuffer(f.read(), np.uint8, offset=8)
        # The labels are vectors of integers now, that's exactly what we want.
        return data

    # We can now download and read the training and test set images and labels.
    X_train = load_mnist_images('train-images-idx3-ubyte.gz')
    y_train = load_mnist_labels('train-labels-idx1-ubyte.gz')
    X_test = load_mnist_images('t10k-images-idx3-ubyte.gz')
    y_test = load_mnist_labels('t10k-labels-idx1-ubyte.gz')

    # We reserve the last 10000 training examples for validation.
    X_train, X_val = X_train[:-10000], X_train[-10000:]
    y_train, y_val = y_train[:-10000], y_train[-10000:]

    if flatten:
        X_train = X_train.reshape([X_train.shape[0], -1])
        X_val = X_val.reshape([X_val.shape[0], -1])
        X_test = X_test.reshape([X_test.shape[0], -1])

    # We just return all the arrays in order, as expected in main().
    # (It doesn't matter how we do this as long as we can read them again.)
    return X_train, y_train, X_val, y_val, X_test, y_test

In [3]:
from sklearn.datasets import fetch_openml

def load_mnist(flatten):
    X, y = fetch_openml("mnist_784", version=1, return_X_y=True, as_frame=False)
    X_train, X_val, X_test = X[:50000], X[50000:60000], X[60000:]
    y_train, y_val, y_test = y[:50000], y[50000:60000], y[60000:]

    X_train = X_train.reshape([X_train.shape[0], -1])
    X_val = X_val.reshape([X_val.shape[0], -1])
    X_test = X_test.reshape([X_test.shape[0], -1])
    
    return X_train, y_train, X_val, y_val, X_test, y_test


In [4]:
class Layer:
    """
    A building block. Each layer is capable of performing two things:

    - Process input to get output:           output = layer.forward(input)

    - Propagate gradients through itself:    grad_input = layer.backward(input, grad_output)

    Some layers also have learnable parameters which they update during layer.backward.
    """
    def __init__(self):
        """Here you can initialize layer parameters (if any) and auxiliary stuff."""
        pass

    def forward(self, input):
        """
        Takes input data of shape [batch, input_units], returns output data [batch, output_units]
        """
        return input

    def backward(self, input, grad_output):
        """
        Performs a backpropagation step through the layer, with respect to the given input.

        To compute loss gradients w.r.t input, you need to apply chain rule (backprop):

        d loss / d x  = (d loss / d layer) * (d layer / d x)

        Luckily, you already receive d loss / d layer as input, so you only need to multiply it by d layer / d x.

        If your layer has parameters (e.g. dense layer), you also need to update them here using d loss / d layer
        """
        input_dim = input.shape[1]

        d_layer_d_input = np.eye(input_dim)

        return np.dot(grad_output, d_layer_d_input) # chain rule

In [5]:
class ReLU(Layer):
    def __init__(self):
        """ReLU layer simply applies elementwise rectified linear unit to all inputs"""
        pass

    def forward(self, input):
        """Apply elementwise ReLU to [batch, input_units] matrix"""
        output = np.maximum(input,0)
        return output

    def backward(self, input, grad_output):
        """Compute gradient of loss w.r.t. ReLU input"""
        relu_grad_mask = input > 0
        return grad_output * relu_grad_mask

In [6]:
class Dense(Layer):
    def __init__(self, input_units, output_units):
        """
        A dense layer is a layer which performs a learned affine transformation:
        f(x) = <W*x> + b
        """
        self.weights = np.random.randn(input_units, output_units)*0.01
        self.biases = np.zeros(output_units)

        self.grad_weights = None
        self.grad_biases = None

    def forward(self,input):
        """
        Perform an affine transformation:
        f(x) = <W*x> + b

        input shape: [batch, input_units]
        output shape: [batch, output units]
        """
        return input @ self.weights + self.biases

    def backward(self,input,grad_output):
        grad_input = grad_output @ self.weights.T

        grad_weights = input.T @ grad_output
        grad_biases = grad_output.sum(axis=0)

        assert grad_weights.shape == self.weights.shape and grad_biases.shape == self.biases.shape

        self.grad_weights = grad_weights
        self.grad_biases = grad_biases

        return grad_input

In [None]:
class Conv2d(Layer):
    def __init__(self, input_channels, output_channels, kernel_size):

        self.weights = np.random.randn(input_channels, output_channels, kernel_size, kernel_size)*0.01
        self.biases = np.zeros(output_channels)

        self.grad_weights = None
        self.grad_biases = None

    def forward(self, input):
        """
        Perform an convolution:

        output_height = input_height - kernel_size + 1
        output_width = input_width - kernel_size + 1

        input shape: [batch, input_channels, input_height, input_width]
        output shape: [batch, output_channels, output_height, output_width]
        """
        batch_size, input_channels, input_height, input_width = input.shape
        output_channels, k_h, k_w = self.weight.shape[1:]

        output_height = input_height - self.weight.shape[2] + 1
        output_width = input_width - self.weight.shape[3] + 1
        output = np.empty((batch_size, output_channels, output_height, output_width))

        for out_h in range(output_height):
            for out_w in range(output_width):
                    step_h, step_w = out_h+k_h, out_w+k_w
                    input_region = input[..., None, out_h:step_h, out_w:step_w]
                    output[..., out_h, out_w] = np.sum(input_region * self.weight, axis=(1,3,4))

        output += self.bias[None, :, None, None]

        return output

    def backward(self, input, grad_output):
        """
        dL/df = conv(input, grad_output)
        
        dL/dX = F_i^T @ grad_output
        """
        
        #grad_input = <your code here>

        #grad_weights = <your code here>
        #grad_biases = <your code here>

        assert grad_weights.shape == self.weights.shape and grad_biases.shape == self.biases.shape

        self.grad_weights = grad_weights
        self.grad_biases = grad_biases

        return grad_input

In [None]:
class MaxPool2d(Layer):
    def __init__(self, kernel_size):
        self.kernel_size = kernel_size


    def forward(self, input):
        """
        Perform an pooling:

        output_height = input_height // kernel_size
        output_width = input_width // kernel_size

        input shape: [batch, input_channels, input_height, input_width]
        output shape: [batch, input_channels, output_height, output_width]
        """
        batch_size, input_channels, input_height, input_width = input.shape
        
        # lazy impl implying input_channels is multiple of kernel
        output_height = input_height // kernel_size
        output_width = input_width // kernel_size
        output_channels = input_channels
        output = np.empty((batch_size, output_channels, output_height, output_width))
        
        # 0 for h, 1 for w of max
        self.indices =  np.empty(output.shape))
        
        # i'm too stupid to vectorize argmax..
        for n in range(batch_size):
            for c in range(input_channels):
                for h in range(output_height):
                    for w in range(output_width):

                        shift_h = h * self.kernel.size
                        shift_w = w * self.kernel.size

                        input_region = input[n, c, shift_h:self.kernel_size, shift_w:self.kernel_size]
                        output[n, c, h, w] = np.max(input_region)

                        scalar_ind = np.argmax(input_region)
                        ind = np.unravel_index(scalar_ind, (self.kernel_size, self.kernel_size))

                        self.indices[n,c,h,w] = (ind[0]+shift_h, ind[1]+shift_w)
                    
        return output
                    
    def backward(self, input, grad_output):
        grad_input = np.zeros_like(input)
        batch_size, input_channels, output_height, output_width = grad_output.shape
        
        for n in range(batch_size):
            for c in range(input_channels):
                for h in range(output_height):
                    for w in range(output_width):
                        
                    ind = self.indices[n,c,h,w]
                    grad_input[n,c,ind[0],ind[1]] = grad_output[n,c,h,w] 
        return grad_input

In [None]:
class Flatten(Layer):
    def forward(self, input):
          """
          Perform an flatten operation:

          input shape: [batch, input_channels, input_height, input_width]
          output shape: [batch, input_channels * output_height * output_width]
          """
        batch_size = input.shape[0]
        return x.view(batch_size, -1)

    def backward(self, input, grad_output):
        
        grad_input = grad_output.reshape(input.shape)
        return grad_input

In [None]:
def softmax_crossentropy_with_logits(logits,reference_answers):
    """Compute crossentropy from logits[batch,n_classes] and ids of correct answers"""
    logits_for_answers = logits[np.arange(len(logits)),reference_answers]

    xentropy = - logits_for_answers + np.log(np.sum(np.exp(logits),axis=-1))

    return xentropy

def grad_softmax_crossentropy_with_logits(logits,reference_answers):
    """Compute crossentropy gradient from logits[batch,n_classes] and ids of correct answers"""
    ones_for_answers = np.zeros_like(logits)
    ones_for_answers[np.arange(len(logits)),reference_answers] = 1

    softmax = np.exp(logits) / np.exp(logits).sum(axis=-1,keepdims=True)

    return (- ones_for_answers + softmax) / logits.shape[0]

## Имплементация оптимизатора и изменения learning rate'a

В имплементации этих двух классов есть небольшие неточности.
Посмотрите, как сделана имплементация метода моментов в Pytorch и добавьте пропущенное.

Если хотите, можете имплементировать какой-нибудь другой оптимизатор - например, AdamW, Sophia, Lion, Adan, LAMB и т.п.

Также можете придумать нестандартный scheduling (подсказка: нестандартные расписания лучше всего искать во вполне стандартных научных статьях).

За это будут дополнительные баллы. Но обязательно нужно сравнить имплементированный вами оптимизатор со стандартным SGD и методом мом

> Добавлять моменты Нестерова не нужно!



In [None]:
class SGDOptimizer:
    def __init__(self, momentum=0.9, dampening=0.0, weight_decay=0.0, lr=0.1, params=1):
        """
        Wrapper which perfoms weights update
        """
        self.momentum = momentum
        self.dampening = dampening
        self.weight_decay = weight_decay
        self.lr = lr

        # здесь будем копить моментумы
        # и будем делать это для каждого слоя
        self.momentum_buffer = np.zeros(params)

    def step(self, layers):
        """
        Update weights
        in-place for each layer
        """
        for i in range(len(layers)):
            grad_weights = layers[i].grad_weights
            self.momentum_buffer[i] = self.momentum * self.momentum_buffer[i] + (1 - self.dampening) * grad_weights
            layers[i].weights = layers[i].weights - lr * self.momentum_buffer[i]

        #<your code here>

        #return weights - lr * self.momentum_buffer

In [None]:
class ExponentialLRScheduler:
    def __init__(self, lr, optimizer, gamma=0.9):
        """
        Wrapper which perfoms learning rate updates
        """
        self.lr = lr
        self.gamma = gamma
        self.current_step = 0
        self.optimizer = optimizer

    def step(self):
        """
        Update learing rate for current iteration
        """
        current_lr = self.lr * self.gamma
        
        optimizer.lr = current_lr
        
        #self.current_step += 1
        #return current_lr

## Итоговая нейросеть

Все готово для запуска нейросети. Нейросеть будем тестировать на классическом датасете MNIST. Код ниже визуализирует несколько примеров из этого датасета.

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

X_train, y_train, X_val, y_val, X_test, y_test = load_mnist(flatten=False)

plt.figure(figsize=[6,6])
for i in range(4):
    plt.subplot(2,2,i+1)
    plt.title("Label: %i"%y_train[i])
    plt.imshow(X_train[i].reshape([28,28]),cmap='gray');

В нашей реализации сеть - просто список (Python-list) слоев.



> Обратите внимание, что у нас нет глобального пулинга. При изменении архитектуры сети вы должны поменять входую размерность в последнем Dense слое



In [None]:
network = []
hidden_layers_size = 10
network.append(Conv2d(1, 8, 5))
network.append(MaxPool2d(2))
network.append(ReLU())
network.append(Conv2d(8, 16, 3))
network.append(MaxPool2d(2))
network.append(ReLU())
network.append(Flatten())
network.append(Dense(6 * 16, 10))

In [None]:
learning_rate = 0.1

optimizer = SGDOptimizer(params=3)
scheduler = ExponentialLRScheduler(learning_rate, optimizer)

Реализуйте прямой проход по целой сети, последовательно вызывая .forward() для каждого слоя.

In [None]:
def forward(network, X):
    """
    Compute activations of all network layers by applying them sequentially.
    Return a list of activations for each layer.
    Make sure last activation corresponds to network logits.
    """
    activations = []
    input = X

    for layer in network:
        activations.append(layer.forward(input))
        input = activations[-1]

    assert len(activations) == len(network)
    return activations

def predict(network, X):
    """
    Use network to predict the most likely class for each sample.
    """
    logits = forward(network, X)[-1]
    return logits.argmax(axis=-1)

In [None]:
def train(network,X,y):
    """
    Train your network on a given batch of X and y.
    You first need to run forward to get all layer activations.
    Then you can run layer.backward going from last to first layer.

    After you called backward for all layers, all Dense layers have already made one gradient step.
    """

    # Get the layer activations
    layer_activations = forward(network,X)
    layer_inputs = [X] + layer_activations  #layer_input[i] is an input for network[i]
    logits = layer_activations[-1]

    # Compute the loss and the initial gradient
    loss = softmax_crossentropy_with_logits(logits,y)
    loss_grad = grad_softmax_crossentropy_with_logits(logits,y)

    # propagate gradients through network layers using .backward
    # hint: start from last layer and move to earlier layers
    for i in reversed(range(len(network))):
        loss_grad = network[i].backward(layer_inputs[i], loss_grad)

    # update weights and biases with optimizer
    optimizer.step([network[0], network[3], network[7]])

    # update learning rate
    scheduler.step()

    return np.mean(loss)

Все готово для запуска обучения. Если все реализовано корректно, то точность классификации на валидационном множестве **должна превысить** 99%.

In [None]:
from tqdm.auto import tqdm
def iterate_minibatches(inputs, targets, batchsize, shuffle=False):
    assert len(inputs) == len(targets)
    if shuffle:
        indices = np.random.permutation(len(inputs))
    for start_idx in tqdm(range(0, len(inputs) - batchsize + 1, batchsize)):
        if shuffle:
            excerpt = indices[start_idx:start_idx + batchsize]
        else:
            excerpt = slice(start_idx, start_idx + batchsize)
        yield inputs[excerpt], targets[excerpt]

In [None]:
from IPython.display import clear_output
train_log = []
val_log = []

In [None]:
for epoch in range(15):

    for x_batch,y_batch in iterate_minibatches(X_train, y_train, batchsize=32, shuffle=True):
        train(network, x_batch, y_batch)

    train_log.append(np.mean(predict(network, X_train) == y_train))
    val_log.append(np.mean(predict(network, X_val) == y_val))

    clear_output()
    print("Epoch",epoch)
    print("Train accuracy:",train_log[-1])
    print("Val accuracy:",val_log[-1])
    plt.plot(train_log,label='train accuracy')
    plt.plot(val_log,label='val accuracy')
    plt.legend(loc='best')
    plt.grid()
    plt.show()
