In [2]:
import numpy as np

**Module** is an abstract class which defines fundamental methods necessary for a training a neural network. You do not need to change anything here, just read the comments.

In [3]:
class Module(object):
    """
    Basically, you can think of a module as of a something (black box)
    which can process `input` data and produce `ouput` data.
    This is like applying a function which is called `forward`:

        output = module.forward(input)

    The module should be able to perform a backward pass: to differentiate the `forward` function.
    Moreover, it should be able to differentiate it if is a part of chain (chain rule).
    The latter implies there is a gradient from previous step of a chain rule.

        input_grad = module.backward(input, output_grad)
    """
    def __init__ (self):
        self._output = None
        self._input_grad = None
        self.training = True

    def forward(self, input):
        """
        Takes an input object, and computes the corresponding output of the module.
        """
        self._output = self._compute_output(input)
        return self._output

    def backward(self, input, output_grad):
        """
        Performs a backpropagation step through the module, with respect to the given input.

        This includes
         - computing a gradient w.r.t. `input` (is needed for further backprop),
         - computing a gradient w.r.t. parameters (to update parameters while optimizing).
        """
        self._input_grad = self._compute_input_grad(input, output_grad)
        self._update_parameters_grad(input, output_grad)
        return self._input_grad


    def _compute_output(self, input):
        """
        Computes the output using the current parameter set of the class and input.
        This function returns the result which will be stored in the `_output` field.

        Example: in case of identity operation:

        output = input
        return output
        """
        raise NotImplementedError


    def _compute_input_grad(self, input, output_grad):
        """
        Returns the gradient of the module with respect to its own input.
        The shape of the returned value is always the same as the shape of `input`.

        Example: in case of identity operation:
        input_grad = output_grad
        return input_grad
        """

        raise NotImplementedError

    def _update_parameters_grad(self, input, output_grad):
        """
        Computing the gradient of the module with respect to its own parameters.
        No need to override if module has no parameters (e.g. ReLU).
        """
        pass

    def zero_grad(self):
        """
        Zeroes `gradParams` variable if the module has params.
        """
        pass

    def get_parameters(self):
        """
        Returns a list with its parameters.
        If the module does not have parameters return empty list.
        """
        return []

    def get_parameters_grad(self):
        """
        Returns a list with gradients with respect to its parameters.
        If the module does not have parameters return empty list.
        """
        return []

    def train(self):
        """
        Sets training mode for the module.
        Training and testing behaviour differs for Dropout, BatchNorm.
        """
        self.training = True

    def evaluate(self):
        """
        Sets evaluation mode for the module.
        Training and testing behaviour differs for Dropout, BatchNorm.
        """
        self.training = False

    def __repr__(self):
        """
        Pretty printing. Should be overrided in every module if you want
        to have readable description.
        """
        return "Module"

# Layers

