<a href="https://colab.research.google.com/github/Roda10/Latent_Variable_Models/blob/main/notebooks/02_NFlows.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/pranavm19/SBI-Tutorial/blob/main/notebooks/02_NFlows.ipynb)

## Leveraging normalizing flows for SBI
**Pranav Mamidanna, PhD** (p.mamidanna22@imperial.ac.uk), April 2025

In the previous tutorial, we have seen how we can use ABC to estimate the posterior distribution of the parameters of a simulator. However, ABC is known to be inefficient in high dimensions, and depends on the choice of several hyperparameters.

In this notebook:
1. We will see how we can use normalizing flows to estimate the posterior distribution of the parameters of a simulator.
2. We will train an affine coupling layer from scratch, and use it to build a normalizing flow.
3. We will use the normalizing flow to directly estimate the "two moons" posterior, the so-called "neural posterior estimation".

In [None]:
# NOTE: Takes ~3 min to run
!python -m pip install sbi corner

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

import numpy as np
from scipy.interpolate import PchipInterpolator
from scipy.stats import norm
import matplotlib.pyplot as plt
import seaborn as sns

from IPython.display import clear_output, display
from ipywidgets import interact, FloatSlider

In [None]:
# Recall the two moons model
def two_moons_sbi(theta, sigma=0.01):
    """Generate a two moons posterior"""
    batch_size = theta.shape[0]
    alpha = torch.rand(batch_size) * torch.pi - torch.pi/2  # Uniform(-pi/2, pi/2)
    r = sigma * torch.randn(batch_size) + 1  # Normal(1, sigma)

    x_1 = r * torch.cos(alpha) + 0.5 - torch.abs(theta[:, 0] + theta[:, 1])/torch.sqrt(torch.tensor(2))
    x_2 = r * torch.sin(alpha) + (- theta[:, 0] + theta[:, 1])/torch.sqrt(torch.tensor(2))

    x =  torch.stack([x_1, x_2], dim=-1)

    return x

### **What are normalizing flows?**  
Normalizing flows are a class of generative models that can learn and represent complex probability distributions. They do this by starting with a simple “base” distribution (e.g., a standard normal) and applying a series of invertible transformations to map samples from that base distribution into the target (or data) distribution. Importantly, because these transformations are invertible, we can also evaluate the probability density of any sample by mapping it back to the base distribution!

Let's see how that works below. Let's first demonstrate what normalizing flows do by manipulating what happens to the distribution of $x$, under a given base density and $f$. Here, we will use the tanh function to transform the base density.


In [None]:
def NF_intuition_plot(scale=1.0):
    # Get samples from base density
    n_samples = 5000
    np.random.seed(129)
    base_samples = np.random.randn(n_samples) / 2  # Standard normal

    # Define the transform using tanh
    transform = lambda z: np.tanh(scale * z)

    # Generate a set of x, y values for plotting
    x_vals = np.linspace(-3, 3, 200)
    y_vals = transform(x_vals)

    # Transform the base samples
    transformed_samples = transform(base_samples)

    fig, axes = plt.subplots(1, 3, figsize=(16, 4))

    # Plot base density
    sns.kdeplot(base_samples, ax=axes[0], fill=True, color='C0')
    axes[0].plot(base_samples, np.full_like(base_samples, -0.02), '|', color='C0')
    axes[0].set_title('Base Density')
    axes[0].set_ylim([-0.05, None]); axes[0].set_xlim([-2, 2])

    # Plot transformation function
    axes[1].plot(x_vals, y_vals, label='tanh transform')
    axes[1].set_xlim([-2, 2]); axes[1].set_ylim([-1, 1])
    axes[1].legend(); axes[1].grid(True)
    axes[1].set_title(f'Transform (scale={scale})')

    # Plot transformed density
    sns.kdeplot(transformed_samples, ax=axes[2], fill=True, color='C1')
    axes[2].plot(transformed_samples, np.full_like(transformed_samples, -0.02), '|', color='C1')
    axes[2].set_title('Transformed Density')
    axes[2].set_ylim([-0.05, None]); axes[2].set_xlim([-2, 2])

    plt.tight_layout()
    plt.show()


# Create an interactive widget that lets you control the scale parameter
scale_slider = FloatSlider(value=1.0, min=0.1, max=2.0, step=0.1, description='Scale:')
interact(NF_intuition_plot, scale=scale_slider)

Its very easy to see how to draw samples. But, how do we get the probabilities?

**Change of variables formula**  
The magic basically happens within the simple "change of variables" formula. It tells us how densities transform under invertible mappings.

