In [None]:
#package imports

import torch
import torch.nn as nn
from torch import Tensor
from torch.distributions import Normal
import scipy.optimize
import matplotlib.pyplot as plt
import numpy as np
from typing import Tuple
from typing import Callable
import imageio
from pathlib import Path
import timeit as time

In [None]:
#the planar flow class chains the planar flow layers using nn.Sequential

class PlanarFlow(nn.Module):
    def __init__(self, K: int = 6): #K is the number of planar flow layers chained together
        super().__init__()
        
        #chain the planar transforms together in a list
        self.layers = [PlanarTransform() for _ in range(K)]
        
        #create the model from the list
        self.model = nn.Sequential(*self.layers)
        
    def forward(self, z: Tensor) -> Tuple[Tensor, float]:
        
        #set the log of the Jacobian to zero
        log_det_J = 0
        
        for layer in self.layers:
            
            #sum the log Jacobian of each layer
            log_det_J += layer.log_det_J(z)
            
            #calculate the new points from the planar flow
            z = layer(z)
            
        return z, log_det_J

In [None]:
#the planar transform class contains the functions for the planar flow

class PlanarTransform(nn.Module):
    
    def __init__(self):
        super().__init__()
        
        #randomly assign starting values to the planar flow parameters from a normal distribution
        self.u = nn.Parameter(torch.randn(1).normal_(0, 0.1))
        self.w = nn.Parameter(torch.randn(1).normal_(0, 0.1))
        self.b = nn.Parameter(torch.randn(1).normal_(0, 0.1))
        
    def forward(self, z: Tensor) -> Tensor:
        
        #check the invertibility condition
        if self.u*self.w < -1:
            
            #update u to ensure invertibility
            self.get_u_hat()
        
        #return the planar flow layer function
        return z + self.u*nn.Tanh()(z*self.w + self.b)
    
    def log_det_J(self, z: Tensor) -> Tensor:
        
        #check the invertibility condition
        if self.u*self.w < -1:
            self.get_u_hat()
            
        #calculate the log of the Jacobian
        a = z*self.w + self.b
        psi = (1 - nn.Tanh()(a)**2)*self.w
        abs_det = (1 + self.u*psi).abs()
        log_det = torch.log(1e-10 + abs_det)
        
        return log_det
    
    def get_u_hat(self) -> None:
        
        #invertibility condition
        wtu = self.u*self.w
        m_wtu = -1 + torch.log(1 + torch.exp(wtu))
        self.u.data = (self.u + (m_wtu - wtu)*self.w/self.w**2)

In [None]:
#the target distribution class holds the definitions for the 1D target distributions used in the examples

class TargetDistribution:
    
    def __init__(self, name: str, t: int, a: int = 1): #t is the annealing value, a is the mean for symmetric/assymetric GMM
        
        #get the name of the target distribution to be used
        self.func = self.get_target_distribution(name, t, a)
        
    def __call__(self, z: Tensor) -> Tensor:
        
        #return the target function
        return self.func(z)
    
    @staticmethod
    def get_target_distribution(name: str, t: int, a) -> Callable[[Tensor], Tensor]:
        
        if name == "MotivatingBimodal":
            
            def MotivatingBimodal(z):
                f = 0.953973*torch.exp(-t*((z + 2)**2 - 3)**2)
                return f
            
            return MotivatingBimodal
        
        if name == "SymmetricBimodal":
            def SymmetricBimodal(z):
                f = 0.5*1/np.sqrt(np.pi/8)*torch.exp(-8*t*(z + a)**2) + 0.5*1/np.sqrt(np.pi/8)*torch.exp(-8*t*(z - a)**2)
                return f
                
            return SymmetricBimodal
        
        if name == "AsymmetricBimodal":
            def AsymmetricBimodal(z):
                f = 0.5*1/np.sqrt(np.pi/8)*torch.exp(-8*t*(z + a)**2) + 0.5*1/np.sqrt(np.pi/8)*torch.exp(-8*t*(z)**2)
                return f
                
            return AsymmetricBimodal

