# Forward Process

In diffusion, the forward process is a Markov chain that gradually adds noise to the data. If we add enough noise to the data, eventually it becomes indistinguishable from random noise.


$q(\mathbf{y} \mid \mathbf{x}) := \mathcal{N} \left( \mathbf{y}; \sqrt{1 - \beta} \, \mathbf{x}, \beta \mathbf{I} \right)$

$q(\mathbf{x}_t \mid \mathbf{x}_{t-1}) := \mathcal{N} \left( \mathbf{x}_t; \sqrt{1 - \beta_t} \, \mathbf{x}_{t-1}, \beta_t \mathbf{I} \right)$



## Markov Forward Process

In the Markov forward process, we begin with an initial state $X_0$ and gradually add noise at each timestep $t$, moving towards a fully noisy state $X_T$. This process can be represented as a sequence of random variables:

$$
X_T \rightarrow X_{T-1} \rightarrow \cdots \rightarrow X_t \rightarrow X_{t-1} \rightarrow \cdots \rightarrow X_0
$$

where each $X_t$ is a progressively noisier version of the previous state, with transitions defined by Gaussian distributions. The following probabilities describe the transitions in both directions:

### Forward Process (Adding Noise)
- At each step $t$, we transition from $X_{t-1}$ to $X_t$ by adding a small amount of Gaussian noise.
- The forward transition probability is given by:

$$
q(X_t | X_{t-1}) = \mathcal{N}(X_t; \sqrt{1 - \beta_t} \cdot X_{t-1}, \beta_t \cdot \mathbf{I})
$$

where $\beta_t$ is a noise variance schedule that increases with $t$.

### Reverse Process (Denoising)
- In the reverse process, we try to estimate the reverse transition $p_\theta(X_{t-1} | X_t)$, effectively denoising $X_t$ to approximate $X_{t-1}$.
- This reverse transition is parameterized by a model $\theta$, and can be expressed as:

$$
p_\theta(X_{t-1} | X_t) = \mathcal{N}(X_{t-1}; \mu_\theta(X_t, t), \Sigma_\theta(X_t, t))
$$

where $\mu_\theta$ and $\Sigma_\theta$ are the mean and variance predicted by the model.

### Entire Process in Context
The process can be seen as iteratively denoising a noisy sample $X_T$ until we reach a clean sample $X_0$, aiming to generate a coherent output from noise.

$$
X_T \rightarrow X_{T-1} \rightarrow \cdots \rightarrow X_1 \rightarrow X_0
$$

The Markov forward process is thus a crucial component in generative models like diffusion models, where noise is gradually added in the forward process and removed in the reverse process to produce samples from a complex data distribution.


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm
import ipywidgets as widgets
import io
from PIL import Image
from tqdm import tqdm

# Define parameters for each Gaussian component
weights = [0.5, 0.3, 0.2]
means = [2, 5, -3]
std_devs = [1, 0.5, 1.5]

# Generate x values for plotting
x = np.linspace(-10, 10, 1000)

n_steps = 1000
beta_t = np.linspace(1e-4, 0.02, n_steps)
alpha_t = 1 - beta_t
# alpha_t_hat = product of alpha_t from 0 to t
alpha_t_hat = np.cumprod(alpha_t)
alpha_t_hat = np.insert(alpha_t_hat, 0, 1)

# List of images for animation
frames = []
for alpha in tqdm(alpha_t_hat):
    # Adjusted means and variances for q(y|x) when x is sampled from p(x)
    adjusted_means = [np.sqrt(alpha) * mu for mu in means]
    adjusted_std_devs = [np.sqrt(alpha * (sigma ** 2) + (1-alpha)) for sigma in std_devs]


    # Generate x values for plotting
    x = np.linspace(-10, 10, 1000)

    # Calculate the density of the new distribution q(y)
    q_y_density = sum(w * norm.pdf(x, mu, sigma) for w, mu, sigma in zip(weights, adjusted_means, adjusted_std_devs))

    # Create the plot figure and add it to frames
    fig, ax = plt.subplots(figsize=(10, 6))
    ax.plot(x, q_y_density, label='Mixture of Gaussians', color='black')
    ax.set_title("Example Mixture of Gaussians (MoG)")
    ax.set_xlabel("x")
    ax.set_ylabel("Density")
    ax.set_ylim(0, 0.5)
    ax.legend()
    ax.grid(True)

    # Save the plot to a JPG in memory
    buf = io.BytesIO()
    fig.savefig(buf, format='jpg')
    buf.seek(0)
    frames.append(Image.open(buf))
    plt.close(fig)

