<font size=7>Normalising Flow Experiments</font>

____

<font size=5>This source code contains all the experiments we did throughout the project. The main important sections are the 1D, 2D, 3D and Image models. 

The 1D Model is located under the 'Experimental 1D Single Flow Model' heading \
The 2D Model is located under the '2D Flow-based Model Pipeline' heading\
The 3D Model is located under the '3D Flow-based Model Pipeline' heading\
The Image Model is located under the 'Image Flow-based Model Pipeline' heading\

Wherever there is "Path", Please input an appropriate path to save the loss history.
<font size=5>

____

<font size=6>Some generative model theory<font size=6>

Change of Variable Theorem

Assume $x = T(z)$, where $T$ is a Transformation

According to probability $\int _{x}{P_x(X)dx} = \int _{z}{P_z(Z)dz}$

Therefore $|P_x(X)dx| = |P_z(Z)dz|$

$\qquad \qquad \quad \ \ P_x(X) = P_z(Z)|\frac{dz}{dx}|$

$\qquad \qquad \quad \ \ P_x(X) = P_z(Z)|\frac{\delta T^{-1}(x)}{\delta x}|$

$\qquad \qquad \quad \ \ P_x(X) = P_z(T^{-1}(x))|detJ_{T^{-1}}(x)|$

$\qquad \qquad \quad \ \ P_x(X) = P_z(Z)|detJ_{T}(Z)|^{-1}$

Likelihood Function for the Target Distribution

In normal flow parlance, $p_Z(z;\psi)$ is called the base distribution and $p_X(x)$ the target distribution. 
If $p_Z$ is a Gaussian, then $\psi = (\boldsymbol{\mu}, \boldsymbol{\Sigma})$

$T$ depends on parameters $\phi$. We can consider the likelihood of the data under the target distribution. Denoting the parameters by $\theta$, the likelihood is given by:

$P(\boldsymbol{X} \mid \theta) = \Pi^N_{i=1} P_x(\boldsymbol{x}_i \mid \theta)$

for $N$ data points $\boldsymbol{x}_i$, collectively denoted by $\boldsymbol{X}$.

For the log likelihood,

$ln P(X|\theta) = \displaystyle \sum^{N}_{i = 1}ln P_x(x_i|\theta)$ 

$\qquad \qquad \ \ = \displaystyle \sum^{N}_{i = 1} ln |detJ_T^{-1}(x_i;\phi)| + lnP_Z(T^{-1}(x_i;\phi);\psi)$

Typically, the base distribution is known and easy to sample from. Here, we take

$P_Z(z) = \mathcal{N}(\mu,\Sigma) = 2 \pi^{-D/2}|{det}(\Sigma)^{-\frac{1}{2}}| \exp \{-\frac{1}{2}(x - \mu)^T \Sigma^{-1}(x - \mu)\}$

Calculating loss by KL divergence and log likelihood, where n real data we can sample from $P^{*}(X)$, $P_{x}(X;\theta)$ is a flow-based model.

$\mathcal{L}(\theta) = \mathcal{D}_{KL}[P^{*}(X)||P_{x}(X;\theta)]$

$\qquad \ =-E_{P^{*}(X)}[\log P_{x}(X;\theta)] + const$

$\qquad \ =-E_{P^{*}(X)}[\log P_{z}(T^{-1}(x;\theta)) + \log|det J_{T^{-1}}(x;\theta)|] + const$

$\qquad \ \approx -\frac{1}{N} \displaystyle \sum^{n}_{i = 1} [\log P_{z}(T^{-1}(x_i;\theta)) + \log|det J_{T^{-1}}(x_i;\theta)|] + const$

____

<font size=6>2D uniform transform</font>

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.utils.data as data
import torchvision

from torch import nn, optim
from numpy.random import default_rng

In [None]:
def draw_2D_graph(data,color="steelblue",size=5):
    fig = plt.figure(figsize = (3*6.4,1*6.4))
    ax = plt.subplot(1,3,1)
    plt.title("Total")
    ax = plt.scatter([i[0] for i in data],[i[1] for i in data],s=size,c=color)
    ax = plt.subplot(1,3,2)
    plt.title("X")
    ax = plt.hist([i[0] for i in data],20,density=True,color=color)
    ax = plt.subplot(1,3,3)
    plt.title("Y")
    ax = plt.hist([i[1] for i in data],20,density=True,color=color)

In [None]:
def compare_graph(data1,data2,title1,title2,color="steelblue",size=7):
    fig = plt.figure(figsize = (2*9.6,1*9.6))
    ax = plt.subplot(1,2,1)
    plt.title(title1)
    ax = plt.scatter(data1.T[0],data1.T[1],s=size,c=color)
    ax = plt.subplot(1,2,2)
    plt.title(title2)
    ax = plt.scatter(data2.T[0],data2.T[1],s=size,c=color)

In [None]:
def compare_3D_graph(data1,data2,title1,title2):
    fig = plt.figure(figsize=(2 * 9.6, 2 * 9.6))

    ax = fig.add_subplot(1, 2, 1, projection='3d')
    ax.scatter3D(data1.T[0], data1.T[1], data1.T[2], c = np.abs(data1.T[0]) + np.abs(data1.T[1]) + np.abs(data1.T[2]), cmap =plt.get_cmap('rainbow_r'))
    plt.title(title1)

    ax = fig.add_subplot(1, 2, 2, projection='3d')
    ax.scatter3D(data2.T[0], data2.T[1], data2.T[2], c = np.abs(data2.T[0]) + np.abs(data2.T[1]) + np.abs(data2.T[2]), cmap =plt.get_cmap('rainbow_r'))
    plt.title(title2)
    
    plt.show()

In [None]:
def compare_image(data1,data2,title1,title2):
    fig = plt.figure(figsize = (2*9.6,1*9.6))
    ax = plt.subplot(1,2,1)
    plt.title(title1)
    plt.imshow(np.transpose(data1.detach().numpy(),(1,2,0)),cmap = 'gray')
    ax = plt.subplot(1,2,2)
    plt.title(title2)
    plt.imshow(np.transpose(data2.detach().numpy(),(1,2,0)),cmap = 'gray')

In [None]:
def loss_graph(path,pos):
    data = torch.load(path)
    history = data["stats"]
    fig = plt.figure(figsize = (3*6.4,1*6.4))
    ax1 = plt.subplot()
    ax1.plot(history[0][:pos], 'r', label='Training Loss')
    ax1.plot(history[1][:pos], 'orange', label='Testing Loss')
    ax1.set_ylabel('loss')
    ax1.set_xlabel('epoch')
    ax1.legend(loc='upper right')
    plt.title('Training and Testing loss')
    plt.show()

