# Install and Import Libraries
Code to install the required libraries (diffusers, torch, torchvision, matplotlib) and import them.

In [None]:
# Install required libraries
!pip install diffusers torch torchvision matplotlib

# Import necessary libraries
import torch
from torchvision import transforms
from torchvision.datasets import ImageFolder
from diffusers import DDPMScheduler, UNet2DModel
import matplotlib.pyplot as plt
import os
import numpy as np
from PIL import Image
import tempfile

# Prepare Dataset
Define transformations and create a dataset using ImageFolder with the specified dataroot.

In [None]:
# Define data transformations
transform=transforms.Compose([
    transforms.Resize(64),
    transforms.CenterCrop(64),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])

# Set the dataset root directory
dataroot = "/kaggle/input/wiki-dataset/wiki"

# Create the dataset
dataset = ImageFolder(root=dataroot, transform=transform)

# Check the number of samples in the dataset
print(f"Number of images in the dataset: {len(dataset)}")

# Setup Data Loader
Initialize the DataLoader with parameters like batch_size and shuffle for batching the dataset.

In [None]:
from torch.utils.data import DataLoader

# Initialize DataLoader
batch_size = 16  # Define batch size
shuffle = True   # Shuffle the dataset for training

# Create DataLoader for batching
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)

# Check the number of batches
num_batches = len(dataloader)
print(f"Number of batches: {num_batches}")

# Define Diffusion Model and Trainer
Instantiate the UNet2DModel, set up the noise scheduler, and define the optimizer and initial training configuration.

In [None]:
# Define the diffusion model
model = UNet2DModel(
    sample_size=64,  # Image size
    in_channels=3,   # Number of input channels (RGB)
    out_channels=3,  # Number of output channels (RGB)
    layers_per_block=2,
    block_out_channels=(128, 256, 512, 512),
    down_block_types=(
        "DownBlock2D", "DownBlock2D", "DownBlock2D", "AttnDownBlock2D"
    ),
    up_block_types=(
        "AttnUpBlock2D", "UpBlock2D", "UpBlock2D", "UpBlock2D"
    )
)

# Define the noise scheduler
noise_scheduler = DDPMScheduler(num_train_timesteps=1000)

# Define optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)

# Move model to device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Training parameters
num_epochs = 15  # Number of epochs

# Training Loop
Implement the loop to iterate over data batches, add noise, predict noise, and update model weights. Note: This section appears twice in the notebook, but only one implementation is required.

In [None]:
for epoch in range(num_epochs):
    print(f"Epoch {epoch + 1}/{num_epochs}")
    for step, (images, _) in enumerate(dataloader):
        # Move images to the device
        images = images.to(device)

        # Sample noise
        noise = torch.randn_like(images).to(device)

        # Sample random timesteps
        timesteps = torch.randint(0, noise_scheduler.num_train_timesteps, (images.shape[0],), device=device).long()

        # Add noise to the images
        noisy_images = noise_scheduler.add_noise(images, noise, timesteps)

        # Predict the noise
        noise_pred = model(noisy_images, timesteps).sample

        # Compute loss (mean squared error)
        loss = torch.nn.functional.mse_loss(noise_pred, noise)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Print loss every 100 steps
        if step % 100 == 0:
            print(f"Step {step}/{len(dataloader)}, Loss: {loss.item()}")

# Generate and Display Images
After training, set the model to evaluation mode, generate images using the reverse diffusion process, and display them using matplotlib.

In [None]:
# Generate and display images after training using the correct de-noising loop
model.eval()  # Set the model to evaluation mode

