In [None]:
import numpy as np

**Module** is an abstract class which defines fundamental methods necessary for a training a neural network.

In [None]:
class Module(object):
    """
    Basically, you can think of a module as of a something (black box) 
    which can process `input` data and produce `ouput` data.
    This is like applying a function which is called `forward`: 
        
        output = module.forward(input)
    
    The module should be able to perform a backward pass: to differentiate the `forward` function. 
    Moreover, it should be able to differentiate it if is a part of chain (chain rule).
    The latter implies there is a gradient from previous step of a chain rule. 
    
        input_grad = module.backward(input, output_grad)
    """
    def __init__ (self):
        self._output = None
        self._input_grad = None
        self.training = True
    
    def forward(self, input):
        """
        Takes an input object, and computes the corresponding output of the module.
        """
        self._output = self._compute_output(input)
        return self._output

    def backward(self, input, output_grad):
        """
        Performs a backpropagation step through the module, with respect to the given input.
        
        This includes 
         - computing a gradient w.r.t. `input` (is needed for further backprop),
         - computing a gradient w.r.t. parameters (to update parameters while optimizing).
        """
        self._input_grad = self._compute_input_grad(input, output_grad)
        self._update_parameters_grad(input, output_grad)
        return self._input_grad
    

    def _compute_output(self, input):
        """
        Computes the output using the current parameter set of the class and input.
        This function returns the result which will be stored in the `_output` field.

        Example: in case of identity operation:
        
        output = input 
        return output
        """
        raise NotImplementedError
        

    def _compute_input_grad(self, input, output_grad):
        """
        Returns the gradient of the module with respect to its own input. 
        The shape of the returned value is always the same as the shape of `input`.
        
        Example: in case of identity operation:
        input_grad = output_grad
        return input_grad
        """
        
        raise NotImplementedError
    
    def _update_parameters_grad(self, input, output_grad):
        """
        Computing the gradient of the module with respect to its own parameters.
        No need to override if module has no parameters (e.g. ReLU).
        """
        pass
    
    def zero_grad(self): 
        """
        Zeroes `gradParams` variable if the module has params.
        """
        pass
        
    def get_parameters(self):
        """
        Returns a list with its parameters. 
        If the module does not have parameters return empty list. 
        """
        return []
        
    def get_parameters_grad(self):
        """
        Returns a list with gradients with respect to its parameters. 
        If the module does not have parameters return empty list. 
        """
        return []
    
    def train(self):
        """
        Sets training mode for the module.
        Training and testing behaviour differs for Dropout, BatchNorm.
        """
        self.training = True
    
    def evaluate(self):
        """
        Sets evaluation mode for the module.
        Training and testing behaviour differs for Dropout, BatchNorm.
        """
        self.training = False
    
    def __repr__(self):
        """
        Pretty printing. Should be overrided in every module if you want 
        to have readable description. 
        """
        return "Module"

# Layers

## 1. Batch normalization
- input:   **`batch_size x n_feats`**
- output: **`batch_size x n_feats`**

The layer works as follows. While training (`self.training == True`) it transforms input as $$y = \frac{x - \mu}  {\sqrt{\sigma + \epsilon}}$$
where $\mu$ and $\sigma$ — mean and variance of feature values in **batch** and $\epsilon$ is just a small number for numericall stability. Also during training, layer maintains exponential moving average values for mean and variance: 
```
    self.moving_mean = self.moving_mean * alpha + batch_mean * (1 - alpha)
    self.moving_variance = self.moving_variance * alpha + batch_variance * (1 - alpha)
```
During testing (`self.training == False`) the layer normalizes input using moving_mean and moving_variance. 

Note that decomposition of batch normalization on normalization itself and channelwise scaling here is just a common **implementation** choice. In general "batch normalization" always assumes normalization + scaling.

### Backpropagation
Suppose $X$ has shape $n×m$ where $n$ is number of samples in a batch and $m$ is number of nodes in the layer , $J$ is matrix of ones (default with shape $n×n$). Then the centered $X$, $\: X_c$, is: 

$$X_c = (X - \frac{1}{n} J X)$$ 

The broadcasted standard deviation matrix of $X$, $\: X_s$, is:

$$X_s = (\frac{1}{n} J X_c^{\circ^2} + \epsilon J)^{\circ^{\frac{1}{2}}}$$
where $\circ$ denotes element-wise power. $\epsilon$  is a tiny scalar to avoid division by zero.

Then the normalized $X$, $\: X_n$ is:

$$X_n = X_c \odot X_s^{\circ^{-1}}$$

where $\odot$ is element-wise product.

For column vectors $\gamma$ and $\beta$, the transformed normalized $X$,  the output of batch normalization $\hat{X}$ is:

$$\hat{X} = X_n \odot (1_n \gamma^T) + 1_n \beta^T$$

where $1_n$ is column vector of ones with proper shape.

Several trace tricks:
- $a)\: X : Y = tr(X^T Y)$
- $b)\: x = tr(x)$
- $c) x = x^T$
- $d)\: tr(aX + bY) = a tr(X) + b tr(Y)$
- $e)\: tr(XY) = tr(YX)$
- $f)\: tr(X^T) = tr(X)$
- $g)\: tr(X(Y \odot Z)) = tr((X \odot Y^T)Z)$
- $h)\: (xy^T) \odot Z = D_x Z D_y; \: D_x$ denotes diagonal matrix with $x$ on diagonal
- $i)\: X(Y \odot (1_n z^T)) = (X Y) \odot (1_n z^T)$
- $j)\: ((z 1_n^T) \odot X) Y = (z 1_n^T) \odot (X Y)$

$\mathcal{L} \:$ denotes output scalar value of a loss function

Calculating Frechet derivative of $\mathcal{L}$ with respect to $\beta$:

$
D_{\beta_0}[\mathcal{L}](h) \\ = tr(\nabla_{\hat{X}} \mathcal{L}^T \cdot D_{\beta_0}[1_n \beta^T](h)) \\ = tr(\nabla_{\hat{X}} \mathcal{L}^T \cdot 1_n h^T) \\ = tr(1_n^T \cdot \nabla_{\hat{X}} \mathcal{L} \cdot h)
$

$
\nabla_{\beta} \mathcal{L} = (1_n^T \cdot \nabla_{\hat{X}} \mathcal{L})^T 
$

Calculating Frechet derivative of $\mathcal{L}$ with respect to $\gamma$:

$
D_{\gamma_0}[\mathcal{L}](h) \\ = tr(\nabla_{\hat{X}} \mathcal{L}^T \cdot D_{\gamma_0}[X_n \odot (1_n \gamma^T)](h)) \\ = tr(\nabla_{\hat{X}} \mathcal{L}^T \cdot (X_n \odot (1_n h^T))) \\ = tr((\nabla_{\hat{X}} \mathcal{L}^T \odot X_n^T) \cdot (1_n h^T)) \\ = tr(1_n^T \cdot (\nabla_{\hat{X}} \mathcal{L} \odot X_n) \cdot h)
$

$
\nabla_{\gamma} \mathcal{L} = (1_n^T \cdot (\nabla_{\hat{X}} \mathcal{L} \odot X_n))^T
$

Calculating Frechet derivative of $\mathcal{L}$ with respect to $X_n$:

$
D_{X_{n_0}}[\mathcal{L}](H) \\ =  tr(\nabla_{\hat{X}} \mathcal{L}^T \cdot D_{X_{n_0}}[X_n \odot (1_n \gamma^T)](H)) \\ = tr(\nabla_{\hat{X}} \mathcal{L}^T \cdot (H \odot (1_n \gamma^T))) \\ = tr((\nabla_{\hat{X}} \mathcal{L}^T \odot (1_n \gamma^T)^T) \cdot H) \\ = tr((\nabla_{\hat{X}} \mathcal{L} \odot (1_n \gamma^T))^T \cdot H)
$

$
\nabla_{X_n} \mathcal{L} = \nabla_{\hat{X}} \mathcal{L} \odot (1_n \gamma^T)
$

Calculating Frechet derivative of $X_n$ with respect to $X_c$:

$
D_{X_{c_0}}[X_n](H) \\ = D_{X_{c_0}}[X_c \odot X_s^{\circ^{-1}}](H) \\ = D_{X_{c_0}}[X_c](H) \odot X_{s_0}^{\circ^{-1}} + X_{c_0} \odot D_{X_{c_0}}[X_s^{\circ^{-1}}](H) \\ = H \odot X_{s_0}^{\circ^{-1}} - X_{c_0} \odot X_{s_0}^{\circ^{-2}} \odot D_{X_{c_0}}[(\frac{1}{n} J X_c^{\circ^2} + \epsilon J)^{\circ^{\frac{1}{2}}}](H) \\ = H \odot X_{s_0}^{\circ^{-1}} - X_{c_0} \odot X_{s_0}^{\circ^{-2}} \odot (\frac{1}{2} X_{s_0}^{\circ^{-1}} \odot D_{X_{c_0}}[\frac{1}{n} J X_c^{\circ^2} + \epsilon J](H)) \\ = H \odot X_{s_0}^{\circ^{-1}} - X_{c_0} \odot X_{s_0}^{\circ^{-2}} \odot (\frac{1}{2} X_{s_0}^{\circ^{-1}} (2 \frac{1}{n} J \cdot (X_{c_0} \odot D_{X_{c_0}}[X_c](H)))) \\ = H \odot X_{s_0}^{\circ^{-1}} - \frac{1}{n} X_{n_0} \odot X_{s_0}^{\circ^{-1}} \odot (X_{s_0}^{\circ^{-1}} \odot J \cdot (X_{c_0} \odot H)) \\ = H \odot X_{s_0}^{\circ^{-1}} - \frac{1}{n} X_{n_0} \odot X_{s_0}^{\circ^{-1}} \odot J \cdot (X_{c_0} \odot X_{s_0}^{\circ^{-1}} \odot H) \\ = H \odot X_{s_0}^{\circ^{-1}} - \frac{1}{n} X_{n_0} \odot X_{s_0}^{\circ^{-1}} \odot J \cdot (X_{n_0} \odot H) \\ = X_{s_0}^{\circ^{-1}} \odot (H - \frac{1}{n} X_{n_0} \odot (J \cdot (X_{n_0} \odot H)))
$

Now we can calculate derivative of $\mathcal{L}$ with respect to $X_c$:

$
D_{X_{c_0}}[\mathcal{L}](H) \\ =  tr(\nabla_{X_n} \mathcal{L}^T \cdot D_{X_{c_0}}[X_n](H)) \\ = tr(\nabla_{X_n} \mathcal{L}^T \cdot (X_{s_0}^{\circ^{-1}} \odot (H - \frac{1}{n} X_{n_0} \odot (J \cdot (X_{n_0} \odot H))))) \\ = tr(\nabla_{X_n} \mathcal{L}^T \cdot (X_{s_0}^{\circ^{-1}} \odot H)) - \frac{1}{n} tr(\nabla_{X_n} \mathcal{L}^T \cdot (X_{s_0}^{\circ^{-1}} \odot X_{n_0} \odot (J \cdot (X_{n_0} \odot H)))) \\ = tr((\nabla_{X_n} \mathcal{L} \odot X_{s_0}^{\circ^{-1}})^T \cdot H) - \frac{1}{n} tr((\nabla_{X_n} \mathcal{L} \odot X_{s_0}^{\circ^{-1}} \odot X_{n_0})^T \cdot (J \cdot (X_{n_0} \odot H))) \\ = tr((\nabla_{X_n} \mathcal{L} \odot X_{s_0}^{\circ^{-1}})^T \cdot H) - \frac{1}{n} tr(J \cdot (\nabla_{X_n} \mathcal{L} \odot X_{s_0}^{\circ^{-1}} \odot X_{n_0})^T \cdot (X_{n_0} \odot H)) \\ = tr((\nabla_{X_n} \mathcal{L} \odot X_{s_0}^{\circ^{-1}})^T \cdot H) - \frac{1}{n} tr((X_{s_0}^{\circ^{-1}} \odot J (\nabla_{X_n} \mathcal{L} \odot X_{n_0}))^T \cdot (X_{n_0} \odot H)) \\ = tr((\nabla_{X_n} \mathcal{L} \odot X_{s_0}^{\circ^{-1}})^T \cdot H) - \frac{1}{n} tr((X_{s_0}^{\circ^{-1}} \odot X_{n_0} \odot J (\nabla_{X_n} \mathcal{L} \odot X_{n_0}))^T \cdot H) \\ = tr((X_{s_0}^{\circ^{-1}} \odot (\nabla_{X_n} \mathcal{L} - \frac{1}{n} X_{n_0} \odot J (X_{n_0} \odot \nabla_{X_n} \mathcal{L})))^T \cdot H)
$