In [None]:
def validation(model,test_loader,loss_fn,target_distribution):
    epoch_loss = []
    with torch.no_grad():
        for x in test_loader:
            z, dz_by_dx = model(x)
            loss = loss_fn(target_distribution,z,dz_by_dx)
            epoch_loss.append(loss)
    return np.mean(epoch_loss)

Generate uniform distrbution

In [None]:
rng = default_rng()

In [None]:
x_vals = rng.uniform(-1,1,1000)
y_vals = rng.uniform(-1,1,1000)

Combine two arrays and test

In [None]:
vals = []
for i in range(len(x_vals)):
    vals.append([x_vals[i],y_vals[i]])

draw_2D_graph(vals)

Change the weight to transform the uniform distrbution

In [None]:
W = [[2,1],[1,2]]
trans_vals = np.dot(vals,W)

draw_2D_graph(trans_vals)

____

<font size=6>2D Gaussian Transformation</font>

Generate 2D Gaussian distribution

In [None]:
x_vals = rng.normal(0,1,1000)
y_vals = rng.normal(0,1,1000)

vals = []
for i in range(len(x_vals)):
    vals.append([x_vals[i],y_vals[i]])

draw_2D_graph(vals)

Set transform function, where $s(x)y = 5x^2y$ and $b(x) = 2x$, therefore $z = s(x)y + b(x)$

In [None]:
def transform (data):
    data_trans = []

    for data_point in data:
        data_y = 5 * data_point[0] * data_point[0] * data_point[1] + 2 * data_point[0]
        data_trans.append([data_point[0],data_y])

    return data_trans

Apply function and draw the output

In [None]:
result = transform(vals)

draw_2D_graph(result,"darkcyan")

Set inverse transform function, where original $y = \frac{s(x)-2x}{5x^2}$

In [None]:
def inverse_transform (data):
    data_inverse_trans = []

    for data_point in data:
        data_y = (data_point[1] - 2 * data_point[0]) / (5 * data_point[0] * data_point[0])
        data_inverse_trans.append([data_point[0],data_y])

    return data_inverse_trans

Apply function and draw the output

In [None]:
inverse_result = inverse_transform(result)

draw_2D_graph(inverse_result,"forestgreen")

____

<font size=6>SVD</font>

SVD ($A=UWV^T and\ A^{-1}=VW^{-1}U^T $), $A$ in SVD represent the model parameters

Set an original matrix $A$

In [None]:
m = np.mat("1,2;0,1")
m

Define $A^{-1}$ calculating function

In [None]:
def inverse_weight_matrix(m):
    u,sigma,vt = np.linalg.svd(m,full_matrices=True)

    s_inv = np.zeros([len(m),len(m)])
    for i in range(len(m)):
        s_inv[i][i] = 1/sigma[i]

    return np.dot(vt.T,s_inv).dot(u.T)

Calculate $A^{-1}$

In [None]:
np.around(inverse_weight_matrix(m))

____

<font size=6>Implementation of SVD</font>

Generate the original data

In [None]:
x_vals = rng.normal(0,1,1000)
y_vals = rng.normal(0,1,1000)

vals = []
for i in range(len(x_vals)):
    vals.append([x_vals[i],y_vals[i]])

draw_2D_graph(vals)

Define a funtion by matrix, i.e. $T(z) = 2x+3y$

In [None]:
W = [
    [1,0],
    [2,3]
]

W = np.array(W)

Calculating the transform result

In [None]:
result = np.dot(vals,W.T)

draw_2D_graph(result,"darkcyan")

Inverse the Matrix to calculate original data

In [None]:
W_inv = inverse_weight_matrix(W.T)

draw_2D_graph(inverse_result,"forestgreen")

____

<font size=6>FCL transform pipeline with KL Divergence</font>

Generate the original data (Uniform Distribution) / Convert original data to tensor and flatten it.

In [None]:
x_vals = rng.uniform(-1,1,1000)
y_vals = rng.uniform(-1,1,1000)

uniform = []
for i in range(len(x_vals)):
    uniform.append([x_vals[i],y_vals[i]])

draw_2D_graph(uniform,size=10)

input = torch.tensor(uniform) #input shape is (1000,2)
input = nn.Flatten(0,-1)(input) #input shape is (2000)

Convert target data to tensor and flatten it / Draw the target data.

In [None]:
target = torch.tensor(trans_vals[:1000]) #target shape is (1000,2)
target = nn.Flatten(0,-1)(target) #target shape is (2000)

draw_2D_graph(trans_vals[:1000],size=10)

Define a very simple model with one layer

In [None]:
flow = nn.Sequential(
    nn.Linear(2000,2000),
)

Train the model, apply KL divergence to calculating loss 

In [None]:
loss_fn = nn.KLDivLoss(reduction='sum')
optimizer = optim.Adam(flow.parameters(), lr=0.001)

for epoch in range(50):

    optimizer.zero_grad()

    output = flow(input.float())
    loss = loss_fn(output.softmax(dim=-1).log(), target.float().softmax(dim=-1))
    loss.backward()

    optimizer.step()

    print(f"Iter:{epoch+1}   KLDivLoss:{loss.item(): .3f}")

Input the original data to genterate output from the model / Draw the predict output

In [None]:
output = flow(input.float())

tmp_output = nn.Unflatten(0,(1000,2))(output)
tmp_output = tmp_output.detach().numpy()

draw_2D_graph(tmp_output,size=10)

Get parameters from the model, parameters are in matrix form.

In [None]:
parameter_matrix,bias = flow.parameters()

The output is actually calculated like $Input \cdot ParametersMatrix = Output $ i.e. $[1\times2000]\cdot[2000\times2000]=[1\times2000]$, the parameters matrix is $f$.

In [None]:
np.matmul(input,parameter_matrix.detach().numpy().T) + bias.detach().numpy()

Applying SVD to get inverse parameters matrix, which is the $f^{-1}$.

In [None]:
inverse_matrix = inverse_weight_matrix(parameter_matrix.detach().numpy().T)

Apply output as input into $f^{-1}$ and try to get original data.

In [None]:
tmp_input = np.matmul((output.detach().numpy() - bias.detach().numpy()),inverse_matrix)
predict_origin = torch.tensor(tmp_input).unflatten(0,(1000,2)).numpy()

draw_2D_graph(predict_origin,size=10)

____

<font size=6>Experimental 1D Single Flow Model</font>

Define a function to generate original data

