In [None]:
# Import Breakdown
# PyTorch Libraries:

# torch - Core PyTorch library for tensor operations and neural network training
# torch.nn - Neural network modules and building blocks

# Numerical & Utilities:

# numpy - Numerical computing library for array operations
# tqdm - Progress bar utility for tracking iterations during training/inference loops

# Dataset:

# keras.datasets.mnist.load_data - Loads the MNIST dataset (handwritten digits 0-9, 28x28 grayscale images)

# Model Architecture:

# unet.UNet - Imports a U-Net model (a custom implementation from a local file)

# U-Net is a convolutional neural network architecture commonly used in image segmentation and diffusion models
# Features encoder-decoder structure with skip connections
# Well-suited for image-to-image tasks like denoising



# Visualization:

# matplotlib.pyplot - Plotting library for visualizing images, training curves, or generated samples

# Typical Use Case
# This setup is commonly used for:

# Training a diffusion model to generate MNIST digits
# Denoising images using a U-Net architecture
# Experimenting with generative models on simple image data
import torch
import torch.nn as nn
import numpy as np
from tqdm import tqdm
from keras.datasets.mnist import load_data

from unet import UNet
import matplotlib.pyplot as plt

In [None]:
# Load the MNIST dataset
# trainX: training images (60,000 samples, 28x28 pixels)
# trainy: training labels (digits 0-9)
# testX: test images (10,000 samples, 28x28 pixels)
# testy: test labels (digits 0-9)
(trainX, trainy), (testX, testy) = load_data()

# Normalize pixel values from [0, 255] to [0, 1] range
# Convert to float32 for efficient computation with neural networks
# Division by 255 scales the pixel intensities to the range [0, 1]
trainX = np.float32(trainX) / 255.
testX = np.float32(testX) / 255.

def sample_batch(batch_size, device):
    """
    Creates a random batch of training images for model training.
    
    Args:
        batch_size: Number of images to sample from the training set
        device: PyTorch device (cpu or cuda) where tensors will be stored
    
    Returns:
        Batch of images as a tensor with shape (batch_size, 1, 32, 32)
    """
    
    # Generate random indices to select images from the training set
    # torch.randperm creates a random permutation of numbers from 0 to trainX.shape[0]-1
    # [:batch_size] takes only the first 'batch_size' random indices
    indices = torch.randperm(trainX.shape[0])[:batch_size]
    
    # Extract the selected images and convert to PyTorch tensor
    # .unsqueeze(1) adds a channel dimension: (batch_size, 28, 28) -> (batch_size, 1, 28, 28)
    # This is needed because PyTorch expects images in format: (batch, channels, height, width)
    # .to(device) moves the tensor to the specified device (GPU or CPU)
    data = torch.from_numpy(trainX[indices]).unsqueeze(1).to(device)
    
    # Resize images from 28x28 to 32x32 pixels using bilinear interpolation
    # This is common in diffusion models as 32x32 works better with certain architectures
    # interpolate() performs the upsampling operation
    return torch.nn.functional.interpolate(data, 32)