# Set up an interactive display widget
def show_frame(frame_index):
    display_frame = frames[frame_index]
    display(display_frame)

# Slider widget
slider = widgets.IntSlider(value=0, min=0, max=len(frames)-1, step=1, description="Frame")
play_button = widgets.Play(value=0, min=0, max=len(frames)-1, step=1, interval=100)

# Link play button and slider
widgets.jslink((play_button, 'value'), (slider, 'value'))

# Combine play button and slider in the interactive display
interactive_display = widgets.interactive(show_frame, frame_index=slider)
display(widgets.HBox([play_button, interactive_display.children[0]]))  # Attach slider from interactive
display(interactive_display.children[1])  # Display the frame itself


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from IPython.display import HTML
from scipy.stats import norm

# Define parameters for each Gaussian component
weights = [0.5, 0.3, 0.2]
means = [0, 5, -3]
std_devs = [1, 0.5, 1.5]

# Generate x values for plotting
x = np.linspace(-10, 10, 1000)

# Animation parameters
n_steps = 1000  # Reduced steps for smoother animation
beta_t = np.linspace(1e-4, 0.02, n_steps)
alpha_t = 1 - beta_t
alpha_t_hat = np.insert(np.cumprod(alpha_t), 0, 1)

# Initialize plot
fig, ax = plt.subplots(figsize=(10, 6))
ax.set_xlim(-10, 10)
ax.set_ylim(0, 0.5)
line, = ax.plot([], [], color='black')
ax.set_title("Example Mixture of Gaussians (MoG)")
ax.set_xlabel("x")
ax.set_ylabel("Density")
ax.grid(True)

# Initialization function
def init():
    line.set_data([], [])
    return line,

# Animation function
def update(frame):
    alpha = alpha_t_hat[frame]
    # Adjust means and variances for q(y|x)
    adjusted_means = [np.sqrt(alpha) * mu for mu in means]
    adjusted_std_devs = [np.sqrt(alpha * (sigma ** 2) + (1 - alpha)) for sigma in std_devs]

    # Calculate q(y) density for the current frame
    q_y_density = sum(w * norm.pdf(x, mu, sigma) for w, mu, sigma in zip(weights, adjusted_means, adjusted_std_devs))
    line.set_data(x, q_y_density)
    return line,

# Create the animation
ani = FuncAnimation(fig, update, frames=n_steps, init_func=init, blit=True, interval=50)
plt.close(fig)  # Close the figure to prevent static display
HTML(ani.to_jshtml())

In [None]:
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from IPython.display import HTML
from mpl_toolkits.mplot3d import Axes3D
from scipy.stats import multivariate_normal

matplotlib.rcParams['animation.embed_limit'] = 1000 * 2**20  # Increase the animation size limit to 1000 MB

# Define parameters for each Gaussian component in 2D
# weights = [0.5, 0.3, 0.2]
# means = [[0, 0], [5, 5], [-3, -3]]
# covariances = [np.array([[1, 0], [0, 1]]), np.array([[0.5, 0], [0, 0.5]]), np.array([[1.5, 0], [0, 1.5]])]
weights = [0.2, 0.2, 0.2, 0.2, 0.2]
means = [[5, 1.5], [0, 5], [-5, 1.5], [-4.5, -5.5], [3, -4]]
covariances = [
    np.array([[2, 0.8], [0.8, 1]]),
    np.array([[1.5, -0.7], [-0.7, 1.2]]),
    np.array([[1, 0.4], [0.4, 2]]),
    np.array([[1, -0.5], [-0.5, 1.5]]),
    np.array([[2, 0], [0, 2]])
]

# Generate x and y values for plotting
x = np.linspace(-10, 10, 100)
y = np.linspace(-10, 10, 100)
X, Y = np.meshgrid(x, y)

# Calculate the beta and alpha parameters for animation
n_steps = 1000  # Reduced steps for smoother animation
beta_t = np.linspace(1e-4, 0.02, n_steps)
alpha_t = 1 - beta_t
alpha_t_hat = np.insert(np.cumprod(alpha_t), 0, 1)

# Initialize 3D plot
fig = plt.figure(figsize=(10, 8))
ax = fig.add_subplot(111, projection='3d')


# Create a placeholder for the surface plot
surface = ax.plot_surface(X, Y, np.zeros_like(X), cmap='viridis')

# Initialization function
def init():
    surface.set_array(np.zeros_like(X).ravel())
    return surface,

# Animation function
def update(frame):
    # print(f'frame={frame}')
    alpha = alpha_t_hat[frame]

    # Adjust means and covariances for each Gaussian component in 2D
    Z = np.zeros_like(X)
    for w, mean, cov in zip(weights, means, covariances):
        adjusted_mean = np.sqrt(alpha) * np.array(mean)
        adjusted_cov = alpha * cov + (1 - alpha) * np.eye(2)
        rv = multivariate_normal(adjusted_mean, adjusted_cov)
        Z += w * rv.pdf(np.dstack((X, Y)))

    # Update the surface plot
    ax.clear()
    ax.set_xlim(-10, 10)
    ax.set_ylim(-10, 10)
    ax.set_zlim(0, 0.2)
    ax.set_xlabel("X-axis")
    ax.set_ylabel("Y-axis")
    ax.set_zlabel("Density")
    # change view angle
    ax.view_init(elev=20, azim=120)
    surface = ax.plot_surface(X, Y, Z, cmap='viridis', edgecolor='none', rstride=1, cstride=1, alpha=0.8)
    return surface,

idxes = np.linspace(0, n_steps, 10, dtype=int)
# print(f'idxes={idxes}')

# Create the animation
ani = FuncAnimation(fig, update, frames=idxes, init_func=init, blit=True, interval=50)

plt.close(fig)  # Close the static plot to avoid double display
display(HTML(ani.to_jshtml()))


In [None]:
import torch

def posterior_func(x, beta_t):
    # Use PyTorch's random sampling for GPU acceleration
    return torch.sqrt(1 - beta_t) * x + torch.randn_like(x, device=x.device) * torch.sqrt(beta_t)

def posterior_func_numpy(x, beta_t):
    return np.sqrt(1 - beta_t) * x + np.random.randn(*x.shape) * np.sqrt(beta_t)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from scipy.stats import multivariate_normal

# Step 1: Define the mean and covariance matrix of the bivariate normal distribution
mean = [0, 0]
cov = [[1, 0.5], [0.5, 1]]

# Step 2: Create a grid of x, y values
x = np.linspace(-3, 3, 100)
y = np.linspace(-3, 3, 100)
X, Y = np.meshgrid(x, y)
pos = np.dstack((X, Y))

# Step 3: Compute the PDF values over the grid
rv = multivariate_normal(mean, cov)
Z = rv.pdf(pos)

# Step 4: Set up the 3D plot
fig = plt.figure(figsize=(10, 7))
ax = fig.add_subplot(111, projection='3d')

# Step 5: Create the surface plot
surf = ax.plot_surface(X, Y, Z, cmap='viridis', edgecolor='none', rstride=1, cstride=1, alpha=0.8)
ax.contour(X, Y, Z, zdir='z', offset=0, cmap='viridis')  # Optional: add contour lines at the base

# Step 6: Customize for aesthetics
ax.set_xlabel('X-axis')
ax.set_ylabel('Y-axis')
ax.set_zlabel('Probability Density')
ax.set_title('3D PDF Plot of a Bivariate Normal Distribution')
fig.colorbar(surf, ax=ax, shrink=0.5, aspect=5)  # Add color bar

# Optional: Set view angle
ax.view_init(elev=30, azim=135)

plt.show()


## Perspectives

* Target data distribution as a manifold,

In [None]:
import io
import matplotlib
import functools
import ipywidgets as widgets
from IPython.display import display, Image as IPImage
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image
from matplotlib.animation import FuncAnimation
from IPython.display import HTML
from tqdm import tqdm
import torch

# Set up the data
# Parameters
device = 'cuda' if torch.cuda.is_available() else 'cpu'
size = 512

img = Image.open('data/mandrill.png').convert('L').resize((size, size))
img = np.array(img).astype(np.float32) / 255
img = torch.tensor(img, device=device)
img = 2 * img - 1  # Normalize to [-1, 1]

frames = [{'data': img, 't': 0}]
T = 1000
beta = torch.linspace(1e-4, 0.02, T, device=device)
for i in tqdm(range(T)):
    img = posterior_func(img, beta[i])
    frames.append({'data': img, 't': i+1})

images = []
for frame in tqdm(frames):
    img = frame['data']
    img = img.cpu().numpy()
    img = np.clip(img, -1, 1)
    img = (img + 1) / 2  # Normalize to [0, 1]
    img = (img * 255).astype(np.uint8)
    img = Image.fromarray(img)
    images.append(img)
images[0].save("mandrill.gif", save_all=True, append_images=images[1:], duration=100, loop=0)

# Set up an interactive display widget
def show_frame(frame_index):
    display_frame = images[frame_index]
    display(display_frame)

# Slider widget
slider = widgets.IntSlider(value=0, min=0, max=len(frames)-1, step=1, description="Frame")
play_button = widgets.Play(value=0, min=0, max=len(frames)-1, step=1, interval=100)

# Link play button and slider
widgets.jslink((play_button, 'value'), (slider, 'value'))

# Combine play button and slider in the interactive display
interactive_display = widgets.interactive(show_frame, frame_index=slider)
display(widgets.HBox([play_button, interactive_display.children[0]]))  # Attach slider from interactive
display(interactive_display.children[1])  # Display the frame itself

In [None]:
import matplotlib.pyplot as plt
img = np.random.randn(size, size)
plt.imshow(img, cmap='gray', vmin=-1, vmax=1)

In [None]:
import matplotlib.pyplot as plt

t_min = 1.5 * np.pi * (1 + 2 * 0)
t_max = 1.5 * np.pi * (1 + 2 * 1)
print(f't_min={t_min}, t_max={t_max}')
x_min = t_min * np.cos(t_min)
x_max = t_max * np.cos(t_max)
z_min = t_min * np.sin(t_min)
z_max = t_max * np.sin(t_max)
print(f'x_min={x_min}, x_max={x_max}')
print(f'z_min={z_min}, z_max={z_max}')
t = np.linspace(t_min, t_max, 1000)
x = t * np.cos(t)
z = t * np.sin(t)
print(4.5 * np.pi)


In [None]:
x = np.random.randn(1000000)
y = np.random.randn(1000000)
# _range = [[-100, 100], [-100, 100]]
# 
# # Compute the histogram for the data distribution P
# hist, x_edges, y_edges = np.histogram2d(x, y, bins=1000, range=_range)

x_percentile = norm.cdf(x, loc=0, scale=1)
y_percentile = norm.cdf(y, loc=0, scale=1)
_range = [[0, 1], [0, 1]]
hist, x_edges, y_edges = np.histogram2d(x_percentile, y_percentile, bins=100, range=_range)

hist /= hist.sum()  # Normalize to make it a probability distribution

qx = x_edges[1:] - x_edges[:-1]
qy = y_edges[1:] - y_edges[:-1]
q = np.outer(qx, qy)

q /= q.sum()  # Normalize Q for consistency

print(f'hist.min={hist.min()}, hist.max={hist.max()}, hist.mean={hist.mean()}, hist.std={hist.std()}')
print(f'q.min={q.min()}, q.max={q.max()}, q.mean={q.mean()}, q.std={q.std()}')
# plt.imshow(v, extent=[0, 1, 0, 1], origin='lower')