Let $z$ be a sample from a known "base" density, like a normal distribution, whose probability density is given by $p_Z(z)$. Then, $x = f(z)$ the corresponding transformed sample has a probability density given as $$p_X(x) = p_Z(z)\left| \det \left( \frac{\partial f^{-1}(x)}{\partial x} \right) \right|$$ That is, if we have the right $f$ that maps $x$ to $z$, we can estimate the probability density of any given $x$ under the learned distribution.

> **Task 2.1** Implement the change of variables formula to estimate the probability density of $x$.



In [None]:
def NF_intuition_plot(scale=1.0):
    n_samples = 5000
    np.random.seed(129)
    base_samples = np.random.randn(n_samples) / 2.0   # σ = 0.5

    ## TODO: Implement the following lambda functions
    # transforms
    transform = lambda z: np.tanh(scale * z)
    inverse = lambda x:
    inv_jac = lambda x:

    # densities
    p_z = lambda z:
    p_x = lambda x:

    # evaluate on grid for smooth curve
    x_grid = np.linspace(-2, 2, n_samples)
    px_grid = p_x(x_grid)

    # transform samples
    transformed_samples = transform(base_samples)

    fig, axes = plt.subplots(1, 3, figsize=(16, 4))

    sns.kdeplot(base_samples, ax=axes[0], fill=True, color='C0')
    axes[0].plot(base_samples, np.full_like(base_samples, -0.02), '|', color='C0')
    axes[0].set_title('Base Density'); axes[0].set_ylim([-0.05, None]); axes[0].set_xlim([-2, 2])

    xx = np.linspace(-2, 2, n_samples)
    axes[1].plot(xx, transform(xx), label='tanh'); axes[1].grid(True)
    axes[1].set_title(f'Transform (scale={scale})'); axes[1].set_xlim([-2, 2]); axes[1].set_ylim([-1.1, 1.1])

    sns.kdeplot(transformed_samples, ax=axes[2], fill=True, color='C1', label='KDE')
    axes[2].plot(x_grid, px_grid, 'r', lw=2, label='Analytical')
    axes[2].set_title('Transformed density'); axes[2].set_xlim([-2, 2]); axes[2].set_ylim([-0.05, None])
    axes[2].legend()

    plt.tight_layout(); plt.show()

# Create an interactive widget that lets you control the scale parameter
scale_slider = FloatSlider(value=1.0, min=0.5, max=2.0, step=0.1, description='Scale:')
interact(NF_intuition_plot, scale=scale_slider)

#### **The API**

So far, we have seen how a bijective function can be used to transform a given "base" density. The real strength of normalizing flows comes from the fact that we can basically do the reverse - we can start with an unknown data density and learn the bijective function that transforms it into a known base density!

There are two components to a normalizing flow: (1) the bijective transform (or a set of them), and (2) the prior distribution. Once we have these, at train time, we can compute $z$ given a batch of ($x$, context), and evaluate the loss. At test time, we can draw a sample from ($z$, context), and generate samples from the learned data distribution $x$ (and evaluate the probability density!!).

To do this, we will need:

`NF = NormalizingFlow(flows, prior)`
- `NF.forward(x, context) -> z, ldj`,
- `NF.sample(z, context) -> x, ldj`

The `flows` object is usually a list of layers each of which is a bijective transform:

`T = FlowTransform()`
- `T.forward(x, context) -> z, ldj`
- `T.inverse(z, context) -> x, ldj`

#### **Why are they suitable for Simulation-Based Inference?**

Here, we will specifically deal with the case of Neural Posterior Estimation.

In the previous tutorial, we have seen that SBI basically deals with estimating the posterior distribution, i.e., estimating the density $p(\theta \mid x_{\text{obs}})$. One way to do this using normalizing flows, is by sampling from a base density (priors on the parameters of interest), and learning how to transform these directly into posterior samples! This is the idea behind neural posterior estimation.

With a simulator in hand, both these steps become possible! Sample from a prior distribution over $\theta$, plug it into the simulator to obtain $x$. You train a normalizing flow to learn how $p(\theta)$ is transformed to $p(\theta \mid x)$!

### **Real NVP - Affine Coupling Flows**

A popular implementation of normalizing flows is **Real NVP** (Dinh et al., 2017). The core of Real NVP is the **affine coupling** transformation.

Suppose we split the data $x$ of dimension $D$ into two parts, $x = [x_{1:d},\, x_{(d+1):D}]$. In a **coupling layer**, we leave one part unchanged and apply a learnable affine transformation to the other part. Specifically, let:

$$
    y_{1:d} \;=\; x_{1:d},
$$
$$
    y_{(d+1):D} \;=\; x_{(d+1):D}\,\odot \exp\bigl(s_\theta(x_{1:d})\bigr)\;+\; t_\theta(x_{1:d}),
$$

where $\odot$ denotes elementwise multiplication. The functions $s_\theta(\cdot)$ and $t_\theta(\cdot)$ (the "scale" and "shift" networks) are typically small neural networks that depend on the "frozen" part $x_{1:d}$.

- **Invertibility**: This transformation is **invertible** because you can solve for $x_{(d+1):D}$ by reversing the shift and scale operations:
  $$
    x_{(d+1):D}
     = \Bigl(y_{(d+1):D} - t_\theta(y_{1:d})\Bigr)\,\odot \exp\Bigl(- s_\theta(y_{1:d})\Bigr).
  $$
  The log-determinant of the Jacobian $\left\lvert \det \frac{\partial y}{\partial x} \right\rvert$ is simply
  $$
    \sum_{j=1}^{D-d} s_\theta(x_{1:d})_j,
  $$
  because the scaling is diagonal in the sub-block.

- **Why is it non-linear?**
  Although the transformation is written as an *affine* function for the second block, the parameters of that affine transformation are themselves neural-network outputs, i.e., $s_\theta(\cdot)$ and $t_\theta(\cdot)$. This makes the overall mapping
  $$
    x \mapsto y
  $$
  non-linear in $x$. The "frozen" part $x_{1:d}$ is feeding through a neural network to produce scale and shift factors, which can be highly non-linear functions of $x_{1:d}$.

- **Coupling layers and permutations**: In Real NVP, we often interleave such coupling layers with permutation layers to ensure that over multiple layers, each dimension eventually appears in the "frozen" part and the "transformed" part. This broadens the flexibility of the flow, letting it model complex dependencies across all dimensions.

In summary, Real NVP is a straightforward yet powerful example of how normalizing flows combine tractable Jacobians (via affine coupling) with flexible function approximators (neural networks for scale and shift).


> **Task 2.2.** We have seen above that for NPE, we need to train a conditional normalizing flow, where $x$ are the conditioning variables (also called as context), and $\theta$ are primary variables what get normalized. However, in the above equations, we don't see any conditioning variables. Can you modify the forward and backward equations such that they show how context is utilized?

> **Task 2.3.** With all the equations to implement a conditional normalizing flow at hand, complete the AffineCouplingLayer class below...

