<a href="https://colab.research.google.com/github/c-mulliken/csci2952n/blob/main/mini_project_normalizing_flows.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Stencil code for Mini Project Normalizing Flows. **Please make a copy before modifying the code.**

Created by Calvin Luo (calvin_luo@brown.edu)

## Imports and Set Up Code

In [None]:
# Plotting, qol
import imageio
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from IPython.display import Image

# ML stuff
import tensorflow.compat.v2 as tf
tf.enable_v2_behavior()
import tensorflow_probability as tfp
import tensorflow.keras.layers as tfkl
from tensorflow.keras.utils import Progbar

# Target distribution loader
from sklearn.datasets import make_moons

In [None]:
# Define a source distribution - in this case, a simple 2D Gaussian will suffice.
source_dist = tfp.distributions.MultivariateNormalDiag(loc=(0,0))

# Define a noise value for the moons distribution (how much deviation from the two semi-circle lines)
moon_noise = 0.1

In [None]:
# Helper function to generate visualization GIFs.
def generate_gif(frames, fname='temp.gif', time_per_frame=500, final_frame_time=2000):
  # Pause the GIF at the end by extending the duration of the last frame
  duration = [time_per_frame] * (len(rendered)-1) + [final_frame_time]
  imageio.mimwrite(fname, rendered, loop=0, duration=duration)

## Step 1: Normalizing Flow Models

There's nothing to edit in the first block, but give it a read and run it!
`Transform` is just a base class for the actual networks to inherit from.

In the second block, we will begin our implementation of an Affine Transformation Block.  Down the road, we will stack multiple of these for expressibility, creating a full normalizing flow implementation.

The third block has implemented the Affine Flow for you, which stacks multiple `AffineTransformBlocks`, and computes the forward and inverse operations along with dimension shifting for expressibility.

**Task 1.1** Implement the transformation functions of `AffineTransformBlock`.

Fill in the `transform` function and `inverse_transform` appropriately.  They should implement an affine transformation and its inverse (where you can choose which is which).  As a reminder, an affine transform will use provided parameters to scale input $x$ and shift it.  Make sure to design the affine transformation in a way that guarantees invertibility.