In [None]:
def mixture_gaussian(number_of_points):
    n = number_of_points // 2
    gaussian_1 = np.random.normal(loc=-1,scale=0.25,size=(n,))
    gaussian_2 = np.random.normal(loc=0.5,scale=0.5,size=(number_of_points - n,))
    return np.concatenate([gaussian_1,gaussian_2])

Define a Dataset class

In [None]:
class Dataset(data.Dataset):

    def __init__(self,array):
        super().__init__()
        self.array = array
    
    def __len__(self):
        return len(self.array)

    def __getitem__(self,index):
        return self.array[index]

Define the model by using CDF calculations

In [None]:
class flow_1d(nn.Module):

    def __init__(self,n_components):
        super(flow_1d,self).__init__()
        self.mus = nn.Parameter(torch.randn(n_components),requires_grad=True)
        self.log_sigmas = nn.Parameter(torch.zeros(n_components),requires_grad=True)
        self.weight_logits = nn.Parameter(torch.ones(n_components),requires_grad=True)

    def forward(self,x):
        x = x.view(-1,1)
        weights = self.weight_logits.softmax(dim=0).view(1,-1)
        distribution = torch.distributions.Normal(self.mus,self.log_sigmas.exp())
        z = (distribution.cdf(x) * weights).sum(dim=1)
        dz_by_dx = (distribution.log_prob(x).exp() * weights).sum(dim=1)
        return z,dz_by_dx

Define loss function by negative log-likelihood

In [None]:
def loss_fn(target_distribution,z,dz_by_dx):
    log_likelihood = target_distribution.log_prob(z) + dz_by_dx.log()
    return -log_likelihood.mean()

Define the train function

In [None]:
def train(model,train_loader,test_loader,optimizer,target_distribution):
    epoch = 100
    history = np.zeros((2,epoch))
    model.train()
    for i in range(epoch):
        epoch_loss = []
        for x in train_loader:
            z, dz_by_dx = model(x)
            loss = loss_fn(target_distribution,z,dz_by_dx)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            epoch_loss.append(loss)

        with torch.no_grad():
            train_loss = np.mean(epoch_loss)
            test_loss = validation(model,test_loader,loss_fn,target_distribution)
            print(f"Iter: {i+1} Train Loss: {train_loss: .3f} Test Loss: {test_loss: .3f}")

        history[:,i] = (train_loss,test_loss)

    torch.save({"stats": history}, "Path")

Create dataset and dataloader

In [None]:
n_train, n_test = 2000, 2000
train_data = mixture_gaussian(n_train)
test_data = mixture_gaussian(n_test)

train_loader = data.DataLoader(Dataset(train_data),batch_size=128,shuffle=True)
test_loader = data.DataLoader(Dataset(test_data),batch_size=2000,shuffle=True)

Set target distribution which is Uniform

In [None]:
target_distribution = torch.distributions.Uniform(low=0,high=1)

Initiate the flow model and optimizer

In [None]:
flow = flow_1d(n_components=3)

optimizer = optim.Adam(flow.parameters(),lr=0.01)

Train the model

In [None]:
train(flow,train_loader,test_loader,optimizer,target_distribution)

In [None]:
loss_graph("Path",100)

Draw distribution of train data

In [None]:
plt.figure(figsize = (1*6.4,1*6.4))
_ = plt.hist(train_data,bins=50)

Draw distribution of test data

In [None]:
plt.figure(figsize = (1*6.4,1*6.4))
_ = plt.hist(test_data,bins=50)

Draw distribution of target (Uniform)

In [None]:
plt.figure(figsize = (1*6.4,1*6.4))
_ = plt.hist(target_distribution.sample([2000]).detach().numpy(),bins=50)

Draw generated distribution after training

In [None]:
flow.eval()
z, dz_by_dx = flow(next(iter(test_loader)))

plt.figure(figsize = (1*6.4,1*6.4))
_ = plt.hist(z.detach().numpy(),bins=50)

In [None]:
plt.figure(figsize = (1*6.4,1*6.4))
_ = plt.hist(z.detach().numpy(),bins=50,label='Generated distribution')
_ = plt.hist(target_distribution.sample([2000]).detach().numpy(),bins=50,color='darkorange',alpha=0.8,label='Target distribution')
_ = plt.legend(loc='upper right')

____

<font size=6>Single transform pipeline with log-likelihood 2-D</font>

Define a function to generate original data

In [None]:
def mixture_gaussian_2d(number_of_points):
    n = number_of_points // 2
    gaussian_1 = np.random.multivariate_normal(mean=[-2.,2.],cov=[[2.,-1.],[-1.,2.]],size=(n))
    gaussian_2 = np.random.multivariate_normal(mean=[2.,-2.],cov=[[4.,2.],[2.,4.]],size=(number_of_points - n))
    return np.concatenate([gaussian_1,gaussian_2])

Define a Dataset class

In [None]:
class Dataset(data.Dataset):

    def __init__(self,array):
        super().__init__()
        self.array = array
    
    def __len__(self):
        return len(self.array)

    def __getitem__(self,index):
        return self.array[index]

Define the model by using polynomial funtion $s(x) \times y+b(x)$ where $s(x)$ and $b(x)$ are 3rd polynomial

In [None]:
class flow_2d(nn.Module):

    def __init__(self,n_components):
        super(flow_2d,self).__init__()
        self.n_components = n_components
        self.phi = 0
        self.sweights = nn.Parameter(torch.randn(n_components,dtype=torch.double).view(-1,1),requires_grad=True)
        self.bweights = nn.Parameter(torch.randn(n_components,dtype=torch.double).view(-1,1),requires_grad=True)

    def forward(self,X):
        x = X.T[0].view(-1,1)
        y = X.T[1].view(-1,1)

        self.phi = x.T**0
        for i in range(self.n_components-1):
            self.phi = torch.vstack((self.phi,x.T**(i+1)))
        self.phi = self.phi.T

        s = torch.sigmoid(self.phi@self.sweights.view(-1,1))
        b = torch.sigmoid(self.phi@self.bweights.view(-1,1))
        y_new = s * y + b
        
        z = torch.vstack((x.T,y_new.T)).T

        return z,s

Define loss function by negative log-likelihood

In [None]:
def loss_fn(target_distribution,z,log_dz_by_dx):
    log_likelihood = target_distribution.log_prob(z).view(-1,1) + log_dz_by_dx
    return -log_likelihood.mean()

Define the train function

In [None]:
def train(model,epoch,train_loader,test_loader,optimizer,target_distribution):
    model.train()
    for i in range(epoch):
        epoch_loss = []
        for x in train_loader:
            z, dz_by_dx = model(x)
            loss = loss_fn(target_distribution,z,dz_by_dx)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            epoch_loss.append(loss)

        with torch.no_grad():
            train_loss = np.mean(epoch_loss)
            test_loss = validation(model,test_loader,loss_fn,target_distribution)
            print(f"Iter: {i+1} Train Loss: {train_loss: .3f} Test Loss: {test_loss: .3f}")

Create dataset and dataloader

In [None]:
n_train, n_test = 2000, 2000
train_data = mixture_gaussian_2d(n_train)
test_data = mixture_gaussian_2d(n_test)

train_loader = data.DataLoader(Dataset(train_data),batch_size=256,shuffle=True)
test_loader = data.DataLoader(Dataset(test_data),batch_size=n_test,shuffle=True)

Initiate the flow model and optimizer

In [None]:
flow = flow_2d(3)
optimizer = optim.Adam(flow.parameters(),lr=0.001)

Set the target distribution which is Multivariate Normal

In [None]:
target_distribution = torch.distributions.MultivariateNormal(torch.zeros(2), torch.eye(2))

Compare original distribution and before trained model output

In [None]:
before_train = next(iter(test_loader))
flow.eval()
z,dz_by_dx = flow(before_train)

compare_graph(test_data,z.detach().numpy(),"P(X)","Before Trained T(X)")

In [None]:
train(flow,1000,train_loader,optimizer,target_distribution)

Compare target distribution and generated distribution

In [None]:
after_train = next(iter(test_loader))
flow.eval()
z,dz_by_dx = flow(after_train)

compare_graph(target_distribution.sample([2000]).detach().numpy(),z.detach().numpy(),"gaussian","Trained T(X)")

Compare original distribution and generated distribution

In [None]:
compare_graph(test_data,z.detach().numpy(),"P(X)","Learned P(Z)")

____

<font size=6>2D Flow-based Model Pipeline</font>

Define a funtion to generate original data

In [None]:
def mixture_gaussian_2d(number_of_points):
    n = number_of_points // 3
    gaussian_1 = np.random.multivariate_normal(mean=[-2.,4.],cov=[[4.,-2.],[-2.,4.]],size=(n))
    gaussian_2 = np.random.multivariate_normal(mean=[-1.,0.],cov=[[1.,0.],[0.,4.]],size=(n))
    gaussian_3 = np.random.multivariate_normal(mean=[2.,2.],cov=[[4.,2.],[2.,4.]],size=(number_of_points - 2 * n))
    return np.concatenate([gaussian_1,gaussian_2,gaussian_3])

Define a Dataset class

In [None]:
class Dataset(data.Dataset):

    def __init__(self,array):
        super().__init__()
        self.array = array
    
    def __len__(self):
        return len(self.array)

    def __getitem__(self,index):
        return self.array[index]

Define the model

In [None]:
class MLP(nn.Module):
    def __init__(self, input_size, hidden_size, num_hidden_layers, output_size):
        super(MLP, self).__init__()
        layers = [nn.Linear(input_size, hidden_size, dtype=torch.double)]
        for _ in range(num_hidden_layers - 1):
            layers.append( nn.Linear(hidden_size, hidden_size, dtype=torch.double) )
            layers.append( nn.ReLU() )
        layers.append( nn.Linear(hidden_size, output_size, dtype=torch.double) )
        self.layers = nn.Sequential(*layers)

    def forward(self, x):
        return self.layers(x)


class flow_2d(nn.Module):
    def __init__(self, pos, hidden_size=128, num_hidden_layers=6):
        super(flow_2d, self).__init__()
        self.mlp = MLP(2, hidden_size, num_hidden_layers, 2)
        if pos == 1: 
            self.mask = torch.tensor([1,0],dtype=torch.double) 
        else:
            self.mask = torch.tensor([0,1],dtype=torch.double)
        self.mask = self.mask.view(1,-1)
        self.scale_weight = nn.Parameter(torch.zeros(1,dtype=torch.double), requires_grad=True)
        self.bias_weight = nn.Parameter(torch.zeros(1,dtype=torch.double), requires_grad=True)

    def forward(self, x, reverse=False):
        x_masked = x * self.mask
        log_scale, bias = self.mlp(x_masked).chunk(2, dim=1)
        log_scale = log_scale.tanh() * self.scale_weight + self.bias_weight
        bias = bias  * (1-self.mask)
        log_scale = log_scale * (1-self.mask)
        if reverse:
            x = (x - bias) * torch.exp(-log_scale)
        else:
            x = x * torch.exp(log_scale) + bias
        return x, log_scale

Define a model to compose flows and pass results

In [None]:
class compose_flow(nn.Module):
    def __init__(self, flow_list):
        super(compose_flow, self).__init__()
        self.flow_list = nn.ModuleList(flow_list)

    def forward(self, x):
        z, log_det_jacobian = x, torch.zeros_like(x)
        for flow in self.flow_list:
            z, log_scale = flow(z)
            log_det_jacobian += log_scale
        return z, log_det_jacobian

    def inverse(self, z):
        list = []
        for flow in self.flow_list[::-1]:
            z, _ = flow(z, reverse=True)
            list.append(z)
        return list

Define loss function by negative log-likelihood

In [None]:
def loss_fn(target_distribution,z,sum_log_dz_by_dx):
    log_likelihood = target_distribution.log_prob(z).view(-1,1) + sum_log_dz_by_dx
    return -log_likelihood.mean()

Define the train function

In [None]:
def train(model,epoch,train_loader,test_loader,target_distribution,lr=0.001):
    optimizer = optim.AdamW(flows.parameters(),lr=lr)
    model.train()
    history = np.zeros((2,epoch))
    for i in range(epoch):
        epoch_loss = []
        for x in train_loader:
            z, sum_log_dz_by_dx = model(x)
            loss = loss_fn(target_distribution,z,sum_log_dz_by_dx)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            epoch_loss.append(loss)

        with torch.no_grad():
            train_loss = np.mean(epoch_loss)
            test_loss = validation(model,test_loader,loss_fn,target_distribution)
            if (i + 1) % 100 == 0:
                print(f"Iter: {i+1} Train Loss: {train_loss: .3f} Test Loss: {test_loss: .3f}")
        history[:,i] = (train_loss,test_loss)

    torch.save({"stats": history}, "Path")

Create dataset and dataloader

In [None]:
train_data = mixture_gaussian_2d(1024)
test_data = mixture_gaussian_2d(1024)