In [None]:
class AffineCouplingLayer(nn.Module):
    def __init__(self, input_dim, context_dim):
        super().__init__()
        self.input_dim = input_dim
        self.context_dim = context_dim
        self.split_idx = input_dim - (input_dim // 2) # first part gets more dims if input_dim is odd

        # Define scale and shift networks
        self.scale_net = nn.Sequential(
            nn.Linear(self.split_idx + context_dim, 64),
            nn.LeakyReLU(),
            nn.Linear(64, 64),
            nn.LeakyReLU(),
            nn.Linear(64, input_dim - self.split_idx)
        )
        self.shift_net = nn.Sequential(
            nn.Linear(self.split_idx + context_dim, 64),
            nn.LeakyReLU(),
            nn.Linear(64, 64),
            nn.LeakyReLU(),
            nn.Linear(64, input_dim - self.split_idx)
        )

    def forward(self, x, context):
        # Split input tensor along the last dimension
        x_identity = x[..., :self.split_idx]
        x_transform = x[..., self.split_idx:]

        # # Concatenate identity and context, pass them into the networks
        # identity_context =
        # scale =
        # shift =

        # Compute log-determinant of the Jacobian
        # ldj =

        # Affine transformation on x_transform
        # z_transform =

        # Concatenate unchanged part with transformed part
        z = torch.cat((x_identity, z_transform), dim=-1)
        return z, ldj

    def inverse(self, z, context):
        # Inverse transform: split z into identity and transformed parts
        z_identity = z[..., :self.split_idx]
        z_transform = z[..., self.split_idx:]

        # # Concatenate identity and context, pass them into the networks
        # identity_context =
        # scale =
        # shift =

        # Compute log-determinant of the Jacobian
        # ldj =

        # Inverse affine transformation
        # x_transform =

        # Concatenate identity and transformed parts
        x = torch.cat((z_identity, x_transform), dim=-1)
        return x, ldj


Subsequently, we can build a normalizing flow by stacking multiple coupling layers, and a prior distribution.

One last missing piece is a permutation layer that ensures that each dimension is eventually used as the "identity" part.

In [None]:
class NormalizingFlow(nn.Module):
    """
    A normalizing flow model composed of a sequence of affine coupling layers and a prior distribution.
    """
    def __init__(self, flows, prior=None):
        super().__init__()
        self.flows = nn.ModuleList(flows)
        self.dim = self.flows[0].input_dim
        # Initialize the prior distribution (device will be set correctly later)
        if prior is None:
            self.prior = torch.distributions.MultivariateNormal(
                torch.zeros(self.dim), torch.eye(self.dim))
        else:
            self.prior = prior

        self.train_loss = []

    def forward(self, x, context):
        """
        Applies a sequence of flow transformations and accumulates the log-determinants.
        """
        ldj = torch.zeros(x.shape[0], device=x.device)
        for flow in self.flows:
            x, ldj_ = flow(x, context)
            ldj += ldj_
        return x, ldj

    def inverse(self, z, context):
        """
        Inverts the flow transformation from latent space back to the input space.
        """
        ldj = torch.zeros(z.shape[0], device=z.device)
        for flow in reversed(self.flows):
            z, ldj_ = flow.inverse(z, context)
            ldj += ldj_  # log-determinants are already negated in inverse
        return z, ldj

    @torch.no_grad()
    def sample(self, num_samples, context):
        """
        Generate samples from the model given a context.
        """
        device = next(self.parameters()).device
        z = self.prior.sample((num_samples,)).to(device)
        x, _ = self.inverse(z, context)
        return x

    def log_prob(self, x, context):
        """
        Compute the log probability of x under the flow model.
        """
        z, ldj = self(x, context)
        log_pz = self.prior.log_prob(z)
        return log_pz + ldj


In [None]:
class PermutationLayer(nn.Module):
    def __init__(self, num_features):
        super().__init__()
        # Create a random permutation for the feature indices.
        perm = torch.randperm(num_features)
        self.register_buffer("perm", perm)
        self.register_buffer("inv_perm", torch.argsort(perm))

    def forward(self, x, context):
        # Permuting the features; no effect on the log-determinant.
        x_permuted = x[..., self.perm]
        # Log-determinant is zero for a permutation
        log_det = torch.zeros(x.size(0), device=x.device)
        return x_permuted, log_det

    def inverse(self, x, context):
        # Inverse permutation
        x_inv = x[..., self.inv_perm]
        log_det = torch.zeros(x.size(0), device=x.device)
        return x_inv, log_det


Now, let's put everything together and train a normalizing flow on the two moons model!

In [None]:
# Handle dimensions
input_dim = 2
context_dim = 2
n_layers = 4
flows = []

# Define the model and optimizer
for i in range(n_layers):
    flows.append(AffineCouplingLayer(input_dim, context_dim))
    flows.append(PermutationLayer(input_dim))

model = NormalizingFlow(flows)
optimizer = optim.Adam(model.parameters(), lr=3e-4)

In [None]:
# Fix an observed data point x_obs
x_obs = torch.tensor([[0.0, 0.0]], dtype=torch.float32)

# Create a grid of theta-values over which to evaluate the posterior
batch_size = 100
theta0_vals = torch.linspace(-2, 2, batch_size)
theta1_vals = torch.linspace(-2, 2, batch_size)
TH0, TH1 = torch.meshgrid(theta0_vals, theta1_vals, indexing='xy')
theta_grid = torch.cat([TH0.reshape(-1,1), TH1.reshape(-1,1)], dim=1)

# Training settings
num_iter = 5000
num_update_iter = 100
batch_size = 100
losses = []

# Prepare figure
fig, (ax_loss, ax_posterior) = plt.subplots(1, 2, figsize=(10, 4))
plt.ion()

for i in range(num_iter):
    # Generate data
    theta = torch.rand((batch_size, 2), dtype=torch.float32) * 4 - 2
    x = two_moons_sbi(theta)

    # Standard training step
    optimizer.zero_grad()
    loss = -model.log_prob(x, theta).mean()
    loss.backward()
    optimizer.step()
    losses.append(loss.item())

    # Update plots interactively
    if i % num_update_iter == 0:
        ax_loss.cla()
        ax_posterior.cla()

        # Training loss
        ax_loss.plot(losses, label='Train Loss')
        ax_loss.set_title('Training Loss')
        ax_loss.set_xlabel('Iteration')
        ax_loss.set_ylabel('Negative Log-Likelihood')
        ax_loss.legend()

        # Approximate posterior
        with torch.no_grad():
            # Replicate x_obs for every point in theta_grid so the shape matches
            x_obs_tiled = x_obs.repeat(theta_grid.shape[0], 1)

            # Posterior ~ exp(log p(x_obs | theta))
            post_vals = model.log_prob(x_obs_tiled, theta_grid).exp()
            post_2d = post_vals.view(batch_size, batch_size)

        # Contour-plot the posterior in theta-space
        c = ax_posterior.contourf(
            TH0.numpy(), TH1.numpy(), post_2d.numpy(),
            levels=50, alpha=0.8
        )
        ax_posterior.set_title(f'Posterior at iteration {i}')
        ax_posterior.set_xlabel(r'$\theta_0$')
        ax_posterior.set_ylabel(r'$\theta_1$')

        clear_output(wait=True)
        display(fig)

plt.ioff();

In [None]:
# The really cool thing is that we don't need to re-run the algorithm
# to infer posterior for a new data point! (more or less...)
x_obs = torch.tensor([[0.1, 0.1]], dtype=torch.float32)

# Create a grid of theta-values over which to evaluate the posterior
batch_size = 100
theta0_vals = torch.linspace(-3, 3, batch_size)
theta1_vals = torch.linspace(-3, 3, batch_size)
TH0, TH1 = torch.meshgrid(theta0_vals, theta1_vals, indexing='xy')
theta_grid = torch.cat([TH0.reshape(-1,1), TH1.reshape(-1,1)], dim=1)

fig, ax = plt.subplots(1, 1, figsize=[4, 4])

with torch.no_grad():
    # Replicate x_obs for every point in theta_grid so the shape matches
    x_obs_tiled = x_obs.repeat(theta_grid.shape[0], 1)

    # Posterior ~ exp(log p(theta | x_obs))
    post_vals = model.log_prob(x_obs_tiled, theta_grid).exp()
    post_2d = post_vals.view(batch_size, batch_size)

# Contour-plot the posterior in theta-space
c = ax.contourf(
    TH0.numpy(), TH1.numpy(), post_2d.numpy(),
    levels=50, alpha=0.8
)
ax.set_title(f'Posterior at iteration {i}')
ax.set_xlabel(r'$\theta_0$')
ax.set_ylabel(r'$\theta_1$')

plt.show()

### **Repeat using `sbi` toolbox**

Some semantics relevant to the `sbi` toolbox: it requires a `prior` and `simulator` to build two objects - an `inference` object that does the density estimation and a `posterior` from which you can sample and calculate log probabilities.

> **Task 2.3.** Follow along the code below to see how we can use the `sbi` toolbox to train a normalizing flow.


In [None]:
from sbi.analysis import pairplot
from sbi.inference import NPE
from sbi.utils import BoxUniform
from sbi.utils.user_input_checks import (
    check_sbi_inputs,
    process_prior,
    process_simulator,
)

In [None]:
# Define prior
num_dim = 2
prior = BoxUniform(-3 * torch.ones(num_dim), 3 * torch.ones(num_dim))

# Define simulator
# We have already done this at the top (two_moons_sbi())

In [None]:
# Check prior
prior, num_parameters, prior_returns_numpy = process_prior(prior)

# Check simulator
simulator = process_simulator(two_moons_sbi, prior, prior_returns_numpy)

# Consistency check after making ready for sbi.
check_sbi_inputs(simulator, prior)

In [None]:
# Generate samples from the prior
num_simulations = 10000
theta = prior.sample((num_simulations,))
x = simulator(theta)
print("theta.shape", theta.shape)
print("x.shape", x.shape)

In [None]:
# Make an inference object and train it!
inference = NPE(prior=prior)
inference = inference.append_simulations(theta, x)
density_estimator = inference.train()

In [None]:
# Build the posterior
posterior = inference.build_posterior(density_estimator)

In [None]:
samples = posterior.sample((10000,), x=[0, 0])
_ = pairplot(
    samples,
    limits=[[-2, 2], [-2, 2]],
    figsize=(6, 6),
    labels=[r"$\theta_1$", r"$\theta_2$"]
)

### **Outro**
We have successfully trained an affine coupling flow on the two moons model from scratch, and compared it to the `sbi` toolbox. Now, let's see how we can apply this to a real-world problem! Click below to open the next notebook.

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/pranavm19/SBI-Tutorial/blob/main/notebooks/03_NFlows_BioMime.ipynb)

#### Additional reading

[1] [How do distributions transform under a change of variables?](https://theoryandpractice.org/stats-ds-book/distributions/change-of-variables.html), Kyle Cranmer  
[2] [Density estimation using Real NVPs](https://arxiv.org/abs/1605.08803), Dinh et al. 2016