# Only consider non-zero values of P (to avoid log(0))
p = hist[hist > 0]
q = q[hist > 0]



# Compute the KL divergence
np.sum(p * (np.log2(p) - np.log2(q)))

In [None]:
from functools import partial
import matplotlib.pyplot as plt
import numpy as np
import matplotlib
from matplotlib.animation import FuncAnimation
from sklearn.datasets import make_swiss_roll
from IPython.display import HTML
from tqdm import tqdm
from scipy.stats import norm

matplotlib.rcParams['animation.embed_limit'] = 100 * 2**20  # Increase the animation size limit to 100 MB

# Set up the data
# Parameters
num_particles = 10000
num_steps = 50

# Generate a swiss roll dataset
points, _ = make_swiss_roll(n_samples=num_particles, noise=0.0, random_state=0)
x = points[:, 0] / (4.5 * np.pi)
y = points[:, 2] / (4.5 * np.pi)

# Create the figure
fig, ax = plt.subplots()
ax.set_xlim(-1, 1)
ax.set_ylim(-1, 1)
ax.set_aspect('equal')
scat = ax.scatter([], [], alpha=0.1)

def compute_entropy(data):
    # x, y = data
    # _range = [[-100, 100], [-100, 100]]
    # hist, _, _ = np.histogram2d(x, y, bins=1000, range=_range)

    x, y = data
    x_quartile = norm.cdf(x, loc=0, scale=1)
    y_quartile = norm.cdf(y, loc=0, scale=1)
    _range = [[0, 1], [0, 1]]
    hist, _, _ = np.histogram2d(x_quartile, y_quartile, bins=1000, range=_range)
    
    hist /= hist.sum()
    p = hist[hist > 0]
    return -np.sum(p * np.log2(p))

def kl_divergence(data):
    x, y = data
    # _range = [[-100, 100], [-100, 100]]
    # 
    # # Compute the histogram for the data distribution P
    # hist, x_edges, y_edges = np.histogram2d(x, y, bins=1000, range=_range)

    x_percentile = norm.cdf(x, loc=0, scale=1)
    y_percentile = norm.cdf(y, loc=0, scale=1)
    _range = [[0, 1], [0, 1]]
    hist, x_edges, y_edges = np.histogram2d(x_percentile, y_percentile, bins=1000, range=_range)
    # print(f'min(x_edges)={min(x_edges)}, max(x_edges)={max(x_edges)}')
    
    hist /= hist.sum()  # Normalize to make it a probability distribution
    
    # # Get the bin centers
    # x_centers = (x_edges[:-1] + x_edges[1:]) / 2
    # y_centers = (y_edges[:-1] + y_edges[1:]) / 2
    # xv, yv = np.meshgrid(x_centers, y_centers, indexing="ij")
    # 
    # # Compute Q (normal distribution probabilities) over the same grid
    # qx = norm.cdf(x_edges[1:], loc=0, scale=1) - norm.cdf(x_edges[:-1], loc=0, scale=1)    
    # qy = norm.cdf(y_edges[1:], loc=0, scale=1) - norm.cdf(y_edges[:-1], loc=0, scale=1)

    # Adjust for clipping
    # qx[0] += norm.cdf(x_edges[0], loc=0, scale=1)
    # qx[-1] += 1 - norm.cdf(x_edges[-1], loc=0, scale=1)
    # qy[0] += norm.cdf(y_edges[0], loc=0, scale=1)
    # qy[-1] += 1 - norm.cdf(y_edges[-1], loc=0, scale=1)
    # q = np.outer(qx, qy)
    
    qx = x_edges[1:] - x_edges[:-1]
    qy = y_edges[1:] - y_edges[:-1]
    q = np.outer(qx, qy)
    
    # q = norm.pdf(xv, loc=0, scale=1) * norm.pdf(yv, loc=0, scale=1)
    q /= q.sum()  # Normalize Q for consistency
    
    # Only consider non-zero values of P (to avoid log(0))
    p = hist[hist > 0]
    q = q[hist > 0]
    
    # Compute the KL divergence
    return np.sum(p * (np.log2(p) - np.log2(q)))