In [None]:
class DiffusionModel():
    """
    Implementation of Denoising Diffusion Probabilistic Models (DDPM).
    This class handles both the forward diffusion process (adding noise) 
    and reverse process (denoising/generation).
    """
    
    def __init__(self, T : int, model : nn.Module, device : str):
        """
        Initialize the diffusion model.
        
        Args:
            T: Number of diffusion timesteps (e.g., 1000)
            model: Neural network (U-Net) that predicts noise at each timestep
            device: 'cuda' for GPU or 'cpu' for CPU computation
        """
        
        # Total number of diffusion steps
        self.T = T
        
        # The neural network (U-Net) that learns to predict noise
        # This is the core model that will be trained
        self.function_approximator = model.to(device)
        self.device = device
        
        # Beta schedule: controls how much noise is added at each timestep
        # Starts small (1e-4) and increases to 0.02 over T steps
        # Small values at start = gradual noise addition
        self.beta = torch.linspace(1e-4, 0.02, T).to(device)
        
        # Alpha = 1 - beta
        # Represents how much of the original signal is retained
        self.alpha = 1. - self.beta
        
        # Alpha_bar: cumulative product of alphas
        # Represents total signal retention from step 0 to step t
        # This allows us to jump directly to any timestep without iterating
        self.alpha_bar = torch.cumprod(self.alpha, dim=0)
    
    def training(self, batch_size, optimizer):
        """
        Training step following Algorithm 1 from DDPM paper.
        Trains the model to predict the noise that was added to images.
        
        Args:
            batch_size: Number of images to train on in this step
            optimizer: PyTorch optimizer (e.g., Adam) for updating model weights
            
        Returns:
            loss: The mean squared error between predicted and actual noise
        """
        
        # Step 1: Get a batch of clean images (x0) from the dataset
        x0 = sample_batch(batch_size, self.device)
        
        # Step 2: Randomly sample a timestep t for each image in the batch
        # Each image will have noise added corresponding to a random timestep
        # Range: 1 to T (inclusive)
        t = torch.randint(1, self.T + 1, (batch_size,), device=self.device, dtype=torch.long)
        
        # Step 3: Sample random noise (epsilon) with same shape as x0
        # This is the "true" noise that will be added to the images
        eps = torch.randn_like(x0)
        
        # Step 4: Get alpha_bar for the sampled timesteps
        # unsqueeze operations reshape it to broadcast correctly with image dimensions
        # Shape: (batch_size, 1, 1, 1) to match (batch_size, channels, height, width)
        alpha_bar_t = self.alpha_bar[t-1].unsqueeze(-1).unsqueeze(-1).unsqueeze(-1)
        
        # Step 5: Create noisy images using the closed-form forward diffusion equation
        # x_t = sqrt(alpha_bar_t) * x0 + sqrt(1 - alpha_bar_t) * epsilon
        # This adds the appropriate amount of noise for timestep t
        # The model receives both the noisy image and the timestep t
        eps_predicted = self.function_approximator(torch.sqrt(
            alpha_bar_t) * x0 + torch.sqrt(1 - alpha_bar_t) * eps, t-1)
        
        # Step 6: Calculate loss - how well did the model predict the noise?
        # MSE loss between actual noise (eps) and predicted noise (eps_predicted)
        loss = nn.functional.mse_loss(eps, eps_predicted)
        
        # Step 7: Backpropagation - standard PyTorch training steps
        optimizer.zero_grad()  # Clear previous gradients
        loss.backward()         # Compute gradients
        optimizer.step()        # Update model weights
        
        return loss.item()  # Return loss value for monitoring
    
    @torch.no_grad()  # Disable gradient calculation for efficiency during sampling
    def sampling(self, n_samples=1, image_channels=1, img_size=(32, 32), use_tqdm=True):
        """
        Generate new images by iteratively denoising random noise.
        Implements Algorithm 2 from DDPM paper (reverse diffusion process).
        
        Args:
            n_samples: Number of images to generate
            image_channels: Number of color channels (1 for grayscale, 3 for RGB)
            img_size: Tuple of (height, width) for generated images
            use_tqdm: Whether to show a progress bar
            
        Returns:
            Generated images as tensor of shape (n_samples, channels, height, width)
        """
        
        # Step 1: Start from pure random noise (x_T)
        # This is the completely noisy starting point
        x = torch.randn((n_samples, image_channels, img_size[0], img_size[1]), 
                         device=self.device)
        
        # Optional progress bar for visualization
        progress_bar = tqdm if use_tqdm else lambda x : x
        
        # Step 2: Iteratively denoise from timestep T down to 1
        # Going backwards: T, T-1, T-2, ..., 2, 1
        for t in progress_bar(range(self.T, 0, -1)):
            
            # Sample random noise z for stochastic sampling
            # At the last step (t=1), use zeros instead (deterministic final step)
            z = torch.randn_like(x) if t > 1 else torch.zeros_like(x)
            
            # Create a tensor of the current timestep for all samples in batch
            # Shape: (n_samples,) all filled with current timestep value
            t = torch.ones(n_samples, dtype=torch.long, device=self.device) * t 
            
            # Get diffusion parameters for current timestep t
            # Reshape to (n_samples, 1, 1, 1) for broadcasting
            beta_t = self.beta[t-1].unsqueeze(-1).unsqueeze(-1).unsqueeze(-1)
            alpha_t = self.alpha[t-1].unsqueeze(-1).unsqueeze(-1).unsqueeze(-1)
            alpha_bar_t = self.alpha_bar[t-1].unsqueeze(-1).unsqueeze(-1).unsqueeze(-1)
            
            # Step 3: Calculate the mean of the denoising distribution
            # This is the reverse diffusion equation from the DDPM paper
            # Uses the model's noise prediction to estimate the less noisy image
            mean = 1 / torch.sqrt(alpha_t) * (x - ((1 - alpha_t) / torch.sqrt(
                1 - alpha_bar_t)) * self.function_approximator(x, t-1))
            
            # Step 4: Calculate the standard deviation for adding stochasticity
            sigma = torch.sqrt(beta_t)
            
            # Step 5: Sample from the denoising distribution
            # x_{t-1} = mean + sigma * z
            # This gives us a slightly less noisy image
            x = mean + sigma * z
    
        # After all iterations, x should be a clean generated image
        return x
            