train_loader = data.DataLoader(Dataset(train_data),batch_size=512,shuffle=True)
test_loader = data.DataLoader(Dataset(test_data),batch_size=1024,shuffle=True)

In [None]:
compare_graph(train_data,test_data,"","")

Initiate the flow model and optimizer

In [None]:
flows_list = [flow_2d(1), flow_2d(2),flow_2d(1), flow_2d(2),flow_2d(1), flow_2d(2)]
flows = compose_flow(flows_list)

Set the target distribution which is Multivariate Normal

In [None]:
target_distribution = torch.distributions.MultivariateNormal(torch.zeros(2), torch.eye(2))

Compare original distribution and before trained model output

In [None]:
before_train = next(iter(test_loader))
flows.eval()
z,dz_by_dx = flows(before_train)

compare_graph(test_data,z.detach().numpy(),"Original distribution","Before trained generated distribution")

Training

In [None]:
train(flows,2000,train_loader,test_loader,target_distribution,lr=0.001)

In [None]:
loss_graph("Path",500)

Compare target distribution and generated distribution

In [None]:
after_train = next(iter(test_loader))
flows.eval()
z,dz_by_dx = flows(after_train)

compare_graph(target_distribution.sample([1024]).detach().numpy(),z.detach().numpy(),"Target distribution (Gaussian)","Generated distribution")

In [None]:
t = target_distribution.sample([1024]).detach().numpy()
g = z.detach().numpy()

plt.figure(figsize = (1*9.6,1*9.6))
_ = plt.scatter(t.T[0],t.T[1],s=7,c="darkorange",label='Target distribution',alpha=0.6)
_ = plt.scatter(g.T[0],g.T[1],s=7,label='Generated distribution')
_ = plt.legend(loc='upper right')

Compare original distribution and generated distribution

In [None]:
compare_graph(test_data,z.detach().numpy(),"Original distribution","Generated distribution")

Draw a graph showing every inverse flow output (Original Distribution)

In [None]:
ls = flows.inverse(z)
fig = plt.figure(figsize = (4*4.8,3*4.8))
generated_from_trainning_result = 0

for f in range(len(ls)):
    ax = plt.subplot(3,4,f+1)
    plt.title("After {}th flow's inverse function".format(len(ls) - f))

    z_ = ls[f].detach().numpy()
    generated_from_trainning_result = z_
    x, y = z_.T[0], z_.T[1]
    ax = plt.scatter(x,y,s=1)

Draw a graph showing every inverse flow output (Reverse Generated Distribution)

In [None]:
g = target_distribution.sample([1024])
ls = flows.inverse(g)
fig = plt.figure(figsize = (4*4.8,3*4.8))
generated_from_gaussian = 0

for f in range(len(ls)):
    ax = plt.subplot(3,4,f+1)
    plt.title("After {}th flow's inverse function".format(len(ls) - f))

    z_ = ls[f].detach().numpy()
    generated_from_gaussian = z_
    x, y = z_.T[0], z_.T[1]
    ax = plt.scatter(x,y,s=1)

Comparison between orginal distribution and reverse generated distribution

In [None]:
t = generated_from_trainning_result
g = generated_from_gaussian

plt.figure(figsize = (1*9.6,1*9.6))
_ = plt.scatter(t.T[0],t.T[1],s=7,c="darkorange",label='Original distribution',alpha=0.5)
_ = plt.scatter(g.T[0],g.T[1],s=7,alpha=0.8,label='Generated from Gaussian sampling')
_ = plt.legend(loc='upper right')

____

<font size=6>Experimental 2D Model Pipeline with Mixture Inputs</font>

Define a function to generate original data

In [None]:
Beta = torch.distributions.Beta(torch.tensor([1.,1.]),torch.tensor([1.,1.]))
w1 = [[1.,2.],[2.,1.]]
w2 = [[1.,2.],[2.,1.]]
w3 = [[1.,2.],[-2.,1.]]

dist1 = (Beta.sample([1000])@torch.tensor(w1)).detach().numpy()
dist2 = (Beta.sample([1000])@torch.tensor(w2)@torch.tensor(w3)).detach().numpy()
dist3 = (Beta.sample([1000])@torch.tensor(w1)).detach().numpy()
dist4 = (Beta.sample([1000])@torch.tensor(w2)@torch.tensor(w3)).detach().numpy()

dist1, dist2, dist3, dist4 = np.double(dist1), np.double(dist2), np.double(dist3), np.double(dist4)

train_dist, test_dist = np.concatenate([dist1,dist2]), np.concatenate([dist3,dist4])

draw_2D_graph(train_dist)

Define a Dataset class

In [None]:
class Dataset(data.Dataset):

    def __init__(self,array):
        super().__init__()
        self.array = array
    
    def __len__(self):
        return len(self.array)

    def __getitem__(self,index):
        return self.array[index]

Define the model by using polynomial funtion $s(x) \times y+b(x)$ where $s(x)$ and $b(x)$ are 3rd polynomial and its inverse funtion

In [None]:
class flow_2d(nn.Module):

    def __init__(self,n_components,flows_number=1):
        super(flow_2d,self).__init__()
        self.n_components = n_components
        self.flows_number = flows_number
        self.phi = 0
        self.sweights = nn.Parameter(torch.randn(n_components,dtype=torch.double).view(-1,1),requires_grad=True)
        self.bweights = nn.Parameter(torch.randn(n_components,dtype=torch.double).view(-1,1),requires_grad=True)
        self.w1_weights = nn.Parameter(torch.tensor(1,dtype=torch.double).view(-1,1),requires_grad=True)
        self.w2_weights = nn.Parameter(torch.tensor(1,dtype=torch.double).view(-1,1),requires_grad=True)


    def forward(self,X):
        if self.flows_number % 2 == 1:
            x = X.T[0].view(-1,1)
            y = X.T[1].view(-1,1)
        else:
            x = X.T[1].view(-1,1)
            y = X.T[0].view(-1,1)

        self.phi = x.T**0
        for i in range(self.n_components-1):
            self.phi = torch.vstack((self.phi,x.T**(i+1)))
        self.phi = self.phi.T

        self.s = torch.sigmoid(self.phi@self.sweights.view(-1,1))
        self.b = torch.sigmoid(self.phi@self.bweights.view(-1,1))
        y_new = self.w1_weights * self.s * y + self.w2_weights * self.b
        
        if self.flows_number % 2 == 1:
            z = torch.vstack((x.T,y_new.T)).T
        else:
            z = torch.vstack((y_new.T,x.T)).T

        return z, (self.w1_weights * self.s).log()

    def inverse(self,Z):
        if self.flows_number % 2 == 1:
            x = Z.T[0].view(-1,1)
            y = Z.T[1].view(-1,1)
        else:
            x = Z.T[1].view(-1,1)
            y = Z.T[0].view(-1,1)

        y_old = (y - self.w2_weights * self.b) / (self.w1_weights * self.s)

        if self.flows_number % 2 == 1:
            z = torch.vstack((x.T,y_old.T)).T
        else:
            z = torch.vstack((y_old.T,x.T)).T

        return z

