# Homework 1. Likelihood-based models

- **Task 1 (5 points): Warmup**
- Task 2 (10 points): PixelCNN
- Task 3 (10 points): Conditional PixelCNN
- Task 4 (10 points): RealNVP
- \* Bonus (10+++ points)

# Warmup task

In this task we will play with simplest likelihood-based models with both 1D and 2D data. The task consists of 2 parts:
- Likelihood model in 1D - fitting histogram using SGD
- Deep Autoregressive model in 2D

# Part 1. Fitting histogram

In this part we will build our first likelihood-based model for 1D data and will try to fit it using gradient methods.

In [None]:
%matplotlib inline

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from collections import Counter
import torch
from torch import nn
from torch import optim
from torch.nn import functional as F
import torch.utils.data
import math
from sklearn.model_selection import train_test_split
import random

Choose your device: don't forget to switch to GPU runtime when working in colab with cuda.

In [None]:
device = 'cuda'

First, we define the procedure of data generation. It will generate a dataset of samples $x \in \{0 \dots 99\}$

In [None]:
def sample_data():
    count = 10000
    rand = np.random.RandomState(0)
    a = 0.3 + 0.1 * rand.randn(count)
    b = 0.8 + 0.05 * rand.randn(count)
    mask = rand.rand(count) < 0.5
    samples = np.clip(a * mask + b * (1 - mask), 0.0, 1.0)
    
    return np.digitize(samples, np.linspace(0.0, 1.0, 100))

We generate data and perform train/val/test split.

In [None]:
data = sample_data()
train_data, test_data = train_test_split(data, test_size = 0.3)
train_data, val_data = train_test_split(train_data, test_size = 0.3)

Let's plot and visualize the histogram of training data!

In [None]:
def plot_histogram(data):
    counts = Counter(data)
    keys = list(counts.keys())
    values = list(counts.values())
    plt.bar(keys, values)
    plt.show()

In [None]:
plot_histogram(train_data)

On lecture we have discussed how to build histogram model. But this model is not the best choice for high-dimensional data. So, we suggest you to implement the following parametrized model:

$$ p_\theta(x)_i = \frac{e^{\theta_i}}{\sum_j{e^{\theta_j}}} $$

Where $\theta=(\theta_0 \dots \theta_{99})$

We propose you to implement this model in the following class

In [None]:
class SimpleProbabilityModel(nn.Module):
    # Store all parameters of your model as class fields in constructor
    def __init__(self,  num_elements=100):
        super(SimpleProbabilityModel, self).__init__()
        
        ################
        # YOUR CODE HERE
        ###############
        
    # Forward should return vector of log probabilities for each element
    def forward(self):
        ################
        # YOUR CODE HERE
        ###############
    
    # Should sample element using probabilities, obtained from parameters. Return single number 0..99
    def sample(self):
        ################
        # YOUR CODE HERE
        ###############

We will train this model using negative log-likelihood optimization: $ L_i = -\log p_{y_i} $. Implement this loss calculation for your model given a batch of data samples.

In [None]:
# data: n.array of numbers from your training distribution
# model: instance of your SimpleProbabilityModel.
# should return: negative log-likelihood of your data given the model to perform backpropagation
def calc_loss(data, model):
    ################
    # YOUR CODE HERE
    ###############

Finally, we can create instance of our model and perform training. Note that if you calculated previous loss as classic natural logarithm, here we scale it to binary logarithm for logging likelihood in bits (which is better for interpretation and comparisons).

In [None]:
model = SimpleProbabilityModel().to(device)

