
# Minimal Diffusion Lab (Beginner-Friendly)

**Goal:** Learn diffusion by **building a tiny model from scratch** using only **NumPy** and **Matplotlib**.

**What you'll do:**
1. Make a simple 2D toy dataset (a spiral).
2. Add Gaussian noise step-by-step (forward diffusion).
3. Train a tiny neural network to predict the noise (denoising).
4. Generate new samples by reversing diffusion (sampling).
5. Visualize everything and compare to the original data.

> Time budget: ~5 hours. Keep the code simple and readable.



## Rules & Notes
- **Libraries:** Only use **NumPy** and **Matplotlib**.
- **Keep it simple:** Small dataset, tiny network, clear visuals.
- **Focus on understanding:** Read comments, run cells one-by-one, and observe the plots.
- **Notation & idea:** We follow the core DDPM idea: train a network to predict noise $\epsilon$ added during forward diffusion.


In [None]:

import numpy as np
import matplotlib.pyplot as plt

# For reproducibility
rng = np.random.default_rng(42)



## 1) Build a tiny 2D toy dataset (a spiral)
We'll generate a simple **2D spiral** because it's easy to see if our samples look right.


In [None]:


def make_spiral(n_samples=1500, noise=0.02):
    # Parameter t controls angle and radius
    t = rng.uniform(0, 4*np.pi, size=(n_samples,))
    # radius grows linearly with angle
    r = t / (4*np.pi) * 1 
    x = r * np.cos(t)
    y = r * np.sin(t)
    # small noise so it's not perfectly clean
    x += rng.normal(0, noise, size=x.shape)
    y += rng.normal(0, noise, size=y.shape)
    data = np.stack([x, y], axis=1).astype(np.float32)
    # Very important!: Normalize to roughly unit variance for stability 
    data = (data - data.mean(axis=0)) / (data.std(axis=0) + 1e-7)
    return data

X = make_spiral(n_samples=1500, noise=0.02)

plt.figure(figsize=(4,4))
plt.scatter(X[:,0], X[:,1], s=5)
plt.title("Spiral dataset (normalized)")
plt.xlabel("x")
plt.ylabel("y")
plt.axis("equal")
plt.show()



## 2) Make a simple **noise schedule** (linear $\beta_t$)
We keep things tiny with just $T=100$ diffusion steps and linearly increase $\beta_t$ from $10^{-4}$ to $5 \times 10^{-3}$. That amount of noise is easy to grasp yet still teaches the reverse process something non-trivial.
From $\beta_t$ we derive:
- $\alpha_t = 1 - \beta_t$
- $\bar{\alpha}_t = \prod_{s=1}^t \alpha_s$

We'll also plot the schedule to see how noise grows over time.


In [None]:

T = 100

def make_linear_beta_schedule(T=50, beta_start=1e-4, beta_end=5e-3):
    betas = np.linspace(beta_start, beta_end, T, dtype=np.float32)
    alphas = 1 - betas
    alpha_bars = np.cumprod(alphas, axis=0)
    return betas, alphas, alpha_bars

betas, alphas, alpha_bars = make_linear_beta_schedule(T=T)

plt.figure(figsize=(5,4))
plt.plot(betas)
plt.title("Linear beta schedule")
plt.xlabel("t")
plt.ylabel("beta_t")
plt.show()

plt.figure(figsize=(5,4))
plt.plot(alpha_bars)
plt.title("Cumulative product: alpha_bar_t")
plt.xlabel("t")
plt.ylabel("alpha_bar_t")
plt.show()



## 3) Forward diffusion: make $x_t$ from $x_0$
The formula:
$$ x_t = \sqrt{\bar{\alpha}_t}\, x_0 + \sqrt{1-\bar{\alpha}_t}\, \epsilon, \quad \epsilon \sim \mathcal{N}(0,I) $$

Let's write a helper to sample $x_t$ for any time $t$.


In [None]:


def q_sample(x0, t, alpha_bars):
    # t can be a scalar or array of integers in [1, T]
    t_arr = np.asarray(t, dtype=np.int32)
    if t_arr.ndim == 0:
        ab = alpha_bars[int(t_arr) - 1]
        mean = np.sqrt(ab) * x0
        std = np.sqrt(1 - ab)
    else:
        ab = alpha_bars[t_arr - 1]
        mean = np.sqrt(ab)[:, None] * x0
        std = np.sqrt(1 - ab)[:, None]
    eps = rng.normal(0, 1, size=x0.shape).astype(np.float32)
    x_t = mean + std * eps
    return x_t, eps