In [None]:
# Specify the device for computation
# 'cuda' = use GPU for faster training (requires NVIDIA GPU with CUDA support)
# Alternative: 'cpu' for CPU computation (slower but works on any machine)
device = 'cuda'

# Batch size: number of images processed simultaneously in each training step
# Larger batch sizes:
#   - Faster training (better GPU utilization)
#   - More memory required
#   - More stable gradients
# 64 is a common choice balancing speed and memory usage
batch_size = 64

# Initialize the U-Net neural network
# This is the core architecture that will learn to predict noise
# U-Net features:
#   - Encoder-decoder structure with skip connections
#   - Good at preserving spatial information
#   - Widely used in image generation and segmentation tasks
model = UNet()

# Initialize the Adam optimizer for training the model
# Adam is an adaptive learning rate optimizer (popular for deep learning)
# Parameters:
#   - model.parameters(): all trainable weights in the U-Net
#   - lr=2e-5: learning rate (0.00002)
#     * Small learning rate for stable training
#     * Prevents overshooting optimal weights
#     * Common for diffusion models
optimizer = torch.optim.Adam(model.parameters(), lr=2e-5)

# Create the diffusion model instance
# This wraps the U-Net and handles the diffusion process
# Parameters:
#   - 1000: Number of diffusion timesteps (T)
#     * More steps = smoother noise addition/removal
#     * Standard choice in DDPM paper
#     * Trade-off between quality and computation time
#   - model: The U-Net that predicts noise
#   - device: Where computations happen (GPU in this case)
diffusion_model = DiffusionModel(1000, model, device)

In [None]:
# List to store training loss values for monitoring progress
# Each epoch's loss will be appended here for visualization
training_loss = []

# Training loop: iterate for 40,000 epochs (training iterations)
# tqdm creates a progress bar showing training progress
# 40_000 uses underscore for readability (same as 40000)
for epoch in tqdm(range(40_000)):
    
    # Perform one training step
    # - Samples a batch of images
    # - Adds noise at random timesteps
    # - Trains model to predict the noise
    # - Returns the loss value (how well model predicted noise)
    loss = diffusion_model.training(batch_size, optimizer)
    
    # Store the loss for this epoch
    # Allows us to track if model is improving over time
    training_loss.append(loss)
    
    # Every 100 epochs, save loss plots to visualize training progress
    if epoch % 100 == 0:
        
        # Plot 1: Complete training history
        # Shows loss from epoch 0 to current epoch
        # Useful to see overall trend and convergence
        plt.plot(training_loss)
        plt.savefig('training_loss.png')
        plt.close()  # Close figure to free memory
        
        # Plot 2: Recent training history (last 1000 epochs)
        # Zoomed-in view of recent performance
        # Useful to see if loss is still decreasing or has plateaued
        plt.plot(training_loss[-1000:])  # [-1000:] takes last 1000 elements
        plt.savefig('training_loss_cropped.png')
        plt.close()
    
    # Every 5000 epochs, generate sample images and save model checkpoint
    if epoch % 5000 == 0:
        
        # Generate 81 images (9x9 grid)
        # This helps visually assess model quality during training
        nb_images = 81
        
        # Use the model to generate new images from random noise
        # use_tqdm=False: don't show progress bar for sampling (cleaner output)
        samples = diffusion_model.sampling(n_samples=nb_images, use_tqdm=False)
        
        # Create a large figure to display all generated images
        # figsize=(17, 17): 17 inches x 17 inches for clear visualization
        plt.figure(figsize=(17, 17))
        
        # Loop through all generated images and arrange in 9x9 grid
        for i in range(nb_images):
            plt.subplot(9, 9, 1 + i)  # Create subplot in 9x9 grid, position 1+i
            plt.axis('off')  # Hide axis labels and ticks for cleaner image
            
            # Display the image:
            # - .squeeze(0): Remove channel dimension (1, 32, 32) -> (32, 32)
            # - .clip(0, 1): Clamp pixel values to valid range [0, 1]
            # - .data: Access underlying tensor data
            # - .cpu(): Move tensor from GPU to CPU (required for numpy)
            # - .numpy(): Convert PyTorch tensor to numpy array (for matplotlib)
            # - cmap='gray': Display as grayscale image
            plt.imshow(samples[i].squeeze(0).clip(0, 1).data.cpu().numpy(), cmap='gray')
        
        # Save the grid of generated images with epoch number in filename
        # Allows tracking of generation quality improvement over training
        plt.savefig(f'samples_epoch_{epoch}.png')
        plt.close()
        
        # Save model checkpoint
        # model.cpu(): Move model to CPU before saving (reduces file size)
        # Checkpoint naming includes epoch for version tracking
        torch.save(model.cpu(), f'model_paper2_epoch_{epoch}')
        
        # Move model back to GPU for continued training
        model.cuda()

