# DDPM (Denoising Diffusion Probabilistic Models) Demo

This notebook demonstrates the complete workflow of training and sampling from a DDPM model.

## Overview
- Load and preprocess data
- Initialize DDPM model and U-Net
- Train the model
- Generate samples
- Visualize results

In [None]:
import os
import sys
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import numpy as np
from tqdm import tqdm
import yaml
from PIL import Image

# Add src to path
sys.path.append('../src')

from model.unet import UNet
from model.diffusion import DDPM
from data.dataset import get_dataset
from training.trainer import DDPMTrainer
from utils.scheduler import NoiseScheduler
from utils.visualization import plot_samples, plot_loss_curve, plot_noise_schedule
from utils.helpers import set_seed, save_checkpoint, load_checkpoint

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

## 1. Load Configuration

In [None]:
# Load configuration
with open('../config/config.yaml', 'r') as f:
    config = yaml.safe_load(f)

print("Configuration:")
for key, value in config.items():
    print(f"{key}: {value}")

# Set random seed for reproducibility
set_seed(config['training']['seed'])

## 2. Load Dataset

In [None]:
# Load dataset
train_dataset = get_dataset(
    dataset_name=config['data']['dataset'],
    image_size=config['data']['image_size'],
    data_dir=config['data']['data_dir'],
    train=True
)

train_loader = DataLoader(
    train_dataset,
    batch_size=config['training']['batch_size'],
    shuffle=True,
    num_workers=config['data']['num_workers']
)

print(f"Dataset: {config['data']['dataset']}")
print(f"Number of training samples: {len(train_dataset)}")
print(f"Image size: {config['data']['image_size']}")
print(f"Batch size: {config['training']['batch_size']}")

## 3. Visualize Sample Data

In [None]:
# Visualize some sample images
sample_batch = next(iter(train_loader))
images = sample_batch[0] if isinstance(sample_batch, tuple) else sample_batch

fig, axes = plt.subplots(2, 4, figsize=(12, 6))
axes = axes.flatten()

for i in range(8):
    img = images[i].permute(1, 2, 0).cpu().numpy()
    img = (img + 1) / 2  # Denormalize from [-1, 1] to [0, 1]
    img = np.clip(img, 0, 1)
    axes[i].imshow(img)
    axes[i].axis('off')

plt.suptitle('Sample Training Images')
plt.tight_layout()
plt.show()

## 4. Initialize Models

In [None]:
# Initialize U-Net model
model = UNet(
    in_channels=config['model']['in_channels'],
    out_channels=config['model']['out_channels'],
    base_channels=config['model']['base_channels'],
    channel_mults=config['model']['channel_mults'],
    num_res_blocks=config['model']['num_res_blocks'],
    time_emb_dim=config['model']['time_emb_dim'],
    dropout=config['model']['dropout']
).to(device)

# Initialize noise scheduler
noise_scheduler = NoiseScheduler(
    num_timesteps=config['diffusion']['num_timesteps'],
    beta_start=config['diffusion']['beta_start'],
    beta_end=config['diffusion']['beta_end'],
    schedule=config['diffusion']['schedule']
)

# Initialize DDPM
ddpm = DDPM(
    model=model,
    noise_scheduler=noise_scheduler,
    device=device
)

print(f"Model parameters: {sum(p.numel() for p in model.parameters()):,}")
print(f"Timesteps: {config['diffusion']['num_timesteps']}")

## 5. Visualize Noise Schedule

In [None]:
# Plot noise schedule
plot_noise_schedule(noise_scheduler)
plt.show()

## 6. Demonstrate Forward Process

In [None]:
# Demonstrate forward diffusion process
sample_image = images[0].unsqueeze(0).to(device)  # Take first image
timesteps_to_show = [0, 50, 100, 200, 400, 800]

fig, axes = plt.subplots(1, len(timesteps_to_show), figsize=(15, 3))

for i, t in enumerate(timesteps_to_show):
    if t == 0:
        noisy_image = sample_image
    else:
        t_tensor = torch.tensor([t], device=device)
        noise = torch.randn_like(sample_image)
        noisy_image = ddpm.q_sample(sample_image, t_tensor, noise)
    
    img = noisy_image[0].permute(1, 2, 0).cpu().numpy()
    img = (img + 1) / 2  # Denormalize
    img = np.clip(img, 0, 1)
    
    axes[i].imshow(img)
    axes[i].set_title(f't={t}')
    axes[i].axis('off')

plt.suptitle('Forward Diffusion Process')
plt.tight_layout()
plt.show()

## 7. Training

In [None]:
# Initialize trainer
trainer = DDPMTrainer(
    ddpm=ddpm,
    train_loader=train_loader,
    config=config,
    device=device
)

# Train the model
print("Starting training...")
losses = trainer.train()

# Plot training loss
plot_loss_curve(losses)
plt.show()

## 8. Generate Samples

In [None]:
# Generate samples
print("Generating samples...")
ddpm.model.eval()

with torch.no_grad():
    # Generate a batch of samples
    samples = ddpm.sample(
        batch_size=16,
        shape=(config['model']['in_channels'], 
               config['data']['image_size'], 
               config['data']['image_size'])
    )

# Visualize generated samples
plot_samples(samples, nrow=4, title="Generated Samples")
plt.show()

## 9. Visualize Sampling Process

In [None]:
# Visualize the reverse sampling process
print("Visualizing sampling process...")

ddpm.model.eval()
with torch.no_grad():
    # Start with pure noise
    shape = (1, config['model']['in_channels'], 
             config['data']['image_size'], 
             config['data']['image_size'])
    
    x = torch.randn(shape, device=device)
    
    # Sample with intermediate steps
    timesteps_to_show = [999, 800, 600, 400, 200, 100, 50, 0]
    intermediate_samples = []
    
    for i in tqdm(reversed(range(ddpm.noise_scheduler.num_timesteps)), desc="Sampling"):
        t = torch.full((1,), i, device=device, dtype=torch.long)
        x = ddpm.p_sample(x, t)
        
        if i in timesteps_to_show:
            intermediate_samples.append(x.clone())
    
    # Plot intermediate samples
    fig, axes = plt.subplots(1, len(timesteps_to_show), figsize=(16, 2))
    
    for i, (sample, t) in enumerate(zip(intermediate_samples, timesteps_to_show)):
        img = sample[0].permute(1, 2, 0).cpu().numpy()
        img = (img + 1) / 2  # Denormalize
        img = np.clip(img, 0, 1)
        
        axes[i].imshow(img)
        axes[i].set_title(f't={t}')
        axes[i].axis('off')
    
    plt.suptitle('Reverse Sampling Process')
    plt.tight_layout()
    plt.show()

## 10. Compare Real vs Generated

In [None]:
# Compare real and generated images
fig, axes = plt.subplots(2, 8, figsize=(16, 4))

# Real images (top row)
for i in range(8):
    img = images[i].permute(1, 2, 0).cpu().numpy()
    img = (img + 1) / 2
    img = np.clip(img, 0, 1)
    axes[0, i].imshow(img)
    axes[0, i].axis('off')
    if i == 0:
        axes[0, i].set_ylabel('Real', rotation=0, ha='right', va='center')

# Generated images (bottom row)
for i in range(8):
    img = samples[i].permute(1, 2, 0).cpu().numpy()
    img = (img + 1) / 2
    img = np.clip(img, 0, 1)
    axes[1, i].imshow(img)
    axes[1, i].axis('off')
    if i == 0:
        axes[1, i].set_ylabel('Generated', rotation=0, ha='right', va='center')

plt.suptitle('Real vs Generated Images')
plt.tight_layout()
plt.show()

## 11. Save Model and Results

In [None]:
# Save model checkpoint
checkpoint_path = '../checkpoints/ddpm_demo.pth'
os.makedirs('../checkpoints', exist_ok=True)

save_checkpoint({
    'model_state_dict': ddpm.model.state_dict(),
    'config': config,
    'losses': losses
}, checkpoint_path)

print(f"Model saved to {checkpoint_path}")

# Save sample images
os.makedirs('../results', exist_ok=True)
torchvision.utils.save_image(
    samples, 
    '../results/generated_samples.png', 
    nrow=4, 
    normalize=True, 
    value_range=(-1, 1)
)

print("Generated samples saved to ../results/generated_samples.png")

## 12. Model Analysis

In [None]:
# Analyze model performance
print("\nModel Analysis:")
print(f"Final training loss: {losses[-1]:.4f}")
print(f"Total parameters: {sum(p.numel() for p in ddpm.model.parameters()):,}")
print(f"Trainable parameters: {sum(p.numel() for p in ddpm.model.parameters() if p.requires_grad):,}")

# Memory usage
if torch.cuda.is_available():
    print(f"GPU memory allocated: {torch.cuda.memory_allocated()/1024**3:.2f} GB")
    print(f"GPU memory cached: {torch.cuda.memory_reserved()/1024**3:.2f} GB")

# Training statistics
print(f"\nTraining Statistics:")
print(f"Number of epochs: {config['training']['epochs']}")
print(f"Batch size: {config['training']['batch_size']}")
print(f"Learning rate: {config['training']['learning_rate']}")
print(f"Total training steps: {len(losses)}")

## Summary

This notebook demonstrated:
1. Loading and preprocessing data
2. Initializing DDPM model components
3. Training the denoising model
4. Generating new samples
5. Visualizing the diffusion process
6. Comparing real vs generated images

The DDPM model learns to reverse the noise process by predicting the noise added at each timestep, enabling generation of high-quality samples from pure noise.