## 1. Batch normalization
One of the most significant recent ideas that impacted NNs a lot is [**Batch normalization**](http://arxiv.org/abs/1502.03167). The idea is simple, yet effective: the features should be whitened ($mean = 0$, $std = 1$) all the way through NN. This improves the convergence for deep models letting it train them for days but not weeks. **You are** to implement the first part of the layer: features normalization. The second part (`ChannelwiseScaling` layer) is implemented below.

- input:   **`batch_size x n_feats`**
- output: **`batch_size x n_feats`**

The layer should work as follows. While training (`self.training == True`) it transforms input as $$y = \frac{x - \mu}  {\sqrt{\sigma + \epsilon}}$$
where $\mu$ and $\sigma$ — mean and variance of feature values in **batch** and $\epsilon$ is just a small number for numericall stability. Also during training, layer should maintain exponential moving average values for mean and variance:
```
    self.moving_mean = self.moving_mean * alpha + batch_mean * (1 - alpha)
    self.moving_variance = self.moving_variance * alpha + batch_variance * (1 - alpha)
```
During testing (`self.training == False`) the layer normalizes input using moving_mean and moving_variance.

Note that decomposition of batch normalization on normalization itself and channelwise scaling here is just a common **implementation** choice. In general "batch normalization" always assumes normalization + scaling.

In [4]:
class BatchNormalization(Module):
    EPS = 1e-3

    def __init__(self, alpha=0.):
        super(BatchNormalization, self).__init__()
        self.alpha = alpha
        self.moving_mean = 0.
        self.moving_variance = 1.

    def _compute_output(self, input):
        input_mean = np.mean(input, axis=0)
        input_var = np.var(input, axis=0)
        if self.training:
            self.moving_mean = self.moving_mean * self.alpha + input_mean * (1 - self.alpha)
            self.moving_variance = self.moving_variance * self.alpha + input_var * (1 - self.alpha)
            output = (input - input_mean) / np.sqrt(BatchNormalization.EPS + input_var)
        else:
            output = (input - self.moving_mean) / np.sqrt(BatchNormalization.EPS + self.moving_variance)
        return output


    def _compute_input_grad(self, input, output_grad):
        m = input.shape[0]
        input_mean = np.mean(input, axis=0)
        input_var = np.var(input, axis=0)
        output = (input - input_mean) / np.sqrt(BatchNormalization.EPS + input_var)
        grad_input = (1 / np.sqrt(input_var + BatchNormalization.EPS) / m) * \
         (output_grad * m - np.sum(output_grad, axis=0) - output * np.sum(output_grad * output, axis=0))
        return grad_input

    def __repr__(self):
        return "BatchNormalization"

In [5]:
class ChannelwiseScaling(Module):
    """
       Implements linear transform of input y = \gamma * x + \beta
       where \gamma, \beta - learnable vectors of length x.shape[-1]
    """
    def __init__(self, n_out):
        super(ChannelwiseScaling, self).__init__()

        stdv = 1./np.sqrt(n_out)
        self.gamma = np.random.uniform(-stdv, stdv, size=n_out)
        self.beta = np.random.uniform(-stdv, stdv, size=n_out)

        self.gradGamma = np.zeros_like(self.gamma)
        self.gradBeta = np.zeros_like(self.beta)

    def _compute_output(self, input):
        output = input * self.gamma + self.beta
        return output

    def _compute_input_grad(self, input, output_grad):
        grad_input = output_grad * self.gamma
        return grad_input

    def _update_parameters_grad(self, input, output_grad):
        self.gradBeta = np.sum(output_grad, axis=0)
        self.gradGamma = np.sum(output_grad*input, axis=0)

    def zero_grad(self):
        self.gradGamma.fill(0)
        self.gradBeta.fill(0)

    def get_parameters(self):
        return [self.gamma, self.beta]

    def get_parameters_grad(self):
        return [self.gradGamma, self.gradBeta]

    def __repr__(self):
        return "ChannelwiseScaling"

Practical notes. If BatchNormalization is placed after a linear transformation layer (including dense layer, convolutions, channelwise scaling) that implements function like `y = weight * x + bias`, than bias adding become useless and could be omitted since its effect will be discarded while batch mean subtraction. If BatchNormalization (followed by `ChannelwiseScaling`) is placed before a layer that propagates scale (including ReLU, LeakyReLU) followed by any linear transformation layer than parameter `gamma` in `ChannelwiseScaling` could be freezed since it could be absorbed into the linear transformation layer.

## 2. Dropout
Implement [**dropout**](https://www.cs.toronto.edu/~hinton/absps/JMLRdropout.pdf). The idea and implementation is really simple: just multimply the input by $Bernoulli(p)$ mask. Here $p$ is probability of an element to be zeroed.

This has proven to be an effective technique for regularization and preventing the co-adaptation of neurons.

While training (`self.training == True`) it should sample a mask on each iteration (for every batch), zero out elements and multiply elements by $1 / (1 - p)$. The latter is needed for keeping mean values of features close to mean values which will be in test mode. When testing this module should implement identity transform i.e. `output = input`.

- input:   **`batch_size x n_feats`**
- output: **`batch_size x n_feats`**

In [6]:
class Dropout(Module):
    def __init__(self, p=0.5):
        super(Dropout, self).__init__()
        self.p = p
        self.mask = []
        self.coef = 1 / (1-self.p)

    def _compute_output(self, input):
        if self.training:
            self.mask = np.random.binomial(1, 1 - self.p, size=input.shape)
            output = input * self.mask * self.coef
        else:
            output = input
        return output

    def _compute_input_grad(self, input, output_grad):
        grad_input = output_grad * self.mask * self.coef  if self.training else output_grad
        return grad_input

    def __repr__(self):
        return "Dropout"

## 3. Conv2d

* input: `batch_size x in_channels x h x w`
* output: `batch_size x out_channels x h x w`

You should implement something like pytorch Conv2d layer with `stride=1` and zero-padding outside of image using `scipy.signal.correlate` function.

**Practical notes:**

* While the layer name is "convolution", the most of neural network frameworks (including tensorflow and pytorch) implement operation that is called [correlation](https://en.wikipedia.org/wiki/Cross-correlation#Cross-correlation_of_deterministic_signals) in signal processing theory. **So don't use** `scipy.signal.convolve` since it implements convolution in terms of signal processing.
* It may be convenient to use numpy.pad for zero-padding.
* It's rather ok to implement convolution over 4d array using 2 nested loops: one over batch size dimension and another one over output filters dimension

In [45]:
import scipy as sp
import scipy.signal
import skimage

class Conv2d(Module):
    def __init__(self, in_channels, out_channels, kernel_size):
        super(Conv2d, self).__init__()
        assert kernel_size % 2 == 1, kernel_size

        stdv = 1./np.sqrt(in_channels)
        self.W = np.random.uniform(-stdv, stdv, size = (out_channels, in_channels, kernel_size, kernel_size))
        self.b = np.random.uniform(-stdv, stdv, size=(out_channels,))
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size

        self.gradW = np.zeros_like(self.W)
        self.gradb = np.zeros_like(self.b)

    def _compute_output(self, input):
        pad_size = self.kernel_size // 2
        batch_size, in_channels, h, w = input.shape
        input_pad = np.pad(input, ((0, 0), (0, 0), (pad_size, pad_size), (pad_size, pad_size)), 'constant', constant_values=0)
        self._output = np.zeros((batch_size, self.out_channels, h, w))
        for i in np.arange(batch_size):
            for o in np.arange(self.out_channels):
                out_map = np.zeros((h, w))
                for j in np.arange(in_channels):
                    out_map += scipy.signal.correlate(input_pad[i, j], self.W[o, j], mode='valid')
                self._output[i, o] = out_map + self.b[o]
        return self._output


    def _compute_input_grad(self, input, gradOutput):
        pad_size = self.kernel_size // 2
        batch_size, in_channels, h, w = input.shape
        _, out_channels, h_out, w_out = gradOutput.shape
        gradOutput_pad = np.pad(gradOutput, ((0, 0), (0, 0), (pad_size, pad_size), (pad_size, pad_size)), 'constant')
        self._input_grad = np.zeros((batch_size, in_channels, h, w))

        for i in range(batch_size):
            for j in range(in_channels):
                input_grad_channel = np.zeros((h, w))
                for o in range(out_channels):
                    flipped_weights = np.flip(self.W[o, j], axis=(0, 1))
                    input_grad_channel += scipy.signal.correlate(gradOutput_pad[i, o], flipped_weights, mode='valid')
                self._input_grad[i, j] = input_grad_channel

        return self._input_grad


    def accGradParameters(self, input, gradOutput):
        pad_size = self.kernel_size // 2
        batch_size, in_channels, h, w = input.shape
        _, out_channels, h_out, w_out = gradOutput.shape
        input_pad = np.pad(input, ((0, 0), (0, 0), (pad_size, pad_size), (pad_size, pad_size)), 'constant')

        self.gradW = np.zeros((out_channels, in_channels, self.kernel_size, self.kernel_size))
        self.gradb = np.zeros_like(self.b)

        for i in np.arange(batch_size):
            for o in np.arange(out_channels):
                for j in np.arange(in_channels):
                    self.gradW[o, j] += scipy.signal.correlate(input_pad[i, j], gradOutput[i, o], mode='valid')
                self.gradb[o] += np.sum(gradOutput[i, o])


    def zeroGradParameters(self):
        self.gradW.fill(0)
        self.gradb.fill(0)

    def getParameters(self):
        return [self.W, self.b]

    def getGradParameters(self):
        return [self.gradW, self.gradb]

    def __repr__(self):
        s = self.W.shape
        q = 'Conv2d %d -> %d' %(s[1],s[0])
        return q

In [47]:
import torch
from torch.autograd import Variable
import numpy as np
import unittest

In [48]:
class TestLayers(unittest.TestCase):

    def test_BatchNormalization(self):
        np.random.seed(42)
        torch.manual_seed(42)

        batch_size, n_in = 32, 16
        for _ in range(100):
            # layers initialization
            slope = np.random.uniform(0.01, 0.05)
            alpha = 0.9
            custom_layer = BatchNormalization(alpha)
            custom_layer.train()
            torch_layer = torch.nn.BatchNorm1d(n_in, eps=custom_layer.EPS, momentum=1.-alpha, affine=False)
            custom_layer.moving_mean = torch_layer.running_mean.numpy().copy()
            custom_layer.moving_variance = torch_layer.running_var.numpy().copy()

            layer_input = np.random.uniform(-5, 5, (batch_size, n_in)).astype(np.float32)
            next_layer_grad = np.random.uniform(-5, 5, (batch_size, n_in)).astype(np.float32)

            # 1. check layer output
            custom_layer_output = custom_layer.forward(layer_input)
            layer_input_var = Variable(torch.from_numpy(layer_input), requires_grad=True)
            torch_layer_output_var = torch_layer(layer_input_var)
            self.assertTrue(np.allclose(torch_layer_output_var.data.numpy(), custom_layer_output, atol=1e-6))

            # 2. check layer input grad
            custom_layer_grad = custom_layer.backward(layer_input, next_layer_grad)
            torch_layer_output_var.backward(torch.from_numpy(next_layer_grad))
            torch_layer_grad_var = layer_input_var.grad
            # please, don't increase `atol` parameter, it's garanteed that you can implement batch norm layer
            # with tolerance 1e-5
            self.assertTrue(np.allclose(torch_layer_grad_var.data.numpy(), custom_layer_grad, atol=1e-5))

            # 3. check moving mean
            self.assertTrue(np.allclose(custom_layer.moving_mean, torch_layer.running_mean.numpy()))
            # we don't check moving_variance because pytorch uses slightly different formula for it:
            # it computes moving average for unbiased variance (i.e var*N/(N-1))
            #self.assertTrue(np.allclose(custom_layer.moving_variance, torch_layer.running_var.numpy()))

            # 4. check evaluation mode
            custom_layer.moving_variance = torch_layer.running_var.numpy().copy()
            custom_layer.evaluate()
            custom_layer_output = custom_layer.forward(layer_input)
            torch_layer.eval()
            torch_layer_output_var = torch_layer(layer_input_var)
            self.assertTrue(np.allclose(torch_layer_output_var.data.numpy(), custom_layer_output, atol=1e-6))

    def test_Dropout(self):
        np.random.seed(42)

        batch_size, n_in = 2, 4
        for _ in range(100):
            # layers initialization
            p = np.random.uniform(0.3, 0.7)
            layer = Dropout(p)
            layer.train()

            layer_input = np.random.uniform(-5, 5, (batch_size, n_in)).astype(np.float32)
            next_layer_grad = np.random.uniform(-5, 5, (batch_size, n_in)).astype(np.float32)

            # 1. check layer output
            layer_output = layer.forward(layer_input)
            self.assertTrue(np.all(np.logical_or(np.isclose(layer_output, 0),
                                        np.isclose(layer_output*(1.-p), layer_input))))

            # 2. check layer input grad
            layer_grad = layer.backward(layer_input, next_layer_grad)
            self.assertTrue(np.all(np.logical_or(np.isclose(layer_grad, 0),
                                        np.isclose(layer_grad*(1.-p), next_layer_grad))))

            # 3. check evaluation mode
            layer.evaluate()
            layer_output = layer.forward(layer_input)
            self.assertTrue(np.allclose(layer_output, layer_input))

            # 4. check mask
            p = 0.0
            layer = Dropout(p)
            layer.train()
            layer_output = layer.forward(layer_input)
            self.assertTrue(np.allclose(layer_output, layer_input))

            p = 0.5
            layer = Dropout(p)
            layer.train()
            layer_input = np.random.uniform(5, 10, (batch_size, n_in)).astype(np.float32)
            next_layer_grad = np.random.uniform(5, 10, (batch_size, n_in)).astype(np.float32)
            layer_output = layer.forward(layer_input)
            zeroed_elem_mask = np.isclose(layer_output, 0)
            layer_grad = layer.backward(layer_input, next_layer_grad)
            self.assertTrue(np.all(zeroed_elem_mask == np.isclose(layer_grad, 0)))

            # 5. dropout mask should be generated independently for every input matrix element, not for row/column
            batch_size, n_in = 1000, 1
            p = 0.8
            layer = Dropout(p)
            layer.train()

            layer_input = np.random.uniform(5, 10, (batch_size, n_in)).astype(np.float32)
            layer_output = layer.forward(layer_input)
            self.assertTrue(np.sum(np.isclose(layer_output, 0)) != layer_input.size)

            layer_input = layer_input.T
            layer_output = layer.forward(layer_input)
            self.assertTrue(np.sum(np.isclose(layer_output, 0)) != layer_input.size)

    def test_Conv2d(self):
        np.random.seed(42)
        torch.manual_seed(42)

        batch_size, n_in, n_out = 2, 3, 4
        h,w = 5,6
        kern_size = 3
        for _ in range(100):
            # layers initialization
            torch_layer = torch.nn.Conv2d(n_in, n_out, kern_size, padding=1)
            custom_layer = Conv2d(n_in, n_out, kern_size)
            custom_layer.W = torch_layer.weight.data.numpy() # [n_out, n_in, kern, kern]
            custom_layer.b = torch_layer.bias.data.numpy()

            layer_input = np.random.uniform(-1, 1, (batch_size, n_in, h,w)).astype(np.float32)
            next_layer_grad = np.random.uniform(-1, 1, (batch_size, n_out, h, w)).astype(np.float32)

            # 1. check layer output
            custom_layer_output = custom_layer._compute_output(layer_input)
            layer_input_var = Variable(torch.from_numpy(layer_input), requires_grad=True)
            torch_layer_output_var = torch_layer(layer_input_var)
            self.assertTrue(np.allclose(torch_layer_output_var.data.numpy(), custom_layer_output, atol=1e-6))

            # 2. check layer input grad
            custom_layer_grad = custom_layer._compute_input_grad(layer_input, next_layer_grad)
            torch_layer_output_var.backward(torch.from_numpy(next_layer_grad))
            torch_layer_grad_var = layer_input_var.grad
            self.assertTrue(np.allclose(torch_layer_grad_var.data.numpy(), custom_layer_grad, atol=1e-6))

            # 3. check layer parameters grad
            custom_layer.accGradParameters(layer_input, next_layer_grad)
            weight_grad = custom_layer.gradW
            bias_grad = custom_layer.gradb
            torch_weight_grad = torch_layer.weight.grad.data.numpy()
            torch_bias_grad = torch_layer.bias.grad.data.numpy()
            #m = ~np.isclose(torch_weight_grad, weight_grad, atol=1e-5)
            self.assertTrue(np.allclose(torch_weight_grad, weight_grad, atol=1e-6, ))
            self.assertTrue(np.allclose(torch_bias_grad, bias_grad, atol=1e-6))

suite = unittest.TestLoader().loadTestsFromTestCase(TestLayers)
unittest.TextTestRunner(verbosity=2).run(suite)

test_BatchNormalization (__main__.TestLayers) ... ok
test_Conv2d (__main__.TestLayers) ... ok
test_Dropout (__main__.TestLayers) ... ok

----------------------------------------------------------------------
Ran 3 tests in 1.062s

OK


<unittest.runner.TextTestResult run=3 errors=0 failures=0>