In [None]:
#free energy loss function used

class VariationalLoss(nn.Module):
    
    def __init__(self, distribution: TargetDistribution, mean: int, std: int):
        super().__init__()
        
        #the target distribution
        self.distr = distribution
        
        #the starting distribution
        self.base_distr = Normal(mean, std)
        
    def forward(self, z0: Tensor, z: Tensor, sum_log_det_J: float) -> float:
        
        #calculate the log of the starting distribution at initial points z0
        base_log_prob = self.base_distr.log_prob(z0)
        
        #calculate the log of the target distribution at final points z
        target_density_log_prob = torch.log(self.distr(z) + 1e-10)
        
        #calculate the free energy
        return (base_log_prob - target_density_log_prob - sum_log_det_J).mean()

In [None]:
#function definitions required for plotting purposes

#the starting function
def StartingFunction(x, mean, std):
    return 1/np.sqrt(2*(std**2)*np.pi)*np.exp(-(x+mean)**2/(2*std**2))

#the jacobian calculation using tanh
def Jacobian(x, P):
    return np.abs(1 + P[0]*(1-np.tanh(P[1]*x + P[2])**2)*P[1])

#compute the inverse
def H(x, P):
    return x + P[0]*np.tanh(P[1]*x + P[2])

def computeInverse(z, P):
    hInverse = (z - P[0]*P[2])/(1 + P[0]*P[1])
    
    for j in range(len(hInverse)):
        
        def optFun(x):
            return H(x, P) - z[j]
        
        hInverse[j] = scipy.optimize.fsolve(optFun, hInverse[j])
        
    return hInverse

In [None]:
#the definitions for plotting

plt.rc('font', family='Arial') 
plt.rc('xtick', labelsize='x-small') 
plt.rc('ytick', labelsize='x-small')
    
def plot_density(density, xlim=5):
        
    #plotting points
    x = np.linspace(-xlim, xlim, 1000)
    
    #calculate the density function
    y = density(torch.tensor(x))
        
    #plot
    fig = plt.figure(figsize=(6, 4), dpi=300)
    plt.plot(x, y, 'darkmagenta')
    plt.title('Target PDF')
    #plt.savefig(f'Target PDF.png', bbox_inches='tight')
    plt.show()
    
def compute_opt(mean, std, xlim):
    
    #put the parameters into a vector
    P = []
    for param in model.parameters():
        p = param.detach().numpy().tolist()
        p = np.reshape(p, -1)
        P = np.concatenate([P, p])
            
    #points
    x = np.linspace(-xlim, xlim, 1000)
    optimized_pdf = np.ones(len(x))
    
    xy = x
    i = 3*(flow_length - 1)
    for j in range(flow_length):
        xy = computeInverse(xy, P[i:i+3])
        optimized_pdf = optimized_pdf/Jacobian(xy, P[i:i+3])
        i-=3
            
    optimized_pdf = optimized_pdf*StartingFunction(xy, mean, std)
    
    return x, optimized_pdf
    
def plot_optimized(mean, std, xlim=5):
    
    #compute the optimized density function
    x, optimized_pdf = compute_opt(mean, std, xlim)
    
    #plot
    fig = plt.figure(figsize=(6, 4), dpi=300)
    plt.plot(x, optimized_pdf, 'darkmagenta')
    plt.title('Optimized PDF')
    #plt.savefig(f'Optimized PDF.png', bbox_inches='tight')
    plt.show()
    
    
def plot_comparison(mean, std, xlim=5):
    
    #compute the optimized density function
    x, opt = compute_opt(mean, std, xlim)
    
    #calculate the starting density function
    start = StartingFunction(x, mean, std)
    
    #calculate the true density function
    density_target = TargetDistribution(target_distr, t=1, a=a)
    true = density_target(torch.tensor(x))
    
    #plot
    fig = plt.figure(figsize=(6, 4), dpi=300)
    plt.plot(x, start, 'gold', lw=1, label='Starting')
    plt.plot(x, true, 'tomato', lw=1, label='Target')
    plt.plot(x, opt, 'darkmagenta', lw=1, ls='--', label='Optimized')
    plt.legend()
    plt.title('Comparison of PDFs')
    #plt.savefig(f'Comparison PDFs.png', bbox_inches='tight')
    plt.show()
    
def plot_tvals(tvals):
    
    #plot
    fig = plt.figure(figsize=(6, 4), dpi=300)
    plt.plot(np.linspace(1,len(tvals), len(tvals)), tvals, 'darkmagenta', lw=1)
    plt.title('Annealing Schedule')
    #plt.savefig(f'Annealing Schedule.png', bbox_inches='tight')
    plt.show()

In [None]:
#AdaAnn scheduler set-up

#list to collect t values
tvals = []

#choose the starting distribution parameters
mean_starting = 0
std_starting = 2
    
#choose the target distribution and set a (for symmetric/asymmetric)
target_distr = 'MotivatingBimodal'
#target_distr = 'SymmetricBimodal'
#target_distr = 'AsymmetricBimodal'

a = 1                  #a needs to be defined for MotivatingBimodal but is not used

#plot target density
density_target = TargetDistribution(target_distr, t=1, a=a)
plot_density(density_target, 5)

#set the parameters
flow_length = 50       #number of planar flow layers K
lr = 0.005             #learning rate for the optimizer
t0 = 0.01              #starting t value
tol = 0.01             #KL divergence tolerance for AdaAnn
M = 1000               #number of sample points to compute step size
dt = 0

#set the number of samples in each batch
N = 100                #at t0 and each annealing step
N_1 = 1000             #at t = 1

#set the number of iterations
T_0 = 500              #at t0
T = 2                  #at each annealing step
T_1 = 8000             #at t = 1
  
#create the model and optimizer using Adam
model = PlanarFlow(K=flow_length)
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

#start time
start = time.default_timer()

#optimization using AdaAnn
t = t0
while t < 1:
    
    #new t value
    t = min(1, t + dt)
    tvals = np.concatenate([tvals, np.array([t])])
    
    #number of iterations and batch size at each annealing step
    num_iter = T
    batch_size = N
    
    #update parameters at t0
    if t == t0:
        num_iter = T_0
        
    #update parameters at t = 1 and include a learning rate scheduler
    if t == 1:
        num_iter = T_1
        batch_size = N_1
        scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1000, gamma=0.5)
            
    #update the target density and loss function with current t value
    density = TargetDistribution(target_distr, t, a=a)
    bound = VariationalLoss(density, mean=mean_starting, std=std_starting)
        
    #train the model  
    for iter_num in range(1, num_iter + 1):
            
        #get the batches from starting distribution
        batch = torch.zeros(batch_size).normal_(mean=mean_starting, std=std_starting)

        #pass the batch through the planar flow model
        zk, log_jacobians = model(batch)

        #compute the loss
        loss = bound(batch, zk, log_jacobians)

        #train the model
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
            
        #apply a learning rate scheduler when t = 1
        if t == 1:
            scheduler.step()
            
    #compute the dt value using M points
    density_dt = TargetDistribution(target_distr, t=1)
    zk, log_jacobians = model(torch.zeros(M).normal_(mean=mean_starting, std=std_starting))
    log_qk = torch.log(density_dt(zk) + 1e-10)
    #log_qk = np.log(0.953973) - ((zk + 2)**2 - 3)**2                #could use exact log calculation for motivating
    dt = tol/torch.sqrt(log_qk.var())
    dt = dt.detach().numpy()

#compute time
end = time.default_timer()
opt_time = end - start
    
#plot approximation and annealing schedule
plot_comparison(mean=mean_starting, std=std_starting, lim=5)
plot_tvals(tvals)

In [None]:
#linear scheduler set-up

#list to collect t values
tvals = []

#choose the starting distribution parameters
mean_starting = 0
std_starting = 2
    
