# Normalising flows 

To read more about normalising flows, you can check out [Towards Data Science](https://towardsdatascience.com/introduction-to-normalizing-flows-d002af262a4b#:~:text=In%20simple%20words%2C%20normalizing%20flows,is%20not%20a%20reversible%20function.) there is a comprehensive summary of Normalising flows and the intuition behind normalising flows.

In [1]:
# Import libraries

import numpy as np
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn

from matplotlib import pylab as plt
import seaborn as sns

#from torch.utils.data import DataLoader, Subset
from torch.distributions import Normal
from torch.distributions.multivariate_normal import MultivariateNormal
from tqdm import trange

from IPython.display import Image
from types import MethodType

In [2]:
def sample_pinwheel(num_samples):
    rng = np.random.RandomState()
    radial_std = 0.3
    tangential_std = 0.1
    num_classes = 5
    rate = 0.25
    rads = np.linspace(0, 2*np.pi, num_classes, endpoint=False)

    features = rng.randn(num_samples, 2) \
            * np.array([radial_std, tangential_std])
    features[:, 0] += 1.
    labels = rng.randint(0, num_classes, num_samples)

    angles = rads[labels] + rate*np.exp(features[:, 0])
    rotations = np.stack([np.cos(angles), -np.sin(angles), np.sin(angles), np.cos(angles)])
    rotations = np.reshape(rotations.T, (-1, 2, 2))

    wheels = 2*rng.permutation(np.einsum("ti,tij->tj", features, rotations))

    return torch.tensor(wheels, dtype=torch.float32)

In [None]:
pin = sample_pinwheel(5000)
fig, ax = plt.subplots(1,2, figsize=(9,3), sharex=True, sharey=True)
x_0 = pin[:, 0].numpy()
x_1 = pin[:, 1].numpy()
ax[0].scatter(x_0, x_1, alpha=.2)

# This KDE plot function call constructs an estimate of the density from the sample
sns.kdeplot(x=x_0, y=x_1, cmap="Blues", shade=True, bw_adjust=.5, ax=ax[1])

ax[0].set_title('Pinwheel scatterplot')
ax[1].set_title('Pinwheel density estimate')

for axi in ax:
    axi.set_xlim((-4,4))
    axi.set_ylim((-4,4))
    axi.axes.xaxis.set_visible(False)
    axi.axes.yaxis.set_visible(False)
plt.subplots_adjust(wspace=0.05, hspace=0.05)
plt.show()

Each point in the pinwheel is randomly drawn independently and identically from the others, from an underlying distribution over $\mathbb{R}^2$ specified implicitly by the code above.

What we would now like to do is model this distribution.
Modelling a distribution is usually desirable to enable us to perform two main tasks, namely:

- calculate the density of an observation under the distribution; or
- sample new observations from the distribution.

In practice, we usually don't have access to the code/mechanism generating points: instead, we typically only have access to a sample of observations.

This section will use normalizing flows to tackle this problem.
Flows enable us to both evaluate the density function of a distribution, as well as sample points from it.
The key assumption we will make is that we can _associate_ this complicated pinwheel distribution, which we know little about, with a _simple_ distribution that we understand well (i.e. we can sample from it, and evaluate its density).
Once we understand this association, we can perform the tasks above for the complicated pinwheel distribution by relating it to the solutions to these tasks for the simple distribution.

In practice, the simple distribution is often chosen to be a standard multivariate normal distribution; for technical reasons, this distribution must be the same dimensionality as the data we are to model.
Since the pinwheel data has two dimensions, we will use a two-dimensional standard normal distribution as our simple distribution.

More specifically, the association between the two distributions we are seeking in this case is a function $F : \mathbb{R}^2 \rightarrow \mathbb{R}^2$ so that:

- if $(x, y)$ is a point drawn from the simple (multivariate Gaussian) distribution, then $F(x, y)$ is a point drawn from the pinwheel distribution; and
- $F(\cdot)$ is bijective, i.e. every point in the range of $F$ ("pinwheel space") is uniquely associated with a corresponding point in the domain of $F$ ("Gaussian space") and vice versa.

Because $F(\cdot)$ is bijective, its inverse $F^{-1}(\cdot)$ exists and if $(x, y)$ is a point drawn from the pinwheel distribution, then $F^{-1}(x, y)$ is a point drawn from the multivariate Gaussian.

Neural networks allow us to fit complex functions; the flows we develop in this practical will enhance multi-layer perceptrons (MLPs) - a simple form of feedforward neural network - to ensure that the resulting function $F$ will be bijective.
For a flow to be useful, we also need to be able to sample or perform density estimation with it, and these operations should be reasonably efficient.
This further constrains the choice of functions we will fit with our neural networks, and the specific form of flows used in practice depend on various tradeoffs regarding the planned usage of the fitted model.

The main tradeoffs involve:

- how well the flow is able to model the true mapping between the base and true distribution (_expressivity_);
- how tractable it is to calculate the flow and/or its inverse (although the flow is invertible, computing the inverse may not be tractable); and
- how tractable it is to calculate the Jacobian of the flow.

Computing the Jacobian of the flow is important because the change in density incurred by the transformation $F$ is tracked via the _change-of-variables_ formula.
Consider a random variable $\mathbf{z} = \mathbf{z}_0$ with a standard Gaussian distribution, $\mathbf{z}_0 \sim p_0 = \mathcal{N}(\mathbf{0}, I)$, to which we apply an invertible transformation to obtain the random variable $\mathbf{x} = F(\mathbf{z_0})$. The density of the resulting $\mathbf{x}$ is then given by

$$
\begin{aligned}
\log p(\mathbf{x}) &= \log p_0(\mathbf{z}_0) + \log \left|J_{F^{-1}}(\mathbf{x})\right| \qquad (\mbox{change-of-variables})\\
&=  \log p_0(\mathbf{z}_0) + \log \left|J_{F}(\mathbf{z}_0)\right|^{-1} \quad (\mbox{inverse function theorem}) \\
&= \log p_0(\mathbf{z}_0) - \log \left|J_{F}(\mathbf{z}_0)\right|
\end{aligned}
$$

where $J_F(a)$ denotes the Jacobian of the transformation $F$ at $a$. (See equation 1.27 in section 1.2.1 of \[[5](#References)\], and for a discussion on the derivation see [here](https://stats.stackexchange.com/questions/239588/derivation-of-change-of-variables-of-a-probability-density-function)).

### Normalizing Flows with RealNVP affine coupling layers

The form of the change-of-variables formula makes it possible to compose multiple simpler bijective transformations in order to obtain an overall transformation that is complex enough to provide an accurate mapping between the base and target distribution. So, let $F$ consist of a chain of simpler transformations:

$$
\mathbf{x} = F(\mathbf{z}_0) = f_T \circ \ldots \circ f_1(\mathbf{z}_0).
$$

Writing $\mathbf{z}_{t}$ for $f_t(\mathbf{z}_{t-1})$, we have $\mathbf{x} = \mathbf{z}^T$.
Then we have that $
\log p_T(\mathbf{x}) = \log p_0(\mathbf{z}_0) - \sum_{t=1}^T \log \left|J_{f_t}(\mathbf{z}_{t-1})\right|
$.

For this practical we will consider _coupling_ transformations as presented in the paper 'Density estimation using Real NVP' \[[7](#References)\].
In each simple bijection $f_t(\mathbf{z}_{t-1})$, one half of the input vector
( $\mathbf{z}_{t-1,{1:k}}$ where $k \lt K$ ) remains unchanged, while the rest of the input
( $\mathbf{z}_{t, k+1:K}$) is transformed using a function that is simple to invert, but which depends on $\mathbf{z}_{t-1,1:k}$ in a complex way.

$$
\begin{aligned}
\mathbf{z}_{t, 1:k} &= \mathbf{z}_{t-1, 1:k} \\
\mathbf{z}_{t, k+1:K} &= \mathbf{z}_{t-1, k+1:K} \odot \exp\left(s_t(\mathbf{z}_{t-1, 1:k})\right) + m_t(\mathbf{z}_{t-1, 1:k})
\end{aligned}
$$

Here, $s_t$ and $m_t$ are MLPs each outputting $K-k$-dimensional vectors encoding the complex dependency above for $f_t$: we perform an affine transformation on the second part of the input, with $m_t$ specifying the translation term and $s_t$ implicitly specifying a scaling transformation for each component.

This type of transformation is known as an _affine coupling layer_. Note that the transformation remains invertible for arbitrarily complex functions $s_t$ and $m_t$: the inverse is given by

$$
\begin{aligned}
\mathbf{z}_{t-1, 1:k} &= \mathbf{z}_{t, 1:k}  \\
\mathbf{z}_{t-1, k+1:K} &= \left(\mathbf{z}_{t, k+1:K} - m_t(\mathbf{z}_{t, 1:k})\right) \odot \exp\left( -s_t(\mathbf{z}_{t, 1:k})\right).
\end{aligned}
$$

To allow all the variables to influence each other, we will need to change which subset of variables remains constant and which subset gets updated at each successive transformation step.

In [None]:
class CouplingLayer(nn.Module):
    def __init__(self, dim, conditioner_hidden_dims, reverse=False):
        super().__init__()

        self.block1 = dim//2 # Use k = K//2 for coupling layer
        self.block2 = dim - self.block1

        self.reverse = reverse # Control which half is kept constant vs transformed
        if not self.reverse:
            # block1 remains constant
            # block 2 depends on block 1
            self.s_net = self.build_mlp(self.block1, conditioner_hidden_dims, self.block2)
            self.m_net = self.build_mlp(self.block1, conditioner_hidden_dims, self.block2)
        else:
            # block 2 remains constant
            # block 1 depends on block 2
            self.s_net = self.build_mlp(self.block2, conditioner_hidden_dims, self.block1)
            self.m_net = self.build_mlp(self.block2, conditioner_hidden_dims, self.block1)

    def build_mlp(self, in_dim, hidden_dims, out_dim):
        '''Construct a multilayer perceptron neural network with ReLU
        activation functions.

        Parameters:
        * in-dim: input width (int)
        * hidden_dims: hidden layer widths (list of ints)
        * out_dim: output width (int)
        '''
        pass

    # Note that the forward function is automatically executed when you
    # "call" the surrounding instance, and this is the typical way in
    # which this function is executed as part of a forward pass in a
    # network.  The "call" to the surrounding instance passed on any
    # arguments provided to the forward function call.  For example, to run
    # this forward function, if you have a variable cl that is an instance
    # of CouplingLayer, you should generally run its forward method as:
    #
    # cl(z)
    #
    # and not
    #
    # cl.forward(z)
    def forward(self, z):
        '''Applies this coupling transformation to z, and returns the result as well as the 
        log-Jacobian-determinant of the transformation. '''
        s, m = self.conditioner(z)
        z = self.coupling_transform(z, s, m)
        return z, s.sum(1)

    def inverse(self, z):
        '''Applies the inverse of this coupling transformation to z, and returns the 
        result well as the the log-Jacobian-determinant of the inverse transformation.

        If flow.forward(z) transforms z to z1, then flow.inverse(z1) should transform
        z1 to z.
        '''
        pass

    def conditioner(self, z):
        '''Apply and return the result of the conditioner functions s and m for the layer input z.'''
        pass

    def coupling_transform(self, z, s, m):
        '''Applies the transformer of the coupling layer, with conditioner outputs s and m, to z.'''
        if not self.reverse:
            z1 = z[:, :self.block1]
            z2 = torch.exp(s)*z[:, self.block1:] + m
        else:
            z1 = torch.exp(s)*z[:, :self.block1] + m
            z2 = z[:, self.block1:]
        return torch.cat((z1, z2), dim=1)

    def coupling_transform_inverse(self, z, s, m):
        '''Applies the inverse of the transformer of the coupling layer to z, assuming the conditioner
        outputs used to generate z were s and m.'''
        pass

As a starter we will implement the ```build_mlp``` method for you.

The method creates a new multilayer perceptron (MLP), which could be trained and used on its own; however, we use this in the CouplingLayer constructor to generate the MLPs for $m_t$ and $s_t$.
The parameters ```in_dim``` and ```out_dim``` are natural numbers specifying the number of nodes in the input and output layers of the MLP to be created.
```hidden_dims``` is a list of natural numbers that specifies the number of nodes in each hidden dimension.

In [None]:
def build_mlp(self, in_dim, hidden_dims, out_dim):
    layer_widths = [in_dim] + hidden_dims + [out_dim]
    layers = []
    for i in range(len(layer_widths)-1):
        in_dim = layer_widths[i]
        out_dim = layer_widths[i+1]
        layers += [nn.Linear(in_dim, out_dim), nn.ReLU()]
    layers.pop() # get rid of the last nn.ReLU()
    return nn.Sequential(*layers)

As a sanity check, let's check that this function produces sensible output, by checking whether the output of our MLP on valid input produces the correctly shaped output.

In [None]:
# batch shape: (batch_size, feature_dim)
random_batch = torch.randn(5, 9)

in_dim = None # TODO: change None to correct value
#BEGIN_REMOVE
in_dim = 9
#END_REMOVE
hidden_dims = [3, 2, 5]
out_dim = 4

# Create the MLP
test_mlp = build_mlp(None, in_dim, hidden_dims, out_dim)
# Predict with the MLP by applying the forward function of test_mlp to random_batch.
# Since test_mlp is of type nn.Sequential, the forward functions
# of each layer are executed in turn, with the output of each used
# as the input to the next.  This effectively composes the functions
# in the layers.
out_mlp = test_mlp(random_batch) 
batch_size, output_dim = out_mlp.shape

if batch_size == 5 and output_dim == 4:
    print('Great success!')
else:
    print('Failure :(')

The instruction in the next cell replaces the default `pass` implementation of `build_mlp` in the `CouplingLayer` class by the version we defined above. 

In [None]:
# Override the previously empty definition with our new definition.
CouplingLayer.build_mlp = build_mlp

Now that we've implemented the `build_mlp` function which `CouplingLayer`'s constructor required, we can create an instance of `CouplingLayer` for testing the rest of our method implementations.

In [None]:
test_dim = 16
test_conditioner_hidden_dims = [15, 20, 10]
test_cl = CouplingLayer(dim=test_dim, conditioner_hidden_dims=test_conditioner_hidden_dims)
test_z = torch.randn(1, test_dim) # random input to test functionality.

In order to get the output of a `CouplingLayer`, we need to be able to apply the `forward` function.  This requires the implementation of the `conditioner` function below.

In [None]:
def conditioner(self, z):
    '''Apply and return the result of the conditioner functions s and m for the layer input z.'''
    # TODO: Implement. Use self.reverse to identify the appropriate inputs to self.s_net and self.m_net,
    # and then compute and return the outputs of these MLPs on these inputs.
    #BEGIN_REMOVE
    if not self.reverse:
        input_ = z[:, :self.block1]
    else:
        input_ = z[:, self.block1:]
    s = self.s_net(input_)
    m = self.m_net(input_)
    #END_REMOVE
    return s, m

# Associate the function above with the CouplingLayer class
CouplingLayer.conditioner = conditioner

# test CouplingLayer - output of the first 8 dimensions must stay constant, others should change
out_cond, _ = test_cl(test_z)

if torch.equal(test_z[0, :8], out_cond[0, :8]) and not torch.equal(test_z[0, 8:], out_cond[0, 8:]):
    print('Great success!')
else:
    print('Failure :(')

The functionality defined so far is enough to use `CouplingLayers` in a normalizing flow that can be trained using the _reverse_ KL divergence and used for _sampling_ from the flow.  (In fact, we will return to this way of using the flow towards the end of the practical.)
However, we are now going to implement functionality for inverting the layer to allow _for density estimation_.

In [None]:
def coupling_transform_inverse(self, z, s, m):
    # TODO: Implement.  z denotes the usual output of the layer, given the outputs
    # of the s_net and m_net outputs were s and m respectively.  Return the input
    # that would have generated z.  Remember to take self.reverse into account.
    # The function torch.cat for concatenating tensors might be helpful.
    #BEGIN_REMOVE
    if not self.reverse:
        z_inv_1 = z[:, :self.block1]
        z_inv_2 = (z[:, self.block1:] - m)*torch.exp(-s)
    else:
        z_inv_1 = (z[:, :self.block1] - m)*torch.exp(-s)
        z_inv_2 = z[:, self.block1:]
    return torch.cat((z_inv_1, z_inv_2), dim=1)
    #END_REMOVE

CouplingLayer.coupling_transform_inverse = coupling_transform_inverse

In [None]:
def inverse(self, z):
    # TODO: Implement. z denotes the usual output of the layer. Determine
    # what the outputs of the s_net and m_net would have been and use them with
    # the function you implemented above.  Return the input that would have
    # generated z, as well as the log-Jacobian-determinant of this inverse
    # transformation. You may want to consult the implementation of forward().
    #BEGIN_REMOVE
    s, m = self.conditioner(z)
    z = self.coupling_transform_inverse(z, s, m)
    return z, -s.sum(1)
    #END_REMOVE

CouplingLayer.inverse = inverse

# Testing inverse and Jacobian
z_forward, j_forward = test_cl(test_z)
z_forward_inverse, j_forward_inverse = test_cl.inverse(z_forward)

if torch.norm(test_z - z_forward_inverse) < 1e-6:
    print('inverse calculated correctly')
else:
    print('inverse calculated incorrectly')
if torch.abs(j_forward + j_forward_inverse) < 1e-6:
    print('Jacobian calculated correctly')
else:
    print('Jacobian calculated incorrectly')

We now use the ```CouplingLayer``` module as a building block to construct our flow.

The skeleton is again given below.
You will have to complete the inverse function.

In [None]:
class RealNVPFlow(nn.Module):
    def __init__(self, dim, num_steps, conditioner_hidden_dims):
        super().__init__()
        self.num_steps = num_steps # number of CouplingLayers - generally use at least 3

        # Initialize coupling transformation steps
        steps = []
        reverse = False
        for i in range(num_steps):
            steps.append(CouplingLayer(dim, conditioner_hidden_dims, reverse))
            reverse = not reverse # Change which half is kept unchanged in each layer
        # We don't use nn.Sequential here, because we must use a custom forward() 
        self.steps = nn.ModuleList(steps)

    def forward(self, z0):
        '''Apply the flow transformation steps to the input z0. 
        Returns the final transformed variable z as well as
        the log Jacobian term in the change-of-variables formula.
        '''
        j_total = 0.0
        zT = z0
        for step in self.steps:
            zT, j = step(zT) # Call forward on the CouplingLayer step
            j_total += j

        return zT, j_total

    def inverse(self, zT):
        '''Apply the inverse flow transformation steps to the output zT. 
        Returns the initial input variable z0 as well as
        the log Jacobian term for the inverse transformation
        in the change-of-variables formula.
        '''
        pass

    def loglik(self, x):
        '''Return the log-density of x for this RealNVPFlow instance.'''
        pass

In [None]:
def inverse(self, zT):
    # TODO: Implement. You need to invert the flow transformation
    # implemented in RealNVPFlow.forward().
    #BEGIN_REMOVE
    j_total = 0.0
    z0 = zT
    for step in reversed(self.steps):
        z0, j = step.inverse(z0)
        j_total += j
    #END_REMOVE
    return z0, j_total

RealNVPFlow.inverse = inverse

# testing
test_flow = RealNVPFlow(test_dim, 2, test_conditioner_hidden_dims)

z_forward, j_forward = test_flow(test_z)
z_forward_inverse, j_forward_inverse = test_flow.inverse(z_forward)

if torch.norm(test_z - z_forward_inverse) < 1e-6:
    print('inverse calculated correctly')
else:
    print('inverse calculated incorrectly')

if torch.norm(j_forward + j_forward_inverse) < 1e-6:
    print('Jacobian calculated correctly')
else:
    print('Jacobian calculated incorrectly')

We will now train an instance of the `RealNVPFlow` class to model the pinwheel distribution.  We use 8 flow steps, and illustrate the flow by plotting a large number of samples at various stages of training.  While it is instructive to work through this code, the key takeaway is to see how the flow evolves over the course of training.  You may develop your insight by adjusting some of the parameters of the flow or training hyperparameters.

In order to train successfully, we need to specify the loss function to be optimized by the training loop.  In this case, we have training data, so minimizing the forward KL divergence, or equivalently maximizing the (log-)likelihood is applicable.  Implement the `loglik` function in the cell below for use in training.

In [None]:
# TODO: Implement the mean log likelihood over a batch, x.
def loglik(self, x):
    # normal distribution object for calculating loss
    p0 = MultivariateNormal(torch.zeros(x.shape[-1]), torch.eye(x.shape[-1]))
    # TODO: Complete
    #BEGIN_REMOVE
    u, log_j = self.inverse(x)
    #print("x")
    #print(x)
    #print("u")
    #print(u)
    #print("log_j")
    #print(log_j)
    ll = (p0.log_prob(u) + log_j).mean()
    #END_REMOVE
    return ll

RealNVPFlow.loglik = loglik

In [None]:
# setup plotting
cmap = 'brg'
alpha = .4
num_plots = 4
fig, ax = plt.subplots(1,2+num_plots, figsize=(18,3), sharex=True, sharey=True)

# create flow model
dim = 2
num_steps = 8
conditioner_hidden_dims = [20, 20, 20]
flow = RealNVPFlow(dim, num_steps, conditioner_hidden_dims)

# set training hyperparameters
num_epochs = 10*2**num_plots
batch_size = 2**7
optim = torch.optim.Adam(flow.parameters())

# sample training data
num_batches = 10
X_train = sample_pinwheel(num_batches*batch_size)

# plot validation set and reference latents
X_val = sample_pinwheel(5*batch_size)
x_0, x_1 = X_val[:, 0], X_val[:, 1]
ax[0].scatter(x_0, x_1, alpha=alpha, cmap=cmap)
ax[0].set_title('Data')

# produce and plot standard normal data
Z = torch.randn(10*batch_size, 2)
z_0, z_1 = Z[:, 0], Z[:, 1]
sample_noise_colors = np.zeros(shape=len(z_1), dtype=int)
cuts = [-np.inf, -1, 1]
for n, (low, high) in enumerate(zip(cuts, cuts[1:])):
    sample_noise_colors[(low <= z_1)&(z_1 <= high)] = n + 1
ax[1].scatter(z_0, z_1, alpha=alpha, c=sample_noise_colors, cmap=cmap)
ax[1].set_title(r'$p_0=\mathcal{N}(\mathbf{0},I)$')

# training loop
nf_val_loss = []
plot_epochs = [num_epochs//(2**(num_plots - i - 1)) for i in range(num_plots)]
epochs = trange(num_epochs)
for epoch in epochs:
    for i in range(num_batches):
        # Process the training set by batches.  (Later, we will use an object called
        # a DataLoader for managing this for us.)
        x = X_train[i*batch_size:(i+1)*batch_size]

        # take step in direction of negative gradient of loss
        loss = -flow.loglik(x)
        optim.zero_grad()
        loss.backward()
        optim.step()

    # for tracking validation loss, and producing nice plots of training flows
    if (epoch+1) in plot_epochs:
        with torch.no_grad():
            # track validation loss
            loss = -flow.loglik(X_val)
            nf_val_loss.append(loss.item)

            # plot flow output
            zT, _ = flow(Z)
            z_0, z_1 = zT[:, 0], zT[:, 1]
            k = 2 + plot_epochs.index(epoch+1)
            ax[k].scatter(z_0, z_1, alpha=alpha, c=sample_noise_colors, cmap=cmap)
            ax[k].set_title(f'Epoch {epoch+1}')

# tidy plots
for axi in ax:
    axi.set_xlim((-4,4))
    axi.set_ylim((-4,4))
    axi.axes.xaxis.set_visible(False)
    axi.axes.yaxis.set_visible(False)

plt.subplots_adjust(wspace=0.05, hspace=0.05)
plt.show()

Finally, we compare the shapes produced by different numbers of step sizes.

Instead of using scatter plots as above (which is faster), here we plot the resulting distribution using kernel density estimation (KDE).  (The details of KDE are not important; however, it is useful here because it gives some indication of how densely populated the points are in certain regions of space.)

We expect to find that more step sizes result in a better fit, with diminishing returns at some point.

The first cell below fits the models (fairly quickly); the second cell constructs and displays the KDE plots and is a bit slower.

In [None]:
flows = []
T = [2, 4, 8, 16]
# use more epochs this time to get the best possible fit for each model.
num_epochs = 250 # decrease to reduce changing time at the cost of a worse fit.
for t in T:
    print(f't = {t}:')
    flow = RealNVPFlow(dim, num_steps=t, conditioner_hidden_dims=conditioner_hidden_dims)
    optim = torch.optim.Adam(flow.parameters())

    epochs = trange(num_epochs)
    for epoch in epochs:
        for i in range(num_batches):
            x = X_train[i*batch_size:(i+1)*batch_size]
            # take step in direction of negative gradient of loss
            loss = -flow.loglik(x)
            optim.zero_grad()
            loss.backward()
            optim.step()
    flows.append(flow)

In [None]:
    # Density estimation by sampling, using kernel density estimation (KDE)
    # This function sometimes misbehaves or gives an error - this is typically
    # because one or more of the models trained in the previous step encountered
    # numerical problems.  You can skip this cell and view the alternative plots
    # below, or try retraining the models in the previous cell.

    kde_numpoints = 4000 # Reduce this for quicker, but more granular plots.

    plt.rcParams.update({'font.size': 16})
    fig, ax = plt.subplots(1,6, figsize=(18,3), sharex=True, sharey=True)

    ax[0].set_title('Data')
    X = X_train.numpy()
    x_0, x_1 = X[:, 0], X[:, 1]
    sns.kdeplot(x=x_0, y=x_1, cmap="Blues", shade=True, bw_adjust=.5, ax=ax[0])

    T = [2,4,8,16]
    Z = Normal(
            loc=torch.zeros(2, dtype=torch.float32), 
            scale=torch.ones(2, dtype=torch.float32)
        ).sample((kde_numpoints,))
    z_0, z_1 = Z[:, 0].numpy(), Z[:, 1].numpy()

    sns.kdeplot(x=z_0, y=z_1, cmap="Blues", shade=True, bw_adjust=.5, ax=ax[1])
    ax[1].set_title(r'$p_0=\mathcal{N}(\mathbf{0},I)$')

    with torch.no_grad():
        for t in range(len(T)):
            ax[t+2].set_title(r'$T=${}'.format(T[t]))
            model = flows[t]
            x, _ = model(Z)
            x_0, x_1 = x[:, 0].numpy(), x[:, 1].numpy()
            sns.kdeplot(x=x_0, y=x_1, cmap="Blues", shade=True, bw_adjust=.5, ax=ax[t+2])

    for axi in ax:
        axi.set_xlim((-4,4))
        axi.set_ylim((-4,4))
        axi.axes.xaxis.set_visible(False)
        axi.axes.yaxis.set_visible(False)

    plt.subplots_adjust(wspace=0.05, hspace=0.05)
    plt.show()

Since we have implemented inversion for this flow, we can do something else to plot the density: we can query the density at various points on a grid, and plot these to get a density surface directly.  This can give us information on the relative density of the flow at places where density is so low that sampling will not give us reliable information.

We have followed this approach below.

In [None]:
# Density estimation by querying the density at various points on a grid

kde_numpoints = 1000 # Reduce these for quicker, but more granular plots.
grid_resolution = 70

plt.rcParams.update({'font.size': 16})
fig, ax = plt.subplots(1,6, figsize=(18,3), sharex=True, sharey=True)

ax[0].set_title('Data')
X = X_train.numpy()
x_0, x_1 = X[:, 0], X[:, 1]
sns.kdeplot(x=x_0, y=x_1, cmap="Blues", shade=True, bw_adjust=.5, ax=ax[0])

T = [2,4,8,16]
Z = Normal(
        loc=torch.zeros(2, dtype=torch.float32), 
        scale=torch.ones(2, dtype=torch.float32)
    ).sample((kde_numpoints,))
z_0, z_1 = Z[:, 0].numpy(), Z[:, 1].numpy()

sns.kdeplot(x=z_0, y=z_1, cmap="Blues", shade=True, bw_adjust=.5, ax=ax[1])
ax[1].set_title(r'$p_0=\mathcal{N}(\mathbf{0},I)$')

densities = []
with torch.no_grad():
    x_0_coords = torch.linspace(-3.0, 3.0, grid_resolution)
    x_1_coords = torch.linspace(-3.0, 3.0, grid_resolution)
    gm0, gm1 = torch.meshgrid(x_0_coords, x_1_coords)
    for t in range(len(T)):
        ax[t+2].set_title(r'$T=${}'.format(T[t]))
        model = flows[t]
        density = torch.zeros((grid_resolution, grid_resolution))

        for i in range(grid_resolution):
            for j in range(grid_resolution):
                density[i,j] = torch.exp(model.loglik(torch.Tensor([[x_0_coords[i], x_1_coords[j]]])))

        densities.append(density.detach().numpy())
        mappable = ax[t+2].pcolormesh(gm0, gm1, densities[t], shading='auto')

for axi in ax:
    axi.set_xlim((-4,4))
    axi.set_ylim((-4,4))
    axi.axes.xaxis.set_visible(False)
    axi.axes.yaxis.set_visible(False)

plt.subplots_adjust(wspace=0.05, hspace=0.05)
plt.show()