# Домашнее задание 2.1. Сверточные сети



В этом задании вы должны:
1. Написать слой Conv2d на Numpy и определить в нем forward-backward методы
2. Определить слой MaxPool2d
3. Написать всю необходимую обвязку для обучения: оптимизатор с адаптивным шагом и класс, позволяющий изменять расписание для learning rate'а



> Обратите внимание, что в этом задании больше нет тестов.
> Вы должны сами проверять свой код.  
> Это можно сделать так:
> 1. Написать юнит-тесты с помощью Pytorch. То есть, ваш модудь должен повторять поведение torch'а
> 2. Проверять архитектуру не на всем датасете, а на подвыборке: при наивной имплементации слоев одна эпоха на всем датасете будет занимать около двух часов

## Numpy-имплементация сверточной нейронной сети


Вставьте сюда имплементацию из первого домашнего задания.



> Обратите внимание, что обновление весов теперь производится с помощью специального класса **Optimizer**



In [2]:
import os
import sys
import numpy as np

def load_mnist(flatten=False):
    """taken from https://github.com/Lasagne/Lasagne/blob/master/examples/mnist.py"""
    # We first define a download function, supporting both Python 2 and 3.
    if sys.version_info[0] == 2:
        from urllib import urlretrieve
    else:
        from urllib.request import urlretrieve

    def download(filename, source='http://yann.lecun.com/exdb/mnist/'):
        print("Downloading %s" % filename)
        urlretrieve(source + filename, filename)

    # We then define functions for loading MNIST images and labels.
    # For convenience, they also download the requested files if needed.
    import gzip

    def load_mnist_images(filename):
        if not os.path.exists(filename):
            download(filename)
        # Read the inputs in Yann LeCun's binary format.
        with gzip.open(filename, 'rb') as f:
            data = np.frombuffer(f.read(), np.uint8, offset=16)
        # The inputs are vectors now, we reshape them to monochrome 2D images,
        # following the shape convention: (examples, channels, rows, columns)
        data = data.reshape(-1, 1, 28, 28)
        # The inputs come as bytes, we convert them to float32 in range [0,1].
        # (Actually to range [0, 255/256], for compatibility to the version
        # provided at http://deeplearning.net/data/mnist/mnist.pkl.gz.)
        return data / np.float32(256)

    def load_mnist_labels(filename):
        if not os.path.exists(filename):
            download(filename)
        # Read the labels in Yann LeCun's binary format.
        with gzip.open(filename, 'rb') as f:
            data = np.frombuffer(f.read(), np.uint8, offset=8)
        # The labels are vectors of integers now, that's exactly what we want.
        return data

    # We can now download and read the training and test set images and labels.
    X_train = load_mnist_images('train-images-idx3-ubyte.gz')
    y_train = load_mnist_labels('train-labels-idx1-ubyte.gz')
    X_test = load_mnist_images('t10k-images-idx3-ubyte.gz')
    y_test = load_mnist_labels('t10k-labels-idx1-ubyte.gz')

    # We reserve the last 10000 training examples for validation.
    X_train, X_val = X_train[:-10000], X_train[-10000:]
    y_train, y_val = y_train[:-10000], y_train[-10000:]

    if flatten:
        X_train = X_train.reshape([X_train.shape[0], -1])
        X_val = X_val.reshape([X_val.shape[0], -1])
        X_test = X_test.reshape([X_test.shape[0], -1])

    # We just return all the arrays in order, as expected in main().
    # (It doesn't matter how we do this as long as we can read them again.)
    return X_train, y_train, X_val, y_val, X_test, y_test

In [3]:
class Layer:
    """
    A building block. Each layer is capable of performing two things:

    - Process input to get output:           output = layer.forward(input)

    - Propagate gradients through itself:    grad_input = layer.backward(input, grad_output)

    Some layers also have learnable parameters which they update during layer.backward.
    """
    def __init__(self):
        """Here you can initialize layer parameters (if any) and auxiliary stuff."""
        pass

    def forward(self, input):
        """
        Takes input data of shape [batch, input_units], returns output data [batch, output_units]
        """
        return input

    def backward(self, input, grad_output):
        """
        Performs a backpropagation step through the layer, with respect to the given input.

        To compute loss gradients w.r.t input, you need to apply chain rule (backprop):

        d loss / d x  = (d loss / d layer) * (d layer / d x)

        Luckily, you already receive d loss / d layer as input, so you only need to multiply it by d layer / d x.

        If your layer has parameters (e.g. dense layer), you also need to update them here using d loss / d layer
        """
        input_dim = input.shape[1]

        d_layer_d_input = np.eye(input_dim)

        return np.dot(grad_output, d_layer_d_input) # chain rule