Define a model to compose flows and pass results

In [None]:
class composed_flows(nn.Module):
    
    def __init__(self,models_list):
        super(composed_flows,self).__init__()
        self.models_list = nn.ModuleList(models_list)

    def forward(self,X):
        z, sum_log_dz_by_dx = X, 0
        for flow in self.models_list:
            z, log_dz_by_dx = flow(z)
            sum_log_dz_by_dx += log_dz_by_dx
        
        return z, sum_log_dz_by_dx

Define loss function by negative log-likelihood

In [None]:
def loss_fn(target_distribution,z,sum_log_dz_by_dx):
    log_likelihood = target_distribution.log_prob(z).view(-1,1) + sum_log_dz_by_dx
    return -log_likelihood.mean()

Define the train function

In [None]:
def train(model,epoch,train_loader,optimizer,target_distribution):
    model.train()
    for i in range(epoch):
        epoch_loss = []
        for x in train_loader:
            z, sum_log_dz_by_dx = model(x)
            loss = loss_fn(target_distribution,z,sum_log_dz_by_dx)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            epoch_loss.append(loss)

        with torch.no_grad():
            if (i + 1) % 10 == 0:
                print("Iter: {} Loss: {:.3f}".format(i+1,np.mean(epoch_loss)))

Create dataset and dataloader

In [None]:
train_data = train_dist
test_data = test_dist

train_loader = data.DataLoader(Dataset(train_data),batch_size=1024,shuffle=True)
test_loader = data.DataLoader(Dataset(test_data),batch_size=2000,shuffle=True)

Creating the flow list and initiate the model

In [None]:
flows_list = [flow_2d(3,1),flow_2d(3,2),flow_2d(3,3),flow_2d(3,4),flow_2d(3,5),flow_2d(3,6),flow_2d(3,7),flow_2d(3,8),flow_2d(3,9),flow_2d(3,10)]
flows = composed_flows(flows_list)

optimizer = optim.AdamW(flows.parameters(),lr=0.005)

Set the target distribution which is Multivariate Normal

In [None]:
target_distribution = torch.distributions.MultivariateNormal(torch.zeros(2), torch.eye(2))

Compare original distribution and before trained model output

In [None]:
before_train = next(iter(test_loader))
flows.eval()
z,dz_by_dx = flows(before_train)

compare_graph(test_data,z.detach().numpy(),"P(X)","Before trained T(X)")

Training

In [None]:
train(flows,10000,train_loader,optimizer,target_distribution)

Compare target distribution and generated distribution

In [None]:
after_train = next(iter(test_loader))
flows.eval()
z,dz_by_dx = flows(after_train)

compare_graph(target_distribution.sample([2000]).detach().numpy(),z.detach().numpy(),"Target distribution (Gaussian)","Trained T(X)")

Compare original distribution and generated distribution

In [None]:
compare_graph(test_data,z.detach().numpy(),"P(X)","Trained T(X)")

Draw a graph for every inverse flow output

In [None]:
fig = plt.figure(figsize = (4*4.8,3*4.8))

for f in range(len(flows_list)):
    ax = plt.subplot(3,4,f+1)
    plt.title("The {}th T^-1(Z)".format(len(flows_list) - f))

    z = flows_list[len(flows_list) - f - 1].inverse(z)
    z_ = z.detach().numpy()
    x, y = z_.T[0], z_.T[1]
    ax = plt.scatter(x,y,s=1)

In [None]:
dist1_loader = data.DataLoader(Dataset(dist1),batch_size=1000,shuffle=True)
dist2_loader = data.DataLoader(Dataset(dist2),batch_size=1000,shuffle=True)

In [None]:
dist1 = next(iter(dist1_loader))
flows.eval()
z,dz_by_dx = flows(dist1)

compare_graph(dist1,z.detach().numpy(),"One part of distribution ","T(X) according to the part")

In [None]:
dist2 = next(iter(dist2_loader))
flows.eval()
z,dz_by_dx = flows(dist2)

compare_graph(dist2,z.detach().numpy(),"Another part of distribution ","T(X) according to the part")

____

<font size=6>3D Flow-based Model Pipeline</font>

Define a function to generate original data

In [None]:
x = np.random.uniform(-3.,3.,2048)
y = np.random.uniform(-3.,3.,2048)
z = -(x**2 + y**2) - np.random.uniform(0.,5.,2048)

original_dist = np.vstack((x,y,z)).T

Define a Dataset class

In [None]:
class Dataset(data.Dataset):

    def __init__(self,array):
        super().__init__()
        self.array = array
    
    def __len__(self):
        return len(self.array)

    def __getitem__(self,index):
        return self.array[index]

Define the model

In [None]:
class MLP(nn.Module):
    def __init__(self, input_size, hidden_size, num_hidden_layers, output_size):
        super(MLP, self).__init__()
        layers = [nn.Linear(input_size, hidden_size, dtype=torch.double)]
        for _ in range(num_hidden_layers - 1):
            layers.append( nn.Linear(hidden_size, hidden_size, dtype=torch.double) )
            layers.append( nn.ReLU() )
        layers.append( nn.Linear(hidden_size, output_size, dtype=torch.double) )
        self.layers = nn.Sequential(*layers)

    def forward(self, x):
        return self.layers(x)

class flow_3d(nn.Module):
    def __init__(self, pos, hidden_size=128, num_hidden_layers=6):
        super(flow_3d, self).__init__()
        self.mlp = MLP(3, hidden_size, num_hidden_layers, 3)
        if pos == 1: 
            self.mask = torch.tensor([1,1,0],dtype=torch.double)
        elif pos == 2:
            self.mask = torch.tensor([1,0,1],dtype=torch.double)
        else:
            self.mask = torch.tensor([0,1,1],dtype=torch.double)
        self.mask = self.mask.view(1,-1)
        self.scale_weight1 = nn.Parameter(torch.zeros(1,dtype=torch.double), requires_grad=True)
        self.scale_weight2 = nn.Parameter(torch.zeros(1,dtype=torch.double), requires_grad=True)
        self.bias_weight = nn.Parameter(torch.zeros(1,dtype=torch.double), requires_grad=True)

    def forward(self, x, reverse=False):
        x_masked = x * self.mask
        log_scale1, log_scale2, bias = self.mlp(x_masked).chunk(3,dim=1)
        log_scale = log_scale1.tanh() * self.scale_weight1 + log_scale2.tanh() * self.scale_weight2 + self.bias_weight
        bias = bias  * (1-self.mask)
        log_scale = log_scale * (1-self.mask)
        if reverse:
            x = (x - bias) * torch.exp(-log_scale)
        else:
            x = x * torch.exp(log_scale) + bias
        return x, log_scale