$
\nabla_{X_c} \mathcal{L} = X_{s_0}^{\circ^{-1}} \odot (\nabla_{X_n} \mathcal{L} - \frac{1}{n} X_{n_0} \odot J (X_{n_0} \odot \nabla_{X_n} \mathcal{L}))
$

Calculating Frechet derivative of $\mathcal{L}$ with respect to $X$:

$
D_{X_0}[\mathcal{L}](H) \\ =  tr(\nabla_{X_c} \mathcal{L}^T \cdot D_{X_0}[X_c](H)) \\ = tr(\nabla_{X_c} \mathcal{L}^T \cdot D_{X_0}[X - \frac{1}{n} J X](H)) \\ = tr(\nabla_{X_c} \mathcal{L}^T \cdot (I - \frac{1}{n} J) \cdot H) \\ = tr(((I - \frac{1}{n} J) \cdot \nabla_{X_c} \mathcal{L})^T \cdot H)
$

$
\nabla_{X} \mathcal{L} = (I - \frac{1}{n} J) \cdot \nabla_{X_c} \mathcal{L} = (I - \frac{1}{n} J) \cdot (\nabla_{X_n} \mathcal{L} - \frac{1}{n} X_{n_0} \odot J (X_{n_0} \odot \nabla_{X_n} \mathcal{L})) \odot X_{s_0}^{\circ^{-1}}
$

Notice that:

$J \cdot ((J(X_{n_0} \odot \nabla_{X_n} \mathcal{L})) \odot X_{n_0}) = (J(X_{n_0} \odot \nabla_{X_n} \mathcal{L})) \odot (J \cdot X_{n_0}) = 0 \:$, as $J \cdot X_{n_0} = 0$

So:

$
\nabla_{X} \mathcal{L} \\ = X_{s_0}^{\circ^{-1}} \odot ((I - \frac{1}{n} J) \cdot \nabla_{X_n} \mathcal{L} - \frac{1}{n} X_{n_0} \odot (J (X_{n_0} \odot \nabla_{X_n} \mathcal{L}))) \\ = X_{s_0}^{\circ^{-1}} \odot (((I - \frac{1}{n} J) (\nabla_{\hat{X}} \mathcal{L} \odot (1_n \gamma^T)) - \frac{1}{n} X_{n_0} \odot (J (X_{n_0} \odot (\nabla_{\hat{X}} \mathcal{L} \odot (1_n \cdot \gamma^T)))))) \\ = X_{s_0}^{\circ^{-1}} \odot (1_n \gamma^T) \odot ((I - \frac{1}{n} J) \nabla_{\hat{X}} \mathcal{L} - \frac{1}{n} X_{n_0} \odot (J (X_{n_0} \odot \nabla_{\hat{X}} \mathcal{L})))
$

In [None]:
class BatchNormalization(Module):
    def __init__(self, alpha=0.):
        super(BatchNormalization, self).__init__()
        self.EPS = 1e-3
        self.alpha = alpha
        self.moving_mean = 0.
        self.moving_variance = 1.

    def _compute_output(self, input):
        mean = input.mean(axis=0)
        variance = input.var(axis=0)
        if self.training:
            self.moving_mean = self.moving_mean * self.alpha + mean * (1 - self.alpha)
            self.moving_variance = self.moving_variance * self.alpha + variance * (1 - self.alpha)
        else:
            mean = self.moving_mean
            variance = self.moving_variance

        output = (input - mean) / np.sqrt(variance + self.EPS)

        return output

    def _compute_input_grad(self, input, output_grad):
        n = output_grad.shape[0]
        input_centered = input - input.mean(axis=0, keepdims=True)
        input_std = np.sqrt((input_centered ** 2).mean(axis=0, keepdims=True) + self.EPS)
        input_normalized = input_centered / input_std
        
        output_grad_centered = output_grad - output_grad.mean(axis=0, keepdims=True)

        input_normalized_x_output_grad = np.sum(input_normalized * output_grad,  axis=0, keepdims=True)
        
        grad_input = (output_grad_centered - input_normalized * input_normalized_x_output_grad / n) / input_std
        return grad_input

    def __repr__(self):
        return "BatchNormalization"