In [None]:
def train_simple_model(model, train_data, val_data, num_epochs=10000, batch_size=4000, lr=0.01):
    optimizer = optim.SGD(model.parameters(), lr=lr)
    train_losses = []
    val_losses = []
    for i in range(num_epochs):
        for j in range(len(train_data) // batch_size):
            optimizer.zero_grad()
            batch = train_data[batch_size * j:batch_size * (j + 1)]
            l = calc_loss(batch, model)
            train_losses.append(l.item() / math.log(2))
            l.backward()
            optimizer.step()
        l = calc_loss(val_data, model)
        val_losses.append(l.item() / math.log(2))
    
    print("Train NLL(bits)")
    plt.plot(train_losses, color='green')
    plt.show()

    print("Val NLL(bits)")
    plt.plot(val_losses, color='red')
    plt.show()
    
    print("Final validation NLL(bits): {}".format(val_losses[-1]))

In [None]:
train_simple_model(model, train_data, val_data)

You can also tune your training parameters (number of epochs, batch size, learning rate, optimizer), to improve validation NLL. You should obtain something below 6.

Finally, let's sample values from our model and visualize histograms of our test data and our sample data.

In [None]:
sampled_data = [model.sample() for _ in range(len(test_data))]

In [None]:
plot_histogram(sampled_data)
plot_histogram(test_data)

Training here might not yield perfect results, but pictures should look at least similar.

# Part 2. 2D discrete data. Autoregressive model

In this part we will build our own autoregressive model to work with two-dimensional discrete data. 

We will load 2D distribution as is from file. It's a 200x200 numpy array with probabilities.

In [None]:
# For colab users: download file
! wget https://github.com/egiby/Generative-Models-MIPT/raw/main/module1-likelihood/distribution.npy

In [None]:
original_distribution = np.load("distribution.npy")

Let's define class to sample pair of numbers $(x,y) \in \{0 \dots 199\}^2$ from this distribution.

In [None]:
class SampleDist:
    def __init__(self, distribution):
        self.probabilities = distribution.flatten()
        self.rows, self.cols = distribution.shape
        self.values = np.array([[i // self.cols, i % self.cols] for i in range(self.rows * self.cols)])

    def sample(self):
        idx = np.random.choice(self.rows * self.cols, p = self.probabilities)
        
        return self.values[idx]

So, we define distribution, sample data and create train/val/test splits.

In [None]:
dist2D = SampleDist(original_distribution)

In [None]:
SIZE = 100000
sampled_data = np.array([dist2D.sample() for _ in range(SIZE)])

In [None]:
train_data, test_data = train_test_split(sampled_data, test_size = 0.2)
train_data, val_data = train_test_split(train_data, test_size = 0.2)

We will build our autoregressive model in $(x, y)$ as follows:

- Train marginal model $p(x)$ as in part 1
- Create and train conditional model $p(y|x)$ as multi-layer neural network

Here, create class for your conditional model $p(y|x)$. It should take $x$ as batch of integer inputs and return batch of probability distributions over $y$.

In [None]:
class ConditionalModel(nn.Module):
    # Store all your trainable layers as model fiels in constuctor
    def __init__(self):
        super(ConditionalModel, self).__init__()
        
        ################
        # YOUR CODE HERE
        ###############
    
    # Forward pass takes LongTensor x of shape (N,) and should return predicted logprobs of shape (N, 200)
    def forward(self, x):
        ################
        # YOUR CODE HERE
        ###############

Finally, create a model and train it.

In [None]:
cond_model = ConditionalModel().to(device)

In [None]:
def train_cond_model(cond_model, train_data, num_epochs=100, lr=0.001, batch_size=10000):
    dataset = torch.utils.data.TensorDataset(torch.LongTensor(train_data.T[0]).to(device), 
                                             torch.LongTensor(train_data.T[1]).to(device))
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)
    loss = nn.NLLLoss()
    optimizer = optim.Adam(cond_model.parameters(), lr=lr)
    
    train_losses = []
    
    for i in range(num_epochs):
        for X_train, Y_train in dataloader:
            optimizer.zero_grad()
            predictions = cond_model(X_train)
            l = loss(predictions, Y_train)
            l.backward()
            optimizer.step()
            
            train_losses.append(l.item() / math.log(2))
    
    print("Train NLL(bits)")
    plt.plot(train_losses, color='green')
    plt.show()

In [None]:
train_cond_model(cond_model, train_data)

To build compound model, we will also need our simple model from part 1, trained on marginal data from our distribution (only x values).

In [None]:
x_model = SimpleProbabilityModel(num_elements=200).to(device)
train_simple_model(x_model, train_data.T[0], val_data.T[0], num_epochs=3000)

Finally, we are ready to build compound model for our total $(x, y)$ distribution modeling. Having two trained models implement sampling procedure and probability calculation.

In [None]:
class CompoundModel:
    def __init__(self, x_model, cond_model):
        self.x_model = x_model
        self.cond_model = cond_model
        
        self.x_model.eval()
        self.cond_model.eval()
    
    # Given two numbers x, y from 0 .. 199, return NLL value -log p(x,y)
    # Normalize in in the way it will return NLL in bits / dimention (binary log divided by two in our case)
    def get_logprob(self, x, y):
        ################
        # YOUR CODE HERE
        ###############
    
    # Implement sampling procedure. One call should return sample (x, y) as numpy array from two elements
    def sample(self):
        ################
        # YOUR CODE HERE
        ###############

In [None]:
compound_model = CompoundModel(x_model, cond_model)

Calculate total average NLL in bits / dimension on your validation data. Tune training parameters and conditional model architecture to boost your performance.

In [None]:
total_logprob = 0
for elem in val_data:
    logprob = compound_model.get_logprob(elem[0], elem[1])
    total_logprob += logprob
print("Total NLL on validation data per dimension: {}".format(total_logprob / val_data.shape[0]))

Check if sampling from your model works.

In [None]:
compound_model.sample()

Finally, get enough samples from your final model and display 2D histogram of the results. Compare them with the results you can get from your test data.

In [None]:
sampled_2d_data = np.array([compound_model.sample() for _ in range(test_data.shape[0])])

In [None]:
def plot_2dhistogram(data):
    plt.hist2d(data.T[0], data.T[1], bins=200, cmap='gray')
    plt.show()

In [None]:
# plot_2dhistogram(sampled_2d_data)
plot_2dhistogram(test_data)

Doesn't this picture resemble anything? (look at the rotated version of the histogram). Check out how your original distribution looks like!

In [None]:
plt.imshow(original_distribution, cmap="gray")
plt.show()