# Setup

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import tqdm.auto as tqdm
import torch
%matplotlib widget

In [None]:
def grab(x: torch.Tensor) -> np.ndarray:
    """Convert a torch Tensor to numpy array"""
    return x.detach().numpy()

In [None]:
def wrap(x):
    """Wrap angle into range [-pi, pi]"""
    return (x + np.pi) % (2*np.pi) - np.pi

# Action
We will consider a simple family of theories on a space of two angles $(\theta_1, \theta_2)$. The general form of the action is
$$
S(\theta_1, \theta_2; \alpha, \beta) := -\beta \cos(\theta_1 - \theta_2) - \alpha \cos(\theta_1) + \alpha \cos(\theta_2)
$$

In [None]:
def action(th, *, alpha, beta):
    """family of actions on two angles"""
    assert th.shape[-1] == 2
    th1, th2 = th[...,0] ,th[...,1]
    return (
        -beta * torch.cos(th1 - th2) - alpha * torch.cos(th1)
        + alpha * torch.cos(th2)
    )

def make_action(alpha, beta):
    return lambda th: action(th, alpha=alpha, beta=beta)

# some target parameters
beta_target = 3.0
alpha_target = 1.0
target_action = make_action(alpha_target, beta_target)

It will be useful to have samples from the target distribution. There are many possible way to build this ensemble, here we just do a crude importance sampling with a single resampling step according to computed weights.

In [None]:
def sample_inds(weights):
    """resample indices according to weights"""
    p = np.copy(weights)
    p /= np.sum(p)
    return np.random.choice(len(weights), p=p, size=len(weights))

def sample(batch_size, action, *, beta0):
    """importance sampling to get ground truth data"""
    shape = (batch_size,)
    dist = torch.distributions.VonMises(0.0, beta0)
    delta = dist.sample(shape)
    S0 = dist.log_prob(delta)
    th1 = 2*np.pi*torch.rand(size=shape)
    th2 = (th1 - delta) % (2*np.pi)
    th = torch.stack([th1, th2], axis=-1)
    logw = -action(th) + S0
    logw -= torch.logsumexp(logw, dim=0)
    weight = np.exp(grab(logw))
    # resample
    inds = sample_inds(weight)
    return th[inds]

**EXERCISE:** Implement a more principled sampling function, like MCMC, rejection sampling, or inverse CDF sampling.

Set up some utilities to plot distributions of samples or analytic action over the two-dimensional plane of angles.

In [None]:
def make_th_grid(steps):
    th = torch.linspace(-np.pi, np.pi, steps=steps)
    th = (th[1:]+th[:-1])/2
    th = torch.stack(torch.meshgrid([th, th], indexing='ij'), axis=-1)
    return th
def plot_dist(action, *, ax, nsteps=60):
    th = make_th_grid(nsteps)
    S = action(th)
    th = grab(th)
    ax.contourf(th[...,0], th[...,1], np.exp(-grab(S)))
def plot_samples(th, *, ax, nbins=60):
    bins = np.linspace(-np.pi, np.pi, num=nbins+1)
    th = wrap(grab(th))
    ax.hist2d(th[...,0], th[...,1], bins=bins)

In [None]:
samples = sample(100000, action=target_action, beta0=0.5)
fig, axes = plt.subplots(1,2, figsize=(6,3))
plot_dist(target_action, ax=axes[0])
plot_samples(samples, ax=axes[1])
for ax in axes:
    ax.set_aspect(1.0)
plt.show()

# Action coefficients

For this simple theory, we can expand any action (our target, or intermediate learned actions) in a Fourier basis:
$$
\tilde{S}(k_1, k_2) \sim \int_0^{2\pi} \frac{d\theta_1}{2\pi} \frac{d\theta_2}{2\pi} e^{-i k \cdot \theta} S(\theta).
$$
We can think of these coefficients as some kind of Wilson coefficients in a systematic expansion. It will provide a way to see how we move through the (infinite-dimensional) space of distributions.

In [None]:
def measure_coeffs_grid(S):
    """extract Wilson-like coeffs using the Fourier transform"""
    Sk = np.fft.ifft2(S)
    c = Sk[0,0]
    a1 = Sk[0,1] + Sk[0,-1]
    a2 = Sk[1,0] + Sk[-1,0]
    b1 = Sk[1,1] + Sk[-1,-1]
    b2 = Sk[1,-1] + Sk[-1,1]
    return dict(c=c, a1=a1, a2=a2, b1=b1, b2=b2)

In [None]:
def measure_coeffs(action):
    th = make_th_grid(200)
    S = grab(action(th))
    return measure_coeffs_grid(S)
# for example, the coefficients of our target action extract the
# alpha, -alpha, and beta terms
measure_coeffs(target_action)

# Annealing / trivializing flow
Let's first look at the path through the space of distributions described by annealing / the trivializing flow. This is just linear interpolation in the parameters:

In [None]:
ts = np.linspace(0, 1, num=51)
actions = [
    make_action(t*alpha_target, t*beta_target)
    for t in ts
]
coeffs = [measure_coeffs(S) for S in actions]

In [None]:
def plot_coeffs(ts, coeffs, x='a1', y='b2', *, ax, cmap, marker='.', label=None):
    pts = np.stack([(coeff[x], coeff[y]) for coeff in coeffs], axis=1)
    cmap = plt.get_cmap(cmap)
    ax.scatter(*pts, marker=marker, s=3, color=cmap(ts), label=label)