#choose the target distribution and set a (for symmetric/asymmetric)
target_distr = 'MotivatingBimodal'
#target_distr = 'SymmetricBimodal'
#target_distr = 'AsymmetricBimodal'

a = 1                  #a needs to be defined for MotivatingBimodal but is not used

#plot target density
density_target = TargetDistribution(target_distr, t=1, a=a)
plot_density(density_target, 5)

#set the parameters
flow_length = 50       #number of planar flow layers K
lr = 0.005             #learning rate for the optimizer
t0 = 0.01              #starting t value
eps = 1/10000          #set constant step size
dt = 0

#set the number of samples in each batch
N = 100                #at t0 and each annealing step
N_1 = 1000             #at t = 1

#set the number of iterations
T_0 = 500              #at t0
T = 1                  #at each annealing step
T_1 = 8000             #at t = 1
  
#create the model and optimizer using Adam
model = PlanarFlow(K=flow_length)
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

#start time
start = time.default_timer()

#optimization using linear scheduler
t = t0
while t < 1:
    
    #new t value
    t = min(1, t + dt)
    tvals = np.concatenate([tvals, np.array([t])])
    
    #number of iterations and batch size at each annealing step
    num_iter = T
    batch_size = N
    
    #update parameters at t0
    if t == t0:
        num_iter = T_0
        
    #update parameters at t = 1 and include a learning rate scheduler
    if t == 1:
        num_iter = T_1
        batch_size = N_1
        scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1000, gamma=0.5)
            
    #update the target density and loss function with current t value
    density = TargetDistribution(target_distr, t, a=a)
    bound = VariationalLoss(density, mean=mean_starting, std=std_starting)
        
    #train the model  
    for iter_num in range(1, num_iter + 1):
            
        #get the batches from starting distribution
        batch = torch.zeros(batch_size).normal_(mean=mean_starting, std=std_starting)

        #pass the batch through the planar flow model
        zk, log_jacobians = model(batch)

        #compute the loss
        loss = bound(batch, zk, log_jacobians)

        #train the model
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
            
        #apply a learning rate scheduler when t = 1
        if t == 1:
            scheduler.step()
                
    dt = eps

#compute time
end = time.default_timer()
opt_time = end - start
    
#plot approximation and annealing schedule
plot_comparison(mean=mean_starting, std=std_starting, lim=5)
plot_tvals(tvals)

In [None]:
#no scheduler set-up

#choose the starting distribution parameters
mean_starting = 0
std_starting = 2
    
#choose the target distribution and set a (for symmetric/asymmetric)
target_distr = 'MotivatingBimodal'
#target_distr = 'SymmetricBimodal'
#target_distr = 'AsymmetricBimodal'

a = 1                  #a needs to be defined for MotivatingBimodal but is not used

#plot target density
density_target = TargetDistribution(target_distr, t=1, a=a)
plot_density(density_target, 5)

#set the parameters
flow_length = 50       #number of planar flow layers K
lr = 0.005             #learning rate for the optimizer

#set the number of samples in each batch
batch_size = 100

#set the number of iterations
num_iter = 8000
  
#create the model, optimizer using Adam, target density, and loss function
model = PlanarFlow(K=flow_length)
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
density = TargetDistribution(target_distr, t=1, a=a)
bound = VariationalLoss(density, mean=mean_starting, std=std_starting)

#learning rate scheduler
#scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1000, gamma=0.5)

#start time
start = time.default_timer()

#optimization using no annealing scheduler
#train the model  
for iter_num in range(1, num_iter + 1):
            
    #get the batches from starting distribution
    batch = torch.zeros(batch_size).normal_(mean=mean_starting, std=std_starting)

    #pass the batch through the planar flow model
    zk, log_jacobians = model(batch)

    #compute the loss
    loss = bound(batch, zk, log_jacobians)

    #train the model
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
            
    #apply a learning rate scheduler
    #scheduler.step()

#compute time
end = time.default_timer()
opt_time = end - start
    
#plot approximation
plot_comparison(mean=mean_starting, std=std_starting, lim=5)