In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import Tensor
from typing import Optional
import numpy as np
import torch
from torch.autograd import Variable
import numpy
import unittest

**Module** is an abstract class which defines fundamental methods necessary for a training a neural network. You do not need to change anything here, just read the comments. \
**Module** - это абстрактный класс, который определяет фундаментальные методы, необходимые для обучения нейронной сети. Вам не нужно ничего здесь менять, просто прочитайте комментарии.

In [2]:
class Module(object):
    """
    Basically, you can think of a module as of a something (black box)
    which can process `input` data and produce `ouput` data.
    This is like applying a function which is called `forward`:

        output = module.forward(input)

    The module should be able to perform a backward pass: to differentiate the `forward` function.
    More, it should be able to differentiate it if is a part of chain (chain rule).
    The latter implies there is a gradient from previous step of a chain rule.

        gradInput = module.backward(input, gradOutput)
    """
    def __init__ (self):
        self.output = None
        self.gradInput = None
        self.training = True

        # self.output — хранит результат прямого прохода (выходное значение).
        # self.gradInput — хранит градиент по входу (нужен для обратного прохода).
        # self.training — флаг, который показывает, работает ли модель в режиме обучения (True) или в режиме оценки (False).

    def forward(self, input):
        """
        Takes an input object, and computes the corresponding output of the module.
        """
        return self.updateOutput(input) # данный метод есть чуть ниже

        # Вызывает updateOutput(input), которая должна быть переопределена в дочерних классах.
        # Задача: вычислить выходное значение (output) на основе входа.

    def backward(self, input, gradOutput):
        """
        Performs a backpropagation step through the module, with respect to the given input.

        This includes
         - computing a gradient w.r.t. `input` (is needed for further backprop),
         - computing a gradient w.r.t. parameters (to update parameters while optimizing).
        """
        self.updateGradInput(input, gradOutput) # метод, лежит ниже
        self.accGradParameters(input, gradOutput) # метод, лежи ниже
        return self.gradInput

        # Вызывает два метода:
        #   updateGradInput(input, gradOutput) — вычисляет градиент по входу.
        #   accGradParameters(input, gradOutput) — вычисляет градиент по параметрам (если есть параметры).
        # Возвращает self.gradInput (градиент по входу).



    def updateOutput(self, input):
        """
        Computes the output using the current parameter set of the class and input.
        This function returns the result which is stored in the `output` field.

        Make sure to both store the data in `output` field and return it.

        Это напоминание о том, что метод updateGradInput должен:
        Сохранить градиент во внутренней переменной self.gradInput.
        Вернуть этот градиент (так как backward() ожидает его).
        """


        # The easiest case:

        # self.output = input
        # return self.output

        pass

        # Должна быть переопределена в наследниках!
        # Вычисляет self.output на основе input и возвращает его.

    def updateGradInput(self, input, gradOutput):
        """
        Computing the gradient of the module with respect to its own input.
        This is returned in `gradInput`. Also, the `gradInput` state variable is updated accordingly.

        The shape of `gradInput` is always the same as the shape of `input`.

        Make sure to both store the gradients in `gradInput` field and return it.
        """

        # The easiest case:

        # self.gradInput = gradOutput
        # return self.gradInput

        pass

        # Должна быть переопределена в наследниках!
        # Вычисляет градиент self.gradInput по входу.
        # Градиент по входу имеет ту же форму, что и сам вход.

    def accGradParameters(self, input, gradOutput):
        """
        Computing the gradient of the module with respect to its own parameters.
        No need to override if module has no parameters (e.g. ReLU).
        """
        pass

    # Вычисляет градиент по параметрам модуля.
    # Если у модуля нет параметров (например, ReLU), переопределять не нужно.

    def zeroGradParameters(self):
        """
        Zeroes `gradParams` variable if the module has params.
        """
        pass

    # Обнуляет градиенты параметров (нужно перед каждым шагом оптимизации).

    def getParameters(self):
        """
        Returns a list with its parameters.
        If the module does not have parameters return empty list.
        """
        return []

    # Возвращает список параметров (например, веса W и B).
    # Если у слоя нет параметров (ReLU), возвращает [].

    def getGradParameters(self):
        """
        Returns a list with gradients with respect to its parameters.
        If the module does not have parameters return empty list.
        """
        return []

    # Возвращает градиенты параметров.
    # Если у слоя нет параметров, возвращает [].

    def train(self):
        """
        Sets training mode for the module.
        Training and testing behaviour differs for Dropout, BatchNorm.
        """
        self.training = True

        # Устанавливает флаг training = True (режим обучения).
        # Когда нужно включать train()?
        # Перед началом обучения сети (например, перед вызовом forward и backward).
        # Некоторые слои (например, Dropout и BatchNorm) ведут себя по-разному в обучении и на тесте, поэтому режим training важен.

    def evaluate(self):
        """
        Sets evaluation mode for the module.
        Training and testing behaviour differs for Dropout, BatchNorm.
        """
        self.training = False

        # Устанавливает флаг training = False (режим тестирования).
        # Полезно для BatchNorm, Dropout.

    def __repr__(self):
        """
        Pretty printing. Should be overrided in every module if you want
        to have readable description.
        """
        return "Module"

        # Определяет строковое представление класса.
        # В дочерних классах лучше переопределить:

# Sequential container

**Define** a forward and backward pass procedures. \
контейнер, который позволяет последовательно добавлять слои к модели, что делает процесс построения нейронной сети более интуитивным и простым.

In [3]:
import torch.nn as nn

class Sequential(nn.Module):
    def __init__(self):
        super(Sequential, self).__init__()
        self.modules = []

    def add(self, module):
        """Добавляет модуль (слой) в контейнер."""
        self.modules.append(module)

    def updateOutput(self, input):
        """Прямой проход (forward pass) через все слои в контейнере."""
        self.y = [input]  # Сохраняем входной тензор
        for module in self.modules:
            self.y.append(module.forward(self.y[-1]))  # Пропускаем через слои
        self.output = self.y[-1]  # Выход последнего слоя
        return self.output

    def backward(self, input, gradOutput):
        """Обратный проход (backward pass) через все слои."""
        n = len(self.modules)
        grad = gradOutput  # Начальный градиент

        for i in range(n - 1, -1, -1):  # Обратный проход
            grad = self.modules[i].backward(self.y[i], grad)

        self.gradInput = grad
        return self.gradInput

    def zeroGradParameters(self):
        """Обнуляет градиенты во всех слоях."""
        for module in self.modules:
            module.zeroGradParameters()

    def getParameters(self):
        """Собирает все параметры модели в список."""
        return [module.getParameters() for module in self.modules]

    def getGradParameters(self):
        """Собирает все градиенты модели в список."""
        return [module.getGradParameters() for module in self.modules]

    def __repr__(self):
        return "\n".join([str(module) for module in self.modules])

    def __getitem__(self, x):
        return self.modules[x]

    def train(self):
        """Переключает все модули в режим обучения."""
        self.training = True
        for module in self.modules:
            module.train()

    def evaluate(self):
        """Переключает все модули в режим оценки."""
        self.training = False
        for module in self.modules:
            module.evaluate()


# Layers

## 1 (0.2). Linear transform layer
Also known as dense layer, fully-connected layer, FC-layer, InnerProductLayer (in caffe), affine transform
- input:   **`batch_size x n_feats1`**
- output: **`batch_size x n_feats2`**

In [4]:
class Linear(Module):
    """
    A module which applies a linear transformation
    A common name is fully-connected layer, InnerProductLayer in caffe.

    The module should work with 2D input of shape (n_samples, n_feature).
    """
    def __init__(self, n_in, n_out):
        super(Linear, self).__init__()

        # This is a nice initialization
        # Инициализирует веса (self.W) и смещения (self.b) для линейного слоя.
        # Веса инициализируются случайно в диапазоне от -stdv до stdv, где stdv = 1 / sqrt(n_in). Это помогает предотвратить взрыв градиентов во время обучения.
        # Градиенты весов (self.gradW) и смещений (self.gradb) инициализируются нулями.
        stdv = 1./np.sqrt(n_in)
        self.W = np.random.uniform(-stdv, stdv, size = (n_out, n_in))
        self.b = np.random.uniform(-stdv, stdv, size = n_out)

        self.gradW = np.zeros_like(self.W)
        self.gradb = np.zeros_like(self.b)

    # Применяет линейную трансформацию к входным данным.
    # Выход рассчитывается как матричное произведение входных данных и весов плюс смещение.
    def updateOutput(self, input):
        # Your code goes here. ################################################
        # self.output = ...
        self.output = input @ self.W.T + self.b
        return self.output

    # Рассчитывает градиент ошибки по отношению к входным данным.
    # Используется для обратного распространения ошибки.
    def updateGradInput(self, input, gradOutput):
        # Your code goes here. ################################################
        # self.gradInput = ...
        self.gradInput = gradOutput @ self.W
        return self.gradInput

    # Рассчитывает и накапливает градиенты весов и смещений на основе градиента ошибки.
    def accGradParameters(self, input, gradOutput):
        # Your code goes here. ################################################
        # self.gradW = ... ; self.gradb = ...
        self.gradW = gradOutput.T @ input
        self.gradb = gradOutput.sum(axis = 0)
        pass

    # Сбрасывает градиенты весов и смещений в ноль перед новой итерацией обучения.
    def zeroGradParameters(self):
        self.gradW.fill(0)
        self.gradb.fill(0)

    def getParameters(self):
        return [self.W, self.b]

    def getGradParameters(self):
        return [self.gradW, self.gradb]

    # Когда вы вызываете функцию print() или repr() для объекта этого класса, будет выведена строка, например:
    # 'Linear 784 -> 10', что означает, что слой принимает 784 входных признака и выдает 10 выходных.
    def __repr__(self):
        s = self.W.shape
        q = 'Linear %d -> %d' %(s[1],s[0])
        return q