Unsurprisingly, the extracted Wilson coefficients are linearly interpolated.

In [None]:
fig, ax = plt.subplots(1,1, figsize=(3, 3), tight_layout=True)
plot_coeffs(ts, coeffs, ax=ax, cmap='Reds_r', marker='o', label='Triv flow')
ax.set_xlabel(r'$\alpha$')
ax.set_ylabel(r'$\beta$')
ax.legend()
plt.show()

# Diffusion

The diffusion path requires implementing the **Langevin SDE**. We can use a simple Euler-Maruyama integrator, starting from samples from the target distribution to simulate the forward process.

In [None]:
def forward(th, *, g=1.5, nsteps=1000, save_freq=10):
    dt = 1/nsteps
    ts = [0.0]
    samples = [wrap(th.clone())]
    for i in tqdm.tqdm(range(nsteps)):
        t = (i+1)*dt
        dW = np.sqrt(2*dt*g**2)*torch.randn_like(th)
        th += dW
        if (i+1) % save_freq == 0:
            samples.append(wrap(th.clone()))
            ts.append(t)
    return dict(samples=samples, ts=ts)

In [None]:
diff_res = forward(samples.clone())
diff_samples = diff_res['samples']
diff_ts = diff_res['ts']
bins = np.linspace(-np.pi, np.pi, num=11)
diff_coeffs = [measure_coeffs_grid(
    -np.log(np.histogram2d(th[...,0], th[...,1], bins=bins, density=True)[0])
) for th in diff_samples]
print(f'{len(diff_ts)=}')

As the forward process proceeds, **noise is added** until we converge towards the **uniform distribution**.

In [None]:
fig, axes = plt.subplots(1, 4, figsize=(8, 3), tight_layout=True)
inds = [0, 25, 50, 100]
for ind, ax in zip(inds, axes):
    t = diff_ts[ind]
    plot_samples(diff_samples[ind], ax=ax)
    ax.set_title(rf'$t = {t}$')
    ax.set_aspect(1.0)
plt.show()

Compared to the annealing path, diffusion takes a **non-linear path in the space of couplings**. It terminates at (or close to) the uniform distribution with zero couplings.

In [None]:
fig, ax = plt.subplots(1,1, figsize=(3, 3), tight_layout=True)
plot_coeffs(ts, coeffs, ax=ax, cmap='Reds_r', marker='o', label='Triv flow')
plot_coeffs(diff_ts, diff_coeffs, ax=ax, cmap='Blues_r', marker='s', label='Diffusion')
ax.set_xlabel(r'$\alpha$')
ax.set_ylabel(r'$\beta$')
ax.legend()
plt.show()

# Normalizing flow

Finally, we implement a simple **hard-coded flow** (no machine learning yet!). To evaluate the flow, we just use a simple Euler integrator. The coefficients of the flow are arbitrarily tuned to approximately reproduce the target distribution.

In [None]:
def flow(th, nsteps=1000, save_freq=10):
    dt = 1/nsteps
    def velocity(th, t):
        th1, th2 = th[...,0], th[...,1]
        return (
            5*t*(1-t) * torch.stack([
                -torch.sin(th1 - th2), -torch.sin(th2 - th1)], axis=-1)
            + t**2 * torch.stack([-torch.sin(th1), torch.sin(th2)], axis=-1)
        )
    samples = [wrap(th.clone())]
    ts = [0.0]
    for i in tqdm.tqdm(range(nsteps)):
        t = (i+1)*dt
        v = velocity(th, t)
        th += dt * v
        if (i+1) % save_freq == 0:
            samples.append(wrap(th.clone()))
            ts.append(t)
    return dict(samples=samples, ts=ts)

In [None]:
prior_th = 2*np.pi*torch.rand(size=(100000, 2))
flow_res = flow(prior_th)
flow_ts = flow_res['ts']
flow_samples = flow_res['samples']
bins = np.linspace(-np.pi, np.pi, num=11)
flow_coeffs = [measure_coeffs_grid(
    -np.log(np.histogram2d(th[...,0], th[...,1], bins=bins, density=True)[0])
) for th in flow_samples]

The samples converge towards something similar to our target distribution, as shown in the histograms of the density below.

In [None]:
fig, axes = plt.subplots(1, 4, figsize=(8, 3), tight_layout=True)
inds = [0, 25, 50, 100]
for ind, ax in zip(inds, axes):
    t = flow_ts[ind]
    plot_samples(flow_samples[ind], ax=ax)
    ax.set_title(rf'$t = {t}$')
    ax.set_aspect(1.0)
plt.show()

**EXERCISE:** Compute the probability density of the flow by integrating the divergence of the flow field. Compare this against the sample density above.

In [None]:
fig, ax = plt.subplots(1,1, figsize=(3, 3), tight_layout=True)
plot_coeffs(ts, coeffs, ax=ax, cmap='Reds_r', marker='o', label='Triv flow')
plot_coeffs(diff_ts, diff_coeffs, ax=ax, cmap='Blues_r', marker='s', label='Diffusion')
plot_coeffs(flow_ts, flow_coeffs, ax=ax, cmap='Greens_r', marker='^', label='Flow')
ax.set_xlabel(r'$\alpha$')
ax.set_ylabel(r'$\beta$')
ax.legend()
plt.show()