In [4]:
class ReLU(Layer):
    def __init__(self):
        """ReLU layer simply applies elementwise rectified linear unit to all inputs"""
        pass

    def forward(self, input):
        """Apply elementwise ReLU to [batch, input_units] matrix"""
        output = np.array(input)
        output[output < 0] = 0
        return output

    def backward(self, input, grad_output):
        """Compute gradient of loss w.r.t. ReLU input"""
        relu_grad_mask = np.zeros_like(input)
        relu_grad_mask[input > 0] = 1
        return grad_output * relu_grad_mask

In [None]:
class Dense(Layer):
    def __init__(self, input_units, output_units, learning_rate=0.1):
        """
        A dense layer is a layer which performs a learned affine transformation:
        f(x) = <W*x> + b
        """
        self.learning_rate = learning_rate

        # initialize weights with small random numbers from normal distribution
        # you can change the intializtion method
        self.weights = np.random.randn(input_units, output_units)*0.01
        self.biases = np.zeros(output_units)

    def forward(self,input):
        """
        Perform an affine transformation:
        f(x) = <W*x> + b

        input shape: [batch, input_units]
        output shape: [batch, output_units]
        """
        return np.dot(input, self.weights) + self.biases

    def backward(self, input, grad_output):

        # compute d f / d x = d f / d dense * d dense / d x
        # where d dense/ d x = weights transposed
        # grad_output is a derivative of the next (in forward) layer of prediction
        # this result is needed for the next layer
        grad_input = np.dot(grad_output, np.transpose(self.weights))

        # compute gradient w.r.t. weights and biases
        grad_weights = np.dot(np.transpose(input), grad_output)
        grad_biases = np.sum(grad_output, axis=0)

        assert grad_weights.shape == self.weights.shape and grad_biases.shape == self.biases.shape
        # Here we perform a stochastic gradient descent step.
        # or step of another gradient method
        self.weights = self.weights - self.learning_rate * grad_weights
        self.biases = self.biases - self.learning_rate * grad_biases

        return grad_input

In [33]:
class Conv2d(Layer):
    def __init__(self, input_channels, output_channels, kernel_size):

        self.weights = np.random.randn(input_channels, output_channels, kernel_size, kernel_size)*0.01
        self.biases = np.zeros(output_channels)
        self.output_chanels = output_channels
        self.kernel_size = kernel_size
        
        self.grad_weights = None
        self.grad_biases = None

    def forward(self, input):
        """
        Perform an convolution:

        output_height = input_height - kernel_size + 1
        output_width = input_width - kernel_size + 1

        input shape: [batch, input_channels, input_height, input_width]
        output shape: [batch, output_channels, output_height, output_width]
        """
        # Output dimention is determined by the number of filters in the layer.
        # Each filter is a tensor with the size (input_channels, kernel_size, kernel_size)
        # (In this example second axis is an index of filter)
        # Such a filter produces one number per convolution 
        #   and this convolution is performed for each part of the input tensor.
        batch_size = np.shape(input)[0]
        output_height = np.shape(input)[2] - self.kernel_size + 1
        output_width = np.shape(input)[3] - self.kernel_size + 1

        # This is the result of convollution of each filter
        filters = np.empty((self.output_chanels, output_height, output_width, batch_size))
        for filter_idx in range(self.output_chanels):
            for i in range(output_height):
                for j in range(output_width):
                    filters[filter_idx, i, j] = \
                        np.tensordot(input[:, :, i:i+self.kernel_size, j:j+self.kernel_size],
                                     self.weights[:, filter_idx, :, :],
                                     axes=([1, 2, 3], [0, 1, 2]))
        return np.rollaxis(filters, 3, 0)
        

    def backward(self, input, grad_output):
        #grad_input = <your code here>

        #grad_weights = <your code here>
        #grad_biases = <your code here>

        assert grad_weights.shape == self.weights.shape and grad_biases.shape == self.biases.shape

        self.grad_weights = grad_weights
        self.grad_biases = grad_biases

        return grad_input

