In [1]:
!pip install -U datasets diffusers einops

Collecting datasets
  Downloading datasets-3.5.0-py3-none-any.whl.metadata (19 kB)
Collecting diffusers
  Downloading diffusers-0.33.1-py3-none-any.whl.metadata (19 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py311-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2024.12.0,>=2023.1.0 (from fsspec[http]<=2024.12.0,>=2023.1.0->datasets)
  Downloading fsspec-2024.12.0-py3-none-any.whl.metadata (11 kB)
Downloading datasets-3.5.0-py3-none-any.whl (491 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m491.2/491.2 kB[0m [31m7.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading diffusers-0.33.1-py3-none-any.whl (3.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.6/3.6 MB[0m 

# Simple Diffusion

This is a notebook to inspect some interesting aspects of Diffusion Model training from a simplified point of view. There are multiple superb blogs and papers discussing the underlying mathematics and derivations of diffusion models. However, sparingly few explain in layman terms how the hyper parameters they chose relate to this theory.

So this notebook will take the opposite approach. Under the assumption that the math is indeed correct and is correctly implemented by an underlying library (which we will attempt NOT to modify unless needed), we will take an empirical approach and try various things to try to make-or-break a diffusion training recipe.

Hopefully, this will help us determine sosme simplified ways to deal with trivial diffusion training tasks on toy data. From there, one can follow the literature and learnings from this process, and scale up to real world datasets.

In this notebook, we will develop a standard model, use a simple pipeline from Hugging Face Diffusers, and train a model on a single multivariate sequence, try to train it to convergence (MSE ~ 1e-3 or lower on CPU training), generate some samples from the trained model, and in due process, hopefully understand all minute details required to enable stable training (at least on a toy dataset).

**Note: This is NOT a notebook to understand the underlying mathematics of Diffusion Models.** As mentioned above, there are plenty of much more appropriate resources in various posts and papers.

Topics covered :

- Importance of model design
- Importance of normalization of data
- Role played by $\beta$ when setting up Diffusion Scheduler
- Creation of custom Sampling Pipeline for Sequence generation task
- How to overfit a single training sample with Diffusion Training
- Improving the efficiency of single sample fitting
- Sampling


In [2]:
import math
%matplotlib inline
import matplotlib.pyplot as plt
from tqdm.auto import tqdm
from einops import rearrange, reduce
from einops.layers.torch import Rearrange
from dataclasses import dataclass
from typing import Union, Optional

import torch
from torch import nn, einsum
import torch.nn.functional as F

# Model Architecture - GPT

Commonly, U-Nets are used in literature with various modifications made to either location of attention blocks, number of strides, number of filters in each layer, how to add conditional information etc.

Vision Transformers show that training on image patches and applying standard Transformer layers works as well. [Diffusion Transformer](https://arxiv.org/abs/2212.09748) takes this idea and applies Transformers to Diffusion Generative modelling task. It requires some minor modifications and is applied to latent space (effectively doing latent diffusion to avoid the cost of attention blocks on high resolution images; though the authors suggest it can be modified to apply directly on image diffusion rather than latents).

For a single sample, we do not need such complexity. The single datapoint will be a 12-channel 26-timestep sequence, and the memory cost is trivially small enough to not even require subsampling. So we will directly apply a very small GPT network to the problem. Note: The code for the model is ported from nanoGPT.

This still apparantly posed sufficient problems which I'll discuss below:

- ``SelfAttention`` : Causal padding will ensure model fails to train on even a single sample. Had to remove causal padding entirely.

- ``Block`` : Unlike Conv-based U-Nets where the Diffusion Time features are passed to every block, GPT does not need this. In fact, while it can be added back by uncommenting the line in forward - it will actually cause the model to converge slower.

- ``SinusoidalPosEmb`` : While the general positional embedding can be either Absolute Sinusodial embeddings, or learned embeddings, when discussing the embedding for the Diffusion timestep itself - Sinusodial PE was quite necessary. Learned embedding did far worse (at least for single sample diffusion). Interestingly, swapping sine() with cos() in the final concat step has seemingly no effect on training for a single sample.

- ``PositionalEmbedding`` : Some works apply a small FF network after the Sinusodial Positional Embedding. This was not strictly necessary for a single image, but does help if you try to fit an entire dataset with a tiny number of parameters.

- ``GPT - Diffusion Time Embedding`` : For the GPT code, you must provide some form of diffusion time embedding to denote which timestep the current sample is at. For example, if you follow the line ``time_emb = self.transformer.wpe(time).unsqueeze(1) # time embeddings of shape (time, 1, D); len(time) = B`` with ``time_emb = torch.zeros_like(time_emb)`` - you are guarenteed to be unable to train your model. Nothing I tried could make the model train.

- ``GPT - Positional Embedding (Sequence Embedding)`` : Even for this simple task of overfitting a single sample, positional encodings (learned or sinusodial) are mandatory. Try to zero them out (or simply remove seq_emb from the addition in ``x = self.transformer.drop(inp_emb + seq_emb)`` to completely halt training. Nothing I tried could make the loss reduce below a very high value.

- ``Param Count``: For a single sample, we can get away with very few trainable parameters (discounting the parameters of the large embedding matrices since we do not train on all timesteps).

Reference: https://github.com/karpathy/nanoGPT

In [3]:
def new_gelu(x):
    """
    Implementation of the GELU activation function currently in Google BERT repo (identical to OpenAI GPT).
    Reference: Gaussian Error Linear Units (GELU) paper: https://arxiv.org/abs/1606.08415
    """
    return 0.5 * x * (1.0 + torch.tanh(math.sqrt(2.0 / math.pi) * (x + 0.044715 * torch.pow(x, 3.0))))

class SelfAttention(nn.Module):

    def __init__(self, n_embd: int, n_head: int, dropout: float = 0.0):
        super().__init__()
        assert n_embd % n_head == 0
        # key, query, value projections for all heads, but in a batch
        self.c_attn = nn.Linear(n_embd, 3 * n_embd)
        # output projection
        self.c_proj = nn.Linear(n_embd, n_embd)
        # regularization
        self.attn_dropout = nn.Dropout(dropout)
        self.resid_dropout = nn.Dropout(dropout)

        self.n_head = n_head
        self.n_embd = n_embd

    def forward(self, x):
        B, T, C = x.size() # batch size, sequence length, embedding dimensionality (n_embd)

        # calculate query, key, values for all heads in batch and move head forward to be the batch dim
        q, k ,v  = self.c_attn(x).split(self.n_embd, dim=2)
        k = k.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)
        q = q.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)
        v = v.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)

        # causal self-attention; Self-attend: (B, nh, T, hs) x (B, nh, hs, T) -> (B, nh, T, T)
        att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1)))
        # att = att.masked_fill(self.bias[:,:,:T,:T] == 0, float('-inf'))
        att = F.softmax(att, dim=-1)
        att = self.attn_dropout(att)
        y = att @ v # (B, nh, T, T) x (B, nh, T, hs) -> (B, nh, T, hs)
        y = y.transpose(1, 2).contiguous().view(B, T, C) # re-assemble all head outputs side by side

        # output projection
        y = self.resid_dropout(self.c_proj(y))
        return y

class MLP(nn.Module):

    def __init__(self, n_embd: int, dropout: float = 0.0):
        super().__init__()
        self.c_fc    = nn.Linear(n_embd, 4 * n_embd)
        self.c_proj  = nn.Linear(4 * n_embd, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x = self.c_fc(x)
        x = new_gelu(x)
        x = self.c_proj(x)
        x = self.dropout(x)
        return x

class Block(nn.Module):

    def __init__(self, n_embd: int, n_head: int, dropout: float = 0.0):
        super().__init__()
        self.ln_1 = nn.LayerNorm(n_embd)
        self.attn = SelfAttention(n_embd, n_head, dropout)
        self.ln_2 = nn.LayerNorm(n_embd)
        self.mlp = MLP(n_embd, dropout)

    def forward(self, x, t):
        # Try uncommenting below block to see if adding the diffusion timestep
        # embedding to every block benefits the quality of samples or not.
        # x = x + t
        x = x + self.attn(self.ln_1(x))
        x = x + self.mlp(self.ln_2(x))
        return x


# Added block
class SinusoidalPosEmb(nn.Module):
    def __init__(self, dim, max_timesteps: int = 10000):
        super().__init__()
        self.dim = dim
        self.max_timesteps = float(max_timesteps)

    def forward(self, x):
        device = x.device
        half_dim = self.dim // 2
        emb = math.log(self.max_timesteps) / (half_dim - 1)
        emb = torch.exp(torch.arange(half_dim, device=device) * -emb)
        emb = x[:, None] * emb[None, :]
        emb = torch.cat((emb.sin(), emb.cos()), dim=-1)
        return emb


class PositionalEmbedding(nn.Module):

    def __init__(self, dim, max_timesteps: int):
        super().__init__()
        self.pe = SinusoidalPosEmb(dim=dim, max_timesteps=max_timesteps)

        # Try uncommenting the follwing and the forward step to see whether learned
        # embeddings lead to a better result.

        # self.ff1 = nn.Linear(dim, dim * 4)
        # self.gelu = nn.GELU()
        # self.ff2 = nn.Linear(dim * 4, dim)

    def forward(self, x):
        out = self.pe(x)
        # out = self.ff2(self.gelu(self.ff1(out)))
        return out


@dataclass
class GPTConfig:
    dim: int
    block_size: int = 10000
    n_layer: int = 12
    n_head: int = 12
    n_embd: int = 128
    dropout: float = 0.1

class GPT(nn.Module):

    def __init__(self, config: GPTConfig):
        super().__init__()
        assert config.block_size is not None
        self.config = config

        self.transformer = nn.ModuleDict(dict(
            input_proj = nn.Linear(config.dim, config.n_embd, bias=False),
            wte = nn.Embedding(config.block_size, config.n_embd),
            wpe = PositionalEmbedding(config.n_embd, max_timesteps=config.block_size),
            drop = nn.Dropout(config.dropout),
            h = nn.ModuleList([Block(n_embd=config.n_embd, n_head=config.n_head, dropout=config.dropout) for _ in range(config.n_layer)]),
            ln_f = nn.LayerNorm(config.n_embd),
            output_proj = nn.Linear(config.n_embd, config.dim, bias=False),
        ))

        # report number of parameters
        n_params = sum(p.numel() for p in self.parameters())
        embd_params = sum(p.numel() for p in self.transformer.wte.parameters())
        print("Total number of parameters: %.2fM" % (n_params/1e6,))
        print("Non-Embedding number of parameters: %.2fM" % ((n_params - embd_params)/1e6,))
        print("Embedding number of parameters: %.2fM" % (embd_params/1e6,))

    def forward(self, x, time):
        device = x.device
        b, d, t = x.size()  # [B, C, T]

        assert d == self.config.dim, f"Expected input dim {self.config.dim} got {d}"

        time = time.to(dtype=torch.long, device=device) # torch.arange(0, t, dtype=torch.long, device=device).unsqueeze(0) # shape (1, t)
        if time.ndim == 0:
          time = time.unsqueeze(0).repeat(b)

        assert x.size(0) == time.size(0), f"Time ({len(time)}) and input batch size ({len(x)}) must match"

        # forward the GPT model itself
        x_input = x

        x = x.transpose(1, 2)  # [B, T, C]
        inp_emb = self.transformer.input_proj(x)  # [B, T, D]
        seq_emb = self.transformer.wte(torch.arange(0, t, dtype=torch.long, device=device).unsqueeze(0)) # [1, t, D]
        time_emb = self.transformer.wpe(time).unsqueeze(1) # time embeddings of shape (time, 1, D); len(time) = B

        # You can try to remove `seq_emb` from the following line to see the model fail to converge and generate any samples at all.
        x = self.transformer.drop(inp_emb + seq_emb)

        for block in self.transformer.h:
            # To see the necessity of the diffusion time emb, you can multiply it
            # with 0 to get a model that has no conditioning on the diffusion timestep.
            x = block(x, time_emb)
        x = self.transformer.ln_f(x)
        x_out = self.transformer.output_proj(x)
        x_out = x_out.transpose(1, 2)

        x = x_input + x_out

        return x

    # Add two properties used by HF Sampling Pipeline
    @property
    def dtype(self):
      return next(self.parameters()).dtype

    @property
    def device(self):
      return next(self.parameters()).device

# Preparing a Dataset

In this example, we will be using a tiny speech recognition dataset for Japanese vowels rather than the usual image diffusion. We chose such a task specifically because image diffusion has been studied extensively, and usually training even a single image diffusion model on CPU would take a non trivial amount of compute or memory.

We will use the [Japanese Vowels dataset](https://archive.ics.uci.edu/ml/datasets/Japanese+Vowels), where each sample is a multivariate time series comprising of 12 LPC cepstrum coefficients over a of 26 frames. The amount of compute required to train on this data is trivial compared to even a single 256x256 image.

Most importantly, though the dataset has 12 coefficients, we can easily visualize a single channel as a basic line plot, and we will later see that this sequence has a distinctive signature which we will task the diffusion model of learning.

## Downlaod processed dataset

This dataset is already processed as part of prior research for multivariate time series classification tasks at my repository, so we will simply download and use the processed numpy arrays.

In [4]:
import os

if not os.path.exists("/content/JapaneseVowels-20180329T000739Z-001.zip"):
  !wget https://github.com/titu1994/MLSTM-FCN/releases/download/v1.0/JapaneseVowels-20180329T000739Z-001.zip

--2025-04-20 20:27:57--  https://github.com/titu1994/MLSTM-FCN/releases/download/v1.0/JapaneseVowels-20180329T000739Z-001.zip
Resolving github.com (github.com)... 20.27.177.113
Connecting to github.com (github.com)|20.27.177.113|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/117737292/ff55440e-32be-11e8-96f9-99b45c89fb54?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=releaseassetproduction%2F20250420%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250420T202758Z&X-Amz-Expires=300&X-Amz-Signature=2a06ba37ff45ef7113b187d4779b18bfa33f742db8b99a54f539c4d57d350b92&X-Amz-SignedHeaders=host&response-content-disposition=attachment%3B%20filename%3DJapaneseVowels-20180329T000739Z-001.zip&response-content-type=application%2Foctet-stream [following]
--2025-04-20 20:27:58--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/117737292/ff55440e-32be-11e8-96f9-99b45c8

In [5]:
import zipfile
import os

dataset_root = "dataset/"
os.makedirs(dataset_root, exist_ok=True)

with zipfile.ZipFile('JapaneseVowels-20180329T000739Z-001.zip', 'r') as zip_ref:
  zip_ref.extractall(dataset_root)

## Normalize the dataset

In almost all literature for diffusion models, the standard practice is to normalize the input to be within the range [-1, 1], so we will do the same for this dataset.

A simple study can be to change this to mean-std normalization instead.

In [6]:
import numpy as np

jp_root = os.path.join(dataset_root, "JapaneseVowels")

x_train = np.load(os.path.join(jp_root, "X_train.npy"))  # [B, C, T]
x_test = np.load(os.path.join(jp_root, "X_test.npy"))  # [B, C, T]

# Normalize dataset to [-1, 1] range
x_train = (x_train - x_train.min(-1, keepdims=True)) / (x_train.max(-1, keepdims=True) - x_train.min(-1, keepdims=True))
x_train = 2 * x_train - 1

x_test = (x_test - x_test.min(-1, keepdims=True)) / (x_test.max(-1, keepdims=True) - x_test.min(-1, keepdims=True))
x_test = 2 * x_test - 1

# Convert dataset to tensors
x_train = torch.tensor(x_train, dtype=torch.float32)
x_test = torch.tensor(x_test, dtype=torch.float32)

channels = x_train.shape[1]
timesteps = x_train.shape[2]

print("Loaded X Train :", x_train.shape, "Max", x_train.max(), "Min", x_train.min())
print("Loaded X Test  :", x_test.shape, "Max", x_test.max(), "Min", x_test.min())

Loaded X Train : torch.Size([270, 12, 26]) Max tensor(1.) Min tensor(-1.)
Loaded X Test  : torch.Size([370, 12, 26]) Max tensor(1.) Min tensor(-1.)


# Check model forward pass

Given a dataset and a model, lets make sure that the forward pass works as expected.

Note: For a single sample, we wont be using millions of parameters, a few hundred thousand (or less) will be sufficient.

In [7]:
config = GPTConfig(dim=channels, n_embd=32, n_head=4, n_layer=4, dropout=0.0)
model = GPT(config)

Total number of parameters: 0.37M
Non-Embedding number of parameters: 0.05M
Embedding number of parameters: 0.32M


In [8]:
with torch.no_grad():
  tmp_ip = x_train[0:8]
  out = model(tmp_ip, time=torch.tensor(list(range(8)), dtype=torch.long))
  print("Input  :", tmp_ip.shape)
  print("Output :", out.shape)

Input  : torch.Size([8, 12, 26])
Output : torch.Size([8, 12, 26])


# Diffusion Scheduler

We will use the simple diffusion scheduler from the [Denoising Diffusion Probabilistic Models](https://arxiv.org/abs/2006.11239) that has been implemented in [Hugging Face](https://huggingface.co/docs/diffusers/api/schedulers/ddpm).

We will make a small modification to the paper, which is to change the number of diffusion steps from 1000 to something manageable, such as 50 or 100. This is for two purposes -

1) There's not much examples out there exploring how to make the train schedule itself require smaller number of diffusion steps - its usually in the order of hundreds or sticks to the defaults in the paper (1000 or more steps).

2) Very few works mention exactly how they select the parameters of $\beta_{start}$ and $\beta_{end}$ for the linear scheduler, which is commonly used (although there is a shift towards continous time discretization which resolves the matter).

-----

For this example, since we want to train the model quickly on a single sample only, it would be preferable to limit the amount of compute and optimizer updates required to achieve this. One simple method is to reduce the size of the Markov chain that defines the diffusion process - simply reduce the diffusion timesteps.

It is to be noted that the model will learn a better result with more diffusion steps, usually. But it will also exacerbate the inference cost which will require thousands of passes of the model to generate a single batch of images.


## How to select $\beta$ range?

First we need to note that $\beta$ is not directly used for the forward process (when converting data to gaussian noise). Instead we use $\alpha$ which can be defined as

\begin{align}
  \alpha_t &= 1 - \beta_t \\
  \bar{\alpha}_t &= \prod_{s=1}^t \alpha_s
\end{align}

Another thing to note is that optimally, at step $t=T$ where $T$ is the number of diffusion steps, the input data should be nearly gaussian noise $\mathcal{N}(0, 1)$.

However, when we change $T$, we also substantially affect $\alpha_t$ and $\bar{\alpha}_t$, which can lead to the output of the diffusion process not being similar to gaussian noise, thereby making the reverse diffusion process more difficult (intuitively, if your image is not completely noise, you would need a partially noisy image as your initial guess when sampling).

In [9]:
from diffusers import DDPMScheduler
from diffusers.schedulers.scheduling_ddpm import DDPMSchedulerOutput
from diffusers.utils import randn_tensor

# Setup Constants
DIFFUSION_TIMESTEPS = 50  # Number of Diffusion Steps during training. Note, this is much smaller than ordinary (usually ~1000)
beta_start = 0.0001  # Default from paper
beta_end = 0.2  # NOTE: This is different to accomodate the much shorter DIFFUSION_TIMESTEPS (Usually ~1000). For 1000 diffusion timesteps, use 0.02.
clip_sample = True  # Used for better sample generation

noise_scheduler = DDPMScheduler(num_train_timesteps=DIFFUSION_TIMESTEPS, beta_end=beta_end, clip_sample=clip_sample)
noise_scheduler

ImportError: cannot import name 'randn_tensor' from 'diffusers.utils' (/usr/local/lib/python3.11/dist-packages/diffusers/utils/__init__.py)

## Checking the validity of the noise schedule

Since we change ``DIFFUSION_TIMESTEPS``, we must in turn modify $\beta_{end}$ to ensure the diffusion process results in gaussian noise as the output. Remember that from Equation 14 in the DDPM paper, we effectively optimize the following -

\begin{align}
  \| \left(  ϵ - ϵ_θ (\sqrt{\bar{\alpha_t}} x_0 + \sqrt{1 - \bar{\alpha_t}} ϵ, t ) \right) \|^2
\end{align}

A simple way to accomplish this is to first modify $T$ to a required value, then to modify $\beta_{end}$ in order to observe the final values of $\sqrt{\bar{\alpha_t}}$ and $\sqrt{1 - \bar{\alpha_t}}$ to ensure they are close to 0 and 1 respectively. In doing so, the above $ϵ_Θ(.)$ represents gaussian noise alone.

You may modify the ``DIFFUSION_TIMESTEPS`` ($T$) and $\beta_{end}$ parameters above to observe the printed values. If they are not close to 0 and 1 respectively, the model may not converge properly.

### Alpha Cumulative Product Ranges

In [None]:
print(torch.sqrt(noise_scheduler.alphas_cumprod))
print(torch.sqrt(1. - noise_scheduler.alphas_cumprod))

### Sampled Variance Range

An important side-effect of increasing $\beta_{end}$ is the variance added to generated samples during the reverse process.

Recall that for the reverse process, we need to compute the predicted variance $\beta_t$ (See Equation 6 and 7 in the DDPM paper) and sample from it to get the previous sample in the reverse process. We then scale this sample by  $\sqrt{\tilde{\beta}_t}$, such that $\tilde{\beta}_t$ is defined as follows -

\begin{equation}
   \tilde{\beta}_t = \frac{1 - \bar{α_{t-1}}}{1 - \bar{α_t}} \beta_t
\end{equation}

The primary issue is that by scaling up the value of $\beta_{end}$, we have also scaled up the value of $\tilde{β}_t$, which affects the reverse diffusion process.

For a trivial task as learning a single sample, it may be fine to use a large enough $\beta_t$, but for any real training of diffusion models, care must be taken to have an appropriate number of diffusion steps to manage the scale of $\tilde{\beta}_t$ during the backward process.

Note that the final value of $\tilde{\beta}_T$ will be equivalent to $\beta_{end}$, and so we can calculate whether the scaling of gaussian noise will be excessive or not.

-----

**Note**: Under the DDPM paper's guidance, $T=1000$, and so $β_{end} = 0.02$ so $\tilde{\beta}_T = \sqrt{0.02} = 0.1414 ⋯$.

In [None]:
variance_ = []
for t in range(noise_scheduler.num_train_timesteps):
    alpha_prod_t = noise_scheduler.alphas_cumprod[t]
    alpha_prod_t_prev = noise_scheduler.alphas_cumprod[t - 1] if t > 0 else noise_scheduler.one

    # For t > 0, compute predicted variance βt (see formula (6) and (7) from https://arxiv.org/pdf/2006.11239.pdf)
    # and sample from it to get previous sample
    # x_{t-1} ~ N(pred_prev_sample, variance) == add variance to pred_sample
    variance = (1 - alpha_prod_t_prev) / (1 - alpha_prod_t) * noise_scheduler.betas[t]
    variance_.append(variance)

variance_ = torch.tensor(variance_)
print("Pred sample Variance:", variance_.min(), " -> ", variance_.max())
print("Pred sample stdev   :", variance_.sqrt())

### Visual check of Forward Process of Diffusion

Next, lets make sure visually that the forward diffusion step is correct. Below, you can modify $t$ to any integer value between $0$ to $T-1$ and see the progressive addition of gaussian noise to the original sequence (here we select just one channel of the model to make visualization simpler).

In [None]:
# take time step as any value 0 < t < DIFFUSION_TIMESTEPS
# you can change this value to see the change of original sample to noise
t = torch.tensor([DIFFUSION_TIMESTEPS - 1])
sequence = x_train[0:1].clone()  # We will use just a single sample throughout this tutorial

g = torch.Generator(device='cpu').manual_seed(0)
noise = torch.randn(sequence.shape, generator=g)
x_noisy = noise_scheduler.add_noise(sequence, noise, t)

plt.plot(sequence[0, 0], label='Sequence Channel 0')
plt.plot(x_noisy[0, 0], label='Noisy Channel 0')
plt.legend()

# If at final step of forward process
# X_noisy should have approximately mean=0.0 and std=1.0
if t[0] == DIFFUSION_TIMESTEPS - 1:
  print("X noisy statistics :", "mean=", x_noisy.mean(), "std=", x_noisy.std())

-----

We can create an animation to see the transformation of the input to gaussian noise progressing.

Note that with large number of diffusion timesteps, this might take some time to generate.

In [None]:
# NOTE: The following cell can take a long time for large DIFFUSION_STEPS

from matplotlib.animation import FuncAnimation
from matplotlib import rc
from IPython.display import HTML

# create the figure and axes objects
fig, ax = plt.subplots()

def animate_forward_process(i):
  t = torch.tensor([i])

  g = torch.Generator(device='cpu').manual_seed(0)
  noise = torch.randn(sequence.shape, generator=g)
  x_noisy = noise_scheduler.add_noise(sequence, noise, t)

  ax.clear()
  ax.set_title(f"T={i}")
  ax.plot(sequence[0, 0], label='Sequence Channel 0')
  ax.plot(x_noisy[0, 0], label='Noisy Channel 0')
  ax.legend(loc="upper right")

# run the animation
anim = FuncAnimation(fig, animate_forward_process, frames=list(range(DIFFUSION_TIMESTEPS)), interval=200, repeat=False)

rc('animation', html='jshtml')
HTML(anim.to_jshtml())

# Custom Diffusion Pipeline for Time Series Generation

We will be using the [diffusers](https://huggingface.co/docs/diffusers/index) throughout the notebook in order to abstract away the diffusion process logic. The code for ``DDPM`` can be read in order to understand the paper better.

Here, we will write a custom ``DiffusionPipelin`` for time series data, which is mostly a replication of the ``DDPMPipeline`` with some additional arguments.

In [None]:
from diffusers import DiffusionPipeline
from typing import List, Optional, Tuple, Union

@dataclass
class SequenceDataOutput:
  sequences: List[torch.Tensor]
  history: List[torch.Tensor]


class SequenceDDPMPipeline(DiffusionPipeline):

  def __init__(self, model: GPT, scheduler: DDPMScheduler):
    super().__init__()
    self.register_modules(model=model, scheduler=scheduler)

  @torch.no_grad()
  def __call__(self,
        batch_size: int = 1,
        generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
        num_inference_steps: int = DIFFUSION_TIMESTEPS,
        return_dict: bool = True,
        return_all: bool = False,  # new flag to return all the diffusion reverse process steps as a list
        explicit_noise: torch.Tensor = None,  # We can pass in an explicit loss value to ensure diffusion backward process is working as expected
        **kwargs,
  ) -> SequenceDataOutput:
    self.scheduler: DDPMScheduler
    self.model: GPT

    # Get the sequence length from the notebook's globals as they were defined above when loading the dataset
    # Or use the user defined value if passed.
    # NOTE: Since we train on a single sample with exact seq_len and seq_dim, changing this will NOT work during inference.
    seq_len = kwargs.get('seq_len', timesteps)
    seq_dim = kwargs.get('channels', channels)

    # Sample noise as an init to the reverse process, if no noise was explicitly provided
    if explicit_noise is None:
      sequence_data = torch.randn(batch_size, seq_dim, seq_len, device=self.device, generator=generator)
    else:
      sequence_data = explicit_noise.to(device=device)

    # Set inference timesteps
    self.scheduler.set_timesteps(num_inference_steps=num_inference_steps)

    # Check whether to mantain the history of all reverse diffusion process steps
    if return_all:
      history = [sequence_data.cpu()]
    else:
      history = None

    for t in self.progress_bar(self.scheduler.timesteps):
        # 1. predict noise model_output
        model_output = self.model(sequence_data, t)

        # 2. compute previous image: x_t -> x_t-1  # prev_sample
        sequence_data = self.scheduler.step(model_output, t, sequence_data, generator=generator).prev_sample

        # Optional: Preserve history of intermediate step generation
        if history is not None:
          history.append(sequence_data.cpu().clone())

    # Clamp to allowed prediction range
    sequence_data = sequence_data.clamp(min=-1.0, max=1.0)

    if not return_dict:
        return (sequence_data,)

    return SequenceDataOutput(sequences=sequence_data, history=history)

# Dataloader with a Single Sample

To diffuse a single sample, we dont really need to create a PyTorch Dataset or DataLoader, but for ease of use where we may want to experiment with more than one sample (or even the entire dataset), we will write a simple data loader.

In [None]:
BATCHSIZE = 128  # used only if you select more than one sample below

In [None]:
from torch.utils.data import Dataset, DataLoader

class TimeSeriesDataset(Dataset):

  def __init__(self, X_train):
    super().__init__()

    self.data = X_train.clone()

  def __getitem__(self, idx):
    sample = self.data[idx]  # [C, T]
    return sample

  def __len__(self):
    return len(self.data)

In [None]:
sample_idx = 0

assert 0 <= sample_idx < len(x_train), "Sample index cannot be > number of samples in train set or < 0!"
dataset = TimeSeriesDataset(x_train[sample_idx: sample_idx + 1])
dataloader = DataLoader(dataset, batch_size=BATCHSIZE, shuffle=True, num_workers=0, pin_memory=True)
print("Dataset size :", len(dataset), "\nSingle Sample shape: ", dataset[0].shape)

# Prepare Model for training

The model will learn on a single sample, so we do not need to have too many parameters or too much hyper parameter optimization. Standard Adam optimizer with a cosine schedule to lower the LR for 1000 steps should be enough.

In [None]:
from torch.optim import Adam
from diffusers.optimization import get_cosine_schedule_with_warmup

epochs = int(1e3)  # 1000 'epochs' seems a lot, but with single sample in the dataset, 1 epoch = 1 update step
num_steps = len(dataloader) * epochs

device = "cuda" if torch.cuda.is_available() else "cpu"

# Create small model with ~ 400,000 params (non embedding)
# NOTE: Such a small model is OK since we are training on a single sample
#       We would need to increase model size to train on the entire dataset !
config = GPTConfig(dim=channels, n_embd=128, n_head=4, n_layer=2, dropout=0.0)
model = GPT(config)
model.to(device)

# Since we will train on a single sample, we can use very high LR to converge faster with certain tricks
optimizer = Adam(model.parameters(), lr=1e-2, betas=[0.9, 0.999])

lr_scheduler = get_cosine_schedule_with_warmup(
    optimizer=optimizer,
    num_warmup_steps=100,
    num_training_steps=num_steps,
)

-------

You can uncomment the following line and perhaps change the path to load a pretrained checkpoint (after training at least one model).

In [None]:
# Load previous checkpoint if necessary
# model.load_state_dict(torch.load("/content/checkpoints/jp_vowels.pt"))

# Training

Training of a DDPM model is relatively simple and can be broken down into 3 steps -

1) Sample a batch of input tensors $x$, a corresponding batch of integers corresponding to diffusion timesteps $t$, and gaussian noise $\mathcal{N}(0, 1)$ of the same shape as $x$ (lets call it $ϵ$).

------

2) Compute $x_t$ from $x_0$, $t$ and $ϵ$ as follows -

\begin{align}
  res &= ϵ_θ (\sqrt{\bar{\alpha_t}} x_0 + \sqrt{1 - \bar{\alpha_t}} ϵ, t)
\end{align}

where $θ$ is the parameters of the model that predicts previous noise $ϵ_θ$ given the conditioning input of diffusion timestep $t$ along with the forward diffusion process at current timestep.

------

3) Compute the loss as follows and backprop -

\begin{align}
  loss &= \| \left( ϵ - res  \right) \|
\end{align}

------




For a single sample, the diffusion model must learn the following (vastly simplified) -

* Map a single timestep $t \: ; 0 \le t < T$
* And a single input data point of shape $\mathbb{R}^{C \times T}$, to which noise $ϵ$ is added according to the diffusion schedule $β$
* To $ϵ$ itself

In this scenario, we have three sources of randomness -
* The sampled input datapoint
* The timestep $t$ that defines the diffusion timestep of this step
* The gaussian noise that must be predicted $ϵ$

During diffusion model training over real datasets, training is memory bound due to the size of the tensors for $x$ and $ϵ$, along with the model's forward-backward pass + gradients. Therefore there is no choice but to sample $t$.

Intuitively, it can be considered that for a single given $x_{sample}$ - you can sample *any* $t$, and independently sample *any* $\epsilon$ to perform a single gradient update.

While $t$ is constrained to a finite set of values, $T$ can be very large (usually $1000$) and so you would need a large number of update steps to sample all possible $t$.

Furthermore, for any given $t$, you can sample nearly infinite variations of gaussian noise $\epsilon$ - so the model would need a large number of update steps if we trained it on a single $t$ and a single $ϵ$.

Fortunately, in our case $x$ is a tiny tensor of just $C=12 \times L=26$ floating point values. Furthermore, we want to train on just one single sample, so both $x_{sample}$ and $ϵ$ are both quite small.

So we can dramatically improve the efficiency of training by simply computing gaussian noise value $ϵ_t$ for all given $t \in [1,2,⋯,T]$. This way, the batch of samples now includes **all** $t$, and $t$ number of gaussian noise samples $ϵ_t \in \mathbb{R}^{T \times C \times L }$, such that the batch size is now equal to the diffusion timesteps $T$.

In [None]:
# How many `t` to sample in a single step. Note that it can be used only when training a single sample at a time.
batch_repeat = noise_scheduler.num_train_timesteps

checkpoint_name = "jp_vowels.pt"

############################################################################
from tqdm import tqdm

model.train()

# Checkpoint saving
checkpoint_dir = '/content/checkpoints/'
if not os.path.exists(checkpoint_dir):
  os.makedirs(checkpoint_dir, exist_ok=True)


global_step = 0
for epoch in tqdm(range(epochs), total=num_steps, desc='Epoch'):
    for step, batch in enumerate(dataloader):
      optimizer.zero_grad()

      # Single sample optimization : Repeat the sample such that [1, C, T] -> [arange(0, DIFFUSION_TIMESTEPS), C, T]
      # In doing so, we can parallely apply 1 random Diffusion `t` and `noise` to the same sample in a single step
      # Significantly improves convergence speed by providing multiple `t` and `noise` to calculate the loss
      if batch_repeat > 0 and batch.size(0) == 1:
        batch = batch.expand(batch_repeat, -1, -1)

      batch_size = batch.size(0)
      batch = batch.to(device)

      # Normal algorithm - sample t from discrete uniform distribution
      # t = torch.randint(0, noise_scheduler.num_train_timesteps, (batch_size,), dtype=torch.long, device=device)

      # Single sample optimization : Compute all values of T in a single update step
      # Explicitly compute all possible T in one shot
      t = torch.arange(0, noise_scheduler.num_train_timesteps, dtype=torch.long, device=device)

      # Noise sampling per `t` - note that each `t_i` corresponds to a different noise value of shape `[1, C, T]` !
      noise = torch.randn(batch.shape, device=device)
      noisy_sequence = noise_scheduler.add_noise(batch, noise, t)

      noise_pred = model(noisy_sequence, t)
      loss = F.mse_loss(noise_pred, noise)

      if (global_step + 1) % 100 == 0:
        print(f"Step : {global_step + 1} - LR: {optimizer.param_groups[0]['lr']:0.6f} - Loss: {loss.item()}")
        torch.save(model.state_dict(), os.path.join(checkpoint_dir, checkpoint_name))

      loss_val = loss.item()
      if loss_val > 0.1:
        print("Step :", global_step + 1, "Loss:", loss_val)

      loss.backward()

      optimizer.step()
      lr_scheduler.step()

      global_step += 1

# Sampling to generate a single sequence

Now that the model has hopefully converged, we can sample from it using the custom pipeline.

Something to note : We trained the model for an exceedingly small amount of steps, and so loss is non-zero. We do expect to potentially see samples generated that do not exactly match the input sequence.

In [None]:
sample_batchsize = 4

# Uncomment below to have determinstic sampling
g = None
# g = torch.Generator(device=device)
# g.manual_seed(0)

# Can try changing `num_inference_steps` here to check if we can diffuse a sample with fewer than DIFFUSION_TIMESTEPS steps
pipeline = SequenceDDPMPipeline(model, noise_scheduler)
pipeline = pipeline.to(device)

pipe_output = pipeline(sample_batchsize, num_inference_steps=DIFFUSION_TIMESTEPS, generator=g, return_all=True)
samples = pipe_output.sequences

for idx, smp in enumerate(samples):
  plt.plot(smp.reshape(channels, timesteps)[0].cpu(), label=f'Sample {idx + 1}')
  plt.plot(sequence[0].reshape(channels, timesteps)[0].cpu(), label=f'OG Sequence {idx + 1}')
  plt.legend()
  plt.show()

## Plotting the generation history

We can also observe the entire reverse diffusion process by plotting the history of a single channel over all the diffusion timesteps.

**Note**: For large number of diffusion timesteps, this step could take a long time.

In [None]:
hist_sample_id = 1
channel_id = 0

################################################################################
history = pipe_output.history

assert hist_sample_id < len(history), "hist_sample_id cannot be greater than number of generated samples !"
assert channel_id < channels, f"channel_id {channel_id} cannot be greater than number of channels {channels}"

# create the figure and axes objects
fig, ax = plt.subplots()

def animate_sampling_history(i):
  h = history[i]

  ax.clear()
  ax.set_title(f"Sample Generated at Timestep {DIFFUSION_TIMESTEPS - i - 1}")
  ax.plot(h[hist_sample_id, channel_id].cpu(), label=f'Generated Sample')
  ax.plot(sequence[0, channel_id].cpu(), label='Original Sequence')
  ax.legend(loc="upper right")

# run the animation
anim = FuncAnimation(fig, animate_sampling_history, frames=list(range(DIFFUSION_TIMESTEPS)), interval=200, repeat=False)

rc('animation', html='jshtml')
HTML(anim.to_jshtml())

# Conclusion

With this, we can now begin more advanced applications of diffusion models, such as conditional diffusion models, classifier free guidance training, continious time schedules, stochastic sampling and so on.

Studying diffusion on a single sample allows us to understand the challenges of each technique, and how to successfully apply them.