**Please follow the following steps in order to get required data and checkpoints**:

1. Click [this link](https://drive.google.com/drive/folders/1kogf2w-wmwVz4vK6sh0fel6KLAPvNCC1?usp=sharing).  
2. In the top-left, click the folder name **`SAMSUNG_AIExpert`** → **`Organize`**(**정리**) → **`Add shortcut to Drive`**(**바로가기에 추가**) → choose your desired ❗location❗ to add the folder.

3. In the cell right below, run `drive.mount('/content/drive')` and set `FOLDER_PATH` to the ❗location❗ where you added the folder in step 2.

In [1]:
!nvidia-smi

Tue Aug 12 05:12:42 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 565.57.01              Driver Version: 565.57.01      CUDA Version: 12.7     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  NVIDIA GeForce RTX 3090        On  |   00000000:3D:00.0 Off |                  N/A |
| 30%   27C    P8             24W /  350W |       2MiB /  24576MiB |      0%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
                                                

In [1]:
# from google.colab import drive
from pathlib import Path
import os, json, re, shutil, zipfile

# drive.mount('/content/drive')

In [2]:
FOLDER_PATH = f"./" ## 👈 Edit here and set FOLDER_PATH to the ❗location❗
FOLDER_PATH = Path(FOLDER_PATH)

In [3]:
FOLDER_PATH

PosixPath('.')

We will implement and train a DDPM model to generate small 128 x 128 images conditioned on text prompts. First, we will implement the forward noising process based on Eq. (4) of the paper [1]. Then we will build a UNet model that takes $x_t$ and $t$ as inputs (optionally with other conditioning like text-prompt) and outputs a tensor of the same shape as $x_t$. Finally, we will implement the denoising objective and train our DDPM model.

We use the text encoder from a pretrained CLIP[2] model to encode input text into a 512-dimensional vector. To speed up training, we've already pre-encoded the text data from the training set.

[1] Denoising Diffusion Probabilistic Models. Jonathan Ho, Ajay Jain, Pieter Abbeel. [Link](https://arxiv.org/pdf/2006.11239)
[2] Learning transferable visual models from natural language supervision. Radford et. al. [Link](https://github.com/openai/CLIP)

# Set up & Install Packages
Run the cells below in order.

In [5]:
!git clone https://github.com/rlawldud53/AIExpert_Samsung.git

Cloning into 'AIExpert_Samsung'...
remote: Enumerating objects: 27, done.[K
remote: Counting objects: 100% (27/27), done.[K
remote: Compressing objects: 100% (21/21), done.[K
remote: Total 27 (delta 7), reused 0 (delta 0), pack-reused 0 (from 0)[K
Receiving objects: 100% (27/27), 13.46 KiB | 4.49 MiB/s, done.
Resolving deltas: 100% (7/7), done.


In [4]:
!pip install git+https://github.com/openai/CLIP.git
!pip install gdown

Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com
Collecting git+https://github.com/openai/CLIP.git
  Cloning https://github.com/openai/CLIP.git to /tmp/pip-req-build-_jjcz_ed
  Running command git clone --filter=blob:none --quiet https://github.com/openai/CLIP.git /tmp/pip-req-build-_jjcz_ed
  Resolved https://github.com/openai/CLIP.git to commit dcba3cb2e2827b402d2701e7e1c7d9fed8a20ef1
  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone
[?25hCollecting ftfy (from clip==1.0)
  Downloading ftfy-6.3.1-py3-none-any.whl.metadata (7.3 kB)
Collecting regex (from clip==1.0)
  Downloading regex-2025.7.34-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl.metadata (40 kB)
Collecting tqdm (from clip==1.0)
  Downloading tqdm-4.67.1-py3-none-any.whl.metadata (57 kB)
Downloading ftfy-6.3.1-py3-none-any.whl (44 kB)
Downloading regex-2

In [6]:
%load_ext autoreload
%autoreload 2

import numpy as np
import torch
import random
import matplotlib.pyplot as plt
import torchvision.utils as tv_utils
from tqdm.auto import tqdm
import os

def rel_error(x, y):
    """Returns relative error."""
    return np.max(np.abs(x - y) / (np.maximum(1e-10, np.abs(x) + np.abs(y))))

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


  from .autonotebook import tqdm as notebook_tqdm


# Download Dataset

We will be using the **CelebA-Dialog** dataset.  
CelebA-Dialog is an extension of the CelebA dataset, where each facial image is paired with a **textual description** (caption) that provides information about the person's facial attributes, appearance, or other visual characteristics.  

- **Images**: High-quality celebrity face images.  
- **Captions**: Human-written descriptions that include various attributes such as hair color, hairstyle, facial expressions, accessories, and more.  

In [12]:
####### Dataset Download Path #######
ROOT = Path("./data/celeba_dialog")
RAW        = FOLDER_PATH / "data"

tmp_extract =  ROOT / "_unzipped"
IMAGES_DIR = ROOT / "_unzipped" / "image"
TEXT_DIR   = ROOT / "text"
cap_json_path = RAW / "captions.json"
img_zip_path  = RAW / "images.zip"

if tmp_extract.exists():
    shutil.rmtree(tmp_extract)
with zipfile.ZipFile(img_zip_path) as z:
    members = [m for m in z.infolist() if not m.is_dir()]
    for m in tqdm(members, desc="Unzipping", unit="file"):
        z.extract(m, tmp_extract)
files = [p for ext in ("*.jpg","*.jpeg","*.png") for p in tmp_extract.rglob(ext)]

moved = 0
for src in tqdm(files, desc="Organizing images", unit="img"):
    dst = IMAGES_DIR / src.name
    if not dst.exists():
        shutil.copy2(src, dst)
        moved += 1

print(f"[OK] images organized: {moved} files")

jsonl_path = ROOT / "celeba_dialog.jsonl"

if not jsonl_path.exists():
    with open(cap_json_path, "r", encoding="utf-8") as f:
        data = json.load(f)

    imgset = {p.name for ext in ("*.jpg","*.jpeg","*.png") for p in IMAGES_DIR.glob(ext)}
    final = []
    for img, value_dict in data.items():
        if not img or img not in imgset: continue

        if isinstance(value_dict.get("overall_caption"), str):
            caps = [value_dict["overall_caption"].strip()]
        else:
            caps = ["a portrait photo"]
        for c in caps:
            final.append({"image": img, "caption": c})

    if not final:
        final = [{"image": img, "caption": "a portrait photo"} for img in sorted(imgset)]

    with open(jsonl_path, "w", encoding="utf-8") as f:
        f.write("\n".join(json.dumps(r, ensure_ascii=False) for r in final))
    print(f"[OK] JSONL written: {len(final)} → {jsonl_path}")

else:
    print(f"[SKIP] JSONL exists: {jsonl_path}")

FileNotFoundError: [Errno 2] No such file or directory: 'data/images.zip'

# Look into Dataset

- Load and inspect a few samples from the CelebADialog dataset.
- Verify that image–caption pairs are correctly aligned.

In [11]:
import json, re
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from PIL import Image
import torchvision.transforms as T
from pathlib import Path
from ai_expert.celebadialog_dataset import ClipEmbed, CelebADialogDataset

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
clip_embedder = ClipEmbed(device)

image_size = 128
JSONL_PATH = f"./data/celeba_dialog/celeba_dialog.jsonl"
dataset = CelebADialogDataset(IMAGES_DIR, JSONL_PATH, image_size=image_size,
                                  normalize=False, clip_embedder=clip_embedder)

100%|███████████████████████████████████████| 338M/338M [00:12<00:00, 28.2MiB/s]


FileNotFoundError: [Errno 2] No such file or directory: './data/celeba_dialog/celeba_dialog.jsonl'

In [None]:
def visualize_samples(dataset, num_samples=16, grid_size=(4, 4), max_text_len=50):
    # Randomly sample indices
    indices = random.sample(range(len(dataset)), num_samples)
    samples = [dataset[i] for i in indices]

    # Inspect one sample
    img_shape = list(samples[0][0].shape)
    emb_shape = list(samples[0][1]["text_emb"].shape)
    print(f"One sample: (image: {img_shape}, {{ \"text_emb\": {emb_shape}, \"text\": string }})")

    # Extract images and texts
    images = torch.stack([sample[0] for sample in samples])  # Stack images
    texts = [sample[1]["text"] for sample in samples]  # Extract text descriptions

    # Create a grid of images
    grid_img = tv_utils.make_grid(images, nrow=grid_size[1], padding=2)

    # Convert to numpy for plotting
    grid_img = grid_img.permute(1, 2, 0).numpy()

    # Plot the images
    fig, ax = plt.subplots(figsize=(15, 15))
    ax.imshow(grid_img)
    ax.axis("off")

    # Add text annotations
    grid_w, grid_h = grid_size
    img_w, img_h = grid_img.shape[1] // grid_w, grid_img.shape[0] // grid_h

    for i, text in enumerate(texts):
        row, col = divmod(i, grid_w)
        x, y = col * img_w, row * img_h

        # Wrap text
        wrapped_text = ""
        words = text.split()
        current_line = ""
        for word in words:
            if len(current_line) + len(word) + 1 <= max_text_len:
                if current_line:
                    current_line += " "
                current_line += word
            else:
                wrapped_text += current_line + "\n"
                current_line = word
        wrapped_text += current_line

        ax.text(x+5, y+5, wrapped_text, fontsize=6, color='white', bbox=dict(facecolor='black', alpha=0.5))

    plt.show()

visualize_samples(dataset)

# Implement Gaussian Diffusion (Q1, Q2, Q4, Q5, Q6)

From this point on, you will implement several methods in the `GaussianDiffusion` class.  
Fill in the sections marked with `# TODO` comments and run the cells
If the implementation is incorrect, you will not meet the requirements in the following cells.


In [None]:
import torch
import torch.nn as nn
from tqdm.auto import tqdm
import math


class GaussianDiffusion(nn.Module):
    def __init__(
        self,
        model,
        *,
        image_size,
        timesteps=1000,
        objective="pred_noise",
        beta_schedule="sigmoid",
    ):
        super().__init__()

        self.model = model
        self.channels = 3
        self.image_size = image_size
        self.objective = objective
        assert objective in {
            "pred_noise",
            "pred_x_start",
        }, "objective must be either pred_noise (predict noise) or pred_x_start (predict image start)"

        # A helper function to register some constants as buffers to ensure that
        # they are on the same device as model parameters.
        # See https://pytorch.org/docs/stable/generated/torch.nn.Module.html
        # Each buffer can be accessed as `self.name`
        register_buffer = lambda name, val: self.register_buffer(name, val.float())

        #############################################################################
        # Noise schedule beta and alpha values
        #############################################################################
        betas = get_beta_schedule(beta_schedule, timesteps)
        self.num_timesteps = int(betas.shape[0])
        alphas = 1.0 - betas
        alphas_cumprod = torch.cumprod(alphas, dim=0)  # alpha_bar_t
        register_buffer("betas", betas)  # can be accessed as self.betas
        register_buffer("alphas", alphas)  # can be accessed as self.alphas
        register_buffer("alphas_cumprod", alphas_cumprod)  # self.alphas_cumprod

        #############################################################################
        # Other coefficients needed to transform between x_t, x_0, and noise
        # Note that according to Eq. (4) and its reparameterization in Eq. (14),
        # x_t = sqrt(alpha_bar_t) * x_0 + sqrt(1 - alpha_bar_t) * noise
        # where noise is sampled from N(0, 1)
        #############################################################################
        register_buffer("sqrt_alphas_cumprod", torch.sqrt(alphas_cumprod))
        register_buffer(
            "sqrt_one_minus_alphas_cumprod", torch.sqrt(1.0 - alphas_cumprod)
        )
        # register_buffer("sqrt_recip_alphas_cumprod", torch.sqrt(1.0 / alphas_cumprod))
        # register_buffer(
        #     "sqrt_recipm1_alphas_cumprod", torch.sqrt(1.0 / alphas_cumprod - 1)
        # )

        #############################################################################
        # For posterior q(x_{t-1} | x_t, x_0) according to Eq. (6) and (7) of the paper.
        #############################################################################
        # alpha_bar_{t-1}
        alphas_cumprod_prev = nn.functional.pad(alphas_cumprod[:-1], (1, 0), value=1.0)
        register_buffer(
            "posterior_mean_coef1",
            betas * torch.sqrt(alphas_cumprod_prev) / (1.0 - alphas_cumprod),
        )
        register_buffer(
            "posterior_mean_coef2",
            (1.0 - alphas_cumprod_prev) * torch.sqrt(alphas) / (1.0 - alphas_cumprod),
        )
        posterior_var = betas * (1.0 - alphas_cumprod_prev) / (1.0 - alphas_cumprod)
        posterior_std = torch.sqrt(posterior_var.clamp(min=1e-20))
        register_buffer("posterior_std", posterior_std)

        #################################################################
        # loss weight
        #################################################################
        snr = alphas_cumprod / (1 - alphas_cumprod)
        loss_weight = torch.ones_like(snr) if objective == "pred_noise" else snr
        register_buffer("loss_weight", loss_weight)

    def normalize(self, img):
        return img * 2 - 1

    def unnormalize(self, img):
        return (img + 1) * 0.5

    def predict_start_from_noise(self, x_t, t, noise):
        """Get x_start from x_t and noise according to Eq. (14) of the paper.
        Args:
            x_t: (b, *) tensor. Noisy image.
            t: (b,) tensor. Time step.
            noise: (b, *) tensor. Noise from N(0, 1).
        Returns:
            x_start: (b, *) tensor. Starting image.
        """
        x_start = None
        ####################################################################
        # Q2.TODO:
        # Transform x_t and noise to get x_start according to Eq.(4) and Eq.(14).
        # Look at the coeffs in `__init__` method and use the `extract` function.
        ####################################################################

        # YOUR CODE FROM HERE


        ####################################################################
        return x_start

    def predict_prev_from_noise_ddim(self ,x_t, t, t_prev, noise, eta : float = 0.0):
        a_t = extract(self.alphas_cumprod, t, x_t.shape)
        a_prev = extract(self.alphas_cumprod, t_prev, x_t.shape)
        z = torch.randn_like(x_t) if (eta > 0) else torch.zeros_like(x_t) # random noise
        x_prev = None
        ####################################################################
        # Q6. TODO:
        # Compute x_{t-1} from (x_t, predicted noise) via DDIM update (recover x0, then apply DDIM formula).
        ####################################################################
        # YOUR CODE FROM HERE
        ####################################################################
        return x_prev

    def predict_noise_from_start(self, x_t, t, x_start):
        """Get noise from x_t and x_start according to Eq. (14) of the paper.
        Args:
            x_t: (b, *) tensor. Noisy image.
            t: (b,) tensor. Time step.
            x_start: (b, *) tensor. Starting image.
        Returns:
            pred_noise: (b, *) tensor. Predicted noise.
        """
        pred_noise = None
        ####################################################################
        # Q2.TODO:
        # Transform x_t and noise to get x_start according to Eq.(4) and Eq.(14).
        # Look at the coeffs in `__init__` method and use the `extract` function.
        ####################################################################

        # YOUR CODE FROM HERE
        

        ####################################################################
        return pred_noise

    def predict_prev_from_start_ddim(self, x_t, t, t_prev, x_start, eta : float = 0.0 ):
        a_t = extract(self.alphas_cumprod, t, x_t.shape)
        a_prev = extract(self.alphas_cumprod, t_prev, x_t.shape)
        z = torch.randn_like(x_t) if (eta > 0) else torch.zeros_like(x_t)
        x_prev = None
        ####################################################################
        # Q6.TODO: Compute x_{t-1} from (x_t, predicted x0) via DDIM update.
        ####################################################################
        # YOUR CODE FROM HERE
        ####################################################################

        return x_prev


    def q_posterior(self, x_start, x_t, t):
        """Get the posterior q(x_{t-1} | x_t, x_0) according to Eq. (6) and (7) of the paper.
        Args:
            x_start: (b, *) tensor. Predicted start image.
            x_t: (b, *) tensor. Noisy image.
            t: (b,) tensor. Time step.
        Returns:
            posterior_mean: (b, *) tensor. Mean of the posterior.
            posterior_std: (b, *) tensor. Std of the posterior.
        """
        posterior_mean = None
        posterior_std = None
        ####################################################################
        # We have already implemented this method for you.
        c1 = extract(self.posterior_mean_coef1, t, x_t.shape)
        c2 = extract(self.posterior_mean_coef2, t, x_t.shape)
        posterior_mean = c1 * x_start + c2 * x_t
        posterior_std = extract(self.posterior_std, t, x_t.shape)
        ####################################################################
        return posterior_mean, posterior_std


    @torch.no_grad()
    def p_sample(self, x_t, t: int, model_kwargs={}):
        """Sample from p(x_{t-1} | x_t) according to Eq. (6) of the paper. Used only during inference.
        Args:
            x_t: (b, *) tensor. Noisy image.
            t: int. Sampling time step.
            model_kwargs: additional arguments for the model.
        Returns:
            x_tm1: (b, *) tensor. Sampled image.
        """
        t = torch.full((x_t.shape[0],), t, device=x_t.device, dtype=torch.long)  # (b,)
        x_tm1 = None  # sample x_{t-1} from p(x_{t-1} | x_t)

        ##################################################################
        # Q5. TODO: Implement the sampling step p(x_{t-1} | x_t) according to Eq. (6):
        #
        # - Steps:
        #   1. Get the model prediction by calling self.model with appropriate args.
        #   2. The model output can be either noise or x_start depending on self.objective.
        #      You can recover the other by calling self.predict_start_from_noise or
        #      self.predict_noise_from_start as needed.
        #   3. Clamp predicted x_start to the valid range [-1, 1]. This ensures the
        #      generation remains stable during denoising iterations.
        #   4. Get the mean and std for q(x_{t-1} | x_t, x_0) using self.q_posterior,
        #      and sample x_{t-1}.
        ##################################################################

         # YOUR CODE FROM HERE

        # Call model to predict x_start (or noise)

        if self.objective == "pred_noise":
            pass
            # Model predicted noise, not x_start; we need to convert

        # Get the posterior mean and standard deviation, sample backwards

        ##################################################################
        return x_tm1

    @torch.no_grad()
    def p_sample_ddim(self, x_t, t:int, t_prev:int, eta:float=0.0, model_kwargs={}):
        t = torch.full((x_t.shape[0],), t, device=x_t.device, dtype=torch.long)
        t_prev = torch.full((x_t.shape[0],), t_prev, device=x_t.device, dtype=torch.long)
        x_tm1 = None # sample x_{t-1} from p(x_{t-1} | x_t)
        ####################################################################
        # Q6. TODO:
        # # Predict noise or x0 from x_t, then compute x_{t-1} using DDIM.
        ####################################################################

        # YOUR CODE FROM HERE

        ####################################################################

        return x_tm1


    @torch.no_grad()
    def sample(self, batch_size=16, return_all_timesteps=False, model_kwargs={}):

        shape = (batch_size, self.channels, self.image_size, self.image_size)
        img = torch.randn(shape, device=self.betas.device)
        imgs = [img]

        for t in tqdm(
            reversed(range(0, self.num_timesteps)),
            desc="sampling ddpm loop time step",
            total=self.num_timesteps,
        ):
            img = self.p_sample(img, t, model_kwargs=model_kwargs)
            imgs.append(img)

        res = img if not return_all_timesteps else torch.stack(imgs, dim=1)
        res = self.unnormalize(res)
        return res

    @torch.no_grad()
    def sample_ddim(self, batch_size=16, eta:float=0.0, return_all_timesteps=False, steps=None, model_kwargs={}):

        if steps == None:
            steps=self.num_timesteps

        assert 1 <= steps <= self.num_timesteps, "steps must be in [1, num_timesteps]"

        shape = (batch_size, self.channels, self.image_size, self.image_size)
        img = torch.randn(shape, device=self.betas.device)

        t_seq = torch.linspace(self.num_timesteps - 1, 0, steps+1, device=img.device)
        t_seq = torch.round(t_seq).long().tolist()

        imgs = [img]
        for i in tqdm(
            range(len(t_seq) - 1),
            desc="sampling ddim loop time step",
            total=steps
        ):
            t = int(t_seq[i])
            t_prev = int(t_seq[i+1])
            img = self.p_sample_ddim(img, t, t_prev, eta=eta, model_kwargs=model_kwargs)
            imgs.append(img)

        res = img if not return_all_timesteps else torch.stack(imgs,dim=1)
        res = self.unnormalize(res)

        return res

    def q_sample(self, x_start, t, noise):
        """Sample from q(x_t | x_0) according to Eq. (4) of the paper.

        Args:
            x_start: (b, *) tensor. Starting image.
            t: (b,) tensor. Time step.
            noise: (b, *) tensor. Noise from N(0, 1).
        Returns:
            x_t: (b, *) tensor. Noisy image.
        """

        x_t = None
        ####################################################################
        # Q1. TODO:
        # Implement sampling from q(x_t | x_0) according to Eq. (4) of the paper.
        # Hints: (1) Look at the `__init__` method to see precomputed coefficients.
        # (2) Use the `extract` function defined above to extract the coefficients
        # for the given time step `t`. (3) Recall that sampling from N(mu, sigma^2)
        # can be done as: x_t = mu + sigma * noise where noise is sampled from N(0, 1).
        # Approximately 3 lines of code.
        ###################################################################
        alpha_sqrt = extract(self.sqrt_alphas_cumprod, t, x_start.shape)
        sigma = extract(self.sqrt_one_minus_alphas_cumprod, t, x_start.shape)
        x_t = alpha_sqrt

        

        ####################################################################
        return x_t

    def p_losses(self, x_start, model_kwargs={}):
        b, nts = x_start.shape[0], self.num_timesteps
        t = torch.randint(0, nts, (b,), device=x_start.device).long()  # (b,)
        x_start = self.normalize(x_start)  # (b, *)
        noise = torch.randn_like(x_start)  # (b, *)
        target = noise if self.objective == "pred_noise" else x_start  # (b, *)
        loss_weight = extract(self.loss_weight, t, target.shape)  # (b, *)
        loss = None

        ####################################################################
        # Q4. TODO:
        # Implement the loss function according to Eq. (14) of the paper.
        # First, sample x_t from q(x_t | x_0) using the `q_sample` function.
        # Then, get model predictions by calling self.model with appropriate args.
        # Finally, compute the weighted MSE loss.
        # Approximately 3-4 lines of code.
        ####################################################################

        # YOUR CODE FROM HERE

        ####################################################################

        return loss

In [None]:
def extract(a, t, x_shape):
    """
    Extracts the appropriate coefficient values based on the given timesteps.

    This function gathers the values from the coefficient tensor `a` according to
    the given timesteps `t` and reshapes them to match the required shape such that
    it supports broadcasting with the tensor of given shape `x_shape`.

    Args:
        a (torch.Tensor): A tensor of shape (T,), containing coefficient values for all timesteps.
        t (torch.Tensor): A tensor of shape (b,), representing the timesteps for each sample in the batch.
        x_shape (tuple): The shape of the input image tensor, usually (b, c, h, w).

    Returns:
        torch.Tensor: A tensor of shape (b, 1, 1, 1), containing the extracted coefficient values
                      from a for corresponding timestep of each batch element, reshaped accordingly.
    """
    b, *_ = t.shape  # Extract batch size from the timestep tensor
    out = a.gather(-1, t)  # Gather the coefficient values from `a` based on `t`
    out = out.reshape(
        b, *((1,) * (len(x_shape) - 1))
    )  # Reshape to (b, 1, 1, 1) for broadcasting
    return out


def linear_beta_schedule(timesteps):
    """
    linear schedule, proposed in original ddpm paper
    """
    scale = 1000 / timesteps
    beta_start = scale * 0.0001
    beta_end = scale * 0.02
    return torch.linspace(beta_start, beta_end, timesteps, dtype=torch.float64)


def cosine_beta_schedule(timesteps, s=0.008):
    """
    cosine schedule
    as proposed in https://openreview.net/forum?id=-NEXDKk8gZ
    """
    steps = timesteps + 1
    t = torch.linspace(0, timesteps, steps, dtype=torch.float64) / timesteps
    alphas_cumprod = torch.cos((t + s) / (1 + s) * math.pi * 0.5) ** 2
    alphas_cumprod = alphas_cumprod / alphas_cumprod[0]
    betas = 1 - (alphas_cumprod[1:] / alphas_cumprod[:-1])
    return torch.clip(betas, 0, 0.999)


def sigmoid_beta_schedule(timesteps, start=-3, end=3, tau=1, clamp_min=1e-5):
    """
    sigmoid schedule
    proposed in https://arxiv.org/abs/2212.11972 - Figure 8
    better for images > 64x64, when used during training
    """
    steps = timesteps + 1
    t = torch.linspace(0, timesteps, steps, dtype=torch.float64) / timesteps
    v_start = torch.tensor(start / tau).sigmoid()
    v_end = torch.tensor(end / tau).sigmoid()
    alphas_cumprod = (-((t * (end - start) + start) / tau).sigmoid() + v_end) / (
        v_end - v_start
    )
    alphas_cumprod = alphas_cumprod / alphas_cumprod[0]
    betas = 1 - (alphas_cumprod[1:] / alphas_cumprod[:-1])
    return torch.clip(betas, 0, 0.999)


def get_beta_schedule(beta_schedule, timesteps):
    if beta_schedule == "linear":
        beta_schedule_fn = linear_beta_schedule
    elif beta_schedule == "cosine":
        beta_schedule_fn = cosine_beta_schedule
    elif beta_schedule == "sigmoid":
        beta_schedule_fn = sigmoid_beta_schedule
    else:
        raise ValueError(f"unknown beta schedule {beta_schedule}")

    betas = beta_schedule_fn(timesteps)
    return betas

## **Q1. q_sample**

Now we will define the forward noising process. Consult the original DDPM paper[1] for the equations. Implement `q_sample` method and test it below. You should see **zero** relative error.

$$
q(x_t \mid x_0) = \mathcal{N} \left( x_t \; ; \; \sqrt{\bar{\alpha}_t} \, x_0, \; (1 - \bar{\alpha}_t) \, \mathbf{I} \right)
$$



In [None]:
# Test GaussianDiffusion.q_sample method
sz = 2
b = 3 # batch size

diffusion = GaussianDiffusion(
      model=None,
      image_size=sz,
      timesteps=1000,
      beta_schedule="sigmoid",
)

t = torch.tensor([0, 300, 999]).long()
x_start = torch.linspace(-0.9, 0.6, b*3*sz*sz).view(b, 3, sz, sz)
noise = torch.linspace(-0.7, 0.8, b*3*sz*sz).view(b, 3, sz, sz)
x_t = diffusion.q_sample(x_start, t, noise)

expected_x_t = np.array([
    [
        [[-0.9119949, -0.86840147], [-0.8248081, -0.7812148]],
        [[-0.7376214, -0.694028], [-0.65043473, -0.6068413]],
        [[-0.563248, -0.51965463], [-0.47606122, -0.43246788]],
    ],
    [
        [[-0.42800453, -0.37039882], [-0.31279305, -0.2551873]],
        [[-0.19758154, -0.1399758], [-0.08237009, -0.024764337]],
        [[0.032841414, 0.090447165], [0.14805292, 0.20565866]],
    ],
    [
        [[0.32864183, 0.37152246], [0.41440308, 0.45728368]],
        [[0.50016433, 0.5430449], [0.5859255, 0.6288062]],
        [[0.67168677, 0.7145674], [0.757448, 0.8003287]],
    ],
]).astype(np.float32)

# Should see zero relative error
error = rel_error(x_t.numpy(), expected_x_t)
print("x_t error: ", rel_error(x_t.numpy(), expected_x_t))

if error == 0.0 : print("Passed! You did it")
else : print("Failed! Try again")

In [None]:
# Let's visualize the noisy images at various timesteps.
diffusion = GaussianDiffusion(
      model=None,
      image_size=image_size,
      timesteps=1000,
)

B = 10
img = dataset[770][0]  # 3 x H x W
x_start = img[None].repeat(B, 1, 1, 1)  # B x 3 x H x W
noise = torch.randn_like(x_start)  # B x 3 x H x W
t = torch.linspace(0, 1000-1, B).long()

x_start = diffusion.normalize(x_start)
x_t = diffusion.q_sample(x_start, t, noise)
x_t = diffusion.unnormalize(x_t).clamp(0, 1)
grid_img = tv_utils.make_grid(x_t, nrow=5, padding=2)
grid_img = grid_img.permute(1, 2, 0).cpu().numpy()
fig, ax = plt.subplots(figsize=(10, 10))
ax.imshow(grid_img)
ax.axis("off")
plt.show()

# **Q2. Predict Noise/Clean Image**
A diffusion model can be trained to predict either the clean image or the noise, as one can be derived from the other.

Forward noising process can be written as:

$$
x_t = \sqrt{\bar{\alpha}_t} \; x_0 + \sigma_t \; \epsilon, \quad \epsilon \sim \mathcal{N}(0, \mathbf{I})
$$

Implement `predict_start_from_noise` and `predict_noise_from_start` methods and test them below. You should see relative error less than **1e-5**.

In [None]:
# Test `predict_noise_from_start` and `predict_start_from_noise`
sz = 2
b = 3

diffusion = GaussianDiffusion(
      model=None,
      image_size=sz,
      timesteps=1000,
      beta_schedule="sigmoid",
)

t = torch.tensor([1, 300, 998]).long()
x_start = torch.linspace(-0.91, 0.67, b*3*sz*sz).view(b, 3, sz, sz)
noise = torch.linspace(-0.73, 0.81, b*3*sz*sz).view(b, 3, sz, sz)
x_t = diffusion.q_sample(x_start, t, noise)

pred_noise = diffusion.predict_noise_from_start(x_t, t, x_start)
pred_x_start = diffusion.predict_start_from_noise(x_t, t, noise)

# Should relative errors around 1e-5 or less
noise_error = rel_error(pred_noise.numpy(), noise.numpy())
print("noise error: ", noise_error)

if noise_error < 1e-5 : print("Passed! You did it")
else : print("Failed! Try again")

x_start_error = rel_error(pred_x_start.numpy(), x_start.numpy())
print("x_start error: ", x_start_error )

if x_start_error < 1e-5 : print("Passed! You did it")
else : print("Failed! Try again")

# **Q3. UNet for Denoising**

So far, we have focused on the **`forward process`**, which gradually adds noise to the clean image.
Now, we turn to the **`reverse process`**—progressively removing noise to recover the original image.
We will use a **UNet** model for denoising the input image during this reverse diffusion process.

**What is UNet?**  
UNet is a neural network architecture originally designed for image-to-image tasks such as segmentation, style transfer, and inpainting.  
It consists of:  
- **Encoder (Downsampling path):** progressively reduces the spatial resolution while increasing the number of feature channels to extract high-level representations.  
- **Decoder (Upsampling path):** progressively restores the spatial resolution, mirroring the encoder’s structure.  
- **Skip connections:** directly connect encoder and decoder layers at the same scale, allowing the decoder to recover fine-grained details without relying solely on the bottleneck features.

**Why UNet here?**  

Here's 4th question!
> 💡 **Question:** Why do you think UNet is a good choice for this denoising task?

**Answer** : WRITE YOUR OWN ANSWER HERE

In a later lecture, we will explore **DiT** (Diffusion Transformer), which replaces UNet with a pure transformer-based architecture for image generation.


# **Q4. p_losses**

Now that we have model implementation done, let's write the DDPM's denoising training step. As mentioned before, optimizing the denoising loss is equivalent to minimizing the expected negative log likelihood of the dataset. Complete the `GaussianDiffusion.p_losses` method and test it below. You should see relative error less than 1e-6 .

$$
\mathcal{L} = \mathbb{E}_{t, \mathbf{x}_0, \boldsymbol{\epsilon}}
\left[ \left\| \boldsymbol{\epsilon} - \boldsymbol{\epsilon}_\theta \left( \mathbf{x}_t, t \right) \right\|_2^2 \right]
$$


In [None]:
from ai_expert.unet import Unet, ResnetBlock, Downsample, Upsample

np.random.seed(231)
torch.manual_seed(231)

dim = 4
condition_dim = 4
dim_mults = (2, 4)
unet = Unet(dim=dim, condition_dim=condition_dim, dim_mults=dim_mults)

h = w = 4
b = 3
diffusion = GaussianDiffusion(
      model=unet,
      image_size=h,
      timesteps=1000,
      beta_schedule="sigmoid",
      objective="pred_x_start",
)

inp_x = torch.randn(b, 3, h, w)
inp_model_kwargs = {"text_emb": torch.randn(b, condition_dim)}
out = diffusion.p_losses(inp_x, inp_model_kwargs)
expected_out = 30.0732689

forward_error = rel_error(out.item(), expected_out)
print("forward error: ", forward_error)

if forward_error < 1e-6 : print("Passed! You did it")
else : print("Failed! Try again")

## **Q5. p_sample**

There is one final ingredient remaining now. DDPM generates samples by iteratively performing the reverse process. Each iteration of this reverse process involves sampling from $p(x_{t-1}|x_t)$. Implement `GaussianDiffusion.p_sample` method by following Equation (6) from the paper. This equation describes sampling from the posterior of the forward process, conditioned on $x_t$ and $x_0$, where $x_0$ can be derived from the denoising model's output. We have already implemented `sample` method that iteratively calls `p_sample` to generate images from input texts.

Test your implementation of `p_sample` below, you should see relative errors less than 1e-6.

$$
p_\theta(x_{t-1} \mid x_t) =
\mathcal{N}\left(
x_{t-1} \; ; \;
\mu_\theta(x_t, t), \; \sigma_t^2 \mathbf{I}
\right)
$$

> 🔎 **Reference:** Consult the original DDPM paper[1] around **Equation (6)** (posterior mean/variance)
> and **Equation (11)** (reparameterization using the model prediction).

In [None]:
np.random.seed(231)
torch.manual_seed(231)

dim = 4
condition_dim = 4
dim_mults = (2,)
unet = Unet(dim=dim, condition_dim=condition_dim, dim_mults=dim_mults)

h = w = 4
b = 1
inp_x_t = torch.randn(b, 3, h, w)
inp_model_kwargs = {"text_emb": torch.randn(b, condition_dim)}
t = 231

# test 1
diffusion = GaussianDiffusion(
      model=unet,
      image_size=h,
      timesteps=1000,
      beta_schedule="sigmoid",
      objective="pred_x_start",
)
out = diffusion.p_sample(inp_x_t, t, inp_model_kwargs).detach().numpy()
expected_out = np.array(
    [[[[ 1.1339471 ,  0.12097352, -0.7175048 ,  1.3196243 ],
         [-0.27657282,  0.4899886 ,  1.0170169 , -0.8242867 ],
         [-0.18946372,  0.9899801 ,  0.01498353,  0.39722288],
         [-0.97995025, -0.5947938 , -0.07796463, -0.07311387]],

        [[ 0.0739838 , -1.5537696 ,  0.43128064, -0.7395982 ],
         [-1.0517508 , -1.7030833 ,  0.79073197, -1.217138  ],
         [-0.5314434 ,  0.9862699 ,  0.6568664 , -0.4559122 ],
         [-0.17322278,  0.51251256, -0.75741345, -0.3967054 ]],

        [[ 0.8546979 ,  1.6186953 ,  1.9930652 ,  0.57347   ],
         [ 0.20219846,  0.5374655 , -0.81597316,  1.9089762 ],
         [ 0.7327057 ,  1.19275   ,  1.8593936 , -1.4582647 ],
         [ 0.68447256, -0.9056745 ,  0.7863245 ,  0.14455058]]]])
forward_error = rel_error(out, expected_out)
print("forward error: ", forward_error)
if forward_error < 1e-6 : print("Passed! You did it")
else : print("Failed! Try again")

# test 2
diffusion = GaussianDiffusion(
      model=unet,
      image_size=h,
      timesteps=1000,
      beta_schedule="cosine",
      objective="pred_noise",
)
out = diffusion.p_sample(inp_x_t, t, inp_model_kwargs).detach().numpy()
expected_out = np.array(
    [[[[ 1.1036711 ,  0.08143333, -0.6856102 ,  1.3826138 ],
         [-0.25455472,  0.514572  ,  1.104592  , -0.75972646],
         [-0.22729763,  0.9837706 ,  0.05891411,  0.52049375],
         [-1.0331786 , -0.5416254 , -0.01623197, -0.04838388]],

        [[ 0.08324978, -1.545468  ,  0.41357145, -0.63511896],
         [-1.1362139 , -1.7128816 ,  0.8694859 , -1.2297069 ],
         [-0.49168122,  1.0043695 ,  0.6759953 , -0.5297671 ],
         [-0.10931232,  0.52347076, -0.80946106, -0.5015002 ]],

        [[ 0.7437265 ,  1.590004  ,  1.9481117 ,  0.5656144 ],
         [ 0.22895451,  0.5289113 , -0.8511001 ,  1.8864397 ],
         [ 0.72863096,  1.2271638 ,  1.892699  , -1.5199479 ],
         [ 0.64346373, -0.86913294,  0.7869012 ,  0.12637165]]]])

forward_error = rel_error(out, expected_out)
print("forward error: ", forward_error)
if forward_error < 1e-6 : print("Passed! You did it")
else : print("Failed! Try again")

# **Training**

We have all ingredients needed for DDPM training and we can train the model on our CelebADialog dataset. You don't have to code anything here but we encourage you to look at the training code at `ai_expert/ddpm_trainer.py` in [ this link](https://github.com/rlawldud53/AIExpert_Samsung/tree/main/ai_expert).

For the rest of the notebook, we will use a pretrained model which is already trained for many iterations on this dataset. However, you are free to train your own model on colab GPU (make sure to change the `results_folder`). Note that it may take more than 12 hours on T4 GPU before you start seeing a reasonable generation.

In [None]:
from ai_expert.ddpm_trainer import Trainer

dim = 48
image_size = 128
results_folder = FOLDER_PATH / "pretrained_model"
condition_dim = 512

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

model = Unet(
    dim=dim,
    dim_mults=(1, 2, 4, 8),
    condition_dim=condition_dim,
)
print("Number of parameters:", sum(p.numel() for p in model.parameters()))

diffusion = GaussianDiffusion(
    model,
    image_size=image_size,
    timesteps=100,  # number of diffusion steps
    objective="pred_noise",  # "pred_x_start" or "pred_noise"
)

dataset = CelebADialogDataset(  image_size=image_size,
                                    img_dir = "./data/celeba_dialog/images",
                                    ann_jsonl="./data/celeba_dialog/celeba_dialog.jsonl",
                                    clip_embedder=clip_embedder)

trainer = Trainer(
    diffusion,
    dataset,
    device,
    train_batch_size=256,
    weight_decay=0.0,
    train_lr=1e-3,
    train_num_steps=50000,
    results_folder=results_folder,
)

# trainer.train() # If you want to train your own model from scratch, uncomment this

In [None]:
trainer.load(7000)

# **Q6. DDIM Sampling**  

In this section, you will implement **DDIM sampling**, an alternative to DDPM sampling that allows for **fewer denoising steps** while maintaining high sample quality.  

Unlike DDPM, where we sample from the stochastic posterior $p(x_{t-1} \mid x_t)$ with variance from the forward process, DDIM introduces a deterministic path when $\eta = 0$, and controlled stochasticity when $\eta > 0$.


Your task is to implement the **`TODO`** parts in the `GaussianDiffusion`:  

- **`predict_prev_from_noise_ddim`**:  
Computes $x_{t-1}$ given $x_t$, timestep $t$, noise prediction, and $\eta$, following the DDIM update rule.  


- **`predict_prev_from_start_ddim`**:  
Computes $x_{t-1}$ given $x_t$, timestep $t$, predicted $x_0$, and $\eta$.  


- **`p_sample_ddim`**:  
  Calls the model to obtain predictions, then chooses the correct function above depending on whether the model predicts **noise** or **\(x_0\)**.

> 💡 **Hint:**  
> - When $\eta = 0$, DDIM becomes deterministic (no additional random noise is added).  
> - When $\eta > 0$, noise is reintroduced proportionally to $\eta$.  
> - You can reuse your implementations of `predict_start_from_noise` and `predict_noise_from_start` here.  


**Expected Outcome:**  
When implemented correctly, DDIM sampling should generate coherent images **with fewer steps** compared to DDPM.  

**DDIM Update Equation:**  

> 🔎 **Reference:** Consult the original DDIM paper [[2]](https://arxiv.org/abs/2010.02502) around **Equation (12)** and **Equation (16)**

In [None]:
def get_text_emb(text):
    return trainer.ds._encode(text)

# Helper function to visualize generations.
def show_images(img):
    # img: B x T x 3 x H x W
    plt.figure(figsize=(10, 10))
    img2 = img.clamp(0, 1).permute(0, 3, 1, 4, 2).flatten(0, 1).flatten(1, 2).cpu().numpy()
    plt.imshow(img2)
    plt.axis('off')

    plt.show()

In [None]:
text = "This old lady has extremely frowning face with yellow hair. There are eyeglasses on her face"  # edit here freely!
text_emb = get_text_emb(text)
text_emb = text_emb[None].expand(5, -1).to(device)

torch.manual_seed(0)


### DDPM sampling
DDPM progressively denoises random Gaussian noise through all timesteps until a clean image emerges.  

In [None]:
# Sample
# `return_all_timesteps=True` returns the intermediate results at each timestep,
ddpm_imgs = trainer.diffusion_model.sample(
    batch_size=5,
    model_kwargs={"text_emb": text_emb},
    return_all_timesteps=True
)

In [None]:
show_images(ddpm_imgs[:, ::20])

### DDIM Sampling
Unlike DDPM, DDIM can use fewer denoising steps (`steps=10`) while maintaining quality, thanks to its non-Markovian update rule.

In [None]:
# Here, `eta=0.0` makes the process **deterministic** (no additional noise is added).
# `steps=10` means we skip timesteps to perform sampling in only 10 reverse steps
# instead of the full training `timesteps` (e.g., 1000), making generation faster but potentially less accurate.
ddim_imgs = trainer.diffusion_model.sample_ddim(
    batch_size=5,
    steps=100, # you can edit here for faster sampling
    eta=0.0,
    model_kwargs={"text_emb": text_emb},
    return_all_timesteps=True
)

In [None]:
show_images(ddim_imgs[:, ::5])