In [36]:
# Conv2D test
#from torch import nn
#import torch
#test_batch_size = 10
#test_input_chanels = 3
#test_output_chanels = 2
#test_input_height = 4
#test_input_width = 5
#test_kernel_size = 2
#
#test_conv = Conv2d(input_channels=test_input_chanels, 
#                   output_channels=test_output_chanels, 
#                   kernel_size=test_kernel_size)
#test_input = np.random.randn(test_batch_size, 
#                             test_input_chanels, 
#                             test_input_height,
#                             test_input_width)
#test_output = test_conv.forward(test_input)
#test_layer = nn.Conv2d(in_channels=test_input_chanels,
#                     out_channels=test_output_chanels,
#                     kernel_size=(test_kernel_size, test_kernel_size))
#test_ref = test_layer(torch.tensor(test_input, dtype=torch.float))
#print(test_ref)
#print(test_output)
#assert torch.equal(torch.tensor(test_output, dtype=torch.float), test_ref)


tensor([[[[-0.6838,  0.0923, -0.4602,  0.4486],
          [ 0.5164, -0.3346, -0.2623, -0.4785],
          [ 0.4358,  0.1483,  0.0891, -0.2875]],

         [[-1.1064,  0.2061, -0.6074,  1.4716],
          [ 0.2603, -0.6969,  0.0148,  0.8142],
          [ 0.2549, -1.5044,  0.6521,  0.1686]]],


        [[[-0.8534,  0.4540, -0.3437, -0.4620],
          [-0.3190,  0.2090, -0.1892, -0.3953],
          [-0.1837, -0.1451, -0.0466, -0.1749]],

         [[ 0.2089,  0.1504, -0.6972, -1.0789],
          [ 0.1066, -0.0919, -0.8480,  0.1634],
          [ 0.1517,  0.5613,  0.1536,  1.0877]]],


        [[[-0.5526, -0.8650,  0.1007, -0.7047],
          [-0.8038, -0.3513, -0.0599,  0.0467],
          [ 0.2070, -0.3426,  0.1781, -0.2867]],

         [[ 0.7698,  0.7011,  0.1731, -0.2240],
          [-0.0336,  0.1821, -0.5542, -0.6382],
          [ 1.1757, -1.3990, -0.0182,  0.7523]]],


        [[[ 0.0263, -0.7677,  0.0338, -0.4457],
          [-0.0618,  0.5206, -0.3122, -0.0073],
          [-0.1349, -0

AssertionError: 

In [None]:
class MaxPool2d(Layer):
    def __init__(self, kernel_size):
        self.kernel_size = kernel_size


    def forward(self, input):
        """
        Perform an pooling:

        output_height = input_height // kernel_size
        output_width = input_width // kernel_size

        input shape: [batch, input_channels, input_height, input_width]
        output shape: [batch, input_channels, output_height, output_width]
        """
        #<your code here>

    def backward(self, input, grad_output):
        #grad_input = <your code here>

        return grad_input

In [None]:
class Flatten(Layer):
    def forward(self, input):
          """
          Perform an flatten operation:

          input shape: [batch, input_channels, input_height, input_width]
          output shape: [batch, input_channels * output_height * output_width]
          """
          #<your code here>

    def backward(self, input, grad_output):
        #grad_input = <your code here>

        return grad_input

In [None]:
def softmax_crossentropy_with_logits(logits,reference_answers):
    """Compute crossentropy from logits[batch,n_classes] and ids of correct answers"""
    logits_for_answers = logits[np.arange(len(logits)),reference_answers]

    xentropy = - logits_for_answers + np.log(np.sum(np.exp(logits),axis=-1))

    return xentropy

def grad_softmax_crossentropy_with_logits(logits,reference_answers):
    """Compute crossentropy gradient from logits[batch,n_classes] and ids of correct answers"""
    ones_for_answers = np.zeros_like(logits)
    ones_for_answers[np.arange(len(logits)),reference_answers] = 1

    softmax = np.exp(logits) / np.exp(logits).sum(axis=-1,keepdims=True)

    return (- ones_for_answers + softmax) / logits.shape[0]

## Имплементация оптимизатора и изменения learning rate'a

В имплементации этих двух классов есть небольшие неточности.
Посмотрите, как сделана имплементация метода моментов в Pytorch и добавьте пропущенное.

> Добавлять моменты Нестерова не нужно!



In [None]:
class SGDOptimizer:
    def __init__(self, momentum=0.9, dampening=0.0, weight_decay=0.0):
        """
        Wrapper which perfoms weights update
        """
        self.momentum = momentum
        self.dampening = dampening
        self.weight_decay = weight_decay

        # здесь будем копить моментумы
        self.momentum_buffer = 0

    def step(self, weights, grad_weights, lr=0.1):
      """
      Update weights
      """
      self.momentum_buffer = self.momentum * self.momentum_buffer + (1 - self.dampening) * grad_weights

      #<your code here>

      return weights - lr * self.momentum_buffer

In [None]:
class LRScheduler:
    def __init__(self, lr):
        """
        Wrapper which perfoms learning rate updates
        """
        self.lr = lr
        self.current_step = 0

    def get_lr(self, step):
      """
      Update learing rate for current iteration
      """
      current_lr = self.lr / (self.current_step + 1)
      #<your code here>
      self.current_step += 1
      return current_lr

## Итоговая нейросеть

Все готово для запуска нейросети. Нейросеть будем тестировать на классическом датасете MNIST. Код ниже визуализирует несколько примеров из этого датасета.

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

X_train, y_train, X_val, y_val, X_test, y_test = load_mnist(flatten=False)

plt.figure(figsize=[6,6])
for i in range(4):
    plt.subplot(2,2,i+1)
    plt.title("Label: %i"%y_train[i])
    plt.imshow(X_train[i].reshape([28,28]),cmap='gray');

В нашей реализации сеть - просто список (Python-list) слоев.



> Обратите внимание, что у нас нет глобального пулинга. При изменении архитектуры сети вы должны поменять входую размерность в последнем Dense слое



In [None]:
network = []
network.append(Conv2d(1, 4, 5))
network.append(MaxPool2d(2))
network.append(ReLU())
network.append(Conv2d(4, 8, 5))
network.append(MaxPool2d(2))
network.append(ReLU())
network.append(Flatten())
network.append(Dense(5 * 5 * 8, 10))

In [None]:
learning_rate = 0.1

optimizer = SGDOptimizer()
scheduler = LRScheduler(learning_rate)

Реализуйте прямой проход по целой сети, последовательно вызывая .forward() для каждого слоя.

In [None]:
def forward(network, X):
    """
    Compute activations of all network layers by applying them sequentially.
    Return a list of activations for each layer.
    Make sure last activation corresponds to network logits.
    """
    activations = []
    input = X

    # <your code here>

    assert len(activations) == len(network)
    return activations

def predict(network, X):
    """
    Use network to predict the most likely class for each sample.
    """
    logits = forward(network, X)[-1]
    return logits.argmax(axis=-1)

In [None]:
def train(network,X,y):
    """
    Train your network on a given batch of X and y.
    You first need to run forward to get all layer activations.
    Then you can run layer.backward going from last to first layer.

    After you called backward for all layers, all Dense layers have already made one gradient step.
    """

    # Get the layer activations
    layer_activations = forward(network,X)
    layer_inputs = [X] + layer_activations  #layer_input[i] is an input for network[i]
    logits = layer_activations[-1]

    # Compute the loss and the initial gradient
    loss = softmax_crossentropy_with_logits(logits,y)
    loss_grad = grad_softmax_crossentropy_with_logits(logits,y)

    # propagate gradients through network layers using .backward
    # hint: start from last layer and move to earlier layers
    #<YOUR CODE>

    # update weights and biases with optimizer
    #<YOUR CODE>

    # update learning rate
    #<YOUR CODE>

    return np.mean(loss)

Все готово для запуска обучения. Если все реализовано корректно, то точность классификации на валидационном множестве **должна быть около** 99%.

In [None]:
from tqdm.auto import tqdm
def iterate_minibatches(inputs, targets, batchsize, shuffle=False):
    assert len(inputs) == len(targets)
    if shuffle:
        indices = np.random.permutation(len(inputs))
    for start_idx in tqdm(range(0, len(inputs) - batchsize + 1, batchsize)):
        if shuffle:
            excerpt = indices[start_idx:start_idx + batchsize]
        else:
            excerpt = slice(start_idx, start_idx + batchsize)
        yield inputs[excerpt], targets[excerpt]

In [None]:
from IPython.display import clear_output
train_log = []
val_log = []

In [None]:
for epoch in range(15):

    for x_batch,y_batch in iterate_minibatches(X_train, y_train, batchsize=32, shuffle=True):
        train(network, x_batch, y_batch)

    train_log.append(np.mean(predict(network, X_train) == y_train))
    val_log.append(np.mean(predict(network, X_val) == y_val))

    clear_output()
    print("Epoch",epoch)
    print("Train accuracy:",train_log[-1])
    print("Val accuracy:",val_log[-1])
    plt.plot(train_log,label='train accuracy')
    plt.plot(val_log,label='val accuracy')
    plt.legend(loc='best')
    plt.grid()
    plt.show()