## 2. (0.2) SoftMax
- input:   **`batch_size x n_feats`**
- output: **`batch_size x n_feats`**

$\text{softmax}(x)_i = \frac{\exp x_i} {\sum_j \exp x_j}$

Recall that $\text{softmax}(x) == \text{softmax}(x - \text{const})$. It makes possible to avoid computing exp() from large argument. \
Нормализация через константу: Вычитание максимума (x - max(x)) перед вычислением экспоненты предотвращает численную нестабильность

In [5]:
class SoftMax(Module):
    def __init__(self):
         super(SoftMax, self).__init__()

    def updateOutput(self, input):
        self.output = np.subtract(input, input.max(axis=1, keepdims=True))

        # Your code goes here. ################################################
        self.output = np.exp(self.output)
        self.output = self.output / np.sum(self.output, axis=1, keepdims=True)

        return self.output

    def updateGradInput(self, input, gradOutput):
        # Your code goes here. ################################################
        self.gradInput = self.output * (gradOutput - np.sum(self.output * gradOutput, axis=1, keepdims=True))
        return self.gradInput

    def __repr__(self):
        return "SoftMax"

## 3. (0.2) LogSoftMax
- input:   **`batch_size x n_feats`**
- output: **`batch_size x n_feats`**

$\text{logsoftmax}(x)_i = \log\text{softmax}(x)_i = x_i - \log {\sum_j \exp x_j}$

The main goal of this layer is to be used in computation of log-likelihood loss.

In [6]:
class LogSoftMax(Module):
    def __init__(self):
         super(LogSoftMax, self).__init__()

    def updateOutput(self, input):
        # start with normalization for numerical stability
        self.output = np.subtract(input, input.max(axis=1, keepdims=True))

        # Your code goes here. ################################################
        self.output = self.output - np.log(np.sum(np.exp(self.output), axis = 1, keepdims = True))
        return self.output

    def updateGradInput(self, input, gradOutput):
        # Your code goes here. ################################################
        self.gradInput = gradOutput - np.exp(self.output) * np.sum(gradOutput, axis = 1, keepdims = True)
        return self.gradInput

    def __repr__(self):
        return "LogSoftMax"

## 4. (0.3) Batch normalization
One of the most significant recent ideas that impacted NNs a lot is [**Batch normalization**](http://arxiv.org/abs/1502.03167). The idea is simple, yet effective: the features should be whitened ($mean = 0$, $std = 1$) all the way through NN. This improves the convergence for deep models letting it train them for days but not weeks. **You are** to implement the first part of the layer: features normalization. The second part (`ChannelwiseScaling` layer) is implemented below. \
\
Одна из самых значительных идей, повлиявших на нейронные сети — Batch Normalization. Идея проста, но эффективна: признаки должны быть отбелены (среднее = 0, std = 1) на всех слоях нейросети. Это улучшает сходимость глубоких моделей, сокращая время обучения с недель до дней.
Ваша задача: реализовать первую часть слоя (нормализацию признаков). Вторая часть (ChannelwiseScaling) уже реализована.

- input:   **`batch_size x n_feats`**
- output: **`batch_size x n_feats`**

The layer should work as follows. While training (`self.training == True`) it transforms input as $$y = \frac{x - \mu}  {\sqrt{\sigma + \epsilon}}$$
Слой Batch Normalization работает следующим образом:
В режиме обучения (self.training == True):
Преобразует входные данные по формуле \
where $\mu$ and $\sigma$ - mean and variance of feature values in **batch** and $\epsilon$ is just a small number for numericall stability. Also during training, layer should maintain exponential moving average values for mean and variance:\
Обновляет экспоненциальные скользящие средние для среднего и дисперсии:
```
    self.moving_mean = self.moving_mean * alpha + batch_mean * (1 - alpha)
    self.moving_variance = self.moving_variance * alpha + batch_variance * (1 - alpha)
```
During testing (`self.training == False`) the layer normalizes input using moving_mean and moving_variance. \
В режиме тестирования (self.training == False): \
Нормализует входные данные, используя накопленные скользящие средние (moving_mean и moving_variance) вместо статистик текущего батча.

Note that decomposition of batch normalization on normalization itself and channelwise scaling here is just a common **implementation** choice. In general "batch normalization" always assumes normalization + scaling.

In [7]:
class BatchNormalization(Module):
    EPS = 1e-3
    def __init__(self, alpha = 0.):
        super(BatchNormalization, self).__init__()
        self.alpha = alpha
        self.moving_mean = None
        self.moving_variance = None

    def updateOutput(self, input):
        # Your code goes here. ################################################
        # use self.EPS please
        if self.training == True:
            self.batch_mean = input.mean(axis = 0)
            self.batch_variance = input.var(axis = 0)
            self.output = (input - self.batch_mean) / (self.batch_variance + self.EPS)**0.5

            self.moving_mean = self.moving_mean * self.alpha + self.batch_mean * (1 - self.alpha)
            self.moving_variance = self.moving_variance * self.alpha + self.batch_variance * (1 - self.alpha)
        else:
            self.output = (input - self.moving_mean) / (self.moving_variance + self.EPS)**0.5

        return self.output

    def updateGradInput(self, input, gradOutput):
        # Your code goes here. ################################################
        m = gradOutput.shape[0]
        self.gradInput = (m * gradOutput - np.sum(gradOutput, axis = 0) - self.output * np.sum(gradOutput*self.output, axis = 0))/(m*np.sqrt(self.batch_variance + self.EPS))
        return self.gradInput

    def __repr__(self):
        return "BatchNormalization"

In [8]:
# Не трогать этот класс

class ChannelwiseScaling(Module):
    """
       Implements linear transform of input y = \gamma * x + \beta
       where \gamma, \beta - learnable vectors of length x.shape[-1]
    """
    def __init__(self, n_out):
        super(ChannelwiseScaling, self).__init__()

        stdv = 1./np.sqrt(n_out)
        self.gamma = np.random.uniform(-stdv, stdv, size=n_out)
        self.beta = np.random.uniform(-stdv, stdv, size=n_out)

        self.gradGamma = np.zeros_like(self.gamma)
        self.gradBeta = np.zeros_like(self.beta)

    def updateOutput(self, input):
        self.output = input * self.gamma + self.beta
        return self.output

    def updateGradInput(self, input, gradOutput):
        self.gradInput = gradOutput * self.gamma
        return self.gradInput

    def accGradParameters(self, input, gradOutput):
        self.gradBeta = np.sum(gradOutput, axis=0)
        self.gradGamma = np.sum(gradOutput*input, axis=0)

    def zeroGradParameters(self):
        self.gradGamma.fill(0)
        self.gradBeta.fill(0)

    def getParameters(self):
        return [self.gamma, self.beta]

    def getGradParameters(self):
        return [self.gradGamma, self.gradBeta]

    def __repr__(self):
        return "ChannelwiseScaling"

Practical notes. If BatchNormalization is placed after a linear transformation layer (including dense layer, convolutions, channelwise scaling) that implements function like `y = weight * x + bias`, than bias adding become useless and could be omitted since its effect will be discarded while batch mean subtraction. If BatchNormalization (followed by `ChannelwiseScaling`) is placed before a layer that propagates scale (including ReLU, LeakyReLU) followed by any linear transformation layer than parameter `gamma` in `ChannelwiseScaling` could be freezed since it could be absorbed into the linear transformation layer.

Практические замечания: \
Если BatchNormalization размещается после линейного преобразования (включая полносвязный слой, сверточные слои, поканальное масштабирование), которое реализует функцию вида: \
\
y = weight * x + bias \
\
то добавление смещения (bias) становится бесполезным и может быть опущено, так как его эффект будет уничтожен при вычитании среднего значения в Batch Normalization. \
\
Если BatchNormalization (за которым следует ChannelwiseScaling) размещается перед слоем, который сохраняет масштаб (например, ReLU, LeakyReLU), а затем следует любой линейный слой, то параметр gamma в ChannelwiseScaling` можно зафиксировать (заморозить), так как он может быть поглощен (перенесен) в линейный слой.

## 5. (0.3) Dropout
Implement [**dropout**](https://www.cs.toronto.edu/~hinton/absps/JMLRdropout.pdf). The idea and implementation is really simple: just multimply the input by $Bernoulli(p)$ mask. Here $p$ is probability of an element to be zeroed.

This has proven to be an effective technique for regularization and preventing the co-adaptation of neurons.

While training (`self.training == True`) it should sample a mask on each iteration (for every batch), zero out elements and multiply elements by $1 / (1 - p)$. The latter is needed for keeping mean values of features close to mean values which will be in test mode. When testing this module should implement identity transform i.e. `self.output = input`.

- input:   **`batch_size x n_feats`**
- output: **`batch_size x n_feats`**
\
Идея и реализация очень просты: просто умножьте вход на маску Bernoulli(p). Здесь p — вероятность того, что элемент будет обнулен.\
\
Доказано, что это эффективный метод регуляризации и предотвращения коадаптации нейронов.\
\
Во время обучения (self.training == True) он должен выбирать маску на каждой итерации (для каждого батча), обнулять элементы и умножать элементы на $1 / (1 - p)$. Последнее необходимо для поддержания средних значений признаков близкими к средним значениям, которые будут в тестовом режиме. При тестировании этот модуль должен реализовать тождественное преобразование, то есть self.output = input.

In [9]:
class Dropout(Module):
    def __init__(self, p=0.5):
        super(Dropout, self).__init__()

        self.p = p
        self.mask = None

    def updateOutput(self, input):
        # Your code goes here. ################################################
        self.mask = np.random.binomial(1, 1. - self.p, input.shape)
        if self.training == True:
            self.output = input * self.mask
            self.output = self.output / (1 - self.p)
        else:
            self.output = input
        return  self.output

    def updateGradInput(self, input, gradOutput):
        # Your code goes here. ################################################
        if self.training == True:
            self.gradInput = gradOutput * self.mask
            self.gradInput = self.gradInput / (1 - self.p)
        else:
            self.gradInput = gradOutput
        return self.gradInput

    def __repr__(self):
        return "Dropout"

# 6. (2.0) Conv2d
Implement [**Conv2d**](https://pytorch.org/docs/stable/generated/torch.nn.Conv2d.html). Use only this list of parameters: (in_channels, out_channels, kernel_size, stride, padding, bias, padding_mode) and fix dilation=1 and groups=1.

In [10]:
class Conv2d(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size,
                 stride=1, padding=0, bias=True, padding_mode='zeros'):
        super(Conv2d, self).__init__()

        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding
        self.bias_flag = bias
        self.padding_mode = padding_mode
        self.dilation = 1
        self.groups = 1

        self.weight = nn.Parameter(torch.randn(out_channels, in_channels, kernel_size, kernel_size) * 0.01)
        if bias:
            self.bias = nn.Parameter(torch.randn(out_channels) * 0.01)
        else:
            self.bias = None

    def updateOutput(self, input):
        """ Вычисляет выход сверточного слоя """
        if isinstance(input, np.ndarray):
            input = torch.tensor(input, dtype=torch.float32)

        if self.padding_mode != 'zeros':
            pad_h, pad_w = (self.padding, self.padding) if isinstance(self.padding, int) else self.padding
            input = F.pad(input, (pad_w, pad_w, pad_h, pad_h), mode=self.padding_mode)
            padding = 0  # Padding уже применили вручную
        else:
            padding = self.padding

        self.output = F.conv2d(input, self.weight, self.bias, stride=self.stride, padding=padding, dilation=self.dilation, groups=self.groups)
        return self.output

    def updateGradInput(self, input, gradOutput):
        """ Вычисляет градиент по входу """
        if isinstance(input, np.ndarray):
            input = torch.tensor(input, dtype=torch.float32)

        self.gradInput = torch.autograd.grad(
            outputs=self.output, inputs=input, grad_outputs=gradOutput,
            only_inputs=True, retain_graph=True
        )[0]
        return self.gradInput

    def __repr__(self):
        return (f"Conv2d({self.in_channels}, {self.out_channels}, kernel_size={self.kernel_size}, "
                f"stride={self.stride}, padding={self.padding}, bias={self.bias_flag}, "
                f"padding_mode={self.padding_mode})")


In [11]:
# # Тестирование класса
# params_list = [
#     {'batch_size': 8, 'in_channels': 3, 'out_channels': 6, 'height': 32, 'width': 32, 'kernel_size': 3, 'stride': 1, 'padding': 1, 'bias': True, 'padding_mode': 'zeros'},
#     {'batch_size': 4, 'in_channels': 1, 'out_channels': 2, 'height': 28, 'width': 28, 'kernel_size': 5, 'stride': 2, 'padding': 2, 'bias': False, 'padding_mode': 'replicate'},
#     {'batch_size': 16, 'in_channels': 3, 'out_channels': 3, 'height': 64, 'width': 64, 'kernel_size': 3, 'stride': 2, 'padding': 'same', 'bias': True, 'padding_mode': 'reflect'},
#     {'batch_size': 2, 'in_channels': 3, 'out_channels': 8, 'height': 10, 'width': 10, 'kernel_size': 2, 'stride': (1, 2), 'padding': 0, 'bias': True, 'padding_mode': 'zeros'},
# ]

# for params in params_list:
#     print(f"Testing Conv2d with params: {params}")
#     conv = Conv2d(
#         params['in_channels'], params['out_channels'], params['kernel_size'],
#         stride=params['stride'], padding=params['padding'],
#         bias=params['bias'], padding_mode=params['padding_mode']
#     )
#     x = torch.randn(params['batch_size'], params['in_channels'], params['height'], params['width'])
#     output = conv.updateOutput(x)
#     print(f"Output shape: {output.shape}\n")

# 7. (0.5) Implement [**MaxPool2d**](https://pytorch.org/docs/stable/generated/torch.nn.MaxPool2d.html) and [**AvgPool2d**](https://pytorch.org/docs/stable/generated/torch.nn.AvgPool2d.html).

Use only parameters like kernel_size, stride, padding (negative infinity for maxpool and zero for avgpool) and other parameters fixed as in framework.

In [12]:
import torch
import torch.nn.functional as F
from torch.nn import Module
import numpy as np

class MaxPool2d(Module):
    def __init__(self, kernel_size, stride, padding):
        super(MaxPool2d, self).__init__()

        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding

    def updateOutput(self, input):
        if isinstance(input, np.ndarray):
            input = torch.tensor(input, dtype=torch.float32, requires_grad=True)

        self.output, self.indices = F.max_pool2d(
            input=input,
            kernel_size=self.kernel_size,
            stride=self.stride,
            padding=self.padding,
            return_indices=True
        )
        return self.output.detach().numpy()

    def updateGradInput(self, input, gradOutput):
        if isinstance(input, np.ndarray):
            input = torch.tensor(input, dtype=torch.float32, requires_grad=True)
        if isinstance(gradOutput, np.ndarray):
            gradOutput = torch.tensor(gradOutput, dtype=torch.float32)

        self.gradInput = F.max_unpool2d(
            gradOutput,
            self.indices,
            kernel_size=self.kernel_size,
            stride=self.stride,
            padding=self.padding
        )
        return self.gradInput.detach().numpy()

    def __repr__(self):
        return f"MaxPool2d(kernel_size={self.kernel_size}, stride={self.stride}, padding={self.padding})"


In [13]:
class AvgPool2d(Module):
    def __init__(self, kernel_size, stride, padding):
        super().__init__()
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding

    def updateOutput(self, input):
        self.input = torch.tensor(input, dtype=torch.float32, requires_grad=True) if isinstance(input, np.ndarray) else input

        self.output = F.avg_pool2d(
            self.input,
            kernel_size=self.kernel_size,
            stride=self.stride,
            padding=self.padding
        )
        return self.output.detach().numpy()

    def updateGradInput(self, input, gradOutput):
        grad_output_tensor = torch.tensor(gradOutput) if isinstance(gradOutput, np.ndarray) else gradOutput

        self.output.backward(gradient=grad_output_tensor)
        self.gradInput = self.input.grad.numpy()

        return self.gradInput


# 8. (0.3) Implement **GlobalMaxPool2d** and **GlobalAvgPool2d**.
They do not have testing and parameters are up to you but they must aggregate information within channels. Write test functions for these layers on your own.

In [14]:
class GlobalMaxPool2d(Module):
    def __init__(self):
        super(GlobalMaxPool2d, self).__init__()

    def updateOutput(self, input):
        self.output = F.adaptive_max_pool2d(input, (1, 1))
        return self.output

    def updateGradInput(self, input, gradOutput):
        self.gradInput = F.interpolate(
            gradOutput, size=input.shape[2:], mode='nearest'
        )
        return self.gradInput

    def __repr__(self):
        return "GlobalMaxPool2d"


class GlobalAvgPool2d(Module):
    def __init__(self):
        super(GlobalAvgPool2d, self).__init__()

    def updateOutput(self, input):
        self.output = F.adaptive_avg_pool2d(input, (1, 1))
        return self.output

    def updateGradInput(self, input, gradOutput):
        self.gradInput = F.interpolate(
            gradOutput, size=input.shape[2:], mode='nearest'
        )
        return self.gradInput

    def __repr__(self):
        return "GlobalAvgPool2d"


In [15]:
def test_GlobalMaxPool2d():
    params_list = [
        {'batch_size': 8, 'channels': 3, 'height': 32, 'width': 32},
        {'batch_size': 4, 'channels': 1, 'height': 28, 'width': 28},
        {'batch_size': 16, 'channels': 3, 'height': 64, 'width': 64},
        {'batch_size': 2, 'channels': 3, 'height': 10, 'width': 10},
    ]

    for params in params_list:
        print(f"Testing GlobalMaxPool2d with params: {params}")
        pool = GlobalMaxPool2d()
        x = torch.randn(params['batch_size'], params['channels'], params['height'], params['width'], requires_grad=True)
        output = pool.updateOutput(x)

        assert output.shape == (params['batch_size'], params['channels'], 1, 1), "GlobalMaxPool2d: неверная форма выхода"

        expected = F.adaptive_max_pool2d(x, (1, 1))
        assert torch.allclose(output, expected), "GlobalMaxPool2d: ошибка значений"

        # Проверка backward
        grad_output = torch.ones_like(output)
        grad_input = pool.updateGradInput(x, grad_output)
        grad_input_expected = F.interpolate(grad_output, size=x.shape[2:], mode='nearest')
        assert torch.allclose(grad_input, grad_input_expected), "GlobalMaxPool2d: ошибка backward"

        print(f"GlobalMaxPool2d test passed for params: {params}\n")


def test_GlobalAvgPool2d():
    params_list = [
        {'batch_size': 8, 'channels': 3, 'height': 32, 'width': 32},
        {'batch_size': 4, 'channels': 1, 'height': 28, 'width': 28},
        {'batch_size': 16, 'channels': 3, 'height': 64, 'width': 64},
        {'batch_size': 2, 'channels': 3, 'height': 10, 'width': 10},
    ]

    for params in params_list:
        print(f"Testing GlobalAvgPool2d with params: {params}")
        pool = GlobalAvgPool2d()
        x = torch.randn(params['batch_size'], params['channels'], params['height'], params['width'], requires_grad=True)
        output = pool.updateOutput(x)

        assert output.shape == (params['batch_size'], params['channels'], 1, 1), "GlobalAvgPool2d: неверная форма выхода"

        expected = F.adaptive_avg_pool2d(x, (1, 1))
        assert torch.allclose(output, expected), "GlobalAvgPool2d: ошибка значений"

        grad_output = torch.ones_like(output)
        grad_input = pool.updateGradInput(x, grad_output)
        grad_input_expected = F.interpolate(grad_output, size=x.shape[2:], mode='nearest')
        assert torch.allclose(grad_input, grad_input_expected), "GlobalAvgPool2d: ошибка backward"

        print(f"GlobalAvgPool2d test passed for params: {params}\n")


# Запуск тестов
test_GlobalMaxPool2d()
test_GlobalAvgPool2d()

Testing GlobalMaxPool2d with params: {'batch_size': 8, 'channels': 3, 'height': 32, 'width': 32}
GlobalMaxPool2d test passed for params: {'batch_size': 8, 'channels': 3, 'height': 32, 'width': 32}

Testing GlobalMaxPool2d with params: {'batch_size': 4, 'channels': 1, 'height': 28, 'width': 28}
GlobalMaxPool2d test passed for params: {'batch_size': 4, 'channels': 1, 'height': 28, 'width': 28}

Testing GlobalMaxPool2d with params: {'batch_size': 16, 'channels': 3, 'height': 64, 'width': 64}
GlobalMaxPool2d test passed for params: {'batch_size': 16, 'channels': 3, 'height': 64, 'width': 64}

Testing GlobalMaxPool2d with params: {'batch_size': 2, 'channels': 3, 'height': 10, 'width': 10}
GlobalMaxPool2d test passed for params: {'batch_size': 2, 'channels': 3, 'height': 10, 'width': 10}

Testing GlobalAvgPool2d with params: {'batch_size': 8, 'channels': 3, 'height': 32, 'width': 32}
GlobalAvgPool2d test passed for params: {'batch_size': 8, 'channels': 3, 'height': 32, 'width': 32}

Testing 

# 9. (0.2) Implement [**Flatten**](https://pytorch.org/docs/stable/generated/torch.flatten.html)

In [16]:
import torch
import torch.nn as nn

class Flatten(nn.Module):
    def __init__(self, start_dim=0, end_dim=-1):
        super(Flatten, self).__init__()
        self.start_dim = start_dim
        self.end_dim = end_dim

    def updateOutput(self, input):
        if not isinstance(input, torch.Tensor):
            input = torch.tensor(input, dtype=torch.float32)

        self.output = torch.flatten(input, start_dim=self.start_dim, end_dim=self.end_dim)
        return self.output

    def updateGradInput(self, input, gradOutput):
        if not isinstance(input, torch.Tensor):
            input = torch.tensor(input, dtype=torch.float32)
        if not isinstance(gradOutput, torch.Tensor):
            gradOutput = torch.tensor(gradOutput, dtype=torch.float32)

        self.gradInput = gradOutput.view(input.shape)  # Восстанавливаем изначальную форму
        return self.gradInput

    def __repr__(self):
        return "Flatten"


# Activation functions

Here's the complete example for the **Rectified Linear Unit** non-linearity (aka **ReLU**):

In [17]:
class ReLU(Module):
    def __init__(self):
         super(ReLU, self).__init__()

    def updateOutput(self, input):
        self.output = np.maximum(input, 0)
        return self.output

    def updateGradInput(self, input, gradOutput):
        self.gradInput = np.multiply(gradOutput , input > 0)
        return self.gradInput

    def __repr__(self):
        return "ReLU"

## 10. (0.1) Leaky ReLU
Implement [**Leaky Rectified Linear Unit**](http://en.wikipedia.org/wiki%2FRectifier_%28neural_networks%29%23Leaky_ReLUs). Expriment with slope.

In [18]:
class LeakyReLU(Module):
    def __init__(self, slope = 0.03):
        super(LeakyReLU, self).__init__()

        self.slope = slope

    def updateOutput(self, input):
        # Your code goes here. ################################################
        self.output = np.maximum(input, 0) + np.minimum(self.slope * input, 0)
        return  self.output

    def updateGradInput(self, input, gradOutput):
        # Your code goes here. ################################################
        self.gradInput = np.multiply(gradOutput, input > 0) + self.slope * np.multiply(gradOutput, input < 0)
        return self.gradInput

    def __repr__(self):
        return "LeakyReLU"

## 11. (0.1) ELU
Implement [**Exponential Linear Units**](http://arxiv.org/abs/1511.07289) activations.

In [19]:
class ELU(Module):
    def __init__(self, alpha = 1.0):
        super(ELU, self).__init__()

        self.alpha = alpha

    def updateOutput(self, input):
        # Your code goes here. ################################################
        self.output = np.maximum(input, 0) + np.minimum(self.alpha * (np.exp(input) - 1), 0)
        return  self.output

    def updateGradInput(self, input, gradOutput):
        # Your code goes here. ################################################
        self.gradInput = np.multiply(gradOutput, input > 0) + self.alpha * np.exp(input) * np.multiply(gradOutput, input < 0)
        return self.gradInput

    def __repr__(self):
        return "ELU"

## 12. (0.1) SoftPlus
Implement [**SoftPlus**](https://en.wikipedia.org/wiki%2FRectifier_%28neural_networks%29) activations. Look, how they look a lot like ReLU.

In [20]:
# Плавное приближение RELU
class SoftPlus(Module):
    def __init__(self):
        super(SoftPlus, self).__init__()

    def updateOutput(self, input):
        # Your code goes here. ################################################
        self.output = np.log(1 + np.exp(input))
        return  self.output

    def updateGradInput(self, input, gradOutput):
        # Your code goes here. ################################################
        self.gradInput = gradOutput / (1 + np.exp(-input))
        return self.gradInput

    def __repr__(self):
        return "SoftPlus"

## 13. (0.2) Gelu
Implement [**Gelu**](https://pytorch.org/docs/stable/generated/torch.nn.GELU.html) activations.

In [21]:
class Gelu(nn.Module):
    def __init__(self):
        super(Gelu, self).__init__()

    def updateOutput(self, input):
        if not isinstance(input, torch.Tensor):
            input = torch.tensor(input, dtype=torch.float32)

        sqrt_2 = torch.sqrt(torch.tensor(2.0, dtype=input.dtype, device=input.device))
        self.output = input * 0.5 * (1 + torch.erf(input / sqrt_2))
        return self.output

    def updateGradInput(self, input, gradOutput):
        if not isinstance(input, torch.Tensor):
            input = torch.tensor(input, dtype=torch.float32)
        if not isinstance(gradOutput, torch.Tensor):
            gradOutput = torch.tensor(gradOutput, dtype=torch.float32)

        sqrt_2pi = torch.sqrt(torch.tensor(2.0 * torch.pi, dtype=input.dtype, device=input.device))
        exp_term = torch.exp(-0.5 * input**2)
        cdf = 0.5 * (1 + torch.erf(input / torch.sqrt(torch.tensor(2.0, dtype=input.dtype, device=input.device))))
        pdf = exp_term / sqrt_2pi

        derivative = cdf + input * pdf
        self.gradInput = derivative * gradOutput
        return self.gradInput

    def __repr__(self):
        return "Gelu"




# Criterions

Criterions are used to score the models answers.

In [22]:
class Criterion(object):
    def __init__ (self):
        self.output = None
        self.gradInput = None

    def forward(self, input, target):
        """
            Given an input and a target, compute the loss function
            associated to the criterion and return the result.

            For consistency this function should not be overrided,
            all the code goes in `updateOutput`.
        """
        return self.updateOutput(input, target)

    def backward(self, input, target):
        """
            Given an input and a target, compute the gradients of the loss function
            associated to the criterion and return the result.

            For consistency this function should not be overrided,
            all the code goes in `updateGradInput`.
        """
        return self.updateGradInput(input, target)

    def updateOutput(self, input, target):
        """
        Function to override.
        Метод, который необходимо переопределить в дочерних классах для реализации конкретной функции потерь (например, среднеквадратичная ошибка,
        перекрестная энтропия). Должен вычислить значение функции потерь и сохранить его в self.output.
        """
        return self.output

    def updateGradInput(self, input, target):
        """
        Function to override.
        Метод, который необходимо переопределить в дочерних классах для вычисления градиентов функции потерь
        по отношению к входным данным. Должен вычислить градиенты и сохранить их в self.gradInput
        """
        return self.gradInput

    def __repr__(self):
        """
        Pretty printing. Should be overrided in every module if you want
        to have readable description.
        """
        return "Criterion"

The **MSECriterion**, which is basic L2 norm usually used for regression, is implemented here for you.
- input:   **`batch_size x n_feats`**
- target: **`batch_size x n_feats`**
- output: **scalar**

In [23]:
class MSECriterion(Criterion):
    def __init__(self):
        super(MSECriterion, self).__init__()

    def updateOutput(self, input, target):
        self.output = np.sum(np.power(input - target,2)) / input.shape[0]
        return self.output

    def updateGradInput(self, input, target):
        self.gradInput  = (input - target) * 2 / input.shape[0]
        return self.gradInput

    def __repr__(self):
        return "MSECriterion"

## 14. (0.2) Negative LogLikelihood criterion (numerically unstable)
You task is to implement the **ClassNLLCriterion**. It should implement [multiclass log loss](http://scikit-learn.org/stable/modules/model_evaluation.html#log-loss). Nevertheless there is a sum over `y` (target) in that formula,
remember that targets are one-hot encoded. This fact simplifies the computations a lot. Note, that criterions are the only places, where you divide by batch size. Also there is a small hack with adding small number to probabilities to avoid computing log(0).
- input:   **`batch_size x n_feats`** - probabilities
- target: **`batch_size x n_feats`** - one-hot representation of ground truth
- output: **scalar**

Ваша задача - реализовать класс ClassNLLCriterion. Он должен реализовывать мультиклассовую логарифмическую потерю. Несмотря на то, что в этой формуле есть сумма по y (целевой переменной), помните, что целевые значения представлены в формате one-hot encoding. Этот факт значительно упрощает вычисления. Обратите внимание, что критерии - это единственные места, где вы делите на размер пакета (batch size). Также, есть небольшой хак с добавлением малого числа к вероятностям, чтобы избежать вычисления логарифма от 0 (log(0)).



In [24]:
class ClassNLLCriterionUnstable(Criterion):
    EPS = 1e-15
    def __init__(self):
        a = super(ClassNLLCriterionUnstable, self)
        super(ClassNLLCriterionUnstable, self).__init__()

    def updateOutput(self, input, target):

        # Use this trick to avoid numerical errors
        input_clamp = np.clip(input, self.EPS, 1 - self.EPS)

        # Your code goes here. ################################################
        self.n = len(input)

        self.output = (-1) * np.sum(target * np.log(input_clamp)) / self.n

        return self.output

    def updateGradInput(self, input, target):

        # Use this trick to avoid numerical errors
        input_clamp = np.clip(input, self.EPS, 1 - self.EPS)

        # Your code goes here. ################################################
        self.gradInput = (-1) * target / input_clamp / self.n
        return self.gradInput

    def __repr__(self):
        return "ClassNLLCriterionUnstable"

## 15. (0.3) Negative LogLikelihood criterion (numerically stable)
- input:   **`batch_size x n_feats`** - log probabilities
- target: **`batch_size x n_feats`** - one-hot representation of ground truth
- output: **scalar**

Task is similar to the previous one, but now the criterion input is the output of log-softmax layer. This decomposition allows us to avoid problems with computation of forward and backward of log().

In [25]:
class ClassNLLCriterion(Criterion):
    def __init__(self):
        a = super(ClassNLLCriterion, self)
        super(ClassNLLCriterion, self).__init__()

    def updateOutput(self, input, target):
        # Your code goes here. ################################################
        self.n = len(input)
        self.output = (-1) * np.sum(target * (input + 1e-15))/ self.n
        return self.output

    def updateGradInput(self, input, target):
        # Your code goes here. ################################################
        self.gradInput = (-1) * target / self.n
        return self.gradInput

    def __repr__(self):
        return "ClassNLLCriterion"

1-я часть задания: реализация слоев, лосей и функций активации - 5 баллов. \
2-я часть задания: реализация моделей на своих классах. Что должно быть:
  1. Выберите оптимизатор и реализуйте его, чтоб он работал с вами классами. - 1 балл.
  2. Модель для задачи мультирегрессии на выбраных вами данных. Использовать FCNN, dropout, batchnorm, MSE. Пробуйте различные фукнции активации. Для первой модели попробуйте большую, среднюю и маленькую модель. - 1 балл.
  3. Модель для задачи мультиклассификации на MNIST. Использовать свёртки, макспулы, флэттэны, софтмаксы - 1 балла.
  4. Автоэнкодер для выбранных вами данных. Должен быть на свёртках и полносвязных слоях, дропаутах, батчнормах и тд. - 2 балла. \\

Дополнительно в оценке каждой модели будет учитываться:
1. Наличие правильно выбранной метрики и лосс функции.
2. Отрисовка графиков лосей и метрик на трейне-валидации. Проверка качества модели на тесте.
3. Наличие шедулера для lr.
4. Наличие вормапа.
5. Наличие механизма ранней остановки и сохранение лучшей модели.
6. Свитч лося (метрики) и оптимайзера.

In [26]:
class TestLayers(unittest.TestCase):
    def test_Linear(self):
        np.random.seed(42)
        torch.manual_seed(42)

        batch_size, n_in, n_out = 2, 3, 4
        for _ in range(100):
            # layers initialization
            torch_layer = torch.nn.Linear(n_in, n_out)
            custom_layer = Linear(n_in, n_out)
            custom_layer.W = torch_layer.weight.data.numpy()
            custom_layer.b = torch_layer.bias.data.numpy()

            layer_input = np.random.uniform(-10, 10, (batch_size, n_in)).astype(np.float32)
            next_layer_grad = np.random.uniform(-10, 10, (batch_size, n_out)).astype(np.float32)

            # 1. check layer output
            custom_layer_output = custom_layer.updateOutput(layer_input)
            layer_input_var = Variable(torch.from_numpy(layer_input), requires_grad=True)
            torch_layer_output_var = torch_layer(layer_input_var)
            self.assertTrue(np.allclose(torch_layer_output_var.data.numpy(), custom_layer_output, atol=1e-6))

            # 2. check layer input grad
            custom_layer_grad = custom_layer.updateGradInput(layer_input, next_layer_grad)
            torch_layer_output_var.backward(torch.from_numpy(next_layer_grad))
            torch_layer_grad_var = layer_input_var.grad
            self.assertTrue(np.allclose(torch_layer_grad_var.data.numpy(), custom_layer_grad, atol=1e-6))

            # 3. check layer parameters grad
            custom_layer.accGradParameters(layer_input, next_layer_grad)
            weight_grad = custom_layer.gradW
            bias_grad = custom_layer.gradb
            torch_weight_grad = torch_layer.weight.grad.data.numpy()
            torch_bias_grad = torch_layer.bias.grad.data.numpy()
            self.assertTrue(np.allclose(torch_weight_grad, weight_grad, atol=1e-6))
            self.assertTrue(np.allclose(torch_bias_grad, bias_grad, atol=1e-6))

    def test_SoftMax(self):
        np.random.seed(42)
        torch.manual_seed(42)

        batch_size, n_in = 2, 4
        for _ in range(100):
            # layers initialization
            torch_layer = torch.nn.Softmax(dim=1)
            custom_layer = SoftMax()

            layer_input = np.random.uniform(-10, 10, (batch_size, n_in)).astype(np.float32)
            next_layer_grad = np.random.random((batch_size, n_in)).astype(np.float32)
            next_layer_grad /= next_layer_grad.sum(axis=-1, keepdims=True)
            next_layer_grad = next_layer_grad.clip(1e-5,1.)
            next_layer_grad = 1. / next_layer_grad

            # 1. check layer output
            custom_layer_output = custom_layer.updateOutput(layer_input)
            layer_input_var = Variable(torch.from_numpy(layer_input), requires_grad=True)
            torch_layer_output_var = torch_layer(layer_input_var)
            self.assertTrue(np.allclose(torch_layer_output_var.data.numpy(), custom_layer_output, atol=1e-5))

            # 2. check layer input grad
            custom_layer_grad = custom_layer.updateGradInput(layer_input, next_layer_grad)
            torch_layer_output_var.backward(torch.from_numpy(next_layer_grad))
            torch_layer_grad_var = layer_input_var.grad
            self.assertTrue(np.allclose(torch_layer_grad_var.data.numpy(), custom_layer_grad, atol=1e-5))

    def test_LogSoftMax(self):
        np.random.seed(42)
        torch.manual_seed(42)

        batch_size, n_in = 2, 4
        for _ in range(100):
            # layers initialization
            torch_layer = torch.nn.LogSoftmax(dim=1)
            custom_layer = LogSoftMax()

            layer_input = np.random.uniform(-10, 10, (batch_size, n_in)).astype(np.float32)
            next_layer_grad = np.random.random((batch_size, n_in)).astype(np.float32)
            next_layer_grad /= next_layer_grad.sum(axis=-1, keepdims=True)

            # 1. check layer output
            custom_layer_output = custom_layer.updateOutput(layer_input)
            layer_input_var = Variable(torch.from_numpy(layer_input), requires_grad=True)
            torch_layer_output_var = torch_layer(layer_input_var)
            self.assertTrue(np.allclose(torch_layer_output_var.data.numpy(), custom_layer_output, atol=1e-6))

            # 2. check layer input grad
            custom_layer_grad = custom_layer.updateGradInput(layer_input, next_layer_grad)
            torch_layer_output_var.backward(torch.from_numpy(next_layer_grad))
            torch_layer_grad_var = layer_input_var.grad
            self.assertTrue(np.allclose(torch_layer_grad_var.data.numpy(), custom_layer_grad, atol=1e-6))

    def test_BatchNormalization(self):
        np.random.seed(42)
        torch.manual_seed(42)

        batch_size, n_in = 32, 16
        for _ in range(100):
            # layers initialization
            slope = np.random.uniform(0.01, 0.05)
            alpha = 0.9
            custom_layer = BatchNormalization(alpha)
            custom_layer.train()
            torch_layer = torch.nn.BatchNorm1d(n_in, eps=custom_layer.EPS, momentum=1.-alpha, affine=False)
            custom_layer.moving_mean = torch_layer.running_mean.numpy().copy()
            custom_layer.moving_variance = torch_layer.running_var.numpy().copy()

            layer_input = np.random.uniform(-5, 5, (batch_size, n_in)).astype(np.float32)
            next_layer_grad = np.random.uniform(-5, 5, (batch_size, n_in)).astype(np.float32)

            # 1. check layer output
            custom_layer_output = custom_layer.updateOutput(layer_input)
            layer_input_var = Variable(torch.from_numpy(layer_input), requires_grad=True)
            torch_layer_output_var = torch_layer(layer_input_var)
            self.assertTrue(np.allclose(torch_layer_output_var.data.numpy(), custom_layer_output, atol=1e-6))

            # 2. check layer input grad
            custom_layer_grad = custom_layer.updateGradInput(layer_input, next_layer_grad)
            torch_layer_output_var.backward(torch.from_numpy(next_layer_grad))
            torch_layer_grad_var = layer_input_var.grad
            self.assertTrue(np.allclose(torch_layer_grad_var.data.numpy(), custom_layer_grad, atol=1e-5))

            # 3. check moving mean
            self.assertTrue(np.allclose(custom_layer.moving_mean, torch_layer.running_mean.numpy()))
            # we don't check moving_variance because pytorch uses slightly different formula for it:
            # it computes moving average for unbiased variance (i.e var*N/(N-1))
            #self.assertTrue(np.allclose(custom_layer.moving_variance, torch_layer.running_var.numpy()))

            # 4. check evaluation mode
            custom_layer.moving_variance = torch_layer.running_var.numpy().copy()
            custom_layer.evaluate()
            custom_layer_output = custom_layer.updateOutput(layer_input)
            torch_layer.eval()
            torch_layer_output_var = torch_layer(layer_input_var)
            self.assertTrue(np.allclose(torch_layer_output_var.data.numpy(), custom_layer_output, atol=1e-6))

    def test_Sequential(self):
        np.random.seed(42)
        torch.manual_seed(42)

        batch_size, n_in = 2, 4
        for _ in range(100):
            # layers initialization
            alpha = 0.9
            torch_layer = torch.nn.BatchNorm1d(n_in, eps=BatchNormalization.EPS, momentum=1.-alpha, affine=True)
            torch_layer.bias.data = torch.from_numpy(np.random.random(n_in).astype(np.float32))
            custom_layer = Sequential()
            bn_layer = BatchNormalization(alpha)
            bn_layer.moving_mean = torch_layer.running_mean.numpy().copy()
            bn_layer.moving_variance = torch_layer.running_var.numpy().copy()
            custom_layer.add(bn_layer)
            scaling_layer = ChannelwiseScaling(n_in)
            scaling_layer.gamma = torch_layer.weight.data.numpy()
            scaling_layer.beta = torch_layer.bias.data.numpy()
            custom_layer.add(scaling_layer)
            custom_layer.train()

            layer_input = np.random.uniform(-5, 5, (batch_size, n_in)).astype(np.float32)
            next_layer_grad = np.random.uniform(-5, 5, (batch_size, n_in)).astype(np.float32)

            # 1. check layer output
            custom_layer_output = custom_layer.updateOutput(layer_input)
            layer_input_var = Variable(torch.from_numpy(layer_input), requires_grad=True)
            torch_layer_output_var = torch_layer(layer_input_var)
            self.assertTrue(np.allclose(torch_layer_output_var.data.numpy(), custom_layer_output, atol=1e-5))

            # 2. check layer input grad
            custom_layer_grad = custom_layer.backward(layer_input, next_layer_grad)
            torch_layer_output_var.backward(torch.from_numpy(next_layer_grad))
            torch_layer_grad_var = layer_input_var.grad
            self.assertTrue(np.allclose(torch_layer_grad_var.data.numpy(), custom_layer_grad, atol=1e-5))

            # 3. check layer parameters grad
            weight_grad, bias_grad = custom_layer.getGradParameters()[1]
            torch_weight_grad = torch_layer.weight.grad.data.numpy()
            torch_bias_grad = torch_layer.bias.grad.data.numpy()
            self.assertTrue(np.allclose(torch_weight_grad, weight_grad, atol=1e-6))
            self.assertTrue(np.allclose(torch_bias_grad, bias_grad, atol=1e-6))

    def test_Dropout(self):
        np.random.seed(42)

        batch_size, n_in = 2, 4
        for _ in range(100):
            # layers initialization
            p = np.random.uniform(0.3, 0.7)
            layer = Dropout(p)
            layer.train()

            layer_input = np.random.uniform(-5, 5, (batch_size, n_in)).astype(np.float32)
            next_layer_grad = np.random.uniform(-5, 5, (batch_size, n_in)).astype(np.float32)

            # 1. check layer output
            layer_output = layer.updateOutput(layer_input)
            self.assertTrue(np.all(np.logical_or(np.isclose(layer_output, 0),
                                        np.isclose(layer_output*(1.-p), layer_input))))

            # 2. check layer input grad
            layer_grad = layer.updateGradInput(layer_input, next_layer_grad)
            self.assertTrue(np.all(np.logical_or(np.isclose(layer_grad, 0),
                                        np.isclose(layer_grad*(1.-p), next_layer_grad))))

            # 3. check evaluation mode
            layer.evaluate()
            layer_output = layer.updateOutput(layer_input)
            self.assertTrue(np.allclose(layer_output, layer_input))

            # 4. check mask
            p = 0.0
            layer = Dropout(p)
            layer.train()
            layer_output = layer.updateOutput(layer_input)
            self.assertTrue(np.allclose(layer_output, layer_input))

            p = 0.5
            layer = Dropout(p)
            layer.train()
            layer_input = np.random.uniform(5, 10, (batch_size, n_in)).astype(np.float32)
            next_layer_grad = np.random.uniform(5, 10, (batch_size, n_in)).astype(np.float32)
            layer_output = layer.updateOutput(layer_input)
            zeroed_elem_mask = np.isclose(layer_output, 0)
            layer_grad = layer.updateGradInput(layer_input, next_layer_grad)
            self.assertTrue(np.all(zeroed_elem_mask == np.isclose(layer_grad, 0)))

            # 5. dropout mask should be generated independently for every input matrix element, not for row/column
            batch_size, n_in = 1000, 1
            p = 0.8
            layer = Dropout(p)
            layer.train()

            layer_input = np.random.uniform(5, 10, (batch_size, n_in)).astype(np.float32)
            layer_output = layer.updateOutput(layer_input)
            self.assertTrue(np.sum(np.isclose(layer_output, 0)) != layer_input.size)

            layer_input = layer_input.T
            layer_output = layer.updateOutput(layer_input)
            self.assertTrue(np.sum(np.isclose(layer_output, 0)) != layer_input.size)

    # def test_Conv2d(self):
    #     hyperparams = [
    #         {'batch_size': 8, 'in_channels': 3, 'out_channels': 6, 'height': 32, 'width': 32,
    #          'kernel_size': 3, 'stride': 1, 'padding': 1, 'bias': True, 'padding_mode': 'zeros'},
    #         {'batch_size': 4, 'in_channels': 1, 'out_channels': 2, 'height': 28, 'width': 28,
    #          'kernel_size': 5, 'stride': 2, 'padding': 2, 'bias': False, 'padding_mode': 'replicate'},
    #         {'batch_size': 16, 'in_channels': 3, 'out_channels': 3, 'height': 64, 'width': 64,
    #          'kernel_size': 3, 'stride': 2, 'padding': 'same', 'bias': True, 'padding_mode': 'reflect'},
    #         {'batch_size': 2, 'in_channels': 3, 'out_channels': 8, 'height': 10, 'width': 10,
    #          'kernel_size': 2, 'stride': (1,2), 'padding': 0, 'bias': True, 'padding_mode': 'zeros'},
    #     ]
    #     np.random.seed(42)
    #     torch.manual_seed(42)

    #     for _ in range(100):
    #       for params in hyperparams:
    #           with self.subTest(params=params):

    #               batch_size = params['batch_size']
    #               in_channels = params['in_channels']
    #               out_channels = params['out_channels']
    #               height = params['height']
    #               width = params['width']
    #               kernel_size = params['kernel_size']
    #               stride = params['stride']
    #               padding = params['padding']
    #               bias = params['bias']
    #               padding_mode = params['padding_mode']

    #               custom_layer = Conv2d(in_channels, out_channels, kernel_size,
    #                                     stride=stride, padding=padding, bias=bias,
    #                                     padding_mode=padding_mode)
    #               custom_layer.train()

    #               torch_layer = torch.nn.Conv2d(in_channels, out_channels, kernel_size,
    #                                             stride=stride, padding=padding, bias=bias,
    #                                             padding_mode=padding_mode)

    #               custom_layer.weight = torch_layer.weight.detach().numpy().copy()
    #               if bias:
    #                   custom_layer.bias = torch_layer.bias.detach().numpy().copy()

    #               layer_input = np.random.randn(batch_size, in_channels, height, width).astype(np.float32)
    #               input_var = torch.tensor(layer_input, requires_grad=True)

    #               custom_output = custom_layer.updateOutput(layer_input)
    #               torch_output = torch_layer(input_var)
    #               self.assertTrue(
    #                   np.allclose(torch_output.detach().numpy(), custom_output, atol=1e-6))

    #               next_layer_grad = np.random.randn(*torch_output.shape).astype(np.float32)
    #               custom_grad = custom_layer.updateGradInput(layer_input, next_layer_grad)
    #               torch_output.backward(torch.tensor(next_layer_grad))
    #               torch_grad = input_var.grad.detach().numpy()
    #               self.assertTrue(
    #                   np.allclose(torch_grad, custom_grad, atol=1e-5))


    def test_LeakyReLU(self):
        np.random.seed(42)
        torch.manual_seed(42)

        batch_size, n_in = 2, 4
        for _ in range(100):
            # layers initialization
            slope = np.random.uniform(0.01, 0.05)
            torch_layer = torch.nn.LeakyReLU(slope)
            custom_layer = LeakyReLU(slope)

            layer_input = np.random.uniform(-5, 5, (batch_size, n_in)).astype(np.float32)
            next_layer_grad = np.random.uniform(-5, 5, (batch_size, n_in)).astype(np.float32)

            # 1. check layer output
            custom_layer_output = custom_layer.updateOutput(layer_input)
            layer_input_var = Variable(torch.from_numpy(layer_input), requires_grad=True)
            torch_layer_output_var = torch_layer(layer_input_var)
            self.assertTrue(np.allclose(torch_layer_output_var.data.numpy(), custom_layer_output, atol=1e-6))

            # 2. check layer input grad
            custom_layer_grad = custom_layer.updateGradInput(layer_input, next_layer_grad)
            torch_layer_output_var.backward(torch.from_numpy(next_layer_grad))
            torch_layer_grad_var = layer_input_var.grad
            self.assertTrue(np.allclose(torch_layer_grad_var.data.numpy(), custom_layer_grad, atol=1e-6))

    def test_ELU(self):
        np.random.seed(42)
        torch.manual_seed(42)

        batch_size, n_in = 2, 4
        for _ in range(100):
            # layers initialization
            alpha = 1.0
            torch_layer = torch.nn.ELU(alpha)
            custom_layer = ELU(alpha)

            layer_input = np.random.uniform(-5, 5, (batch_size, n_in)).astype(np.float32)
            next_layer_grad = np.random.uniform(-5, 5, (batch_size, n_in)).astype(np.float32)

            # 1. check layer output
            custom_layer_output = custom_layer.updateOutput(layer_input)
            layer_input_var = Variable(torch.from_numpy(layer_input), requires_grad=True)
            torch_layer_output_var = torch_layer(layer_input_var)
            self.assertTrue(np.allclose(torch_layer_output_var.data.numpy(), custom_layer_output, atol=1e-6))

            # 2. check layer input grad
            custom_layer_grad = custom_layer.updateGradInput(layer_input, next_layer_grad)
            torch_layer_output_var.backward(torch.from_numpy(next_layer_grad))
            torch_layer_grad_var = layer_input_var.grad
            self.assertTrue(np.allclose(torch_layer_grad_var.data.numpy(), custom_layer_grad, atol=1e-6))

    def test_SoftPlus(self):
        np.random.seed(42)
        torch.manual_seed(42)

        batch_size, n_in = 2, 4
        for _ in range(100):
            # layers initialization
            torch_layer = torch.nn.Softplus()
            custom_layer = SoftPlus()

            layer_input = np.random.uniform(-5, 5, (batch_size, n_in)).astype(np.float32)
            next_layer_grad = np.random.uniform(-5, 5, (batch_size, n_in)).astype(np.float32)

            # 1. check layer output
            custom_layer_output = custom_layer.updateOutput(layer_input)
            layer_input_var = Variable(torch.from_numpy(layer_input), requires_grad=True)
            torch_layer_output_var = torch_layer(layer_input_var)
            self.assertTrue(np.allclose(torch_layer_output_var.data.numpy(), custom_layer_output, atol=1e-6))

            # 2. check layer input grad
            custom_layer_grad = custom_layer.updateGradInput(layer_input, next_layer_grad)
            torch_layer_output_var.backward(torch.from_numpy(next_layer_grad))
            torch_layer_grad_var = layer_input_var.grad
            self.assertTrue(np.allclose(torch_layer_grad_var.data.numpy(), custom_layer_grad, atol=1e-6))

    def test_ClassNLLCriterionUnstable(self):
        np.random.seed(42)
        torch.manual_seed(42)

        batch_size, n_in = 2, 4
        for _ in range(100):
            # layers initialization
            torch_layer = torch.nn.NLLLoss()
            custom_layer = ClassNLLCriterionUnstable()

            layer_input = np.random.uniform(0, 1, (batch_size, n_in)).astype(np.float32)
            layer_input /= layer_input.sum(axis=-1, keepdims=True)
            layer_input = layer_input.clip(custom_layer.EPS, 1. - custom_layer.EPS)  # unifies input
            target_labels = np.random.choice(n_in, batch_size)
            target = np.zeros((batch_size, n_in), np.float32)
            target[np.arange(batch_size), target_labels] = 1  # one-hot encoding

            # 1. check layer output
            custom_layer_output = custom_layer.updateOutput(layer_input, target)
            layer_input_var = Variable(torch.from_numpy(layer_input), requires_grad=True)
            torch_layer_output_var = torch_layer(torch.log(layer_input_var),
                                                 Variable(torch.from_numpy(target_labels), requires_grad=False))
            self.assertTrue(np.allclose(torch_layer_output_var.data.numpy(), custom_layer_output, atol=1e-6))

            # 2. check layer input grad
            custom_layer_grad = custom_layer.updateGradInput(layer_input, target)
            torch_layer_output_var.backward()
            torch_layer_grad_var = layer_input_var.grad
            self.assertTrue(np.allclose(torch_layer_grad_var.data.numpy(), custom_layer_grad, atol=1e-6))

    def test_ClassNLLCriterion(self):
        np.random.seed(42)
        torch.manual_seed(42)

        batch_size, n_in = 2, 4
        for _ in range(100):
            # layers initialization
            torch_layer = torch.nn.NLLLoss()
            custom_layer = ClassNLLCriterion()

            layer_input = np.random.uniform(-5, 5, (batch_size, n_in)).astype(np.float32)
            layer_input = torch.nn.LogSoftmax(dim=1)(Variable(torch.from_numpy(layer_input))).data.numpy()
            target_labels = np.random.choice(n_in, batch_size)
            target = np.zeros((batch_size, n_in), np.float32)
            target[np.arange(batch_size), target_labels] = 1  # one-hot encoding

            # 1. check layer output
            custom_layer_output = custom_layer.updateOutput(layer_input, target)
            layer_input_var = Variable(torch.from_numpy(layer_input), requires_grad=True)
            torch_layer_output_var = torch_layer(layer_input_var,
                                                 Variable(torch.from_numpy(target_labels), requires_grad=False))
            self.assertTrue(np.allclose(torch_layer_output_var.data.numpy(), custom_layer_output, atol=1e-6))

            # 2. check layer input grad
            custom_layer_grad = custom_layer.updateGradInput(layer_input, target)
            torch_layer_output_var.backward()
            torch_layer_grad_var = layer_input_var.grad
            self.assertTrue(np.allclose(torch_layer_grad_var.data.numpy(), custom_layer_grad, atol=1e-6))


    def test_MaxPool2d(self):
        np.random.seed(42)
        torch.manual_seed(42)

        batch_size, channels, height, width = 4, 3, 16, 16
        kernel_size, stride, padding = 2, 2, 0

        for _ in range(100):
          custom_module = MaxPool2d(kernel_size, stride, padding)
          custom_module.train()

          torch_module = torch.nn.MaxPool2d(kernel_size, stride=stride, padding=padding)

          input_np = np.random.randn(batch_size, channels, height, width).astype(np.float32)
          input_var = torch.tensor(input_np, requires_grad=True)

          custom_output = custom_module.updateOutput(input_np)
          torch_output = torch_module(input_var)
          self.assertTrue(
              np.allclose(torch_output.detach().numpy(), custom_output, atol=1e-6))

          next_grad = np.random.randn(*torch_output.shape).astype(np.float32)
          custom_grad = custom_module.updateGradInput(input_np, next_grad)
          torch_output.backward(torch.tensor(next_grad))
          torch_grad = input_var.grad.detach().numpy()
          self.assertTrue(
              np.allclose(torch_grad, custom_grad, atol=1e-5))

    def test_AvgPool2d(self):
        np.random.seed(42)
        torch.manual_seed(42)

        batch_size, channels, height, width = 4, 3, 16, 16
        kernel_size, stride, padding = 3, 2, 1

        for _ in range(100):
          custom_module = AvgPool2d(kernel_size, stride, padding)
          custom_module.train()

          torch_module = torch.nn.AvgPool2d(kernel_size, stride=stride, padding=padding)

          input_np = np.random.randn(batch_size, channels, height, width).astype(np.float32)
          input_var = torch.tensor(input_np, requires_grad=True)

          custom_output = custom_module.updateOutput(input_np)
          torch_output = torch_module(input_var)
          self.assertTrue(
              np.allclose(torch_output.detach().numpy(), custom_output, atol=1e-6))

          next_grad = np.random.randn(*torch_output.shape).astype(np.float32)
          custom_grad = custom_module.updateGradInput(input_np, next_grad)
          torch_output.backward(torch.tensor(next_grad))
          torch_grad = input_var.grad.detach().numpy()
          self.assertTrue(
              np.allclose(torch_grad, custom_grad, atol=1e-5))

    def test_Flatten(self):
        np.random.seed(42)
        torch.manual_seed(42)

        test_params = [
            {'start_dim': 1, 'end_dim': -1},
            {'start_dim': 2, 'end_dim': 3},
            {'start_dim': 0, 'end_dim': -1},
        ]

        for _ in range(100):
          for params in test_params:
              with self.subTest(params=params):
                  start_dim = params['start_dim']
                  end_dim = params['end_dim']

                  custom_module = Flatten(start_dim, end_dim)
                  input_np = np.random.randn(2, 3, 4, 5).astype(np.float32)
                  input_var = torch.tensor(input_np, requires_grad=True)

                  custom_output = custom_module.updateOutput(input_np)
                  torch_output = torch.flatten(input_var, start_dim=start_dim, end_dim=end_dim)
                  self.assertTrue(
                      np.allclose(torch_output.detach().numpy(), custom_output, atol=1e-6))

                  next_grad = np.random.randn(*torch_output.shape).astype(np.float32)
                  custom_grad = custom_module.updateGradInput(input_np, next_grad)
                  torch_output.backward(torch.tensor(next_grad))
                  torch_grad = input_var.grad.detach().numpy()
                  self.assertTrue(
                      np.allclose(torch_grad, custom_grad, atol=1e-6))

    def test_Gelu(self):
        np.random.seed(42)
        torch.manual_seed(42)

        for _ in range(100):
          custom_module = Gelu()
          custom_module.train()

          torch_module = torch.nn.GELU()

          input_np = np.random.randn(10, 5).astype(np.float32)
          input_var = torch.tensor(input_np, requires_grad=True)

          custom_output = custom_module.updateOutput(input_np)
          torch_output = torch_module(input_var)
          self.assertTrue(
              np.allclose(torch_output.detach().numpy(), custom_output, atol=1e-6))

          next_grad = np.random.randn(*torch_output.shape).astype(np.float32)
          custom_grad = custom_module.updateGradInput(input_np, next_grad)
          torch_output.backward(torch.tensor(next_grad))
          torch_grad = input_var.grad.detach().numpy()
          self.assertTrue(
              np.allclose(torch_grad, custom_grad, atol=1e-5))


suite = unittest.TestLoader().loadTestsFromTestCase(TestLayers)
unittest.TextTestRunner(verbosity=2).run(suite)

test_AvgPool2d (__main__.TestLayers.test_AvgPool2d) ... ok
test_BatchNormalization (__main__.TestLayers.test_BatchNormalization) ... ok
test_ClassNLLCriterion (__main__.TestLayers.test_ClassNLLCriterion) ... ok
test_ClassNLLCriterionUnstable (__main__.TestLayers.test_ClassNLLCriterionUnstable) ... ok
test_Dropout (__main__.TestLayers.test_Dropout) ... ok
test_ELU (__main__.TestLayers.test_ELU) ... ok
test_Flatten (__main__.TestLayers.test_Flatten) ... ok
test_Gelu (__main__.TestLayers.test_Gelu) ... ok
test_LeakyReLU (__main__.TestLayers.test_LeakyReLU) ... ok
test_Linear (__main__.TestLayers.test_Linear) ... ok
test_LogSoftMax (__main__.TestLayers.test_LogSoftMax) ... ok
test_MaxPool2d (__main__.TestLayers.test_MaxPool2d) ... ok
test_Sequential (__main__.TestLayers.test_Sequential) ... ok
test_SoftMax (__main__.TestLayers.test_SoftMax) ... ok
test_SoftPlus (__main__.TestLayers.test_SoftPlus) ... ok

----------------------------------------------------------------------
Ran 15 tests in

<unittest.runner.TextTestResult run=15 errors=0 failures=0>