# NICE: Non-linear Independent Components Estimation

We will apply this algorithm to the MNIST data set to generate new handwritten digits.

## Table of Contents

1. [Hyperparameters](#1st)
2. [Import libraries, define classes and functions](#2nd)
3. [Data preprocessing](#3rd)
4. [Model](#4th)
5. [Training](#5th)
6. [Nearest neighbour](#6th)
7. [References](#7th)

<div id='1st'/>

## 1. Hyperparameters

In [22]:
cfg = {
  'MODEL_SAVE_PATH': './save/models/',

  'USE_CUDA': False, #True,

  'TRAIN_BATCH_SIZE': 256,

  'TRAIN_EPOCHS': 2, # 75 initially!!

  'NUM_COUPLING_LAYERS': 4,

  'NUM_NET_LAYERS': 4,  # 6 initially # neural net layers for each coupling layer

  'NUM_HIDDEN_UNITS': 500 # 1000
}

<div id='2nd'/>

## 2. Import libraries, define classes and functions

In [10]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Distribution, Uniform
import numpy as np
import os
import torch.optim as optim
from torchvision import transforms, datasets
from torchvision.utils import save_image
import sys
import matplotlib.pyplot as plt
import pylab
os.environ['KMP_DUPLICATE_LIB_OK']='True'

The **additive coupling layer**, after deciding the partitions of the image space $I_1$ and $I_2$, basically goes as follows:
$$y_{I_1} = x_{I_1}$$
$$y_{I_2} = x_{I_2} + m(x_{I_1})$$
This makes sure that the transformation is invertible. The way this is done in the class is by using masks so that we can basically choose any convenient partition we want.

In [23]:
class CouplingLayer(nn.Module):
  """
  Implementation of the additive coupling layer from section 3.2 of the NICE
  paper.
  """

  def __init__(self, data_dim, hidden_dim, mask, num_layers=4):
    super().__init__()

    assert data_dim % 2 == 0

    self.mask = mask

    modules = [nn.Linear(data_dim, hidden_dim), nn.LeakyReLU(0.2)]
    for i in range(num_layers - 2):
      modules.append(nn.Linear(hidden_dim, hidden_dim))
      modules.append(nn.LeakyReLU(0.2))
    modules.append(nn.Linear(hidden_dim, data_dim))

    # the function m of the coupling layer is a neural net.
    self.m = nn.Sequential(*modules)

  def forward(self, x, logdet, invert=False):
    if not invert:
      x1, x2 = self.mask * x, (1. - self.mask) * x
      y1, y2 = x1, x2 + (self.m(x1) * (1. - self.mask))
      return y1 + y2, logdet

    # Inverse additive coupling layer
    y1, y2 = self.mask * x, (1. - self.mask) * x
    x1, x2 = y1, y2 - (self.m(y1) * (1. - self.mask))
    return x1 + x2, logdet

The **scaling layer** is only scaling each element of the transformation by a factor:
$$y_i = S_i x_i$$

In [26]:
class ScalingLayer(nn.Module):
  """
  Implementation of the scaling layer from section 3.3 of the NICE paper.
  """
  def __init__(self, data_dim):
    super().__init__()
    self.log_scale_vector = nn.Parameter(torch.randn(1, data_dim, requires_grad=True))

  def forward(self, x, logdet, invert=False):
    log_det_jacobian = torch.sum(self.log_scale_vector)

    if invert:
        return torch.exp(- self.log_scale_vector) * x, logdet - log_det_jacobian

    return torch.exp(self.log_scale_vector) * x, logdet + log_det_jacobian

In [25]:
class LogisticDistribution(Distribution):
  def __init__(self):
    super().__init__()

  def log_prob(self, x):
    return -(F.softplus(x) + F.softplus(-x))

  def sample(self, size):
    if cfg['USE_CUDA']:
      z = Uniform(torch.cuda.FloatTensor([0.]), torch.cuda.FloatTensor([1.])).sample(size)
    else:
      z = Uniform(torch.FloatTensor([0.]), torch.FloatTensor([1.])).sample(size)

    return torch.log(z) - torch.log(1. - z)

<div id='3rd'/>

## 3. Data preprocessing

In [20]:
# Data preprocessing:
transform = transforms.ToTensor()

dataset = datasets.MNIST(root='./data/mnist', train=True, transform=transform, download=True)
test = datasets.MNIST(root='./data/mnist', train=False, transform=transform, download=True)

dataloader = torch.utils.data.DataLoader(dataset=dataset, batch_size=cfg['TRAIN_BATCH_SIZE'],
                                         shuffle=True, pin_memory=True)
testloader = torch.utils.data.DataLoader(dataset=test, batch_size=cfg['TRAIN_BATCH_SIZE'],
                                         shuffle=True, pin_memory=True)

<div id='4th'/>

## 4. Model

In [27]:
class NICE(nn.Module):
  def __init__(self, data_dim, num_coupling_layers=3):
    super().__init__()

    self.data_dim = data_dim
    # alternating mask orientations for consecutive coupling layers
    # The mask will be used as the partition of each additive coupling layer...
    # For each layer we change orientation :)
    masks = [self._get_mask(data_dim, orientation=(i % 2 == 0))
                                            for i in range(num_coupling_layers)]

    self.coupling_layers = nn.ModuleList([CouplingLayer(data_dim=data_dim,
                                hidden_dim=cfg['NUM_HIDDEN_UNITS'],
                                mask=masks[i], num_layers=cfg['NUM_NET_LAYERS'])
                              for i in range(num_coupling_layers)])

    self.scaling_layer = ScalingLayer(data_dim=data_dim)

    self.prior = LogisticDistribution()

  def forward(self, x, invert=False):
    if not invert:
      z, log_det_jacobian = self.f(x)
      log_likelihood = torch.sum(self.prior.log_prob(z), dim=1) + log_det_jacobian
      return z, log_likelihood

    return self.f_inverse(x)

  def f(self, x):
    z = x
    log_det_jacobian = 0
    for i, coupling_layer in enumerate(self.coupling_layers):
      z, log_det_jacobian = coupling_layer(z, log_det_jacobian)
    z, log_det_jacobian = self.scaling_layer(z, log_det_jacobian)
    return z, log_det_jacobian

  def f_inverse(self, z):
    x = z
    x, _ = self.scaling_layer(x, 0, invert=True)
    for i, coupling_layer in reversed(list(enumerate(self.coupling_layers))):
      x, _ = coupling_layer(x, 0, invert=True)
    return x

  def sample(self, num_samples):
    z = self.prior.sample([num_samples, self.data_dim]).view(num_samples, self.data_dim) # self.samples before...
    return self.f_inverse(z)

  def _get_mask(self, dim, orientation=True):
    mask = np.zeros(dim)
    mask[::2] = 1.
    if orientation:
      mask = 1. - mask     # flip mask orientation
    mask = torch.tensor(mask)
    if cfg['USE_CUDA']:
      mask = mask.cuda()
    return mask.float()

In [28]:
# Model and criterion:
model = NICE(data_dim=784, num_coupling_layers=cfg['NUM_COUPLING_LAYERS'])
if cfg['USE_CUDA']:
  device = torch.device('cuda')
  model = model.to(device)

In [30]:
# Load model
state_dict = torch.load('save/models/59.pt')
model.load_state_dict(state_dict)

<All keys matched successfully>

<div id='5th'/>

## 5. Training

In [31]:
opt = optim.Adam(model.parameters())

In [32]:
# TRAINING:
testL = np.zeros(cfg['TRAIN_EPOCHS'])
trainL = np.zeros(cfg['TRAIN_EPOCHS'])

In [33]:
for epoch in range(cfg['TRAIN_EPOCHS']):
  mean_likelihood = 0.0
  num_minibatches = 0

  model.train()

  # TRAINING
  for batch_id, (x, _) in enumerate(dataloader):
      x = x.view(-1, 784) + torch.rand(784) / 256. # Uniform noise between 0 and 1/256 at most!
      if cfg['USE_CUDA']:
        x = x.cuda()

      x = torch.clamp(x, 0, 1) # Make sure values between 0 and 1.

      z, likelihood = model(x)
      loss = -torch.mean(likelihood)   # NLL

      loss.backward()
      opt.step()
      model.zero_grad()

      mean_likelihood -= loss
      num_minibatches += 1

      #print(mean_likelihood / num_minibatches)
      #print(batch_id)

  mean_likelihood /= num_minibatches
  trainL[epoch] = mean_likelihood
  print('Epoch {} completed. Log Likelihood: {}'.format(epoch, mean_likelihood))
  
  # TEST EVALUATION
  ml = 0
  num_minibatches = 0
  for batch_id, (x, _) in enumerate(testloader):
      x = x.view(-1, 784)
      z, likelihood = model(x)
      l = -torch.mean(likelihood)

      ml -= l
      num_minibatches += 1

  ml /= num_minibatches
  testL[epoch] = ml
  print('Test Log Likelihood: {}'.format(ml))

  # OTHERS  
  plt.figure()
  pylab.xlim(0, cfg['TRAIN_EPOCHS'] + 1)
  plt.plot(range(1, cfg['TRAIN_EPOCHS'] + 1), testL, label='test loss')
  plt.plot(range(1, cfg['TRAIN_EPOCHS'] + 1), trainL, label='train loss')
  plt.legend()
  plt.savefig(os.path.join('save/values_graphs', 'loss.pdf'))
  plt.close()

  if epoch % 5 == 0:
    save_path = os.path.join(cfg['MODEL_SAVE_PATH'], '{}.pt'.format(epoch))
    torch.save(model.state_dict(), save_path)

    # SOME SAMPLING:
    model.train(False)

    fake_images = model.sample(20).round() # round to 0, 1
    fake_images = fake_images.view(fake_images.size(0), 1, 28, 28)
    sample_dir = 'samples'
    save_image(fake_images.data, os.path.join(sample_dir, 'fake_images-{}.png'.format(epoch)))

Epoch 0 completed. Log Likelihood: 2038.4735107421875
Test Log Likelihood: 1882.8900146484375
Epoch 1 completed. Log Likelihood: 2046.1549072265625
Test Log Likelihood: 1896.568115234375


<div id='6th'/>

## 6. Nearest Neighbours

In [21]:
NN = True

In [None]:
# NEAREST NEIGHBOURS:
# Images 28x28, search the closest one.
# function(generated_image) --> closest training_image
if NN == True:
  aux_data_loader = torch.utils.data.DataLoader(dataset=dataset,
                                                batch_size=1,
                                                shuffle=False)

  def nearest_gt(generated_image):
      min_d = 0
      closest = False
      for i, (image, _) in enumerate(aux_data_loader):
          image = image.view(1, 28, 28).round() # all distances in binary...
          d = torch.dist(generated_image,image) # must be torch tensors (1,28,28)
          if i == 0 or d < min_d:
              min_d = d
              closest = image

      return closest

  fake_images = model.sample(24).round() # round to 0, 1
  fake_images = fake_images.view(24, 1, 28, 28)
  save_image(fake_images, './samples/f24.png')
  NN = torch.zeros(24, 1, 28, 28)
  for i in range(0,24):
        NN[i] = nearest_gt(fake_images[i])
        print(i)
  save_image(NN.data, './samples/NN24.png')

<div id='7th'/>

## 7. References

[1] https://github.com/DakshIdnani/pytorch-nice