Define a model to compose flows and pass results

In [None]:
class compose_flow(nn.Module):
    def __init__(self, flow_list):
        super(compose_flow, self).__init__()
        self.flow_list = nn.ModuleList(flow_list)

    def forward(self, x):
        z, log_det_jacobian = x, torch.zeros_like(x)
        for flow in self.flow_list:
            z, log_scale = flow(z)
            log_det_jacobian += log_scale
        return z, log_det_jacobian

    def inverse(self, z):
        list = []
        for flow in self.flow_list[::-1]:
            z, _ = flow(z, reverse=True)
            list.append(z)
        return list

Define loss function by negative log-likelihood

In [None]:
def loss_fn(target_distribution,z,sum_log_dz_by_dx):
    log_likelihood = target_distribution.log_prob(z).view(-1,1) + sum_log_dz_by_dx
    return -log_likelihood.mean()

Define the train function

In [None]:
def train(model,epoch,train_loader,test_loader,target_distribution,lr=0.001):
    optimizer = optim.AdamW(flows.parameters(),lr=lr)
    model.train()
    history = np.zeros((2,epoch))
    for i in range(epoch):
        epoch_loss = []
        for x in train_loader:
            z, sum_log_dz_by_dx = model(x)
            loss = loss_fn(target_distribution,z,sum_log_dz_by_dx)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            epoch_loss.append(loss)

        with torch.no_grad():
            train_loss = np.mean(epoch_loss)
            test_loss = validation(model,test_loader,loss_fn,target_distribution)
            if (i + 1) % 100 == 0:
                print(f"Iter: {i+1} Train Loss: {train_loss: .3f} Test Loss: {test_loss: .3f}")
        history[:,i] = (train_loss,test_loss)

    torch.save({"stats": history}, "Path")

Create dataset and dataloader

In [None]:
g1 = torch.distributions.MultivariateNormal(torch.tensor([5.,0.,0.],dtype=torch.double), torch.tensor([[1.,0.,0.],[0.,1.,0.],[0.,0.,1.]],dtype=torch.double))
g2 = torch.distributions.MultivariateNormal(torch.tensor([-5.,0.,0.],dtype=torch.double), torch.tensor([[1.,0.,0.],[0.,1.,0.],[0.,0.,1.]],dtype=torch.double))
train_data = np.concatenate([g1.sample([1024]).numpy(),g2.sample([1024]).numpy()])
test_data = np.concatenate([g1.sample([1024]).numpy(),g2.sample([1024]).numpy()])

In [None]:
train_loader = data.DataLoader(Dataset(train_data),batch_size=512,shuffle=True)
test_loader = data.DataLoader(Dataset(test_data),batch_size=2048,shuffle=True)

In [None]:
target_distribution = torch.distributions.MultivariateNormal(torch.zeros(3), torch.eye(3))

Creating the flow list and initiate the model

In [None]:
flows_list = [flow_3d(1), flow_3d(2),flow_3d(3), flow_3d(1), flow_3d(2), flow_3d(3), flow_3d(1), flow_3d(2), flow_3d(3)]
flows = compose_flow(flows_list)

In [None]:
flows.eval()
X = next(iter(test_loader))
z,d = flows(X)

compare_3D_graph(train_data,z.detach().numpy(),"Original distribution","Transform result before Trainning")

Training

In [None]:
train(flows,1500,train_loader,test_loader,target_distribution,lr=0.001)

In [None]:
loss_graph("Path",4000)

In [None]:
flows.eval()
X = next(iter(test_loader))
z,d = flows(X)

compare_3D_graph(target_distribution.sample([2048]).detach().numpy(),z.detach().numpy(),"Gaussian","Transform result after trainning")

In [None]:
g,t = target_distribution.sample([2048]).detach().numpy(),z.detach().numpy()

fig = plt.figure(figsize=(2 * 9.6, 2 * 9.6))
ax = fig.add_subplot(1, 2, 1, projection='3d')
ax.scatter3D(t.T[0], t.T[1], t.T[2], c = np.abs(t.T[0]) + np.abs(t.T[1]) + np.abs(t.T[2]), cmap =plt.get_cmap('Blues'),alpha=0.5)
ax.scatter3D(g.T[0], g.T[1], g.T[2], c = np.abs(g.T[0]) + np.abs(g.T[1]) + np.abs(g.T[2]), cmap =plt.get_cmap('Oranges'),alpha=0.5)
plt.title("Comparision of two distributions")

In [None]:
compare_3D_graph(train_data,z.detach().numpy(),"Original distribution","Transform result after trainning")

Draw a graph showing every inverse flow output (Original Distribution)

In [None]:
ls = flows.inverse(z)
fig = plt.figure(figsize = (4*4.8,3*4.8))

for f in range(len(ls)):
    ax = fig.add_subplot(3, 4, f+1, projection='3d')
    plt.title("The {}th T^-1(Z)".format(len(ls) - f))

    z_ = ls[f].detach().numpy()
    ax.scatter3D(z_.T[0], z_.T[1], z_.T[2], c = np.abs(z_.T[0]) + np.abs(z_.T[1]) + np.abs(z_.T[2]), cmap =plt.get_cmap('rainbow_r'))

Draw a graph showing every inverse flow output (Reverse Generated Distribution)

In [None]:
g = target_distribution.sample([2048])
ls = flows.inverse(g)
fig = plt.figure(figsize = (4*4.8,3*4.8))

for f in range(len(ls)):
    ax = fig.add_subplot(3, 4, f+1, projection='3d')
    plt.title("The {}th T^-1(Z)".format(len(ls) - f))

    z_ = ls[f].detach().numpy()
    ax.scatter3D(z_.T[0], z_.T[1], z_.T[2], c = np.abs(z_.T[0]) + np.abs(z_.T[1]) + np.abs(z_.T[2]), cmap =plt.get_cmap('rainbow_r'))

____

<font size=6>Image Flow-based Model Pipeline</font>

Define the dataset

In [None]:
class Dataset(data.Dataset):

    def __init__(self,array):
        super().__init__()
        self.array = array
    
    def __len__(self):
        return len(self.array)

    def __getitem__(self,index):
        return self.array[index]

Define the model

In [None]:
class MLP(nn.Module):
    def __init__(self, input_size, hidden_size, num_hidden_layers, output_size):
        super(MLP, self).__init__()
        layers = [nn.Linear(input_size, hidden_size)]
        for _ in range(num_hidden_layers - 1):
            layers.append( nn.Linear(hidden_size, hidden_size) )
            layers.append( nn.ReLU() )
        layers.append( nn.Linear(hidden_size, output_size) )
        self.layers = nn.Sequential(*layers)

    def forward(self, x):
        return self.layers(x)

class flow_image(nn.Module):

    def __init__(self,flows_number=1):
        super(flow_image,self).__init__()
        self.mlp = MLP(4, 128, 4, 4)
        self.flows_number = flows_number
        self.scale_weight = nn.Parameter(torch.zeros((4,1)), requires_grad=True)
        self.bias_weight = nn.Parameter(torch.zeros((4,1)), requires_grad=True)

    def forward(self,X):
        if self.flows_number % 2 == 1:
            x = torch.chunk(X,2,dim=1)[0]
            y = torch.chunk(X,2,dim=1)[1]
        else:
            y = torch.chunk(X,2,dim=1)[0]
            x = torch.chunk(X,2,dim=1)[1]

        base = torch.concat([x**0,x**1,x**2,x**3],dim=0).T
        self.s = (self.mlp(base).sigmoid() @ self.scale_weight).T
        self.b = (self.mlp(base) @ self.bias_weight).T

        y_new = self.s.exp() * y + self.b
        
        if self.flows_number % 2 == 1:
            z = torch.concat([x,y_new],dim=1)
        else:
            z = torch.concat([y_new,x],dim=1)

        s = torch.concat([torch.zeros(1,392),self.s],dim=1)

        return z, s

    def inverse(self,Z):
        if self.flows_number % 2 == 1:
            x = torch.chunk(Z,2,dim=1)[0]
            y = torch.chunk(Z,2,dim=1)[1]
        else:
            y = torch.chunk(Z,2,dim=1)[0]
            x = torch.chunk(Z,2,dim=1)[1]

        y_old = (y - self.b) / self.s.exp()

        if self.flows_number % 2 == 1:
            z = torch.concat([x,y_old],dim=1)
        else:
            z = torch.concat([y_old,x],dim=1)
        
        return z

Define a model to compose flows and pass results

In [None]:
class composed_flows(nn.Module):
    
    def __init__(self,models_list):
        super(composed_flows,self).__init__()
        self.models_list = nn.ModuleList(models_list)

    def forward(self,X):
        z, sum_log_dz_by_dx = X, 0
        for flow in self.models_list:
            z, log_dz_by_dx = flow(z)
            sum_log_dz_by_dx += log_dz_by_dx
        
        return z, sum_log_dz_by_dx

Define loss function by negative log-likelihood

In [None]:
def loss_fn(target_distribution,z,sum_log_dz_by_dx):
    log_likelihood = target_distribution.log_prob(z) + sum_log_dz_by_dx
    return -log_likelihood.mean()

Define the train function

In [None]:
def train(model,epoch,train_loader,target_distribution,lr=0.001):
    optimizer = optim.Adam(flows.parameters(),lr=lr)
    model.train()
    for i in range(epoch):
        epoch_loss = []
        for x in train_loader:
            z, sum_log_dz_by_dx = model(x)
            loss = loss_fn(target_distribution,z.T,sum_log_dz_by_dx.T)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            epoch_loss.append(loss)

        with torch.no_grad():
            if i == 0:
                print("Iter: {} Loss: {:.3f}".format(i+1,np.mean(epoch_loss)))
            elif (i + 1) % 100 == 0:
                print("Iter: {} Loss: {:.3f}".format(i+1,np.mean(epoch_loss)))

Define the target distribution

In [None]:
target_distribution = torch.distributions.Normal(torch.tensor([0.]),torch.tensor([1.]))

Create dataset and dataloader

In [None]:
mnist = torchvision.datasets.MNIST(root="/Users/Siyuan/Documents/Learning/Msc Project/Data/mnist", train=True,transform=torchvision.transforms.ToTensor(), download=True)
img = mnist[2][0]

noise = target_distribution.sample([784]).T * 0.1
train_data = torch.flatten(img,start_dim=1) + noise
test_data = torch.flatten(img,start_dim=1) + noise

train_loader = data.DataLoader(Dataset(train_data),batch_size=1)
test_loader = data.DataLoader(Dataset(test_data),batch_size=1)

Creating the flow list and initiate the model

In [None]:
flows_list = [flow_image(1),flow_image(2),flow_image(1),flow_image(2),flow_image(1),flow_image(2),flow_image(1),flow_image(2),flow_image(1),flow_image(2)]
flows = composed_flows(flows_list)

Training

In [None]:
train(flows,1700,train_loader,target_distribution,lr=0.0001)

In [None]:
img = torch.nn.Unflatten(1,(28,28))(next(iter(train_loader)))
flows.eval()
z = flows(next(iter(train_loader)))[0]
result = torch.nn.Unflatten(1,(28,28))(flows(next(iter(train_loader)))[0])

compare_image(img,result,"Original image","Generated noise")

Draw a graph showing every inverse flow output (Original Distribution)

In [None]:
fig = plt.figure(figsize = (4*4.8,4*4.8))
for f in range(len(flows_list)):
    ax = plt.subplot(4,4,f+1)
    plt.title("After {}th flow's inverse function".format(len(flows_list) - f))

    z = flows_list[len(flows_list) - f - 1].inverse(z)
    z_ = torch.nn.Unflatten(1,(28,28))(z)
    plt.imshow(np.transpose(z_.detach().numpy(),(1,2,0)),cmap = 'gray')

Draw a graph showing every inverse flow output (Reverse Generated Distribution)

In [None]:
g = target_distribution.sample([784]).T
fig = plt.figure(figsize = (4*4.8,4*4.8))
for f in range(len(flows_list)):
    ax = plt.subplot(4,4,f+1)
    plt.title("After {}th flow's inverse function".format(len(flows_list) - f))

    g = flows_list[len(flows_list) - f - 1].inverse(g)
    z_ = torch.nn.Unflatten(1,(28,28))(g)
    plt.imshow(np.transpose(z_.detach().numpy(),(1,2,0)),"gist_gray")