In [1]:
!git clone https://github.com/RahulV2478/gen-vis-assignment-4.git

Cloning into 'gen-vis-assignment-4'...
remote: Enumerating objects: 18, done.[K
remote: Counting objects: 100% (18/18), done.[K
remote: Compressing objects: 100% (14/14), done.[K
remote: Total 18 (delta 9), reused 13 (delta 4), pack-reused 0 (from 0)[K
Receiving objects: 100% (18/18), 3.43 MiB | 20.53 MiB/s, done.
Resolving deltas: 100% (9/9), done.


In [2]:
%cd gen-vis-assignment-4
!ls

/content/gen-vis-assignment-4
Assignment4.ipynb  Assignment4.pdf  DDPM.png  diffusion.py


In [7]:
!git pull

remote: Enumerating objects: 4, done.[K
remote: Counting objects:  25% (1/4)[Kremote: Counting objects:  50% (2/4)[Kremote: Counting objects:  75% (3/4)[Kremote: Counting objects: 100% (4/4)[Kremote: Counting objects: 100% (4/4), done.[K
remote: Compressing objects:  50% (1/2)[Kremote: Compressing objects: 100% (2/2)[Kremote: Compressing objects: 100% (2/2), done.[K
remote: Total 3 (delta 1), reused 3 (delta 1), pack-reused 0 (from 0)[K
Unpacking objects:  33% (1/3)Unpacking objects:  66% (2/3)Unpacking objects: 100% (3/3)Unpacking objects: 100% (3/3), 1.32 KiB | 1.32 MiB/s, done.
From https://github.com/RahulV2478/gen-vis-assignment-4
   ceb8016..8aa1854  main       -> origin/main
Updating ceb8016..8aa1854
Fast-forward
 train_diffusion.py | 96 [32m++++++++++++++++++++++++++++++++++++++++++++++++++++++[m
 1 file changed, 96 insertions(+)
 create mode 100644 train_diffusion.py


In [None]:
%%writefile diffusion.py
import math
import torch
import torch.nn as nn
import torch.nn.functional as F


class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.block = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.SiLU(),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.SiLU(),
        )

    def forward(self, x):
        return self.block(x)


class DiffusionModel(nn.Module):
    def __init__(self, image_size, channels=None, hidden_dims=None, channles=None):
        super().__init__()

        if channels is None:
            channels = channles
        if channels is None:
            raise ValueError("You must provide `channels` or `channles`.")

        if hidden_dims is None:
            hidden_dims = [32, 64]

        self.image_size = image_size
        self.in_channels = channels
        self.hidden_dims = hidden_dims

        self.max_time = 1000
        time_dim = hidden_dims[0]

        self.time_embed = nn.Embedding(self.max_time, time_dim)
        self.time_mlp = nn.Sequential(
            nn.Linear(time_dim, time_dim * 4),
            nn.SiLU(),
            nn.Linear(time_dim * 4, time_dim),
        )

        self.init_conv = nn.Conv2d(self.in_channels, hidden_dims[0], kernel_size=3, padding=1)

        self.down_blocks = nn.ModuleList()
        self.downsamples = nn.ModuleList()

        in_ch = hidden_dims[0]
        for i, out_ch in enumerate(hidden_dims):
            self.down_blocks.append(DoubleConv(in_ch, out_ch))
            if i != len(hidden_dims) - 1:
                self.downsamples.append(nn.MaxPool2d(kernel_size=2, stride=2))
            in_ch = out_ch

        bottleneck_channels = hidden_dims[-1] * 2
        self.bottleneck = DoubleConv(hidden_dims[-1], bottleneck_channels)

        decoder_dims = list(reversed(hidden_dims[:-1]))

        self.up_trans = nn.ModuleList()
        self.up_blocks = nn.ModuleList()
        self.time_projs = nn.ModuleList()

        in_ch = bottleneck_channels
        for out_ch in decoder_dims:
            self.up_trans.append(
                nn.ConvTranspose2d(in_ch, out_ch, kernel_size=2, stride=2)
            )
            self.up_blocks.append(DoubleConv(out_ch * 2, out_ch))
            self.time_projs.append(nn.Linear(time_dim, out_ch))
            in_ch = out_ch

        self.final_conv = nn.Conv2d(hidden_dims[0], self.in_channels, kernel_size=1)

    def forward(self, x, t):
        if t.dim() == 0:
            t = t.unsqueeze(0)
        t = t.long().clamp(max=self.max_time - 1)
        t_embed = self.time_embed(t)
        t_embed = self.time_mlp(t_embed)

        out = self.init_conv(x)

        skips = []
        for i, down_block in enumerate(self.down_blocks):
            out = down_block(out)
            if i != len(self.down_blocks) - 1:
                skips.append(out)
                out = self.downsamples[i](out)

        out = self.bottleneck(out)

        for i, up in enumerate(self.up_trans):
            out = up(out)
            skip = skips[-(i + 1)]

            time_feat = self.time_projs[i](t_embed)
            time_feat = time_feat.view(time_feat.size(0), -1, 1, 1)
            out = out + time_feat

            out = torch.cat([out, skip], dim=1)
            out = self.up_blocks[i](out)

        out = self.final_conv(out)
        return out


class DiffusionProcess:
    def __init__(
        self,
        image_size,
        channels,
        hidden_dims=None,
        beta_start=1e-4,
        beta_end=0.02,
        noise_steps=1000,
        beta_schedule="linear",
        device=torch.device("cpu"),
    ):
        if hidden_dims is None:
            hidden_dims = [32, 64]

        self.image_size = image_size
        self.channels = channels
        self.hidden_dims = hidden_dims
        self.beta_start = beta_start
        self.beta_end = beta_end
        self.noise_steps = noise_steps

        if device is None:
            device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.device = device

        steps = torch.linspace(0.0, 1.0, noise_steps, device=self.device)

        if beta_schedule == "linear":
            betas = torch.linspace(beta_start, beta_end, noise_steps, device=self.device)
        elif beta_schedule == "cosine":
            cosine = 0.5 * (1.0 - torch.cos(torch.pi * steps))
            betas = beta_start + (beta_end - beta_start) * cosine
        else:
            raise ValueError(f"Unknown beta_schedule: {beta_schedule}")

        self.betas = betas
        self.alphas = 1.0 - self.betas
        self.alpha_cumprod = torch.cumprod(self.alphas, dim=0)

        self.sqrt_alpha_cumprod = torch.sqrt(self.alpha_cumprod)
        self.sqrt_one_minus_alpha_cumprod = torch.sqrt(1.0 - self.alpha_cumprod)

        self.model = DiffusionModel(
            image_size=image_size,
            channels=channels,
            hidden_dims=hidden_dims,
        ).to(self.device)

        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=2e-4)

    def add_noise(self, x, t):
        x = x.to(self.device)
        t = t.to(self.device).long()

        sqrt_alpha_hat = self.sqrt_alpha_cumprod[t].view(-1, 1, 1, 1)
        sqrt_one_minus_alpha_hat = self.sqrt_one_minus_alpha_cumprod[t].view(-1, 1, 1, 1)

        noise = torch.randn_like(x)
        noisy_x = sqrt_alpha_hat * x + sqrt_one_minus_alpha_hat * noise
        return noisy_x, noise

    @torch.no_grad()
    def sample_from_noise(self, x_T):
        self.model.eval()
        x = x_T.to(self.device)

        num_samples = x.size(0)
        for t in reversed(range(self.noise_steps)):
            t_batch = torch.full((num_samples,), t, device=self.device, dtype=torch.long)
            eps_theta = self.model(x, t_batch)

            beta_t = self.betas[t]
            alpha_t = self.alphas[t]
            alpha_hat_t = self.alpha_cumprod[t]

            coef1 = 1.0 / torch.sqrt(alpha_t)
            coef2 = (1.0 - alpha_t) / torch.sqrt(1.0 - alpha_hat_t)

            x = coef1 * (x - coef2 * eps_theta)

            if t > 0:
                sigma_t = torch.sqrt(beta_t)
                x = x + sigma_t * torch.randn_like(x)

        return x

    @torch.no_grad()
    def sample(self, num_samples=16):
        x_T = torch.randn(
            num_samples,
            self.channels,
            self.image_size,
            self.image_size,
            device=self.device,
        )
        return self.sample_from_noise(x_T)

    def train_step(self, x):
        self.model.train()
        x = x.to(self.device)
        batch_size = x.size(0)

        t = torch.randint(
            low=0,
            high=self.noise_steps,
            size=(batch_size,),
            device=self.device,
        )

        noisy_x, noise = self.add_noise(x, t)
        pred_noise = self.model(noisy_x, t)

        loss = F.mse_loss(pred_noise, noise)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        return float(loss.item())


In [None]:
%%writefile train_diffusion.py
import os
import torch
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchvision.utils import save_image

from diffusion import DiffusionProcess


def get_mnist_dataloader(batch_size=128, image_size=28):
    transform = transforms.Compose([
        transforms.Resize(image_size),
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,))
    ])

    train_dataset = datasets.MNIST(
        root="./data",
        train=True,
        transform=transform,
        download=True,
    )

    dataloader = DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=2,
        drop_last=True,
    )
    return dataloader


def train_diffusion(
    num_epochs=5,
    batch_size=128,
    noise_steps=1000,
    device=None,
    sample_every=1,
    num_sample_images=16,
    out_dir="outputs",
    beta_schedule="linear",
):
    if device is None:
        device = "cuda" if torch.cuda.is_available() else "cpu"
    device = torch.device(device)

    os.makedirs(out_dir, exist_ok=True)

    dataloader = get_mnist_dataloader(batch_size=batch_size, image_size=28)

    diffusion = DiffusionProcess(
        image_size=28,
        channels=1,
        noise_steps=noise_steps,
        beta_start=1e-4,
        beta_end=0.02,
        beta_schedule=beta_schedule,
        device=device,
    )

    loss_history = []

    for epoch in range(num_epochs):
        running_loss = 0.0

        for x, _ in dataloader:
            loss = diffusion.train_step(x)
            running_loss += loss

        avg_loss = running_loss / len(dataloader)
        loss_history.append(avg_loss)
        print(f"[Epoch {epoch+1}/{num_epochs}] loss={avg_loss:.4f}")

        if (epoch + 1) % sample_every == 0:
            diffusion.model.eval()
            with torch.no_grad():
                samples = diffusion.sample(num_samples=num_sample_images)
                samples = (samples.clamp(-1, 1) + 1) / 2.0
                save_path = os.path.join(out_dir, f"samples_epoch_{epoch+1}.png")
                save_image(samples, save_path, nrow=int(num_sample_images ** 0.5))
                print(f"Saved samples to {save_path}")

    model_path = os.path.join(out_dir, "diffusion_mnist.pth")
    torch.save(diffusion.model.state_dict(), model_path)
    print(f"Saved model to {model_path}")

    return diffusion, loss_history


# Assignment 4: Diffusion Model

In this assignment, you will implement a diffusion model from scratch and train it on the MNIST dataset. Diffusion models are a class of generative models that learn to gradually denoise random noise to generate realistic images. This assignment will guide you through the core components and training process of diffusion models.

Useful links:
1. [What are Diffusion Models?](https://lilianweng.github.io/posts/2021-07-11-diffusion-models/)
2. [Denoising Diffusion Probabilistic Models](https://arxiv.org/abs/2006.11239)

Please:
* Fill out the code marked with `TODO` or `Your code here`. You are allowed to split functions or visualizations to different files for more flexibility as long as your output includes what we asked.
* Reuse or modify visualization code from Assignment 2 for creating necessary visualizations.
* Submit the notebook with all original outputs. If the output is included from another file, please include them into your folder.
* Answer questions at the end of the notebook. Write your answere in the notebook.

**Please reserve enough time for this assignment given the potential amount of time for training.**

In [3]:
import torch

## Part 1: Implementing the U-Net (30 pt)

In this part, you will implement a U-Net style model that serves as the backbone for the diffusion process. The model takes noisy images and their corresponding timesteps as input and predicts the noise that was added to the original images.

Please fill out the code in `diffusion.DiffusionModel` then run the following code for test. For the time embedding, you can only use one embedding layer and concatenate it with the feature. The attention layer is not enforced given the computation resource.

In [4]:
from diffusion import DiffusionModel

def check_diffusion_model(model_class):
    """Verify that the DiffusionModel class is correctly implemented."""
    try:
        channels = 1
        image_size = 28
        noise_steps = 1000
        model = model_class(image_size=image_size, channles=channels)

        # Test forward pass with random inputs
        batch_size = 4
        x = torch.randn(batch_size, channels, image_size, image_size)
        t = torch.randint(0, noise_steps, (batch_size,))

        output = model(x, t)

        # Check output shape
        expected_shape = (batch_size, channels, image_size, image_size)
        assert output.shape == expected_shape, f"Expected output shape {expected_shape}, got {output.shape}"

        print("DiffusionModel implementation is correct!")
        return True
    except Exception as e:
        print(f"DiffusionModel check failed: {str(e)}")
        return False

check_diffusion_model(DiffusionModel)

DiffusionModel implementation is correct!


True

## Part 2: Implementing the Diffusion Process (30 pt)

In this part, you will implement the core diffusion process, including the forward diffusion (adding noise) and the denoising process. This includes setting up the noise schedule and implementing functions for noise addition and sampling.

Please fill out the code in `diffusion.DiffusionProcess` then run the following code for test. Note that this test only tests the correctness of the output format. You need to be careful about the actual math.

In [6]:
from diffusion import DiffusionProcess

def check_diffusion_process(diffusion_class):
    """Verify that the DiffusionProcess class is correctly implemented."""
    try:
        channels = 1
        image_size = 28
        noise_steps = 1000
        diffusion = diffusion_class(image_size=image_size, channels=channels, noise_steps=noise_steps)

        # Test add_noise function
        batch_size = 4
        x = torch.randn(batch_size, channels, image_size, image_size)
        t = torch.randint(0, noise_steps, (batch_size,))

        noisy_x, noise = diffusion.add_noise(x, t)
        assert noisy_x.shape == x.shape, f"Expected noisy_x shape {x.shape}, got {noisy_x.shape}"
        assert noise.shape == x.shape, f"Expected noise shape {x.shape}, got {noise.shape}"

        # Test train_step function
        loss = diffusion.train_step(x)
        assert isinstance(loss, float), f"Expected loss to be a float, got {type(loss)}"

        print("DiffusionProcess implementation is correct!")
        return True
    except Exception as e:
        print(f"DiffusionProcess check failed: {str(e)}")
        return False

check_diffusion_process(DiffusionProcess)

DiffusionProcess implementation is correct!


True

## Part 3: Training and Sampling (20 points)

In this part, you will implement the training loop for the diffusion model and the functions for generating and visualizing samples. Please try to follow the assignment you have written and use the `DiffusionModel`  and `DiffusionProcess` above for write your training function. You should write your training code in a standalone python file.

Please include the training curves and the sampled results below. You can reuse the visualization code we provided in the GAN assignment.

You can include an image like:

![image](./DDPM.png)


In [10]:

from train_diffusion import train_diffusion

diffusion_linear, loss_linear = train_diffusion(
    num_epochs=20,
    batch_size=128,
    noise_steps=1000,
    device=None,
    sample_every=1,
    num_sample_images=16,
    out_dir="outputs_linear",
    beta_schedule="linear",
)

diffusion_cosine, loss_cosine = train_diffusion(
    num_epochs=10,
    batch_size=128,
    noise_steps=1000,
    device=None,
    sample_every=1,
    num_sample_images=16,
    out_dir="outputs_cosine",
    beta_schedule="cosine",
)


[Epoch 1/20] loss=0.1005
Saved samples to outputs/samples_epoch_1.png
[Epoch 2/20] loss=0.0528
Saved samples to outputs/samples_epoch_2.png
[Epoch 3/20] loss=0.0472
Saved samples to outputs/samples_epoch_3.png
[Epoch 4/20] loss=0.0442
Saved samples to outputs/samples_epoch_4.png
[Epoch 5/20] loss=0.0417
Saved samples to outputs/samples_epoch_5.png
[Epoch 6/20] loss=0.0399
Saved samples to outputs/samples_epoch_6.png
[Epoch 7/20] loss=0.0388
Saved samples to outputs/samples_epoch_7.png
[Epoch 8/20] loss=0.0379
Saved samples to outputs/samples_epoch_8.png
[Epoch 9/20] loss=0.0373
Saved samples to outputs/samples_epoch_9.png
[Epoch 10/20] loss=0.0363
Saved samples to outputs/samples_epoch_10.png
[Epoch 11/20] loss=0.0356
Saved samples to outputs/samples_epoch_11.png
[Epoch 12/20] loss=0.0350
Saved samples to outputs/samples_epoch_12.png
[Epoch 13/20] loss=0.0339
Saved samples to outputs/samples_epoch_13.png
[Epoch 14/20] loss=0.0341
Saved samples to outputs/samples_epoch_14.png
[Epoch 15/

In [None]:

import matplotlib.pyplot as plt

plt.figure(figsize=(5, 3))
plt.plot(loss_linear, marker="o")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Diffusion training loss (linear β)")
plt.grid(True, alpha=0.3)
plt.show()

plt.figure(figsize=(6, 3))
plt.plot(range(len(loss_linear)), loss_linear, marker="o", label="linear β")
plt.plot(range(len(loss_cosine)), loss_cosine, marker="o", label="cosine β")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Diffusion training loss: linear vs cosine β")
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()


In [None]:

from PIL import Image
import os

final_samples_path = os.path.join("outputs_linear", "samples_epoch_20.png")

if os.path.exists(final_samples_path):
    img = Image.open(final_samples_path)
    plt.figure(figsize=(4, 4))
    plt.imshow(img, cmap="gray")
    plt.axis("off")
    plt.title("Diffusion samples (epoch 20, linear β)")
    plt.show()
else:
    print("Could not find:", final_samples_path)

cosine_samples_path = os.path.join("outputs_cosine", "samples_epoch_10.png")

if os.path.exists(cosine_samples_path):
    img = Image.open(cosine_samples_path)
    plt.figure(figsize=(4, 4))
    plt.imshow(img, cmap="gray")
    plt.axis("off")
    plt.title("Diffusion samples (epoch 10, cosine β)")
    plt.show()
else:
    print("Could not find:", cosine_samples_path)


In [None]:
import torch
import torch.nn.functional as F
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

device = next(diffusion_linear.model.parameters()).device

transform = transforms.ToTensor()
mnist_train = datasets.MNIST(root="./data", train=True, transform=transform, download=True)
loader = DataLoader(mnist_train, batch_size=64, shuffle=True)

def compute_loss_by_timestep(diffusion, data_loader, num_batches=2, num_ts=50):
    diffusion.model.eval()
    noise_steps = diffusion.noise_steps

    ts = torch.linspace(0, noise_steps - 1, num_ts).long().to(device)
    losses = torch.zeros(num_ts, device=device)
    batch_count = 0

    with torch.no_grad():
        for batch_idx, (x, _) in enumerate(data_loader):
            if batch_idx >= num_batches:
                break
            x = x.to(device)
            b = x.size(0)

            for i, t in enumerate(ts):
                t_batch = t.repeat(b)
                noisy_x, true_noise = diffusion.add_noise(x, t_batch)
                pred_noise = diffusion.model(noisy_x, t_batch)
                loss = F.mse_loss(pred_noise, true_noise, reduction="mean")
                losses[i] += loss

            batch_count += 1

    losses = (losses / batch_count).cpu()
    return ts.cpu(), losses

ts, losses = compute_loss_by_timestep(diffusion_linear, loader)

plt.figure(figsize=(6, 3))
plt.plot(ts, losses, marker="o", linewidth=1)
plt.xlabel("timestep t")
plt.ylabel("avg MSE loss")
plt.title("Per-timestep noise-prediction loss (linear β)")
plt.grid(True, alpha=0.3)
plt.show()

## Part 4: Analysis and Visualization (20 points)

Answer the question with your analysis. Most of the questions are open-ended. We are looking for yourown observasion from the experiments you did.

1. How does the choice of noise schedule (beta values) affect the training stability and sample quality? Try at least one alternative to the linear schedule (e.g., cosine or quadratic) and compare the results.

[Answer]:

2. Based on your observations, at which timesteps (early, middle, or late in the diffusion process) does the model seem to struggle the most with accurately predicting the noise (looking into loss)? Why do you think this occurs?

[Answer]:

3. Perform interpolation between two noise vectors and analyze the resulting generated images. Is the transition smooth? What does this tell you about the model's learned latent space?

[Answer]:

4. Recall Assignment 2, we implemented GAN. compare your diffusion model with GANs in terms of:
* Training stability
* Sample quality
* Diversity of samples
* Computational requirements
* Anything else you find interesting

[Answer]: