In [1]:
#!/usr/bin/env python
"""
Project Title: Benchmarking Generative Models for Image Synthesis and Enhancement in Computer Vision

===============================================================================
1. Project Type
===============================================================================
This project reproduces baseline implementations of three generative models –
Generative Adversarial Networks (GANs), Variational Autoencoders (VAEs), and
Diffusion Models – using custom code. The goal is to train and test these models
on publicly available datasets (e.g., CIFAR-10) and benchmark them with standard
metrics such as FID and Inception Score.

===============================================================================
2. Project Introduction
===============================================================================
In this project we implement three types of generative models from scratch:
• A GAN (DCGAN architecture) for adversarial image synthesis.
• A VAE for probabilistic latent space encoding.
• A Diffusion Model (with an advanced approach including attention mechanisms)
  for iterative image denoising.
We will train these models on the CIFAR-10 dataset (as chosen by the user) and
compare their performance both quantitatively (using FID/IS) and qualitatively.
Refer to [1] for related state-of-the-art references.

===============================================================================
3. Project Motivation
===============================================================================
Benchmarking multiple generative models on CIFAR-10 will provide a deep
understanding of the computational trade-offs and architectural differences
between GANs, VAEs, and Diffusion Models. In addition, incorporating attention
mechanisms in the diffusion model offers insights into improving image quality
and diversity.

===============================================================================
4. Project Plan
===============================================================================
4.1. Baseline Approach:
     - Implement baseline GAN (DCGAN), VAE, and a simple Diffusion Model.
     - Train using CIFAR-10 images (upscaled to 64x64 if necessary).

4.2. Advanced Approach:
     - Enhance the diffusion model with integrated self-attention layers.

4.3. Validation Plan:
     - Evaluate generated images using FID and Inception Score.
     - Perform qualitative visual assessments on saved image outputs.

4.4. Computational Resources:
     - Use Python with PyTorch, running on GPU if available.

4.5. Libraries/Tools:
     - Python, PyTorch, torchvision, matplotlib, OpenCV, scikit-learn.

4.6. Estimated Baseline Runtime:
     - Expect around 1-2 hours per baseline model for training; evaluation may
       take a full day depending on cross-validation and experimental runs.

===============================================================================
5. Workload
===============================================================================
The project involves extensive implementation from scratch, hyperparameter
tuning, literature review, and experiments comparing different models.

===============================================================================
6. Ideal Result & Insights
===============================================================================
The ultimate outcome is a comprehensive benchmarking report and a novel diffusion
model variant leveraging attention that demonstrates improved performance.
Insights will cover computational trade-offs and model-specific strengths.

===============================================================================
7. Potential Risks
===============================================================================
Potential challenges include model instability, insufficient GPU resources,
and difficulty matching the performance of highly optimized existing codebases.

===============================================================================
8. Duplication Statement
===============================================================================
This project is an original effort with no duplication of prior research work.

===============================================================================
References:
[1] Maxime Oquab, Timothée Darcet, Théo Moutakanni, et al. Dinov2: Learning robust visual features without supervision. arXiv preprint arXiv:2304.07193, 2023.
===============================================================================
"""
!pip install pytorch-gan-metrics
# ===========================
# Section 1: Configuration and Setup
# ===========================
import os
import random
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, utils as vutils
import matplotlib.pyplot as plt

# Set random seed for reproducibility.
seed = 42
random.seed(seed)
torch.manual_seed(seed)

# Hyperparameters and configuration.
DATA_PATH     = './data'
BATCH_SIZE    = 128
IMAGE_SIZE    = 64            # Upscale CIFAR-10 from 32x32 to 64x64 for compatibility.
NOISE_DIM     = 100           # Dimension of noise vector for GAN.
NUM_EPOCHS    = 50            # Number of training epochs (modify as needed).
LR            = 0.0002
BETA1         = 0.5
CHANNELS_IMG  = 3             # CIFAR-10 images are RGB.
OUTPUT_DIR    = './output'
os.makedirs(OUTPUT_DIR, exist_ok=True)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ===========================
# Section 2: Data Loading & Preprocessing (CIFAR-10)
# ===========================
def get_cifar10_dataloader(data_path=DATA_PATH, batch_size=BATCH_SIZE, image_size=IMAGE_SIZE):
    transform = transforms.Compose([
        transforms.Resize(image_size),                              # Resize images to 64x64.
        transforms.ToTensor(),                                      # Convert to PyTorch tensors.
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))      # Scale to [-1, 1].
    ])
    dataset = datasets.CIFAR10(root=data_path, train=True, transform=transform, download=True)
    return DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=2)