with torch.no_grad():
    num_images = 16  # Total images to generate
    rows, cols = 4, 4  # 4x4 grid
    fig, axes = plt.subplots(rows, cols, figsize=(cols * 3, rows * 3))
    axes = axes.flatten()  # Flatten the array to iterate easily
    
    for i in range(num_images):
        # Start from random noise with the same size as training images (64x64 with 3 channels)
        noisy_image = torch.randn(1, 3, 64, 64).to(device)
        
        # Reverse diffusion process
        for t in reversed(range(noise_scheduler.num_train_timesteps)):
            # Get noise prediction from the model
            noise_pred = model(noisy_image, t).sample  
            # Perform a de-noising step using the predicted noise
            step_output = noise_scheduler.step(noise_pred, t, noisy_image)
            noisy_image = step_output.prev_sample
        
        # Denormalize and prepare image for display
        generated_image = (noisy_image.squeeze().cpu().numpy().transpose(1, 2, 0) * 0.5 + 0.5).clip(0, 1)
        axes[i].imshow(generated_image)
        axes[i].axis("off")
    
    plt.tight_layout()
    plt.show()

## FID

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models
from torchvision.transforms import Compose, Resize, CenterCrop, ToTensor, Normalize
from torch.utils.data import DataLoader, Dataset
import numpy as np
import os
from PIL import Image
import scipy.linalg

class InceptionV3Feature(nn.Module):
    """
    Extract features from the InceptionV3 model for FID calculation.
    """
    def __init__(self):
        super(InceptionV3Feature, self).__init__()
        inception = models.inception_v3(weights=models.Inception_V3_Weights.DEFAULT)
        self.features = nn.Sequential(
            inception.Conv2d_1a_3x3,
            inception.Conv2d_2a_3x3,
            inception.Conv2d_2b_3x3,
            nn.MaxPool2d(kernel_size=3, stride=2),
            inception.Conv2d_3b_1x1,
            inception.Conv2d_4a_3x3,
            nn.MaxPool2d(kernel_size=3, stride=2),
            inception.Mixed_5b,
            inception.Mixed_5c,
            inception.Mixed_5d,
            inception.Mixed_6a,
            inception.Mixed_6b,
            inception.Mixed_6c,
            inception.Mixed_6d,
            inception.Mixed_6e,
            inception.Mixed_7a,
            inception.Mixed_7b,
            inception.Mixed_7c,
            inception.avgpool  # This gives [B, 2048, 1, 1]
        )
        self.eval()
        for p in self.parameters():
            p.requires_grad = False

    def forward(self, x):
        if x.shape[2] != 299 or x.shape[3] != 299:
            x = F.interpolate(x, size=(299, 299), mode='bilinear', align_corners=False)
        x = self.features(x)
        x = x.view(x.size(0), -1)  # Flatten from [B, 2048, 1, 1] to [B, 2048]
        return x

class ImageDataset(Dataset):
    def __init__(self, path, transform=None):
        self.path = path
        self.image_files = []

        # Recursively gather image files from subdirectories
        for root, _, files in os.walk(self.path):
            for f in files:
                if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.webp')):
                    self.image_files.append(os.path.join(root, f))

        if len(self.image_files) == 0:
            raise ValueError(f"No images found in directory: {self.path}")

        self.transform = transform if transform is not None else Compose([
            Resize(64),
            CenterCrop(64),
            ToTensor(),
            Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
        ])

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_path = self.image_files[idx]
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image

def extract_features(model, dataloader, device):
    """
    Extract features from all images in the dataloader.
    """
    features_list = []
    
    model.eval()
    with torch.no_grad():
        for batch in dataloader:
            batch = batch.to(device)
            features = model(batch)
            features_list.append(features.cpu().numpy())
    
    return np.concatenate(features_list, axis=0)

def calculate_statistics(features):
    """
    Calculate mean and covariance statistics of features.
    """
    mu = np.mean(features, axis=0)
    sigma = np.cov(features, rowvar=False)
    return mu, sigma