In [None]:
class ChannelwiseScaling(Module):
    """
       Implements linear transform of input y = \gamma * x + \beta
       where \gamma, \beta - learnable vectors of length x.shape[-1]
    """
    def __init__(self, n_out):
        super(ChannelwiseScaling, self).__init__()

        stdv = 1./np.sqrt(n_out)
        self.gamma = np.random.uniform(-stdv, stdv, size=n_out)
        self.beta = np.random.uniform(-stdv, stdv, size=n_out)
        
        self.gradGamma = np.zeros_like(self.gamma)
        self.gradBeta = np.zeros_like(self.beta)

    def _compute_output(self, input):
        output = input * self.gamma + self.beta
        return output
        
    def _compute_input_grad(self, input, output_grad):
        grad_input = output_grad * self.gamma
        return grad_input
    
    def _update_parameters_grad(self, input, output_grad):
        self.gradBeta = np.sum(output_grad, axis=0)
        self.gradGamma = np.sum(output_grad*input, axis=0)
    
    def zero_grad(self):
        self.gradGamma.fill(0)
        self.gradBeta.fill(0)
        
    def get_parameters(self):
        return [self.gamma, self.beta]
    
    def get_parameters_grad(self):
        return [self.gradGamma, self.gradBeta]
    
    def __repr__(self):
        return "ChannelwiseScaling"

## 2. Dropout
Implementation of [**dropout**](https://www.cs.toronto.edu/~hinton/absps/JMLRdropout.pdf). The idea and implementation is really simple: just multimply the input by $Bernoulli(p)$ mask. Here $p$ is probability of an element to be zeroed.

This has proven to be an effective technique for regularization and preventing the co-adaptation of neurons.

While training (`self.training == True`) it samples a mask on each iteration (for every batch), zero out elements and multiply elements by $1 / (1 - p)$. The latter is needed for keeping mean values of features close to mean values which will be in test mode. When testing this module uses identity transform i.e. `output = input`.

- input:   **`batch_size x n_feats`**
- output: **`batch_size x n_feats`**

### Backpropagation

$
\hat{X} = X \odot M
$

$
D_{X_0}[\mathcal{L}](H) \\ =  tr(\nabla_{\hat{X}} \mathcal{L}^T \cdot D_{X_0}[\hat{X}](H)) \\ = tr(\nabla_{\hat{X}} \mathcal{L}^T \cdot D_{X_0}[X \odot M](H)) \\ = tr(\nabla_{\hat{X}} \mathcal{L}^T \cdot (M \odot H)) \\ = tr((\nabla_{\hat{X}} \mathcal{L}^T \odot M^T) \cdot H) \\ = tr((\nabla_{\hat{X}} \mathcal{L} \odot M)^T \cdot H))
$

$
\nabla_{X} \mathcal{L} = \nabla_{\hat{X}} \mathcal{L} \odot M
$

In [None]:
class Dropout(Module):
  
    def __init__(self, p=0.5):
        super(Dropout, self).__init__()
        
        self.p = p
        self.mask = []
        
    def _compute_output(self, input):
        n = input.shape[0]

        if self.training:
            self.mask = np.random.binomial(1, 1 - self.p, input.shape) / (1 - self.p)
        else:
            self.mask = np.ones_like(input)
        output = input * self.mask
        return output
    
    def _compute_input_grad(self, input, output_grad):
        grad_input = output_grad * self.mask
        return grad_input
        
    def __repr__(self):
        return "Dropout"

## 3. Conv2d
Implementation of something like pytorch Conv2d layer with `stride=1` and zero-padding outside of image using `scipy.signal.correlate` function.

* input: `batch_size x in_channels x h x w`
* output: `batch_size x out_channels x h x w`

### Forward

$$
\begin{cases}
  Y_1 = B_1 + X_1 \star K_{11} + ... + X_n \star K_{1n} \\
  Y_2 = B_2 + X_1 \star K_{21} + ... + X_n \star K_{2n} \\
  ... \\
  Y_d = B_d + X_1 \star K_{d1} + ... + X_n \star K_{dn} \\
\end{cases}
$$
where $\star$ denotes cross-correlation operator.


### Backpropagation

$$
\begin{cases}
  \dfrac{\partial \mathcal{L}}{\partial K_{ij}} = X_j \star \dfrac{\partial \mathcal{L}}{\partial Y_i} \\
  \dfrac{\partial \mathcal{L}}{\partial B_i} = \dfrac{\partial \mathcal{L}}{\partial Y_i} \\
  \dfrac{\partial \mathcal{L}}{\partial X_j} = \sum_{i=1}^{d} \dfrac{\partial \mathcal{L}}{\partial Y_i} \ast K_{ij}
\end{cases}
$$
where $\ast$ denotes full-convolution operator.

In [None]:
import scipy as sp
import scipy.signal
import skimage

class Conv2d(Module):
    def __init__(self, in_channels, out_channels, kernel_size):
        super(Conv2d, self).__init__()
        assert kernel_size % 2 == 1, kernel_size
       
        stdv = 1./np.sqrt(in_channels)
        self.W = np.random.uniform(-stdv, stdv, size = (out_channels, in_channels, kernel_size, kernel_size))
        self.b = np.random.uniform(-stdv, stdv, size=(out_channels,))
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size
        
        self.gradW = np.zeros_like(self.W)
        self.gradb = np.zeros_like(self.b)
        
    def _compute_output(self, input):
        pad_size = self.kernel_size // 2
        n, c, w, h = input.shape
        input = np.pad(input, ((0, 0), (0, 0), (pad_size, pad_size), (pad_size, pad_size)), 
                       'constant', constant_values=0)
        
        self._output = np.zeros((n, self.out_channels, w, h))
        self._output += np.repeat(self.b[np.newaxis, :, np.newaxis, np.newaxis], n, axis=0)

        for sample_index in range(n):
            for out_channel_index in range(self.out_channels):
                for in_channel_index in range(self.in_channels):
                    self._output[sample_index][out_channel_index] += \
                        scipy.signal.correlate2d(input[sample_index][in_channel_index], 
                                                 self.W[out_channel_index, in_channel_index], "valid")

        return self._output
    
    def _compute_input_grad(self, input, gradOutput):
        pad_size = self.kernel_size // 2
        n, c, w, h = input.shape
        self._input_grad = np.zeros_like(input)
        gradOutput = np.pad(gradOutput, ((0, 0), (0, 0), (pad_size, pad_size), (pad_size, pad_size)), 
                            'constant', constant_values=0)

        for sample_index in range(n):
            for out_channel_index in range(self.out_channels):
                for in_channel_index in range(self.in_channels):
                    self._input_grad[sample_index][in_channel_index] += \
                        scipy.signal.convolve2d(gradOutput[sample_index][out_channel_index], 
                                                self.W[out_channel_index, in_channel_index], "valid")

        return self._input_grad
    
    def accGradParameters(self, input, gradOutput):
        pad_size = self.kernel_size // 2
        n, c, w, h = input.shape
        input = np.pad(input, ((0, 0), (0, 0), (pad_size, pad_size), (pad_size, pad_size)), 
                       'constant', constant_values=0)

        for sample_index in range(n):
            for out_channel_index in range(self.out_channels):
                for in_channel_index in range(self.in_channels):
                    self.gradW[out_channel_index, in_channel_index] += \
                        scipy.signal.correlate2d(input[sample_index][in_channel_index], 
                                                 gradOutput[sample_index][out_channel_index], "valid")

        self.gradb = np.sum(gradOutput, axis=(0, 2, 3))
        pass
    
    def zeroGradParameters(self):
        self.gradW.fill(0)
        self.gradb.fill(0)
        
    def getParameters(self):
        return [self.W, self.b]
    
    def getGradParameters(self):
        return [self.gradW, self.gradb]
    
    def __repr__(self):
        s = self.W.shape
        q = 'Conv2d %d -> %d' %(s[1],s[0])
        return q