# Visualize noising at different t values
ts_to_show = [1, T//4, T//2, T]
fig, axes = plt.subplots(1, len(ts_to_show), figsize=(12,3))
for i, tt in enumerate(ts_to_show):
    xt, _ = q_sample(X, tt, alpha_bars)
    axes[i].scatter(xt[:,0], xt[:,1], s=5)
    axes[i].set_title(f"t={tt}")
    axes[i].set_aspect('equal', 'box')
plt.suptitle("Forward diffusion: adding noise over time")
plt.show()



## 4) A multi layer network to predict the noise $\hat{\epsilon}$
Inputs: $[x_t, y_t, \tilde{t}]$ where $\tilde{t} = t/T$ (scaled time).  
Outputs: $\hat{\epsilon}$ for the two coordinates.

We'll write everything **from scratch**:
- Linear layer: $y = xW + b$
- ReLU: $\max(0, x)$
- MSE loss
- Simple SGD updates


In [None]:


def relu(x):
    return np.maximum(0, x)

def relu_deriv(x):
    return (x > 0).astype(np.float32)

def MultilayerNetwork(input_dim=3, hidden=128, out_dim=2, rng=rng,  momentum=0.999):
    net = {
        "W1": rng.normal(0, 0.02, size=(input_dim, hidden)).astype(np.float32),
        "b1": np.zeros(hidden, dtype=np.float32),
        "W2": rng.normal(0, 0.02, size=(hidden, hidden)).astype(np.float32),
        "b2": np.zeros(hidden, dtype=np.float32),
        "W3": rng.normal(0, 0.02, size=(hidden, out_dim)).astype(np.float32),
        "b3": np.zeros(out_dim, dtype=np.float32),
        "momentum": momentum,
    }
    net["velocities"] = {
        "W1": np.zeros_like(net["W1"]),
        "b1": np.zeros_like(net["b1"]),
        "W2": np.zeros_like(net["W2"]),
        "b2": np.zeros_like(net["b2"]),
        "W3": np.zeros_like(net["W3"]),
        "b3": np.zeros_like(net["b3"]),
    }
    return net

def forward_network(net, x):
    z1 = x @ net["W1"] + net["b1"]
    h1 = relu(z1)
    z2 = h1 @ net["W2"] + net["b2"]
    h2 = relu(z2)
    out = h2 @ net["W3"] + net["b3"]
    cache = (x, z1, h1, z2, h2)
    return out, cache

def backward_network(net, cache, grad_out, lr=1e-3):
    x, z1, h1, z2, h2 = cache

    dW3 = h2.T @ grad_out
    db3 = grad_out.sum(axis=0)

    dh2 = grad_out @ net["W3"].T
    dz2 = dh2 * relu_deriv(z2)

    dW2 = h1.T @ dz2
    db2 = dz2.sum(axis=0)

    dh1 = dz2 @ net["W2"].T
    dz1 = dh1 * relu_deriv(z1)

    dW1 = x.T @ dz1
    db1 = dz1.sum(axis=0)

    grads = {
        "W3": dW3,
        "b3": db3,
        "W2": dW2,
        "b2": db2,
        "W1": dW1,
        "b1": db1,
    }

    for name, grad in grads.items():
        vel = net["velocities"][name]
        vel = net["momentum"] * vel - lr * grad
        net["velocities"][name] = vel
        net[name] += vel

def mse_loss(pred, target):
    diff = pred - target
    mse_value = np.mean(diff**2)
    # pred.shape[0] is the number of samples
    return mse_value, (2* diff / pred.shape[0])  # loss, grad pred



## 5) Train to predict noise
Each update still uses the entire dataset, but now the code avoids classes and is written in a very direct style.


In [None]:


network = MultilayerNetwork(input_dim=3, hidden=128, out_dim=2, rng=rng , momentum=0.999)

def train(network, X, T, betas, alphas, alpha_bars, steps=8000, lr=5e-2):
    losses = []
    running = None
    for step in range(1, steps+1):
        x0 = X
        t = rng.integers(1, T+1, size=(x0.shape[0],))
        xt, eps_true = q_sample(x0, t, alpha_bars)
        t_scaled = (t / T).astype(np.float32).reshape(-1, 1)
        eps_hat, cache_for_backward = forward_network(network, np.concatenate([xt, t_scaled], axis=1))
        loss, grad = mse_loss(eps_hat, eps_true)
        backward_network(network, cache_for_backward, grad, lr=lr)

        if running is None:
            running = float(loss)
        else:
            # Momentum tuning
            # running = 0.999 * running + 0.001 * float(loss)
            running = network["momentum"] * running + (1 - network["momentum"]) * float(loss)

        losses.append(running)
        if step % 1000 == 0:
            print("step" ,step ,"// smoothed loss MSE=" , running)
    return np.array(losses)

losses = train(network, X, T, betas, alphas, alpha_bars, steps=8000, lr=2e-3)

plt.figure(figsize=(5,4))
plt.plot(losses)
plt.title("Training loss (steepest descent)")
plt.xlabel("step")
plt.ylabel("MSE")
plt.show()



## 6) Sampling: generate new data by reversing diffusion
We start from pure noise $x_T \sim \mathcal{N}(0, I)$ and go **backwards** to $x_0$ using our network's noise predictions.
A simple DDPM step:
$$
x_{t-1} = \frac{1}{\sqrt{\alpha_t}}\left(x_t - \frac{1 - \alpha_t}{\sqrt{1 - \bar{\alpha}_t}}\, \hat{\epsilon}\right) + \sigma_t z, \quad \sigma_t = \sqrt{\beta_t},\; z \sim \mathcal{N}(0,I).
$$


In [None]:


def p_sample(network, x_t, t, betas, alphas, alpha_bars):
    t_scaled = np.full((x_t.shape[0],1), t/len(betas), dtype=np.float32)
    net_in = np.concatenate([x_t, t_scaled], axis=1)
    eps_hat, _ = forward_network(network, net_in)

    a_t = alphas[t-1]
    ab_t = alpha_bars[t-1]
    beta_t = betas[t-1]

    mean = (1 / np.sqrt(a_t)) * (x_t - ((1 - a_t)/np.sqrt(1 - ab_t)) * eps_hat)

    if t > 1:
        z = rng.normal(0, 1, size=x_t.shape).astype(np.float32)
        sigma = np.sqrt(beta_t)
        x_prev = mean + sigma * z
    else:
        x_prev = mean
    return x_prev

def p_sample_loop(network, n_samples, T, betas, alphas, alpha_bars,ts_to_show=None):
    x_t = rng.normal(0, 1, size=(n_samples, 2)).astype(np.float32)
    for t in range(T, 0, -1):
        x_t = p_sample(network, x_t, t, betas, alphas, alpha_bars)
        if ts_to_show is not None and t in ts_to_show:
            plt.figure(figsize=(4,4))
            plt.scatter(X[:,0], X[:,1], s=5, label="real")
            plt.scatter(x_t[:,0], x_t[:,1], s=5, label="generated")
            plt.title(f"Generated samples at t={t}")
            plt.xlabel("x")
            plt.ylabel("y")
            plt.axis("equal")
            plt.show()
    return x_t

gen = p_sample_loop(network, n_samples=1000, T=T, betas=betas, alphas=alphas, alpha_bars=alpha_bars,ts_to_show=ts_to_show)

plt.figure(figsize=(4,4))
plt.scatter(X[:,0], X[:,1], s=5, label="real")
plt.scatter(gen[:,0], gen[:,1], s=5, label="generated")
plt.legend()
plt.title("Final Real vs Generated (simple DDPM) at t=0")
plt.xlabel("x")
plt.ylabel("y")
plt.axis("equal")
plt.show()



## 7) Discussion & Experiments
Try:
- Change **T** (e.g., 50 vs 200).
- Try different **beta ranges** (e.g., end=0.01 vs 0.05).
- Change **hidden size** (32 or 128).
- Observe: training loss, intermediate noisy plots, and final samples.

**Questions to reflect on:**
- How does the **noise schedule** affect sample quality?
- Does a slightly larger network help?
- Which plots helped you understand the process the most?
