based on https://github.com/uygarkurt/DDPM-Image-Generation/blob/main/DDPM_Image_Generartion.ipynb

In [149]:
from IPython.display import display, Markdown, HTML, clear_output

In [150]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import AdamW
from torch.optim import SGD
from torch.utils.data import DataLoader, TensorDataset

from torchvision import transforms
from torchvision.models import inception_v3
from torchvision.transforms import ToTensor, Resize, Normalize, Compose


from diffusers import UNet2DModel, DDPMScheduler, DDPMPipeline
from diffusers.optimization import get_cosine_schedule_with_warmup

from datasets import load_dataset

from accelerate import Accelerator

from PIL import Image
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
import numpy as np
import random
import timeit
import json

import os
import time

import numpy as np
from scipy.linalg import sqrtm


# ignore UserWarning
import warnings
warnings.filterwarnings("ignore")

In [151]:
RANDOM_SEED = 42
IMG_SIZE = 64
DATASET_PERCENTAGE = 0.005
BATCH_SIZE = 4
LEARNING_RATE = 1e-4
NUM_EPOCHS = 50
NUM_GENERATE_IMAGES = 9
NUM_TIMESTEPS = 500
MIXED_PRECISION = "fp16"
GRADIENT_ACCUMULATION_STEPS = 1

random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)
torch.cuda.manual_seed(RANDOM_SEED)
torch.cuda.manual_seed_all(RANDOM_SEED)
# torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

device = "cuda" if torch.cuda.is_available() else "cpu"

In [152]:
local_dataset_path = f"data/square{IMG_SIZE}_random{str(DATASET_PERCENTAGE)}/"
dataset = load_dataset("imagefolder", data_dir=local_dataset_path)
dataset = dataset["train"]
clear_output()

preprocess = transforms.Compose(
    [
        transforms.Resize((IMG_SIZE, IMG_SIZE)),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.5], [0.5]),
    ]
)

def transform(examples):
    images = [preprocess(image.convert("RGB")) for image in examples["image"]]
    return {"images": images}


dataset.set_transform(transform)

In [153]:
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)

In [154]:
from tqdm.notebook import tqdm

In [155]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Subset
from torchvision import datasets, transforms
from diffusers import UNet2DModel, DDPMPipeline
import numpy as np
import random


In [156]:
model = UNet2DModel(
    sample_size=64,  # CIFAR-10 images are 32x32
    in_channels=3,   # RGB images
    out_channels=3,  # RGB images
    layers_per_block=2,
    block_out_channels=(128, 128, 256, 256),
    down_block_types=(
        "DownBlock2D", "DownBlock2D", "DownBlock2D", "DownBlock2D"
    ),
    up_block_types=(
        "UpBlock2D", "UpBlock2D", "UpBlock2D", "UpBlock2D"
    ),
)


In [157]:
import torch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

# Check if CUDA is available and set the device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# MNIST dataset
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])
train_dataset = datasets.MNIST(root='mnist_data', train=True, transform=transform, download=True)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)



In [158]:
from diffusers import UNet2DModel
import torch.nn as nn

# Define the UNet model
model = UNet2DModel(
    sample_size=28,       # Size of the images
    in_channels=1,        # Number of input channels (grayscale for MNIST)
    out_channels=1,       # Number of output channels
    layers_per_block=2,   # Number of layers per block
    block_out_channels=(64, 128, 256),  # Number of output channels for each block
    down_block_types=("DownBlock2D", "DownBlock2D", "DownBlock2D"),  # Types of down blocks
    up_block_types=("UpBlock2D", "UpBlock2D", "UpBlock2D"),          # Types of up blocks
).to(device)  # Move model to GPU

# Define the DDPM pipeline
from diffusers import DDPMPipeline

pipeline = DDPMPipeline(unet=model, scheduler=None)



In [162]:
import torch.optim as optim

# Hyperparameters
epochs = 1
learning_rate = 1e-4

# Optimizer
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Loss function
criterion = nn.MSELoss()

# Training loop
for epoch in range(epochs):
    for batch in tqdm(train_loader, total=len(train_loader), desc=f"Epoch [{epoch + 1}/{epochs}]"):
        images, _ = batch
        images = images.to(device).to(torch.float32)  # Move images to GPU

        # Add noise
        noise = torch.randn_like(images, device=device)  # Create noise on GPU
        noised_images = images + noise

        # Generate random timesteps for each image in the batch
        timesteps = torch.randint(0, 1000, (images.shape[0],), device=images.device).float()

        # Forward pass
        optimizer.zero_grad()
        outputs = model(noised_images, timesteps).sample

        # Ensure the output shape matches the input shape
        outputs = outputs.view_as(images)

        # Calculate loss
        loss = criterion(outputs, images)
        loss.backward()
        optimizer.step()

    print(f"Epoch [{epoch + 1}/{epochs}], Loss: {loss.item():.4f}")




Epoch [1/1]:   0%|          | 0/938 [00:00<?, ?it/s]

Epoch [1/1], Loss: 0.0292


In [165]:
import torch.nn.functional as F

def extract_latent(image, timestep):
    """
    Extracts the latent representation from the UNet model.
    """
    timestep = timestep.float()  # Convert timestep to float and expand dimensions
    timestep = timestep[:, None]  # Expand to match the required dimensions

    x = model.conv_in(image)
    latents = []

    for down_block in model.down_blocks:
        x = down_block(x, timestep)
        latents.append(x)
    
    x = model.mid_block(x, timestep)
    return latents, x

def generate_from_latent(latents, mid_latent, timestep):
    """
    Generates an image from the latent representation.
    """
    timestep = timestep.float()  # Convert timestep to float and expand dimensions
    timestep = timestep[:, None]  # Expand to match the required dimensions

    x = mid_latent

    for up_block in model.up_blocks:
        x = up_block(x, timestep)
    
    x = model.conv_out(x)
    return torch.tanh(x)

# Example usage
image, _ = next(iter(train_loader))
image = image.to(device).to(torch.float32)  # Move image to GPU
timestep = torch.tensor([500], device=image.device).float()  # Example timestep converted to float

latents, mid_latent = extract_latent(image, timestep)
generated_image = generate_from_latent(latents, mid_latent, timestep)

# Display the generated image
import matplotlib.pyplot as plt

plt.imshow(generated_image[0].detach().cpu().numpy().squeeze(), cmap='gray')
plt.show()



RuntimeError: mat1 and mat2 shapes cannot be multiplied (1x1 and 256x64)