data_entropy = []
data_kl = []

def func(frame):
    x, y = frame['data']
    t = frame['t']
    scat.set_offsets(np.c_[x, y])
    entropy = compute_entropy((x, y))
    kl = kl_divergence((x, y))
    ax.set_title(f'Frame {t}, Entropy={entropy:.2f}, KL_div={kl:.2f}')
    print(f't={t:03d}, entropy={entropy:.2f}, kl_div={kl:.2f}, mean={np.mean(x):.2f}, std={np.std(x):.2f}')
    data_entropy.append(entropy)
    data_kl.append(kl)
    return scat,

def gen_function(x, y):
    yield {'data': (x, y), 't': 0}

    T = 1000
    beta = np.linspace(1e-4, 0.02, T)

    # for i in tqdm(range(T)):
    for i in range(T):
        x = posterior_func_numpy(x, beta[i])
        y = posterior_func_numpy(y, beta[i])
        yield {'data': (x, y), 't': i+1}

# Run the animation
update = partial(gen_function, x, y)
for frame in update():
    func(frame)
    
# Create 2 subplots and set the title
fig, axs = plt.subplots(2)
fig.suptitle('Entropy and KL Divergence')
axs[0].plot(data_entropy)
axs[0].set_title('Entropy')
axs[0].grid()
# axs[0].set_yscale('log')
axs[1].plot(data_kl)
axs[1].set_title('KL Divergence')
axs[1].grid()
# axs[1].set_yscale('log')
plt.show()

# ani = FuncAnimation(fig, func, frames=update, interval=100, blit=True, cache_frame_data=False)
# 
# # Display the animation in the notebook
# plt.close(fig)  # Close the figure to prevent static display
# HTML(ani.to_jshtml())

In [None]:
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.animation import FuncAnimation
from sklearn.datasets import make_s_curve
from IPython.display import HTML

# Create the figure
fig, ax = plt.subplots()
ax.set_xlim(-3, 3)
ax.set_ylim(-3, 3)
ax.set_aspect('equal')
scat = ax.scatter([], [], alpha=0.1)


def compute_entropy(data):
    _range = [[-100, 100], [-100, 100]]
    hist, _, _ = np.histogram2d(data[0], data[1], bins=1000, range=_range)
    hist /= hist.sum()
    p = hist[hist > 0]
    return -np.sum(p * np.log2(p))


def func(frame):
    x, y = frame['data']
    t = frame['t']
    scat.set_offsets(np.c_[x, y])
    entropy = compute_entropy((x, y))
    ax.set_title(f'Frame {t}, entropy={entropy:.2f}')

    return scat,


def gen_function():
    # Parameters
    num_particles = 10000
    num_steps = 50

    # Generate a swiss roll dataset
    points, _ = make_s_curve(n_samples=num_particles, noise=0.0, random_state=0)
    x = points[:, 0]
    y = points[:, 2] / 2

    t = 0
    yield {'data': (x, y), 't': t}

    for i in range(1, num_steps):
        x += np.random.randn(num_particles)
        y += np.random.randn(num_particles)
        yield {'data': (x, y), 't': i}


# Run the animation
ani = FuncAnimation(fig, func, frames=gen_function, interval=100, blit=True, cache_frame_data=False)

# Display the animation in the notebook
plt.close(fig)  # Close the figure to prevent static display
HTML(ani.to_jshtml())

In [None]:
from sklearn.datasets import load_digits
digits = load_digits(n_class=6)
X, y = digits.data, digits.target
n_samples, n_features = X.shape
transformer = TSNE(n_components=2,
                   max_iter=500, # default is 1000
                   n_iter_without_progress=150, # default is 300
                   n_jobs=2,
                   random_state=0)

In [None]:
t = np.linspace(-1.5 * np.pi, 1.5 * np.pi, 1000)
x = np.sin(t)
z = np.sign(t) * (np.cos(t) - 1)

plt.plot(t, x)