**Task 1.2** Implement an Affine Transform Block, in [Masked Autoregressive Flow (MAF)](https://arxiv.org/pdf/1705.07057) style where conditioning is performed on data samples for faster training.

The conditioner is already provided as a LSTM that produces 2-dimensional outputs, which can be interpreted as your affine transformation parameters.

Fill in the `forward` function, which maps Gaussian samples to data samples.  As per MAF, we will need to compute $x_i$ before utilizing it as subsequent conditioning; sampling is therefore autoregressive and slow.  We therefore must iteratively step through our LSTM calls, manipulating the outputs accordingly before passing it back in as input.

Fill in the `inverse` function, which maps data samples to Gaussian samples.  As per MAF, we have access to the entire $x_i$ input needed to calculate the transformation parameters.  Technically this is still autoregressive (due to the conditioner being an LSTM), but we do not need to manipulate the outputs of the conditioner.  Do not forget to compute an accurate log of the determinant of the jacobian term!  Whereas the forward sampling pass doesn't need this, the inverse call does in order to facilitate maximum likelihood training.

In [None]:
#@title Base Class

class Transform:
  def __init__(self):
    self._conditioner = None

  def transform(self, z, params):
    pass

  def inverse_transform(self, x, params):
    pass

  def forward(self, x):
    pass

  def inverse(self, x):
    pass

### Autoregressive Affine Flow

In [None]:
'''
AUTOREGRESSIVE FLOW SPECIFICATION
  Transformer: Affine
  Conditioner: LSTM
'''

class AffineTransformBlock(Transform):
  def __init__(self):
    # Conditioner only outputs 2 values, the weight and bias.
    self._conditioner = tfkl.LSTM(2, return_sequences=True, return_state=True)

  def transform(self, z, w, b):
    # TODO (Step 1.1): implement an affine transformation
    # Hint: exponentiate the weight to guarantee invertibility
    pass

  def inverse_transform(self, x, w, b):
    # TODO (Step 1.1): implement inverse of the above affine transformation
    # Hint: exponentiate the weight to guarantee invertibility
    pass

  def forward(self, z):
    # Input z should be shape [b, dim]
    # As a reminder, tfkl.LSTM takes in inputs of dimension 3: (batch, timesteps, feature)
    # Return x should be shape [b, dim], representing the predicted transformed sample.
    b, dim = z.shape
    dtype = z.dtype

    # Initial conditioning information is set to default as a constant (e.g. 1).
    # You can modify this if you like, along with its initialization shape.
    ar_input = tf.ones((b), dtype=dtype)
    prev, state = self._conditioner.cell.get_initial_state(batch_size=b)

    # TODO (Step 1.2): implement forward function
    # As per MAF, write a loop that iteratively calculates the transformed x_i.
    # Hint: look at the call function: https://github.com/keras-team/keras/blob/v3.3.3/keras/src/layers/rnn/lstm.py#L559
    # to get a clue as to how to manually perform calls of your LSTM.
    x = None
    return x

  def inverse(self, x):
    # Input x should be shape [b, dim]
    # As a reminder, tfkl.LSTM takes in inputs of dimension 3: (batch, timesteps, feature)
    # Return z should be shape [b, dim], representing the predicted inverted sample.
    b, dim = x.shape
    dtype = x.dtype

    # Initial conditioning information is set to default as a constant (e.g. 1).
    # The rest should be the corresponding x dimensions, as per MAF.
    ar_input = tf.concat([tf.ones((b, 1, 1), dtype=dtype), x[:,:-1,None]], axis=1)

    # TODO (Step 1.2): implement inverse function
    # As per MAF, compute affine parameters by autoregressively conditioning on inputs x.
    # During inversion, we need to compute the log jacobian determinant, for subsequent use in likelihood estimation.
    # Hint: do we need a for loop?  And can we compute the transformation all at once?
    z = None
    log_jacob_det = None
    return z, log_jacob_det

In [None]:
class AffineFlow:
  '''
  Implements AffineFlow with arbitrary number of blocks,
  and naively rolling the dimensions by one each step.
  '''
  def __init__(self, num_blocks):
    self.blocks = [AffineTransformBlock() for _ in range(num_blocks)]

  def forward(self, z):
    for i in range(len(self.blocks)):
      z = self.blocks[i].forward(tf.roll(z, 1, -1))
    x = tf.roll(z, -i-1, -1)
    return x

  def inverse(self, x):
    accumulated_ljd = 0
    x = tf.roll(x, len(self.blocks)+1, -1)
    for i in range(len(self.blocks)):
      x, log_jacob_det = self.blocks[-i-1].inverse(tf.roll(x, -1, -1))
      accumulated_ljd += log_jacob_det
    z = tf.roll(x, -1, -1)
    return z, accumulated_ljd

  def get_weights(self):
    return sum([block._conditioner.trainable_weights for block in self.blocks], [])

### Sanity Checks!

We need to test two main things:


1.   The `forward` and `inverse` functions are exact inverses of each other.
2.   That the Jacobian Determinant computation is equivalent to the one computed via autodifferentiation.

Applying the forward and inverse flows in sequence to a sample should implement the identity function.  The same argument holds for applying the inverse and forward flows in sequence.  We use this fact to verify the correctness of our implementations, for a random sample.

The TensorFlow GradientTape can perform a batch Jacobian computation, which we can follow with a determinant calculation (though it is the expensive $O(D^3)$ version to handle general Jacobians).  We can compare this against the tractable Jacobian determinant computation in our implementation that leverages the autoregressive modeling design.

The maximum difference for all these checks should be sufficiently small (e.g. < 1e-5)

**[NOTE] Please do not proceed with the project until these sanity checks are satisfied.**

In [None]:
# Randomly initiate the parameters of an arbitrary Affine Autoregressive Flow for robustness.
batch_size = 500
dim = np.random.randint(10, 20)
num_blocks = np.random.randint(5, 10)

# Define our testing flow and random sample.
test_flow = AffineFlow(num_blocks)
random_sample = tf.random.normal((batch_size, dim))

# Verify the flow's forward and inverse functions act as inverses of each other.
forward_base = test_flow.forward(random_sample)
inverted_base, _ = test_flow.inverse(forward_base)
print(f'Max Diff Between Inverse of Forward Sample and Sample: {np.max(np.abs(inverted_base - random_sample))}')
invert_target, _ = test_flow.inverse(random_sample)
forward_target = test_flow.forward(invert_target)
print(f'Max Diff Between Forward of Inverse Sample and Sample: {np.max(np.abs(forward_target - random_sample))}')

# Verify that the jacobian is correctly computed.
# We compare against a ground-truth computation, which is not as tractable as our direct computation.
with tf.GradientTape() as tape:
  tape.watch(random_sample)
  inverted, log_det_jac = test_flow.inverse(random_sample)
jacobian = tape.batch_jacobian(inverted, random_sample)
gt_log_det_jac = tf.math.log(tf.abs(tf.linalg.det(jacobian)))
print(f'Max Diff Between Computed Jacobian Determinant and Ground Truth Jacobian Determinant: {np.max(np.abs(gt_log_det_jac - log_det_jac))}')

## Step 2: Training Normalizing Flows

**Task 2.1** Implement the missing loss computation below, to perform training.  Recall that with normalizing flows, we can compute likelihoods for the data sample under the current parameterization; we therefore can train our model using maximum likelihood directly!

In [None]:
# Set seed for reproducibility
tf.random.set_seed(1337)

# Training hyperparameters; 20 blocks with 200 steps should solve the Moon_Target<=>Normal_Base problem.
# 10 blocks is a decent approximation.
# Please modify these parameters as you please.
num_blocks = 20
batch_size = 500
iterations = 200
lr = 0.05

flow = AffineFlow(num_blocks)
opt = tf.keras.optimizers.Adam(learning_rate=lr)

# Training Loop
pb = Progbar(iterations, stateful_metrics=['loss'])
for i in range(iterations):
  moon, _ = make_moons(batch_size, noise=moon_noise)

  # TODO (Step 2.1): Please implement the loss computation, as maximum likelihood averaged over the batch.
  # [NOTE:] you must not use the tape.batch_jacobian function!
  # [NOTE:] determine how to appropriately utilize log_jacobian_det term - it can vary depending on your implementation choices above!
  with tf.GradientTape() as tape:
    loss = None
  grads = tape.gradient(loss, flow.get_weights())
  opt.apply_gradients(zip(grads, flow.get_weights()))
  pb.add(1, values=[('loss', loss)])

## Step 3: Visualizing Normalizing Flows

The rest of this notebook is pure visualization of your learned normalizing flow model - there is nothing more to code up!  The below blocks behave as visual sanity checks - if you have implemented everything above correctly, then you should see some beautiful flow interpolations between source and target distributions!

Run through these code blocks and marvel at the flows you've learned!

In [None]:
#@title Forward Flow Path Visualization
# Forward flowing of samples from the source distribution, with intermediate caching.
sample = source_dist.sample(500)
samples = [sample]
for i in range(len(flow.blocks)):
  sample = flow.blocks[i].forward(tf.roll(sample, 1, -1))
  samples.append(tf.roll(sample, -i-1, -1))

# Plotting Logic
rendered = []
for i in range(len(samples)):
  fig, axes = plt.subplots(1, 2, sharex=True, sharey=True)
  fig.set_size_inches(8, 4)
  fig.tight_layout(pad=2)

  ax = plt.subplot(1, 2, 1)
  ax.set_title(f'Forward Flow Distribution, Step {i}')
  sns.kdeplot(x=samples[i][:,0], y=samples[i][:,1], fill=True, cmap='PuBu')

  ax = plt.subplot(1, 2, 2)
  ax.set_title(f'Forward Flow Scatter, Step {i}')
  ax.scatter(samples[i][:, 0], samples[i][:, 1])

  fig.canvas.draw()
  rendered.append(np.array(fig.canvas.renderer._renderer))
  plt.close()

fname = 'forward_flow.gif'
generate_gif(rendered, fname)
Image(fname)

In [None]:
#@title Inverse Flow Path Visualization
# Inverse flowing of samples from the target distribution, with intermediate caching.
sample = tf.cast(moon, tf.float32)
samples = [sample]
sample = tf.roll(sample, len(flow.blocks)+1, -1)
for i in range(len(flow.blocks)):
  sample, _ = flow.blocks[-i-1].inverse(tf.roll(sample, -1, -1))
  samples.append(tf.roll(sample, len(flow.blocks) - i, -1))

# Plotting Logic
rendered = []
for i in range(len(samples)):
  fig, axes = plt.subplots(1, 2, sharex=True, sharey=True)
  fig.set_size_inches(8, 4)
  fig.tight_layout(pad=2)

  ax = plt.subplot(1, 2, 1)
  ax.set_title(f'Inverse Flow Distribution, Step {i}')
  sns.kdeplot(x=samples[i][:,0], y=samples[i][:,1], fill=True, cmap='PuBu')

  ax = plt.subplot(1, 2, 2)
  ax.set_title(f'Inverse Flow Scatter, Step {i}')
  ax.scatter(samples[i][:, 0], samples[i][:, 1])

  fig.canvas.draw()
  rendered.append(np.array(fig.canvas.renderer._renderer))
  plt.close()

fname = 'inverse_flow.gif'
generate_gif(rendered, fname)
Image(fname)

In [None]:
#@title Forward Flow Comparison Against Ground Truth Target Samples
# Visualize a forward pass from the trained model
sample = source_dist.sample(500) # Normal samples
forwarded = flow.forward(sample) # Forwarded samples
moon, _ = make_moons(batch_size, noise=moon_noise) # True samples

fig = plt.figure(figsize=(16, 8), frameon=False)

ax = fig.add_subplot(2, 3, 1)
ax.set_title('Source Samples')
ax.scatter(sample[:,0], sample[:,1])

ax = fig.add_subplot(2, 3, 2)
ax.set_title('Source Samples Forward Flowed')
ax.scatter(forwarded[:,0], forwarded[:,1])

ax = fig.add_subplot(2, 3, 3)
ax.set_title('Target Samples')
ax.scatter(moon[:,0], moon[:,1])

ax = fig.add_subplot(2, 3, 4)
sns.kdeplot(x=sample[:,0], y=sample[:,1], fill=True, cmap='PuBu')

ax = fig.add_subplot(2, 3, 5)
sns.kdeplot(x=forwarded[:,0], y=forwarded[:,1], fill=True, cmap='PuBu')

ax = fig.add_subplot(2, 3, 6)
sns.kdeplot(x=moon[:,0], y=moon[:,1], fill=True, cmap='PuBu')

fig.tight_layout(pad=1)
fig.canvas.draw()

In [None]:
#@title Inverse Flow Comparison Against Ground Truth Source Samples
# Visualize a forward pass from the trained model
sample = source_dist.sample(500) # Normal samples
moon, _ = make_moons(batch_size, noise=moon_noise) # True samples
inverted, _ = flow.inverse(tf.cast(moon, tf.float32)) # Inverted samples

fig = plt.figure(figsize=(16, 8), frameon=False)

ax = fig.add_subplot(2, 3, 1)
ax.set_title('Target Samples')
ax.scatter(moon[:,0], moon[:,1])

ax = fig.add_subplot(2, 3, 2)
ax.set_title('Target Samples Inverse Flowed')
ax.scatter(inverted[:,0], inverted[:,1])

ax = fig.add_subplot(2, 3, 3)
ax.set_title('Source Samples')
ax.scatter(sample[:,0], sample[:,1])

ax = fig.add_subplot(2, 3, 4)
sns.kdeplot(x=moon[:,0], y=moon[:,1], fill=True, cmap='PuBu')

ax = fig.add_subplot(2, 3, 5)
sns.kdeplot(x=inverted[:,0], y=inverted[:,1], fill=True, cmap='PuBu')

ax = fig.add_subplot(2, 3, 6)
sns.kdeplot(x=sample[:,0], y=sample[:,1], fill=True, cmap='PuBu')

fig.tight_layout(pad=1)
fig.canvas.draw()