In this recitation we will cover diffusion models.
This recitation has been adapted from [this](https://github.com/ThiagoLira/ToyDiffusionhttps://github.com/ThiagoLira/ToyDiffusion) repository.
Some of the key diffusion model functions have been reimplemented using ChatGPT and the goal is to debug.
The prompts used to generate the diffusion model code have been given in addition to the ChatGPT generated code.

In [None]:
import torch
import numpy as np
from utils import pack_data, unpack_1d_data, scatter_pixels
from tqdm import tqdm
!pip install celluloid

In [None]:
device = 'cuda' # It is highly recommended to use a GPU

## Diffusion Model ##

In [None]:
def denoise_with_mu(denoise_model, x_t, t, list_alpha, list_alpha_bar, DATA_SIZE, device):
    """
    Denoising function considering the denoising models tries to model the posterior mean
    """
    alpha_t = list_alpha[t]
    beta_t = 1 - alpha_t
    alpha_bar_t = list_alpha_bar[t]
    
    mu_theta = denoise_model(x_t,t)
    
    x_t_before = torch.distributions.MultivariateNormal(loc=mu_theta,covariance_matrix=torch.diag(beta_t.repeat(DATA_SIZE))).sample().to(device)
        
    return x_t_before

    
def position_encoding_init(n_position, d_pos_vec):
    ''' 
    Init the sinusoid position encoding table 
    n_position in num_timesteps and d_pos_vec is the embedding dimension
    '''
    # keep dim 0 for padding token position encoding zero vector
    position_enc = np.array([
        [pos / np.power(10000, 2*i/d_pos_vec) for i in range(d_pos_vec)]
        if pos != 0 else np.zeros(d_pos_vec) for pos in range(n_position)])

    position_enc[1:, 0::2] = np.sin(position_enc[1:, 0::2]) # dim 2i
    position_enc[1:, 1::2] = np.cos(position_enc[1:, 1::2]) # dim 2i+1
    return torch.from_numpy(position_enc).to(torch.float32)


class Denoising(torch.nn.Module):

    def __init__(self, x_dim, num_diffusion_timesteps):
        super(Denoising, self).__init__()

        self.linear1 = torch.nn.Linear(x_dim, x_dim)
        self.emb = position_encoding_init(num_diffusion_timesteps,x_dim)
        self.linear2 = torch.nn.Linear(x_dim, x_dim)
        self.linear3 = torch.nn.Linear(x_dim, x_dim)
        self.relu = torch.nn.ReLU()

    def forward(self, x_input, t):
        emb_t = self.emb[t]
        x = self.linear1(x_input+emb_t)
        x = self.relu(x)
        x = self.linear2(x)
        x = self.relu(x)
        x = self.linear3(x)
        return x

## Using ChatGPT to Write Our Code
#### Process to implement:
Forward noising at an arbitrary timestep

#### Equation to implement:
$$q\left(x_t|x_0\right) = \mathcal{N}\left(x_t;\sqrt{\bar{\alpha}_t}x_0, \left(1-\bar{\alpha}_t\right)\mathbb{I}\right)$$

####ChatGPT Prompt: 
Write me a diffusion model function that adds noise to a sample given a time, sample, and noise schedule in pytorch

In [None]:
def add_diffusion_noise(sample, t, noise_schedule):
    """
    Adds diffusion noise to the given sample at the specified time using the given noise schedule.

    Args:
        sample (torch.Tensor): The sample to add noise to.
        t (float): The time at which to add the noise.
        noise_schedule (torch.Tensor): The noise schedule to use. Should be a 1D tensor with the same length
            as the dimension of the sample.

    Returns:
        torch.Tensor: The sample with added diffusion noise.
    """
    # Compute the standard deviation of the noise at this time.
    std = (noise_schedule[t] - noise_schedule[t-1]).sqrt()

    # Generate the noise tensor with the same shape as the sample.
    noise = torch.randn_like(sample) * std

    # Add the noise to the sample and return the result.
    return sample + noise

#### Process to implement:
Denoising using our trainable model

#### Equations to implement:
$$
\begin{align}
    %\mu_{\theta} &= \sqrt{\bar{\alpha}_t}\beta_t + \sqrt{\alpha_t}\left(1 - \bar{\alpha}_{t}\right)x_{t}\\
    \mu_{\theta} &= \frac{1}{\sqrt{\alpha_t}}\left( x_t - \frac{\beta_t}{\sqrt{1 - \bar{\alpha}_t}}\epsilon_{\theta}\left(x_t, t\right)\right)\\
    \Sigma_{\theta} &= \sqrt{1 - \bar{\alpha}_t}\mathbb{I}
\end{align}
$$

####Prompt: 
Write me a diffusion model function that calculates the mean and variance of the posterior given noise schedule, starting sample, noisy sample, and time in pytorch

In [None]:
def compute_diffusion_posterior(noise_schedule, start_sample, noisy_sample, t):
    """
    Computes the mean and variance of the posterior distribution for the given diffusion process parameters
    and observed data.

    Args:
        noise_schedule (torch.Tensor): A 1D tensor containing the noise schedule for the diffusion process.
        start_sample (float): The starting sample value for the diffusion process.
        noisy_sample (float): The observed sample value at the specified time.
        t (int): The time at which the observation was made.

    Returns:
        Tuple[float, float]: A tuple containing the mean and variance of the posterior distribution.
    """
    # Compute the parameters of the prior distribution.
    alpha = start_sample
    beta = noise_schedule[0] ** 2 / 2

    # Compute the parameters of the likelihood function.
    y = noisy_sample
    sigma_sq = (noise_schedule[t] ** 2 - noise_schedule[t-1] ** 2) / 2

    # Compute the parameters of the posterior distribution.
    mean = (beta * y + sigma_sq * alpha) / (beta + sigma_sq)
    var = sigma_sq * beta / (beta + sigma_sq)

    return mean, var

## ATTENTION ##
###### If you do not have it, download this image https://www.infomoney.com.br/wp-content/uploads/2019/06/homer-simpson.jpg?resize=900%2C515&quality=50&strip=all and save it in this folder as 'homer.png'


In [None]:
x,y = scatter_pixels('homer.png')
x = [x/25 -3 for x in x]
y = [y/25 -2 for y in y]

## Scatter plot of data we will try to train the model to generate from random noise

In [None]:
import seaborn as sns
ax = sns.scatterplot(x=x, y=y)

In [None]:
## Store the ax to plot the result later
y_ax = ax.get_ylim()
x_ax = ax.get_xlim()
axes = (x_ax,y_ax)

In [None]:
# send data to device
one_d_data = pack_data(x,y)
x_init = torch.tensor(one_d_data).to(torch.float32).to(device)

DATA_SIZE = len(x_init)


# Diffusion Parameters

These parameters control the noise schedule.
They are critical in obtaining a good diffusion model.
Feel free to modify and see how the training process changes.
The given parameters should work well for this problem.

In [None]:
beta_start = .0004
beta_end = .02
num_diffusion_timesteps = 50

In [None]:
from operator import mul
from functools import reduce 

betas = np.linspace(beta_start ** 0.5, beta_end ** 0.5, num_diffusion_timesteps) ** 2
alphas = 1 - betas

# send parameters to device
betas = torch.tensor(betas).to(torch.float32).to(device)
alphas = torch.tensor(alphas).to(torch.float32).to(device)

# alpha_bar_t is the product of all alpha_ts from 0 to t
list_bar_alphas = [alphas[0]]
for t in range(1,num_diffusion_timesteps):
    list_bar_alphas.append(reduce(mul,alphas[:t]))
    
list_bar_alphas = torch.cumprod(alphas, axis=0).to(torch.float32).to(device)

## Training Procedure

In [None]:
import torch.nn as nn
import torch.optim as optim


training_steps_per_epoch = 40


criterion = nn.MSELoss()
denoising_model = Denoising(DATA_SIZE, num_diffusion_timesteps).to(device)

# hack to put embedding layer on 'device' as well
denoising_model.emb = denoising_model.emb.to(device)
optimizer = optim.AdamW(denoising_model.parameters())

In [None]:
pbar = tqdm(range(10))
for epoch in pbar:  # loop over the dataset multiple times
    
    running_loss = 0.0
    # sample a bunch of timesteps
    Ts = np.random.randint(1,num_diffusion_timesteps, size=training_steps_per_epoch)
    for _, t in enumerate(Ts):
        # produce corrupted sample
        q_t = add_diffusion_noise(x_init, t, list_bar_alphas)
                
        # calculate the mean and variance of the posterior forward distribution q(x_t-1 | x_t,x_0)
        #mu_t, cov_t = compute_diffusion_posterior(list_bar_alphas, x_init, q_t, t)
        mu_t, cov_t = compute_diffusion_posterior(alphas, list_bar_alphas, x_init, q_t, t)
        
        # get just first element from diagonal of covariance since they are all equal
        sigma_t = cov_t[0][0]
        # zero the parameter gradients
        optimizer.zero_grad()
  
        mu_theta = denoising_model(q_t , t)
        loss = criterion(mu_t, mu_theta)
        loss.backward()
        optimizer.step()
        running_loss += loss.detach()
    pbar.set_description('Epoch: {} Loss: {}'.format(epoch, running_loss/training_steps_per_epoch))
print('Finished Training')

### Reserve-Diffuse one Sample of Noise!

In [None]:
from tqdm import tqdm 
data = torch.distributions.MultivariateNormal(loc=torch.zeros(DATA_SIZE),covariance_matrix=torch.eye(DATA_SIZE)).sample().to(device)

for t in tqdm(range(0,num_diffusion_timesteps)):
    data = denoise_with_mu(denoising_model,data,num_diffusion_timesteps-t-1, alphas, list_bar_alphas, DATA_SIZE, device)

In [None]:
#data = data.detach().cpu().numpy()
x_new, y_new = unpack_1d_data(data.cpu())

from matplotlib import pyplot as plt
fig, ax = plt.subplots()
ax.scatter(x=x_new, y=y_new, s=5)
plt.show()

### Create an AWESOME HD 24fps GIF

In [None]:
import numpy as np
from celluloid import Camera
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import seaborn as sns

fig = plt.figure()
camera = Camera(fig)

# animation draws one data point at a time
data = torch.distributions.MultivariateNormal(loc=torch.zeros(DATA_SIZE),covariance_matrix=torch.eye(DATA_SIZE)).sample().to(device)
data_plot = data.detach().cpu().numpy()
x_new, y_new = unpack_1d_data(data_plot)
graph = sns.scatterplot(x=x_new, y=y_new, color='goldenrod')#,palette=['green'])
#graph = sns.scatterplot(y_new, color='green')#,palette=['green'])
graph.set_xlim(axes[0])
graph.set_ylim(axes[1])
camera.snap()
data_plot = data.detach().cpu().numpy()

#data = torch.Tensor(data)
for d in tqdm(range(1, num_diffusion_timesteps)):
    data = denoise_with_mu(denoising_model,data,num_diffusion_timesteps-d, alphas, list_bar_alphas, DATA_SIZE, device)
    data_plot = data.detach().cpu().numpy()
    x_new, y_new = unpack_1d_data(data_plot)
    graph = sns.scatterplot(x=x_new, y=y_new, color='goldenrod')#,palette=['green'])
    #graph = sns.scatterplot(y_new, color='green')#,palette=['green'])
    graph.set_xlim(axes[0])
    graph.set_ylim(axes[1])
    camera.snap()
    data_plot = data.detach().cpu().numpy()

anim = camera.animate(blit=False)
anim.save('output.gif',fps=6, dpi=120)
plt.show()

In [None]:
from IPython.display import Image
Image(open('output.gif','rb').read())