def calculate_frechet_distance(mu1, sigma1, mu2, sigma2, eps=1e-6):
    """
    Calculate Fr√©chet distance between two multivariate Gaussians.
    """
    # Calculate squared difference between means
    diff = mu1 - mu2
    
    # Product of covariances sqrt
    # Handle numerical instability by adding small constant to diagonal
    covmean, _ = scipy.linalg.sqrtm(sigma1.dot(sigma2), disp=False)
    if not np.isfinite(covmean).all():
        print("WARNING: FID calculation produces singular product; adding jitter to diagonal")
        offset = np.eye(sigma1.shape[0]) * eps
        covmean = scipy.linalg.sqrtm((sigma1 + offset).dot(sigma2 + offset))
    
    # Numerical precision issues can cause small imaginary parts
    if np.iscomplexobj(covmean):
        if not np.allclose(np.diagonal(covmean).imag, 0, atol=1e-3):
            m = np.max(np.abs(covmean.imag))
            raise ValueError(f"Imaginary component {m}")
        covmean = covmean.real
    
    tr_covmean = np.trace(covmean)
    
    return (diff.dot(diff) + np.trace(sigma1) + np.trace(sigma2) - 2 * tr_covmean)

def calculate_fid(real_image_dir, generated_images, batch_size=16, device='cuda'):
    feature_extractor = InceptionV3Feature().to(device)

    # Prepare real image dataset
    real_dataset = ImageDataset(real_image_dir)
    real_dataloader = DataLoader(real_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

    # Save generated images to temporary directory
    with tempfile.TemporaryDirectory() as tmp_dir:
        for i, img in enumerate(generated_images):
            img_pil = Image.fromarray((img * 255).astype(np.uint8))
            img_pil.save(os.path.join(tmp_dir, f"generated_{i:04d}.png"))

        gen_dataset = ImageDataset(tmp_dir)
        gen_dataloader = DataLoader(gen_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

        # Extract features
        print("Extracting features from real images...")
        real_features = extract_features(feature_extractor, real_dataloader, device)

        print("Extracting features from generated images...")
        gen_features = extract_features(feature_extractor, gen_dataloader, device)

        # Calculate statistics
        print("Calculating statistics...")
        mu_real, sigma_real = calculate_statistics(real_features)
        mu_gen, sigma_gen = calculate_statistics(gen_features)

        # Calculate FID
        print("Calculating FID...")
        fid_score = calculate_frechet_distance(mu_real, sigma_real, mu_gen, sigma_gen)

    return fid_score

def generate_images_for_fid(model, noise_scheduler, device, num_images=100):
    model.eval()
    generated_images = []

    print(f"Generating {num_images} images for FID evaluation...")
    with torch.no_grad():
        for i in range(num_images):
            noisy_image = torch.randn(1, 3, 64, 64).to(device)  # Start from random noise
            for t in reversed(range(noise_scheduler.num_train_timesteps)):
                noise_pred = model(noisy_image, torch.tensor([t], device=device).long()).sample
                step_output = noise_scheduler.step(noise_pred, t, noisy_image)
                noisy_image = step_output.prev_sample

            # Denormalize and convert to numpy
            generated_image = (noisy_image.squeeze().cpu().numpy().transpose(1, 2, 0) * 0.5 + 0.5).clip(0, 1)
            generated_images.append(generated_image)

            if (i + 1) % 10 == 0:
                print(f"Generated {i + 1}/{num_images} images")

    return generated_images

# Example usage
def evaluate_diffusion_model(model, noise_scheduler, real_images_path, device, num_images=50):
    """
    Evaluate diffusion model using FID metric.
    """
    # Generate images
    generated_images = generate_images_for_fid(model, noise_scheduler, device, num_images)
    
    # Calculate FID
    fid_score = calculate_fid(real_images_path, generated_images, batch_size=16, device=device)
    
    print(f"Final FID Score: {fid_score:.4f}")
    return fid_score

In [None]:
# Evaluate the model using FID
real_images_path = dataroot  # Path to real images
evaluate_diffusion_model(model, noise_scheduler, real_images_path, device, num_images=50)