# ===========================
# Section 3: Baseline Approaches
# ---------------------------
# 3.1. Baseline GAN (DCGAN)
# ===========================
class GANGenerator(nn.Module):
    def __init__(self, noise_dim=NOISE_DIM, feature_map_size=64, channels=CHANNELS_IMG):
        super(GANGenerator, self).__init__()
        self.model = nn.Sequential(
            # Input: latent vector Z reshaped as (noise_dim x 1 x 1)
            nn.ConvTranspose2d(noise_dim, feature_map_size * 8, kernel_size=4, stride=1, padding=0, bias=False),
            nn.BatchNorm2d(feature_map_size * 8),
            nn.ReLU(True),
            # State: (feature_map_size*8) x 4 x 4
            nn.ConvTranspose2d(feature_map_size * 8, feature_map_size * 4, kernel_size=4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(feature_map_size * 4),
            nn.ReLU(True),
            # State: (feature_map_size*4) x 8 x 8
            nn.ConvTranspose2d(feature_map_size * 4, feature_map_size * 2, kernel_size=4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(feature_map_size * 2),
            nn.ReLU(True),
            # State: (feature_map_size*2) x 16 x 16
            nn.ConvTranspose2d(feature_map_size * 2, feature_map_size, kernel_size=4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(feature_map_size),
            nn.ReLU(True),
            # State: (feature_map_size) x 32 x 32
            nn.ConvTranspose2d(feature_map_size, channels, kernel_size=4, stride=2, padding=1, bias=False),
            nn.Tanh()  # Output scaled to [-1, 1]
        )
    def forward(self, x):
        return self.model(x)

class GANDiscriminator(nn.Module):
    def __init__(self, feature_map_size=64, channels=CHANNELS_IMG):
        super(GANDiscriminator, self).__init__()
        self.model = nn.Sequential(
            # Input: (channels) x 64 x 64
            nn.Conv2d(channels, feature_map_size, kernel_size=4, stride=2, padding=1, bias=False),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(feature_map_size, feature_map_size * 2, kernel_size=4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(feature_map_size * 2),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(feature_map_size * 2, feature_map_size * 4, kernel_size=4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(feature_map_size * 4),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(feature_map_size * 4, feature_map_size * 8, kernel_size=4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(feature_map_size * 8),
            nn.LeakyReLU(0.2, inplace=True),
            # Final layer: Output a single probability value (real/fake)
            nn.Conv2d(feature_map_size * 8, 1, kernel_size=4, stride=1, padding=0, bias=False),
            nn.Sigmoid()
        )
    def forward(self, x):
        return self.model(x)

# ---------------------------
# 3.2. Baseline VAE
# ---------------------------
class VAE(nn.Module):
    def __init__(self, img_channels=CHANNELS_IMG, latent_dim=128, feature_dim=64):
        super(VAE, self).__init__()
        # Encoder: Convolutional layers to extract features.
        self.encoder = nn.Sequential(
            nn.Conv2d(img_channels, feature_dim, kernel_size=4, stride=2, padding=1),  # 64 -> 32
            nn.ReLU(),
            nn.Conv2d(feature_dim, feature_dim*2, kernel_size=4, stride=2, padding=1),   # 32 -> 16
            nn.BatchNorm2d(feature_dim*2),
            nn.ReLU(),
            nn.Conv2d(feature_dim*2, feature_dim*4, kernel_size=4, stride=2, padding=1),   # 16 -> 8
            nn.BatchNorm2d(feature_dim*4),
            nn.ReLU(),
            nn.Flatten()   # Flatten for the FC layers.
        )
        # Calculate flattened feature size assuming input image size 64x64.
        self.fc_mu    = nn.Linear(feature_dim*4*8*8, latent_dim)
        self.fc_logvar= nn.Linear(feature_dim*4*8*8, latent_dim)
        # Decoder: Map latent vector back to image space.
        self.decoder_input = nn.Linear(latent_dim, feature_dim*4*8*8)
        self.decoder = nn.Sequential(
            nn.Unflatten(1, (feature_dim*4, 8, 8)),
            nn.ConvTranspose2d(feature_dim*4, feature_dim*2, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(feature_dim*2),
            nn.ReLU(),
            nn.ConvTranspose2d(feature_dim*2, feature_dim, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(feature_dim),
            nn.ReLU(),
            nn.ConvTranspose2d(feature_dim, img_channels, kernel_size=4, stride=2, padding=1),
            nn.Tanh()
        )

    def encode(self, x):
        h = self.encoder(x)
        mu = self.fc_mu(h)
        logvar = self.fc_logvar(h)
        return mu, logvar

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def decode(self, z):
        h = self.decoder_input(z)
        return self.decoder(h)

    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        x_recon = self.decode(z)
        return x_recon, mu, logvar

# ---------------------------
# 3.3. Baseline Diffusion Model (Advanced Approach) with Integrated Attention
# ---------------------------
# Self-Attention Module:
class SelfAttention(nn.Module):
    def __init__(self, in_channels):
        super(SelfAttention, self).__init__()
        self.query = nn.Conv2d(in_channels, in_channels // 8, kernel_size=1)
        self.key   = nn.Conv2d(in_channels, in_channels // 8, kernel_size=1)
        self.value = nn.Conv2d(in_channels, in_channels, kernel_size=1)
        self.gamma = nn.Parameter(torch.zeros(1))
        self.softmax = nn.Softmax(dim=-1)
    def forward(self, x):
        b, c, w, h = x.size()
        proj_query = self.query(x).view(b, -1, w*h).permute(0, 2, 1)
        proj_key   = self.key(x).view(b, -1, w*h)
        energy = torch.bmm(proj_query, proj_key)
        attention = self.softmax(energy)
        proj_value = self.value(x).view(b, -1, w*h)
        out = torch.bmm(proj_value, attention.permute(0, 2, 1))
        out = out.view(b, c, w, h)
        out = self.gamma * out + x
        return out

# UNet Block with optional attention
class UNetBlock(nn.Module):
    def __init__(self, in_channels, out_channels, use_attention=False):
        super(UNetBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1)
        self.bn1   = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)
        self.bn2   = nn.BatchNorm2d(out_channels)
        self.relu  = nn.ReLU(inplace=True)
        self.use_attention = use_attention
        if use_attention:
            self.attn = SelfAttention(out_channels)
    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        if self.use_attention:
            x = self.attn(x)
        return x

# Simplified Diffusion Model (U-Net Style)
class DiffusionModel(nn.Module):
    def __init__(self, img_channels=CHANNELS_IMG, feature_maps=64):
        super(DiffusionModel, self).__init__()
        # Down-sampling path.
        self.down1 = UNetBlock(img_channels, feature_maps, use_attention=False)
        self.pool1 = nn.MaxPool2d(2)
        self.down2 = UNetBlock(feature_maps, feature_maps*2, use_attention=True)
        self.pool2 = nn.MaxPool2d(2)
        # Bottleneck with attention.
        self.bottleneck = UNetBlock(feature_maps*2, feature_maps*4, use_attention=True)
        # Up-sampling path with skip connections.
        self.up1 = nn.ConvTranspose2d(feature_maps*4, feature_maps*2, kernel_size=2, stride=2)
        self.up_block1 = UNetBlock(feature_maps*4, feature_maps*2, use_attention=True)
        self.up2 = nn.ConvTranspose2d(feature_maps*2, feature_maps, kernel_size=2, stride=2)
        self.up_block2 = UNetBlock(feature_maps*2, feature_maps, use_attention=False)
        self.final_conv = nn.Conv2d(feature_maps, img_channels, kernel_size=1)
    def forward(self, x, t=None):
        # 't' represents the timestep for the diffusion process (optional embedding).
        d1 = self.down1(x)
        p1 = self.pool1(d1)
        d2 = self.down2(p1)
        p2 = self.pool2(d2)
        bn = self.bottleneck(p2)
        u1 = self.up1(bn)
        # First skip connection.
        u1 = torch.cat([u1, d2], dim=1)
        u1 = self.up_block1(u1)
        u2 = self.up2(u1)
        # Second skip connection.
        u2 = torch.cat([u2, d1], dim=1)
        u2 = self.up_block2(u2)
        out = self.final_conv(u2)
        return out

# ===========================
# Section 4: Validation Plan (Evaluation and Metrics)
# ===========================
import os
import torch
from torchvision import utils as vutils

def evaluate_generated_images(generator, fixed_noise, output_dir=OUTPUT_DIR, epoch=0, stats_path='path/to/statistics.npz'):
    """
    Generate images using the fixed noise vector, save a composite grid,
    and compute Inception Score (IS) and Frechet Inception Distance (FID)
    using the pytorch-gan-metrics package.

    Parameters:
      generator: Trained generator model.
      fixed_noise: A fixed noise tensor (e.g., [N, noise_dim, 1, 1]).
      output_dir: Directory for saving images.
      epoch: Current epoch number (used in filenames).
      stats_path: Path to the NPZ file containing FID statistics for real images.
                  This file must be prepared using the package command line tool.

    Requirements:
      pip install pytorch-gan-metrics

    Note:
      The generator is assumed to produce images in the range [-1, 1]. They
      are converted to [0, 1] before metric calculation.
    """

    # Generate fake images.
    with torch.no_grad():
        fake_images = generator(fixed_noise).detach().cpu()

    # Save composite image grid (normalized for visualization).
    composite_filename = os.path.join(output_dir, f"gan_epoch_{epoch:03d}.png")
    vutils.save_image(fake_images, composite_filename, normalize=True)
    print(f"Saved composite generated images to {composite_filename}")

    # Convert images from [-1, 1] to [0, 1] for the metric calculations.
    fake_images_01 = (fake_images + 1) / 2.0

    # Import metric functions from pytorch-gan-metrics.
    try:
        from pytorch_gan_metrics import get_inception_score, get_fid
    except ImportError:
        print("Please install pytorch-gan-metrics via 'pip install pytorch-gan-metrics'")
        return

    # Compute Inception Score.
    try:
        IS, IS_std = get_inception_score(fake_images_01, batch_size=32, resize=True, splits=10)
        print(f"Inception Score: {IS:.2f} ± {IS_std:.2f}")
    except Exception as e:
        print(f"Error computing Inception Score: {e}")

    # Compute FID Score if the statistics file exists.
    if not os.path.exists(stats_path):
        print(f"Statistics file not found at {stats_path}. Cannot compute FID.")
    else:
        try:
            FID = get_fid(fake_images_01, stats_path)
            print(f"FID: {FID:.2f}")
        except Exception as e:
            print(f"Error computing FID: {e}")


# ===========================
# Section 5: Training Utilities and Main Execution
# ===========================
def train_gan(dataloader, num_epochs=NUM_EPOCHS):
    # Initialize baseline GAN models.
    netG = GANGenerator().to(device)
    netD = GANDiscriminator().to(device)
    criterion = nn.BCELoss()
    optimizerD = optim.Adam(netD.parameters(), lr=LR, betas=(BETA1, 0.999))
    optimizerG = optim.Adam(netG.parameters(), lr=LR, betas=(BETA1, 0.999))
    fixed_noise = torch.randn(64, NOISE_DIM, 1, 1, device=device)

    print("Starting GAN Training (Baseline Approach)...")
    for epoch in range(num_epochs):
        for i, (real_data, _) in enumerate(dataloader):
            # ------------------------------
            # (a) Update Discriminator:
            # Maximize log(D(x)) + log(1-D(G(z)))
            # ------------------------------
            netD.zero_grad()
            real_data = real_data.to(device)
            b_size = real_data.size(0)
            label_real = torch.full((b_size,), 1.0, device=device)
            output_real = netD(real_data).view(-1)
            lossD_real = criterion(output_real, label_real)
            lossD_real.backward()

            noise = torch.randn(b_size, NOISE_DIM, 1, 1, device=device)
            fake_data = netG(noise)
            label_fake = torch.full((b_size,), 0.0, device=device)
            output_fake = netD(fake_data.detach()).view(-1)
            lossD_fake = criterion(output_fake, label_fake)
            lossD_fake.backward()
            lossD = lossD_real + lossD_fake
            optimizerD.step()

            # ------------------------------
            # (b) Update Generator:
            # Maximize log(D(G(z))) by "tricking" the discriminator.
            # ------------------------------
            netG.zero_grad()
            label_gen = torch.full((b_size,), 1.0, device=device)
            output_gen = netD(fake_data).view(-1)
            lossG = criterion(output_gen, label_gen)
            lossG.backward()
            optimizerG.step()

            if i % 100 == 0:
                print(f"[Epoch {epoch+1}/{num_epochs}][Batch {i}/{len(dataloader)}]: Loss_D: {lossD.item():.4f}, Loss_G: {lossG.item():.4f}")

        # Evaluate and save generated images at the end of each epoch.
        evaluate_generated_images(netG, fixed_noise, epoch=epoch+1)
    print("GAN Training Complete.")

def train_vae(dataloader, num_epochs=NUM_EPOCHS):
    print("Starting VAE Training (Baseline Approach)...")
    vae = VAE().to(device)
    optimizerVAE = optim.Adam(vae.parameters(), lr=LR)
    reconstruction_loss_fn = nn.MSELoss(reduction='sum')

    for epoch in range(num_epochs):
        running_loss = 0.0
        for i, (data, _) in enumerate(dataloader):
            vae.zero_grad()
            data = data.to(device)
            x_recon, mu, logvar = vae(data)
            recon_loss = reconstruction_loss_fn(x_recon, data)
            # KL Divergence Loss.
            kl_loss = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
            loss = recon_loss + kl_loss
            loss.backward()
            optimizerVAE.step()
            running_loss += loss.item()
            if i % 100 == 0:
                avg_loss = loss.item() / data.size(0)
                print(f"[VAE][Epoch {epoch+1}/{num_epochs}][Batch {i}/{len(dataloader)}]: Loss per image: {avg_loss:.4f}")
        epoch_loss = running_loss / len(dataloader.dataset)
        print(f"[VAE] Epoch [{epoch+1}/{num_epochs}] Average Loss per Image: {epoch_loss:.4f}")

        # Evaluate: reconstruct a fixed batch and save.
        with torch.no_grad():
            sample_batch, _ = next(iter(dataloader))
            sample_batch = sample_batch.to(device)
            recon_images, _, _ = vae(sample_batch)
        vutils.save_image(recon_images.detach().cpu(), os.path.join(OUTPUT_DIR, f"vae_epoch_{epoch+1:03d}.png"), normalize=True)
    print("VAE Training Complete.")

def train_diffusion(dataloader, num_epochs=NUM_EPOCHS, noise_std=0.1):
    print("Starting Diffusion Model Training (Advanced Approach)...")
    diffusion = DiffusionModel().to(device)
    optimizerDiff = optim.Adam(diffusion.parameters(), lr=LR)
    mse_loss = nn.MSELoss()

    for epoch in range(num_epochs):
        running_loss = 0.0
        for i, (data, _) in enumerate(dataloader):
            diffusion.zero_grad()
            data = data.to(device)
            # Simulate noisy input.
            noise = torch.randn_like(data) * noise_std
            noisy_data = data + noise
            # For this simplified example, the model learns to recover the original image.
            pred = diffusion(noisy_data)
            loss = mse_loss(pred, data)
            loss.backward()
            optimizerDiff.step()
            running_loss += loss.item()
            if i % 100 == 0:
                print(f"[Diffusion][Epoch {epoch+1}/{num_epochs}][Batch {i}/{len(dataloader)}]: Loss: {loss.item():.4f}")
        epoch_loss = running_loss / len(dataloader)
        print(f"[Diffusion] Epoch [{epoch+1}/{num_epochs}] Average Loss: {epoch_loss:.4f}")
        # Evaluate: Save denoised images from the last batch.
        with torch.no_grad():
            denoised = diffusion(noisy_data).detach().cpu()
        vutils.save_image(denoised, os.path.join(OUTPUT_DIR, f"diffusion_epoch_{epoch+1:03d}.png"), normalize=True)
    print("Diffusion Model Training Complete.")


def main():
    # Load CIFAR-10 dataloader (Section 2).
    dataloader = get_cifar10_dataloader()

    # 4.1. Baseline Approach: Train the baseline GAN.
    train_gan(dataloader)

    train_vae(dataloader)

    train_diffusion(dataloader)

if __name__ == "__main__":
    main()

Collecting pytorch-gan-metrics
  Downloading pytorch_gan_metrics-0.5.4-py3-none-any.whl.metadata (7.6 kB)
Collecting pytorch-image-generation-metrics (from pytorch-gan-metrics)
  Downloading pytorch_image_generation_metrics-0.6.1-py3-none-any.whl.metadata (7.1 kB)
Downloading pytorch_gan_metrics-0.5.4-py3-none-any.whl (7.9 kB)
Downloading pytorch_image_generation_metrics-0.6.1-py3-none-any.whl (25 kB)
Installing collected packages: pytorch-image-generation-metrics, pytorch-gan-metrics
Successfully installed pytorch-gan-metrics-0.5.4 pytorch-image-generation-metrics-0.6.1


100%|██████████| 170M/170M [00:14<00:00, 12.1MB/s]


Starting GAN Training (Baseline Approach)...
[Epoch 1/50][Batch 0/391]: Loss_D: 1.4084, Loss_G: 3.2505
[Epoch 1/50][Batch 100/391]: Loss_D: 0.4596, Loss_G: 3.6500
[Epoch 1/50][Batch 200/391]: Loss_D: 0.2984, Loss_G: 2.9557
[Epoch 1/50][Batch 300/391]: Loss_D: 0.2308, Loss_G: 3.9310
Saved composite generated images to ./output/gan_epoch_001.png
Please install pytorch-gan-metrics via 'pip install pytorch-gan-metrics'
[Epoch 2/50][Batch 0/391]: Loss_D: 0.4260, Loss_G: 5.7747
[Epoch 2/50][Batch 100/391]: Loss_D: 1.2222, Loss_G: 3.2112
[Epoch 2/50][Batch 200/391]: Loss_D: 0.6578, Loss_G: 1.8010
[Epoch 2/50][Batch 300/391]: Loss_D: 0.2014, Loss_G: 2.4670
Saved composite generated images to ./output/gan_epoch_002.png
Please install pytorch-gan-metrics via 'pip install pytorch-gan-metrics'
[Epoch 3/50][Batch 0/391]: Loss_D: 0.5236, Loss_G: 2.7999
[Epoch 3/50][Batch 100/391]: Loss_D: 0.9169, Loss_G: 2.5793
[Epoch 3/50][Batch 200/391]: Loss_D: 0.1177, Loss_G: 3.4534
[Epoch 3/50][Batch 300/391]: 

In [None]:
# Save each model's state dictionary.
torch.save(gan_generator.state_dict(), os.path.join(OUTPUT_DIR, 'gan_generator.pth'))
torch.save(gan_discriminator.state_dict(), os.path.join(OUTPUT_DIR, 'gan_discriminator.pth'))
torch.save(vae_model.state_dict(), os.path.join(OUTPUT_DIR, 'vae.pth'))
torch.save(diffusion_model.state_dict(), os.path.join(OUTPUT_DIR, 'diffusion.pth'))
print("Models saved successfully.")

NameError: name 'gan_generator' is not defined

In [2]:
import os
import shutil

# Define the output folder.
OUTPUT_DIR = './output'

# Define subdirectories for each model.
gan_folder = os.path.join(OUTPUT_DIR, 'gan')
vae_folder = os.path.join(OUTPUT_DIR, 'vae')
diffusion_folder = os.path.join(OUTPUT_DIR, 'diffusion')

# Create the subdirectories if they don't exist.
os.makedirs(gan_folder, exist_ok=True)
os.makedirs(vae_folder, exist_ok=True)
os.makedirs(diffusion_folder, exist_ok=True)

# Iterate over all items in the output directory.
for file_name in os.listdir(OUTPUT_DIR):
    full_path = os.path.join(OUTPUT_DIR, file_name)
    # Skip directories
    if not os.path.isfile(full_path):
        continue

    # Check filename prefix and move accordingly.
    if file_name.startswith('gan_'):
        destination = os.path.join(gan_folder, file_name)
        shutil.move(full_path, destination)
        print(f"Moved {file_name} to {gan_folder}")
    elif file_name.startswith('vae_'):
        destination = os.path.join(vae_folder, file_name)
        shutil.move(full_path, destination)
        print(f"Moved {file_name} to {vae_folder}")
    elif file_name.startswith('diffusion_'):
        destination = os.path.join(diffusion_folder, file_name)
        shutil.move(full_path, destination)
        print(f"Moved {file_name} to {diffusion_folder}")

print("File reorganization complete.")

Moved vae_epoch_037.png to ./output/vae
Moved diffusion_epoch_020.png to ./output/diffusion
Moved diffusion_epoch_031.png to ./output/diffusion
Moved gan_epoch_026.png to ./output/gan
Moved diffusion_epoch_034.png to ./output/diffusion
Moved vae_epoch_046.png to ./output/vae
Moved diffusion_epoch_022.png to ./output/diffusion
Moved diffusion_epoch_028.png to ./output/diffusion
Moved diffusion_epoch_013.png to ./output/diffusion
Moved gan_epoch_015.png to ./output/gan
Moved diffusion_epoch_026.png to ./output/diffusion
Moved vae_epoch_005.png to ./output/vae
Moved diffusion_epoch_027.png to ./output/diffusion
Moved vae_epoch_003.png to ./output/vae
Moved vae_epoch_034.png to ./output/vae
Moved vae_epoch_018.png to ./output/vae
Moved diffusion_epoch_044.png to ./output/diffusion
Moved diffusion_epoch_048.png to ./output/diffusion
Moved diffusion_epoch_021.png to ./output/diffusion
Moved vae_epoch_049.png to ./output/vae
Moved gan_epoch_045.png to ./output/gan
Moved diffusion_epoch_039.pn

In [None]:
import os
import torch
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchvision.utils import save_image

DATA_PATH = './data'             # Where CIFAR-10 tar/pickle files are located
IMAGE_SIZE = 32                  # Native CIFAR-10 resolution; set to 64
OUTPUT_REAL_IMAGES = './data/real_images'   # Folder to store extracted images

os.makedirs(OUTPUT_REAL_IMAGES, exist_ok=True)

def export_cifar10_as_images():
    # Transform pipeline
    transform_pipe = transforms.Compose([
        transforms.Resize(IMAGE_SIZE),
        transforms.ToTensor(),
    ])

    # Load CIFAR-10 training or test set
    cifar_dataset = datasets.CIFAR10(root=DATA_PATH, train=True, download=True, transform=transform_pipe)
    dataloader = DataLoader(cifar_dataset, batch_size=1, shuffle=False)

    count = 0
    for images, labels in dataloader:
        # images: [1, 3, H, W]
        # Save image to disk as a PNG (in [0,1] range by default when using transforms.ToTensor())
        save_image(images[0], os.path.join(OUTPUT_REAL_IMAGES, f"img_{count:05d}.png"))
        count += 1

    print(f"Exported {count} images to {OUTPUT_REAL_IMAGES}")

if __name__ == "__main__":
    export_cifar10_as_images()

Exported 50000 images to ./data/real_images


In [None]:
%whos

Variable                    Type        Data/Info
-------------------------------------------------
BATCH_SIZE                  int         128
BETA1                       float       0.5
CHANNELS_IMG                int         3
DATA_PATH                   str         ./data
DataLoader                  type        <class 'torch.utils.data.dataloader.DataLoader'>
DiffusionModel              type        <class '__main__.DiffusionModel'>
GANDiscriminator            type        <class '__main__.GANDiscriminator'>
GANGenerator                type        <class '__main__.GANGenerator'>
IMAGE_SIZE                  int         32
LR                          float       0.0002
NOISE_DIM                   int         100
NUM_EPOCHS                  int         50
OUTPUT_DIR                  str         ./output
OUTPUT_REAL_IMAGES          str         ./data/real_images
SelfAttention               type        <class '__main__.SelfAttention'>
UNetBlock                   type        <class '__main

In [4]:
#!/usr/bin/env python
"""
Evaluate Generated Images from Subdirectories using pytorch_image_generation_metrics

This script computes the Inception Score (IS) and Frechet Inception Distance (FID)
for generated images stored in subdirectories of the output directory. It assumes
that your images are stored as follows:

    ./output/gan/
    ./output/vae/
    ./output/diffusion/

The script uses the functions:
    - get_inception_score_from_directory
    - get_fid_from_directory
    - get_inception_score_and_fid_from_directory

A reference NPZ file (with precomputed real-image statistics) is required for FID.
For CIFAR-10, you might use something like './data/cifar10_real_stats.npz'.
"""

import os
from pytorch_image_generation_metrics import (
    get_inception_score_from_directory,
    get_fid_from_directory,
    get_inception_score_and_fid_from_directory
)

# ---------------------------------------------------------------------------
# Directory and Reference File Configuration
# ---------------------------------------------------------------------------
OUTPUT_DIR = './output'
GAN_DIR = os.path.join(OUTPUT_DIR, "gan")
VAE_DIR = os.path.join(OUTPUT_DIR, "vae")
DIFFUSION_DIR = os.path.join(OUTPUT_DIR, "diffusion")

# Path to the NPZ file with real-image statistics.
FID_REF = './data/cifar10_real_stats.npz'

# ---------------------------------------------------------------------------
# Evaluation Function
# ---------------------------------------------------------------------------
def evaluate_images_dir(model_name, img_dir, fid_ref):
    """
    Evaluate generated images in a directory using pytorch_image_generation_metrics.

    Parameters:
        model_name (str): Identifier for the model type (e.g., "GAN", "VAE", "Diffusion").
        img_dir (str): Directory path containing generated images.
        fid_ref (str): Path to the NPZ file with precomputed statistics for real images.
    """
    print(f"\nEvaluating {model_name} images from directory: {img_dir}\n")

    # Compute the Inception Score from the directory.
    try:
        IS, IS_std = get_inception_score_from_directory(img_dir, batch_size=32, resize=True, splits=10)
        print(f"[{model_name}] Inception Score: {IS:.2f} ± {IS_std:.2f}")
    except Exception as e:
        print(f"Error computing Inception Score for {model_name}: {e}")

    # Compute the FID from the directory.
    try:
        FID = get_fid_from_directory(img_dir, fid_ref, batch_size=32, resize=True)
        print(f"[{model_name}] FID: {FID:.2f}")
    except Exception as e:
        print(f"Error computing FID for {model_name}: {e}")

    # Optionally, compute both metrics together.
    try:
        (IS2, IS_std2), FID2 = get_inception_score_and_fid_from_directory(img_dir, fid_ref, batch_size=32, resize=True, splits=10)
        print(f"[{model_name}] Combined -> IS: {IS2:.2f} ± {IS_std2:.2f}, FID: {FID2:.2f}")
    except Exception as e:
        print(f"Error computing combined metrics for {model_name}: {e}")

# ---------------------------------------------------------------------------
# Main Execution
# ---------------------------------------------------------------------------
def main():
    evaluate_images_dir("GAN", GAN_DIR, FID_REF)
    evaluate_images_dir("VAE", VAE_DIR, FID_REF)
    evaluate_images_dir("Diffusion", DIFFUSION_DIR, FID_REF)

if __name__ == "__main__":
    main()


Evaluating GAN images from directory: ./output/gan

Error computing Inception Score for GAN: get_inception_feature() got an unexpected keyword argument 'resize'
Error computing FID for GAN: get_inception_feature() got an unexpected keyword argument 'resize'
Error computing combined metrics for GAN: get_inception_feature() got an unexpected keyword argument 'resize'

Evaluating VAE images from directory: ./output/vae

Error computing Inception Score for VAE: get_inception_feature() got an unexpected keyword argument 'resize'
Error computing FID for VAE: get_inception_feature() got an unexpected keyword argument 'resize'
Error computing combined metrics for VAE: get_inception_feature() got an unexpected keyword argument 'resize'

Evaluating Diffusion images from directory: ./output/diffusion

Error computing Inception Score for Diffusion: get_inception_feature() got an unexpected keyword argument 'resize'
Error computing FID for Diffusion: get_inception_feature() got an unexpected keywo

In [6]:
!pip install
from pytorch_image_generation_metrics import (
    get_inception_score_from_directory,
    get_fid_from_directory
)

# Define directories for each model's output
GAN_DIR = './output/gan'
VAE_DIR = './output/vae'
DIFFUSION_DIR = './output/diffusion'

# Path to the NPZ file with precomputed real-image statistics (e.g., CIFAR-10 test set)
FID_REF = '/content/cifar10.test.npz'

# --------------------
# Evaluate GAN Outputs
# --------------------
IS_gan, IS_std_gan = get_inception_score_from_directory(GAN_DIR, batch_size=32, splits=10)
print(f"GAN Inception Score: {IS_gan:.2f} ± {IS_std_gan:.2f}")

FID_gan = get_fid_from_directory(GAN_DIR, FID_REF, batch_size=32)
print(f"GAN FID: {FID_gan:.2f}")

# --------------------
# Evaluate VAE Outputs
# --------------------
IS_vae, IS_std_vae = get_inception_score_from_directory(VAE_DIR, batch_size=32, splits=10)
print(f"VAE Inception Score: {IS_vae:.2f} ± {IS_std_vae:.2f}")

FID_vae = get_fid_from_directory(VAE_DIR, FID_REF, batch_size=32)
print(f"VAE FID: {FID_vae:.2f}")

# --------------------
# Evaluate Diffusion Outputs
# --------------------
IS_diff, IS_std_diff = get_inception_score_from_directory(DIFFUSION_DIR, batch_size=32, splits=10)
print(f"Diffusion Inception Score: {IS_diff:.2f} ± {IS_std_diff:.2f}")

FID_diff = get_fid_from_directory(DIFFUSION_DIR, FID_REF, batch_size=32)
print(f"Diffusion FID: {FID_diff:.2f}")

[31mERROR: You must give at least one requirement to install (see "pip help install")[0m[31m
[0m

Downloading: "https://github.com/w86763777/pytorch-image-generation-metrics/releases/download/v0.1.0/pt_inception-2015-12-05-6726825d.pth" to /root/.cache/torch/hub/checkpoints/pt_inception-2015-12-05-6726825d.pth
100%|██████████| 91.2M/91.2M [00:02<00:00, 39.2MB/s]


GAN Inception Score: 1.49 ± 0.16
GAN FID: 475.42
VAE Inception Score: 1.20 ± 0.08
VAE FID: 435.53
Diffusion Inception Score: 1.22 ± 0.05
Diffusion FID: 466.11


In [8]:
#!/usr/bin/env python
"""
Project Title: Benchmarking Generative Models on CelebA

This script unzips a local CelebA archive, trains a DCGAN, VAE, and
a simple Diffusion Model (with attention) on the CelebA images (resized
to 64×64, normalized to [-1,1]), and saves composite outputs after each epoch.

Requirements:
    pip install torch torchvision matplotlib pytorch-gan-metrics
"""

import os
import random
import zipfile
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, utils as vutils

# ---------------------
# 1. Configuration
# ---------------------
seed = 42
random.seed(seed)
torch.manual_seed(seed)

DATA_PATH    = './data'
CELEBA_ZIP   = os.path.join(DATA_PATH, 'img_align_celeba.zip')
CELEBA_DIR   = os.path.join(DATA_PATH, 'celeba')
OUTPUT_DIR   = './output1'   # Updated output directory
os.makedirs(CELEBA_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)

BATCH_SIZE   = 128
IMAGE_SIZE   = 64
NOISE_DIM    = 100
NUM_EPOCHS   = 30
LR           = 0.0002
BETA1        = 0.5
CHANNELS_IMG = 3
device       = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ---------------------
# 2. Unzip CelebA
# ---------------------
if not os.listdir(CELEBA_DIR):
    print(f"Extracting {CELEBA_ZIP} → {CELEBA_DIR}")
    with zipfile.ZipFile(CELEBA_ZIP, 'r') as z:
        z.extractall(CELEBA_DIR)

# ---------------------
# 3. DataLoader
# ---------------------
def get_celeba_dataloader(root_dir=CELEBA_DIR, batch_size=BATCH_SIZE, image_size=IMAGE_SIZE):
    transform = transforms.Compose([
        transforms.CenterCrop(178),
        transforms.Resize(image_size),
        transforms.ToTensor(),
        transforms.Normalize((0.5,0.5,0.5),(0.5,0.5,0.5))
    ])
    dataset = datasets.ImageFolder(root_dir, transform=transform)
    return DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=4)

# ---------------------
# 4. Model Definitions
# (Insert your GANGenerator, GANDiscriminator, VAE, UNetBlock, SelfAttention, DiffusionModel classes here)
# ---------------------

# ---------------------
# 5. Evaluation Utility
# ---------------------
def evaluate_generated_images(generator, fixed_noise, output_dir=OUTPUT_DIR, epoch=0):
    generator.eval()
    with torch.no_grad():
        imgs = generator(fixed_noise).cpu()
    path = os.path.join(output_dir, f"celeba_gan_epoch_{epoch:03d}.png")
    vutils.save_image(imgs, path, normalize=True, nrow=8)
    print(f"Saved {path}")

# ---------------------
# 6. Training Loops
# ---------------------
def train_gan(dataloader):
    netG = GANGenerator().to(device)
    netD = GANDiscriminator().to(device)
    optG = optim.Adam(netG.parameters(), lr=LR, betas=(BETA1,0.999))
    optD = optim.Adam(netD.parameters(), lr=LR, betas=(BETA1,0.999))
    criterion = nn.BCELoss()
    fixed_noise = torch.randn(64, NOISE_DIM, 1, 1, device=device)

    for epoch in range(1, NUM_EPOCHS+1):
        for real, _ in dataloader:
            b = real.size(0)
            real = real.to(device)
            # Discriminator real
            netD.zero_grad()
            label_real = torch.full((b,),1.,device=device)
            lossD_real = criterion(netD(real).view(-1), label_real)
            # Discriminator fake
            noise = torch.randn(b, NOISE_DIM,1,1,device=device)
            fake = netG(noise)
            label_fake = torch.full((b,),0.,device=device)
            lossD_fake = criterion(netD(fake.detach()).view(-1), label_fake)
            (lossD_real+lossD_fake).backward()
            optD.step()
            # Generator update
            netG.zero_grad()
            label_gen = torch.full((b,),1.,device=device)
            lossG = criterion(netD(fake).view(-1), label_gen)
            lossG.backward(); optG.step()

        print(f"GAN Epoch {epoch}: Loss_D {lossD_real+lossD_fake:.4f}, Loss_G {lossG:.4f}")
        evaluate_generated_images(netG, fixed_noise, epoch=epoch)

def train_vae(dataloader):
    vae = VAE().to(device)
    opt = optim.Adam(vae.parameters(), lr=LR)
    for epoch in range(1, NUM_EPOCHS+1):
        tot=0
        for x,_ in dataloader:
            x = x.to(device)
            recon, mu, logvar = vae(x)
            recon_loss = nn.MSELoss(reduction='sum')(recon,x)
            kl = -0.5*torch.sum(1+logvar-mu.pow(2)-logvar.exp())
            loss = recon_loss+kl
            opt.zero_grad(); loss.backward(); opt.step()
            tot+=loss.item()
        print(f"VAE Epoch {epoch}: AvgLoss {tot/len(dataloader.dataset):.4f}")
        with torch.no_grad():
            sample = next(iter(dataloader))[0][:64].to(device)
            imgs,_,_ = vae(sample)
            vutils.save_image(imgs.cpu(), os.path.join(OUTPUT_DIR,f"celeba_vae_{epoch:03d}.png"), normalize=True)

def train_diffusion(dataloader):
    diff = DiffusionModel().to(device)
    opt = optim.Adam(diff.parameters(), lr=LR)
    for epoch in range(1, NUM_EPOCHS+1):
        tot=0
        for x,_ in dataloader:
            x = x.to(device)
            noisy = x + 0.1*torch.randn_like(x)
            pred = diff(noisy)
            loss = nn.MSELoss()(pred,x)
            opt.zero_grad(); loss.backward(); opt.step()
            tot+=loss.item()
        print(f"Diff Epoch {epoch}: AvgLoss {tot/len(dataloader):.4f}")
        with torch.no_grad():
            noisy = next(iter(dataloader))[0][:64].to(device) + 0.1*torch.randn(64,CHANNELS_IMG,IMAGE_SIZE,IMAGE_SIZE,device=device)
            out = diff(noisy)
            vutils.save_image(out.cpu(), os.path.join(OUTPUT_DIR,f"celeba_diff_{epoch:03d}.png"), normalize=True)

# ---------------------
# 7. Main
# ---------------------
def main():
    celeba_loader = get_celeba_dataloader()
    train_gan(celeba_loader)
    train_vae(celeba_loader)
    train_diffusion(celeba_loader)

if __name__ == "__main__":
    main()

Extracting ./data/img_align_celeba.zip → ./data/celeba
GAN Epoch 1: Loss_D 0.3812, Loss_G 3.6866
Saved ./output1/celeba_gan_epoch_001.png
GAN Epoch 2: Loss_D 1.0079, Loss_G 1.8592
Saved ./output1/celeba_gan_epoch_002.png
GAN Epoch 3: Loss_D 0.6834, Loss_G 1.1695
Saved ./output1/celeba_gan_epoch_003.png
GAN Epoch 4: Loss_D 0.4188, Loss_G 3.2421
Saved ./output1/celeba_gan_epoch_004.png
GAN Epoch 5: Loss_D 0.5022, Loss_G 2.5650
Saved ./output1/celeba_gan_epoch_005.png
GAN Epoch 6: Loss_D 0.4067, Loss_G 5.3215
Saved ./output1/celeba_gan_epoch_006.png
GAN Epoch 7: Loss_D 0.4896, Loss_G 4.8496
Saved ./output1/celeba_gan_epoch_007.png
GAN Epoch 8: Loss_D 1.6375, Loss_G 4.0988
Saved ./output1/celeba_gan_epoch_008.png
GAN Epoch 9: Loss_D 1.2593, Loss_G 4.8197
Saved ./output1/celeba_gan_epoch_009.png
GAN Epoch 10: Loss_D 0.2019, Loss_G 3.6329
Saved ./output1/celeba_gan_epoch_010.png
GAN Epoch 11: Loss_D 0.2061, Loss_G 3.9808
Saved ./output1/celeba_gan_epoch_011.png
GAN Epoch 12: Loss_D 1.0059, L

In [9]:
import os
import shutil

# Define the output folder.
OUTPUT_DIR = './output1'

# Define subdirectories for each model.
gan_folder = os.path.join(OUTPUT_DIR, 'gan')
vae_folder = os.path.join(OUTPUT_DIR, 'vae')
diffusion_folder = os.path.join(OUTPUT_DIR, 'diffusion')

# Create the subdirectories if they don't exist.
os.makedirs(gan_folder, exist_ok=True)
os.makedirs(vae_folder, exist_ok=True)
os.makedirs(diffusion_folder, exist_ok=True)

# Iterate over all items in the output directory.
for file_name in os.listdir(OUTPUT_DIR):
    full_path = os.path.join(OUTPUT_DIR, file_name)
    # Skip directories
    if not os.path.isfile(full_path):
        continue

    # Check filename prefix and move accordingly.
    if file_name.startswith('gan_'):
        destination = os.path.join(gan_folder, file_name)
        shutil.move(full_path, destination)
        print(f"Moved {file_name} to {gan_folder}")
    elif file_name.startswith('vae_'):
        destination = os.path.join(vae_folder, file_name)
        shutil.move(full_path, destination)
        print(f"Moved {file_name} to {vae_folder}")
    elif file_name.startswith('diffusion_'):
        destination = os.path.join(diffusion_folder, file_name)
        shutil.move(full_path, destination)
        print(f"Moved {file_name} to {diffusion_folder}")

print("File reorganization complete.")

File reorganization complete.


In [13]:
import os
import shutil

BASE_DIR       = './output1'
GAN_DIR        = os.path.join(BASE_DIR, 'gan')
VAE_DIR        = os.path.join(BASE_DIR, 'vae')
DIFFUSION_DIR  = os.path.join(BASE_DIR, 'diffusion')

# 1. ensure folders exist
for d in (GAN_DIR, VAE_DIR, DIFFUSION_DIR):
    os.makedirs(d, exist_ok=True)

# 2. iterate and move
for fname in os.listdir(BASE_DIR):
    if not fname.lower().endswith('.png'):
        continue

    src = os.path.join(BASE_DIR, fname)
    key = fname.lower()
    if 'gan' in key:
        dst_dir = GAN_DIR
    elif 'vae' in key:
        dst_dir = VAE_DIR
    elif 'diff' in key:       # catches celeba_diff_*.png
        dst_dir = DIFFUSION_DIR
    else:
        continue  # or handle “others” if needed

    shutil.move(src, os.path.join(dst_dir, fname))
    print(f"Moved {fname} → {dst_dir}")


In [21]:
from pytorch_image_generation_metrics import (
    get_inception_score_from_directory,
    get_fid_from_directory
)

# Define directories for each model's output
GAN_DIR = './output1/gan'
VAE_DIR = './output1/vae'
DIFFUSION_DIR = './output1/diffusion'

# Path to the NPZ file with precomputed real-image statistics (e.g., CIFAR-10 test set)
FID_REF = '/content/celebahq.3k.128.npz'

# --------------------
# Evaluate GAN Outputs
# --------------------
IS_gan, IS_std_gan = get_inception_score_from_directory(GAN_DIR, batch_size=32, splits=10)
print(f"GAN Inception Score: {IS_gan:.2f} ± {IS_std_gan:.2f}")

FID_gan = get_fid_from_directory(GAN_DIR, FID_REF, batch_size=32)
print(f"GAN FID: {FID_gan:.2f}")

# --------------------
# Evaluate VAE Outputs
# --------------------
IS_vae, IS_std_vae = get_inception_score_from_directory(VAE_DIR, batch_size=32, splits=10)
print(f"VAE Inception Score: {IS_vae:.2f} ± {IS_std_vae:.2f}")

FID_vae = get_fid_from_directory(VAE_DIR, FID_REF, batch_size=32)
print(f"VAE FID: {FID_vae:.2f}")

# --------------------
# Evaluate Diffusion Oacutputs
# --------------------
IS_diff, IS_std_diff = get_inception_score_from_directory(DIFFUSION_DIR, batch_size=32, splits=10)
print(f"Diffusion Inception Score: {IS_diff:.2f} ± {IS_std_diff:.2f}")

FID_diff = get_fid_from_directory(DIFFUSION_DIR, FID_REF, batch_size=32)
print(f"Diffusion FID: {FID_diff:.2f}")

GAN Inception Score: 1.20 ± 0.07
GAN FID: 529.68
VAE Inception Score: 1.12 ± 0.03
VAE FID: 451.77
Diffusion Inception Score: 1.29 ± 0.18
Diffusion FID: 483.38
