In [None]:
import torch
import matplotlib.pyplot as plt
import numpy as np
import matplotlib.animation as animation
from tqdm import tqdm
from scipy.stats import gaussian_kde
import os

torch.manual_seed(42)

# Parameters for different Gaussian distributions
num_samples = 1000
mean_1, std_1 = 0, 1
mean_2, std_2 = 5, 2
mean_3, std_3 = -3, 0.5

# Generate random samples
gaussian_data_1 = torch.normal(mean_1, std_1, size=(num_samples,))
gaussian_data_2 = torch.normal(mean_2, std_2, size=(num_samples,))
gaussian_data_3 = torch.normal(mean_3, std_3, size=(num_samples,))
x_0 = torch.cat([gaussian_data_1, gaussian_data_2, gaussian_data_3])

# Beta scheduler functions
def linear_beta_schedule(T, beta_min, beta_max):
    return torch.linspace(beta_min, beta_max, T)

def constant_beta_schedule(T, beta_value):
    return torch.full((T,), beta_value)

def exponential_beta_schedule(T, beta_min, beta_max):
    return torch.exp(torch.linspace(torch.log(torch.tensor(beta_min)), torch.log(torch.tensor(beta_max)), T))

# KL divergence calculation function
def calculate_kl_divergence(true_dist, approx_dist):
    kde_true = gaussian_kde(true_dist, bw_method='silverman')
    kde_approx = gaussian_kde(approx_dist, bw_method='silverman')
    x_grid = np.linspace(min(true_dist.min(), approx_dist.min()), max(true_dist.max(), approx_dist.max()), 1000)
    p_true = kde_true(x_grid)
    p_approx = kde_approx(x_grid)
    p_true = p_true / np.sum(p_true)
    p_approx = p_approx / np.sum(p_approx)
    return np.sum(p_true * np.log(p_true / p_approx))

# Forward diffusion function
def forward_diffusion(x_0, betas, T):
    sqrt_one_minus_betas = torch.sqrt(1 - betas)
    sqrt_betas = torch.sqrt(betas)
    x_t = x_0.clone()
    trajectory = [x_t]
    for t in range(T):
        noise = torch.randn_like(x_t)
        x_t = sqrt_one_minus_betas[t] * x_t + sqrt_betas[t] * noise
        trajectory.append(x_t)
    return trajectory

# Reverse denoising function
def reverse_denoising2(trajectory, betas, T, min_beta_tilde=1e-4, noise_floor=1e-4):
    alphas = 1 - betas
    alpha_bar = torch.cumprod(alphas, dim=0)
    xt = trajectory[-1]
    reverse_trajectory = [xt]
    for t in tqdm(reversed(range(1, T))):
        alpha_bar_t_1 = alpha_bar[t - 1]
        alpha_bar_t = alpha_bar[t]
        beta_t = betas[t - 1]
        sqrt_alpha_bar_t_1 = torch.sqrt(alpha_bar_t_1)
        sqrt_alpha_bar_t = torch.sqrt(alpha_bar_t)
        one_minus_alpha_bar_t = 1 - alpha_bar_t
        one_minus_alpha_bar_t_1 = 1 - alpha_bar_t_1
        mu_t = (sqrt_alpha_bar_t_1 / sqrt_alpha_bar_t) * beta_t * x_0 + (torch.sqrt(alphas)[t] * one_minus_alpha_bar_t_1 / one_minus_alpha_bar_t) * xt
        # beta_t_tilde = torch.maximum((1 - alpha_bar_t_1) / (1 - alpha_bar_t) * beta_t, torch.tensor(min_beta_tilde))
        beta_t_tilde = (1 - alpha_bar_t_1) / (1 - alpha_bar_t) * beta_t
        noise = torch.randn_like(xt) * torch.sqrt(beta_t_tilde + noise_floor)
        xt = mu_t + noise
        reverse_trajectory.append(xt)
    return reverse_trajectory

# Update function for the forward diffusion plot
def update_plot1(frame, trajectory, x_0, ax):
    ax.clear()
    kl_divergence = calculate_kl_divergence(x_0.numpy(), trajectory[frame].numpy())
    ax.hist(trajectory[frame].numpy(), bins=50, density=True, alpha=0.6, label=f"KL Divergence: {kl_divergence:.4f}")
    ax.hist(x_0.numpy(), bins=50, density=True, alpha=0.2, color='gray', label="Original Distribution")
    ax.set_title(f"q(x{frame}) - KL Divergence: {kl_divergence:.4f}")
    ax.set_xlim(-10, 10)
    ax.set_ylim(0, 0.4)
    ax.set_xlabel('x')
    ax.set_ylabel('Density')
    ax.legend()

# Update function for the reverse denoising plot
def update_plot2(frame, reverse_trajectory, x_0, ax):
    ax.clear()
    kl_divergence = calculate_kl_divergence(x_0.numpy(), reverse_trajectory[frame].numpy())
    ax.hist(reverse_trajectory[frame].numpy(), bins=50, density=True, alpha=0.6, label=f"KL Divergence: {kl_divergence:.4f}")
    ax.hist(x_0.numpy(), bins=50, density=True, alpha=0.2, color='gray', label="Original Distribution")
    ax.set_title(f"Reverse Step {T - frame} - KL Divergence: {kl_divergence:.4f}")
    ax.set_xlim(-10, 10)
    ax.set_ylim(0, 0.4)
    ax.set_xlabel('x')
    ax.set_ylabel('Density')
    ax.legend()

# Main function to run experiments
def run_experiment(T, beta_scheduler, beta_params, result_folder, scheduler_name):
    os.makedirs(result_folder, exist_ok=True)
    
    if beta_scheduler == "linear":
        betas = linear_beta_schedule(T, **beta_params)
    elif beta_scheduler == "constant":
        betas = constant_beta_schedule(T, **beta_params)
    elif beta_scheduler == "exponential":
        betas = exponential_beta_schedule(T, **beta_params)
    else:
        raise ValueError("Invalid beta scheduler")

    trajectory = forward_diffusion(x_0, betas, T)
    print("Forward process completed. Running reverse denoising...")
    reverse_trajectory = reverse_denoising2(trajectory, betas, T)
    print("Reverse denoising completed.")
    frames_to_plot = range(0, T, max(1, T // 100))
     # Calculate the interval for 5 seconds
    total_duration = 5000  # 5 seconds in milliseconds
    interval = total_duration / T  # Time between frames in milliseconds
    
    # Forward diffusion animation
    fig, ax = plt.subplots(figsize=(6, 4))
    ani = animation.FuncAnimation(fig, update_plot1, frames=frames_to_plot, interval=interval, repeat=True, fargs=(trajectory, x_0, ax))
    ani.save(os.path.join(result_folder, f'forward_diffusion_{scheduler_name}_{T}.gif'), writer='imagemagick')
    plt.close(fig)
    print("Forward diffusion animation saved.")
    
    # Reverse denoising animation
    fig, ax = plt.subplots(figsize=(6, 4))
    ani = animation.FuncAnimation(fig, update_plot2, frames=frames_to_plot, interval=interval, repeat=True, fargs=(reverse_trajectory, x_0, ax))
    ani.save(os.path.join(result_folder, f'reverse_diffusion_{scheduler_name}_{T}.gif'), writer='imagemagick')
    plt.close(fig)
    print("Reverse denoising animation saved.")

# Example usage
T = 500
result_folder = "results"
run_experiment(T, "linear", {"beta_min": 0.0001, "beta_max": 0.1}, result_folder, "linear")
run_experiment(T, "constant", {"beta_value": 0.01}, result_folder, "constant")
run_experiment(T, "exponential", {"beta_min": 0.0001, "beta_max": 0.1}, result_folder, "exponential")


Forward process completed. Running reverse denoising...


19it [00:00, 3891.39it/s]


MovieWriter imagemagick unavailable; using Pillow instead.


Reverse denoising completed.


MovieWriter imagemagick unavailable; using Pillow instead.


Forward diffusion animation saved.


  return np.sum(p_true * np.log(p_true / p_approx))
  return np.sum(p_true * np.log(p_true / p_approx))


Reverse denoising animation saved.
Forward process completed. Running reverse denoising...


19it [00:00, 4914.09it/s]
MovieWriter imagemagick unavailable; using Pillow instead.


Reverse denoising completed.


MovieWriter imagemagick unavailable; using Pillow instead.


Forward diffusion animation saved.
Reverse denoising animation saved.
Forward process completed. Running reverse denoising...


19it [00:00, 5466.20it/s]
MovieWriter imagemagick unavailable; using Pillow instead.


Reverse denoising completed.


MovieWriter imagemagick unavailable; using Pillow instead.


Forward diffusion animation saved.


  return np.sum(p_true * np.log(p_true / p_approx))
  return np.sum(p_true * np.log(p_true / p_approx))


Reverse denoising animation saved.


In [None]:
T = 20
result_folder = "results"
run_experiment(T, "linear", {"beta_min": 0.01, "beta_max": 0.5}, result_folder, "linear")
run_experiment(T, "constant", {"beta_value": 0.1}, result_folder, "constant")
run_experiment(T, "exponential", {"beta_min": 0.01, "beta_max": 0.5}, result_folder, "exponential")
T = 10
result_folder = "results"
run_experiment(T, "linear", {"beta_min": 0.01, "beta_max": 0.5}, result_folder, "linear")
run_experiment(T, "constant", {"beta_value": 0.1}, result_folder, "constant")
run_experiment(T, "exponential", {"beta_min": 0.01, "beta_max": 0.5}, result_folder, "exponential")