In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

## Forward Process

In [2]:
import numpy as np
import matplotlib.pyplot as plt
import torch.nn as nn
from sklearn.datasets import make_swiss_roll
import torch

In [4]:
def sample_batch(batch_size, device='cpu'):
    n_samples = 5000
    data, _ = make_swiss_roll(n_samples)
    data = data[:, [2,0]] / 10
    data = data * np.array([1, -1])
    return torch.from_numpy(data).to(device)

In [23]:
class MLP(nn.Module):

    def __init__(self, N=40, data_dim=2, hidden_dim=64):
        super(MLP, self).__init__()

        self.network_head = nn.Sequential(nn.Linear(data_dim, hidden_dim),
                                         nn.ReLU(),
                                         nn.Linear(hidden_dim, hidden_dim),
                                         nn.ReLU(),)
        self.network_tail = nn.ModuleList([nn.Sequential(nn.Linear(hidden_dim, hidden_dim),
                                         nn.ReLU(),
                                         nn.Linear(hidden_dim, data_dim *2),) for t in range(N)])

    def forward(self, x, t):
        h = self.network_head(x) #[batch size, data_dim]
        tmp = self.network_tail[t](h) #[batch size, data_dim *2]
        mu, h = torch.chunk(tmp, 2, dim=1)
        var = torch.exp(h)
        std = torch.sqrt(var)
        return mu, std

In [25]:
model = MLP()
t = 5
x = torch.randn((64, 2))
mu, std = model(x,t)

print(mu.shape)
print(std.shape)

torch.Size([64, 2])
torch.Size([64, 2])


In [26]:
model = torch.load("model_paper1")

FileNotFoundError: [Errno 2] No such file or directory: 'model_paper1'

In [10]:
class DiffusionModel():

    def __init__(self, T, model: nn.Module, dim=2):
        self.betas = torch.sigmoid(torch.linspace(-18,10,T)) * (3e-1 - 1e-5) + 1e-5
        self.alphas = 1 - self.betas
        self.alphas_bar = torch.cumprod(self.alphas, 0)

        self.T = T
        self.model = model
        self.dim = dim

    def forward_process(self, x0, t):
        """
        :param t: Number of diffusion steps
        """
        assert t>0 , 't should be greater than 0'
        assert self.T <= self.T, 't should be lower or equal than {self.T}'
        t = t-1
        mu = torch.sqrt(self.alphas_bar[t]) * x0
        std = torch.sqrt(1 - self.alphas_bar[t])
        epsilon = torch.randn_like(x0)

        return mu, std, mu + epsilon * std #data ~ N(mu, std)

    def reverse_process(self, x0, t):
        """
        :param t: Number of diffusion steps
        """

        assert t > 0 , 't should be greater than 0'
        assert self.T <= self.T, 't should be lower or equal than {self.T}'

        t = t-1  #we start indexing at 0
        mu, std = self.model(xt, t)
        epsilon = torch.randn_like(xt)

        return mu + epsilon * std #data ~ N(mu, std)

    def sample(self, batch_size):
        noise = torch.randn_like((batch_size, self.dim))
        x = noise
        samples = [x]

        for t in range(self.T,0,-1):
            if not(t==1):
                x = self.reverse_process(x, t)
            samples.append(x)

        return samples[::-1]

    def get_loss(self, x0):
        """
        :param x0: Batch [batch_size, self.dim]
        """
        t = torch.randint(1, 40+1, 1)
        mu_q, sigma_q, xt = self.forward_process(x0, t)
        mu_p, sigma_p, xt_minus1 = self.forward_process(xt, t)

        KL = (torch.log(sigma_p) - torch.log(sigma_q) + (sigma_q**2 + (mu_q-mu_p)**2) / (2 * sigma_p**2))
        K = -(KL).mean() #Should be maximized
        loss = - (K) #Should be minimized

NameError: name 'nn' is not defined

In [None]:
x0 = sample_batch(5_000)
mlp_model = torch.load("model_paper1")
model = DiffusionModel(40, mlp_model)
xT = model.forward_process(x0, 20)

In [None]:
samples = model.sample(1000)

In [None]:
t = 10
plt.scatter(samples[t][:,0].data.numpy(), samples[t][:,1].data.numpy())

In [None]:
print(xT.mean(0))
print(xT.std(0))

In [None]:
fontsize = 14
fig = plt.figure(figsize=(10,6))

N = 3_000
x0 = sample_batch(N)
samples = model.sample(N)

data = [x0, model.forward_process(x0, 20), model.forward_process(x0, 40)]
for i in range(3):
        
    plt.subplot(1,3,1+i)
    plt.scatter(data[i][:,0].data.numpy(), data[i][:,1].data.numpy(), alpha=0.1, s=1)
    plt.xlim([-2,2])
    plt.ylim([-2,2])
    plt.gca().set_aspect('equal')

    if i==0: plt.ylabel(r'$q(\mathbf{x}^{(0...T)})$', fontsize=fontsize)
    if i==0: plt.title(r'$t=0$', fontsize=fontsize)
    if i==1: plt.title(r'$t=\frac{T}{2}$', fontsize=fontsize)
    if i==2: plt.title(r'$t=T$', fontsize=fontsize)

time_steps = [0,20,40]
for i in range(3):
        
    plt.subplot(2,3,4+i)
    plt.scatter(samples[time_steps[i]][:,0].data.numpy(), samples[time_steps[i]][:,1].data.numpy(), alpha=0.1, c='r', s=1)
    plt.xlim([-2,2])
    plt.ylim([-2,2])
    plt.gca().set_aspect('equal')

    if i==0: plt.ylabel(r'$p(\mathbf{x}^{(0...T)})$', fontsize=fontsize)
    if i==0: plt.title(r'$t=0$', fontsize=fontsize)
    if i==1: plt.title(r'$t=\frac{T}{2}$', fontsize=fontsize)
    if i==2: plt.title(r'$t=T$', fontsize=fontsize)

plt.savefig('difussion_model_paper1_fig1.png', bbox_inches='tight')

## Reverse Process