100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 40000/40000 [2:42:05<00:00,  4.11it/s]    


In [None]:
# Define the number of images to generate
# 81 images = 9 rows Ã— 9 columns (perfect square for grid display)
nb_images = 81

# Generate new images using the trained diffusion model
# This is the inference/sampling process:
# - Starts from pure random noise
# - Iteratively denoises through all 1000 timesteps
# - Returns clean generated images
# use_tqdm=False: Disables progress bar during sampling for cleaner output
samples = diffusion_model.sampling(n_samples=nb_images, use_tqdm=False)

# Create a large matplotlib figure to display all images
# figsize=(17, 17): Creates a 17Ã—17 inch figure
# Large size ensures each generated image is clearly visible
plt.figure(figsize=(17, 17))

# Loop through all generated images and arrange them in a grid
for i in range(nb_images):
    
    # Create a subplot in a 9Ã—9 grid layout
    # plt.subplot(rows, cols, position)
    # - 9, 9: Grid dimensions (9 rows, 9 columns)
    # - 1 + i: Position in grid (1-indexed, so starts at 1 not 0)
    plt.subplot(9, 9, 1 + i)
    
    # Remove axis labels, ticks, and frame for cleaner image display
    # Makes the grid look like a collection of images without distractions
    plt.axis('off')
    
    # Display the generated image with proper preprocessing:
    # samples[i]: Get the i-th generated image (shape: [1, 32, 32])
    # .squeeze(0): Remove channel dimension [1, 32, 32] â†’ [32, 32]
    #              (matplotlib expects 2D array for grayscale)
    # .clip(0, 1): Clamp pixel values to valid range [0.0, 1.0]
    #              (neural networks sometimes output values outside this range)
    # .data: Access the underlying tensor data
    # .cpu(): Move tensor from GPU memory to CPU memory
    #         (matplotlib requires CPU arrays, not GPU tensors)
    # .numpy(): Convert PyTorch tensor to NumPy array
    #           (matplotlib works with NumPy arrays)
    # cmap='gray': Use grayscale colormap for displaying single-channel images
    plt.imshow(samples[i].squeeze(0).clip(0, 1).data.cpu().numpy(), cmap='gray')

# Save the complete 9Ã—9 grid as a single image file
# f-string allows inserting the current epoch number in filename
# Example: 'samples_epoch_40000.png' if epoch is 40000
# This creates a visual record of generation quality at this training stage
plt.savefig(f'samples_epoch_{epoch}.png')

# Close the figure to free up memory
# Important in loops to prevent memory buildup from multiple figures
plt.close()

# Save the trained model as a checkpoint file
# model.cpu(): Move model from GPU to CPU before saving
#              - Reduces file size
#              - Makes checkpoint compatible with systems without GPU
# torch.save(): PyTorch's function to serialize and save model
# f'model_paper2_epoch_{epoch}': Filename includes epoch for version tracking
# Note: This saves the entire model object (architecture + weights)
torch.save(model.cpu(), f'model_paper2_epoch_{epoch}')

In [None]:
# How It Works (Big Picture)
# Forward Process (Training)

# Take clean MNIST image
# Add noise according to random timestep t
# Train U-Net to predict that noise
# Repeat 40,000 times

# Reverse Process (Generation)

# Start with pure random noise
# Use trained U-Net to predict noise
# Remove predicted noise
# Repeat 1000 times
# Result: Clean generated digit image


# Key Concepts
# Concept DescriptionDiffusion 
# Steps (T=1000)Number of gradual noise addition/removal steps
# U-Net Neural network that learns to predict noise
# Beta ScheduleControls how much noise is added at each step
# Training Teach model to recognize noise patterns
# Sampling Generate new images by denoising random noise
# Checkpoints Save model at intervals for backup/evaluation

# Expected Results

# Training Loss: Should decrease over time (model improving)
# Generated Images: Should look like handwritten digits (0-9)
# Training Time: Several hours on GPU for 40,000 epochs
# Output Files: Loss plots, sample grids, model checkpoints

# This is a complete implementation of the DDPM paper for generating MNIST-style digit images! ðŸŽ¨ðŸ”¢