Used the StanfordCars Dataset with around 8000 images in the training set

In [None]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("jutrera/stanford-car-dataset-by-classes-folder")

print("Path to dataset files:", path)

In [None]:
# Investingating the dataset with visualization

import math
import torch
import torchvision
import matplotlib.pyplot as plt

def show_images(dataset, num_samples = 20, cols = 4):
    plt.figure(figsize=(15,15))
    for i, img in enumerate(dataset):
        if i == num_samples:
            break
        plt.subplot(num_samples//cols + 1, cols, i+1)
        plt.imshow(img[0])
root = r"C:\Users\danie\.cache\kagglehub\datasets\jutrera\stanford-car-dataset-by-classes-folder\versions\2\car_data\car_data"
train = torchvision.datasets.ImageFolder(root= rf"{root}\train")
test = torchvision.datasets.ImageFolder(root = rf"{root}\test")
show_images(train)

The Forward Process (Noise Scheduler)

In [None]:
import torch.nn.functional as F

def linear_beta_schedule(timesteps, start=0.0001, end =0.02):
    return torch.linspace(start, end, timesteps)

def get_index_from_list(vals, t, x_shape):
    batch_size = t.shape[0]
    out=vals.gather(-1, t.to("cuda"))
    return out.reshape(batch_size, *((1,) * (len(x_shape) -1))).to("cuda")

def forward_diffusion_sample(x_0, t, device="cuda"):
    # returns noisy version of an image at certain timestep t
    noise = torch.randn_like(x_0)
    sqrt_alphas_cumprod_t = get_index_from_list(sqrt_alphas_cumprod, t, x_0.shape)
    sqrt_one_minus_alphas_cumprod_t = get_index_from_list(sqrt_one_minus_alphas_cumprod, t, x_0.shape)
    
    return sqrt_alphas_cumprod_t.to(device) * x_0.to(device) \
    + sqrt_one_minus_alphas_cumprod_t.to(device) * noise.to(device), noise.to(device)

# Define beta schedule
T = 200
betas = linear_beta_schedule(timesteps=T)

# Pre-calculated different terms
alphas = 1. - betas
alphas_cumprod = torch.cumprod(alphas, axis = 0)
alphas_cumprod_prev = F.pad(alphas_cumprod[:-1], (1,0), value = 1.0)
sqrt_recip_alphas = torch.sqrt(1.0/alphas)
sqrt_alphas_cumprod = torch.sqrt(alphas_cumprod)
sqrt_one_minus_alphas_cumprod = torch.sqrt(1. - alphas_cumprod)
posterior_variance = betas * (1. - alphas_cumprod_prev) / (1. - alphas_cumprod)
alphas = alphas.to("cuda")
betas = betas.to("cuda")
alphas_cumprod = alphas_cumprod.to("cuda")
alphas_cumprod_prev = alphas_cumprod_prev.to("cuda")
sqrt_recip_alphas = sqrt_recip_alphas.to("cuda")
sqrt_alphas_cumprod = sqrt_alphas_cumprod.to("cuda")
sqrt_one_minus_alphas_cumprod = sqrt_one_minus_alphas_cumprod.to("cuda")
posterior_variance = posterior_variance.to("cuda")



In [None]:
from torchvision import transforms
from torch.utils.data import DataLoader
import numpy as np

IMG_SIZE = 64
BATCH_SIZE = 128

def load_transformed_dataset(isTraining=True):
    data_transforms = [
        transforms.Resize((IMG_SIZE, IMG_SIZE)),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(), # Scales data into [0,1]
        transforms.Lambda(lambda t: (t*2 -1)) # Scale between [-1,1]
    ]
    data_transform = transforms.Compose(data_transforms)
    
    root = r"C:\Users\danie\.cache\kagglehub\datasets\jutrera\stanford-car-dataset-by-classes-folder\versions\2\car_data\car_data"
    train = torchvision.datasets.ImageFolder(root= rf"{root}\train", transform=data_transform)
    test = torchvision.datasets.ImageFolder(root = rf"{root}\test", transform=data_transform)
    
    if isTraining:
        return train
    else:
        return test

def show_tensor_image(image):
    reverse_transforms = transforms.Compose([
        transforms.Lambda(lambda t: (t+1)/2),
        transforms.Lambda(lambda t: t.permute(1,2,0)),
        transforms.Lambda(lambda t: t * 255),
        transforms.Lambda(lambda t: t.byte().cpu().numpy()),
        transforms.ToPILImage()
    ])
    
    # Take first image of batch
    if len(image.shape)==4:
        image = image[0, :, :, :]
    plt.imshow(reverse_transforms(image))
data = load_transformed_dataset()
dataloader = DataLoader(data, batch_size=BATCH_SIZE, shuffle=True, drop_last=True, pin_memory=torch.cuda.is_available(), num_workers=0)

In [None]:
# Forward Diffusion
image = next(iter(dataloader))[0]

plt.figure(figsize=(10,5))
plt.axis('off')
num_images = 10
stepsize = int(T/num_images)

for idx in range(0, T, stepsize):
    t = torch.Tensor([idx]).type(torch.int64)
    plt.subplot(1, num_images+1, int(idx/stepsize)+1)
    image, noise = forward_diffusion_sample(image, t)
    show_tensor_image(image)

Backward Process

In [None]:
from torch import nn
import math

class Block(nn.Module):
    def __init__(self, in_ch, out_ch, time_emb_dim, up=False):
        super().__init__()
        self.time_mlp = nn.Linear(time_emb_dim, out_ch)
        if up:
            self.conv1 = nn.Conv2d(2*in_ch, out_ch, 3, padding=1)
            self.transform = nn.ConvTranspose2d(out_ch, out_ch, 4, 2, 1)
        else:
            self.conv1 = nn.Conv2d(in_ch, out_ch, 3, padding = 1)
            self.transform = nn.Conv2d(out_ch, out_ch, 4, 2, 1)
        self.conv2 = nn.Conv2d(out_ch, out_ch, 3, padding=1)
        self.pool = nn.MaxPool2d(3, stride = 2)
        self.bnorm = nn.BatchNorm2d(out_ch)
        self.relu = nn.ReLU()
    
    def forward(self, x, t, ):
        h = self.bnorm(self.relu(self.conv1(x)))
        time_emb = self.relu(self.time_mlp(t))
        time_emb = time_emb[(..., ) + (None, ) * 2]
        h = h + time_emb
        h = self.bnorm(self.relu(self.conv2(h)))
        return self.transform(h)

class SinusoidalPositionEmbeddings(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.dim = dim
        
    def forward(self, time):
         device = time.device
         half_dim = self.dim // 2
         embeddings = math.log(10000) / (half_dim-1)
         embeddings = torch.exp(torch.arange(half_dim, device=device) * -embeddings)
         embeddings = time[:, None] * embeddings[None, :]
         embeddings = torch.cat((embeddings.sin(), embeddings.cos()), dim=-1)
         return embeddings

class SimpleUnet(nn.Module):
    # Simplified variant of the Unet architecture
    def __init__(self):
        super().__init__()
        image_size = IMG_SIZE
        image_channels = 3
        down_channels = (64, 128, 256, 512, 1024)
        up_channels = (1024, 512, 256, 128, 64)
        
        # Kernel size
        out_dim = 1
        
        # Sinusoidal Embedding length
        time_emb_dim = 32
        
        self.time_mlp = nn.Sequential(
            SinusoidalPositionEmbeddings(time_emb_dim),
            nn.Linear(time_emb_dim, time_emb_dim),
            nn.ReLU()
        )
        
        self.conv0 = nn.Conv2d(image_channels, down_channels[0], 3, padding=1)
        
        self.downs = nn.ModuleList([Block(down_channels[i], down_channels[i+1], time_emb_dim) for i in range(len(down_channels)-1)])
        
        self.ups = nn.ModuleList([Block(up_channels[i], up_channels[i+1], time_emb_dim, up=True) for i in range(len(up_channels)-1)])
        
        self.output = nn.Conv2d(up_channels[-1], 3, out_dim)
        
    def forward(self, x, timestep):
        t = self.time_mlp(timestep)
        x = self.conv0(x)
        residual_inputs = []
        for down in self.downs:
            x = down(x, t)
            residual_inputs.append(x)
        for up in self.ups:
            residual_x = residual_inputs.pop()
            x = torch.cat((x, residual_x), dim=1)
            x = up(x, t)
        return self.output(x)
model = SimpleUnet()
model.to("cuda")
print("Num params: ", sum(p.numel() for p in model.parameters()))
model


Loss Function

In [None]:
def get_loss(model, x_0, t):
    x_noisy, noise = forward_diffusion_sample(x_0, t)
    noise_pred = model(x_noisy, t)
    return F.l1_loss(noise, noise_pred)

Sampling

In [None]:
@torch.no_grad()
def sample_timestep(x, t):
    """
    Calls model to predict noise in the image and returns the denoised image.
    Applies noise to this image, if we are not in the last step yet 
    """
    betas_t = get_index_from_list(betas, t, x.shape)
    sqrt_one_minus_alphas_cumprod_t = get_index_from_list(sqrt_one_minus_alphas_cumprod, t, x.shape)
    sqrt_recip_alphas_t = get_index_from_list(sqrt_recip_alphas, t, x.shape)
    
    # Call model (current image - noise prediction)
    model_mean = sqrt_recip_alphas_t * (
        x - betas_t * model(x, t)/ sqrt_one_minus_alphas_cumprod_t
    )
    
    posterior_variance_t = get_index_from_list(posterior_variance, t, x.shape)
    
    if t == 0:
        return model_mean
    else:
        noise = torch.randn_like(x)
        return model_mean + torch.sqrt(posterior_variance_t) * noise

@torch.no_grad()
def sample_plot_image():
    img_size = IMG_SIZE
    img = torch.randn((1, 3, img_size, img_size), device = "cuda")
    plt.figure(figsize=(15,15))
    plt.axis('off')
    num_images = 10
    stepsize = int(T/num_images)
    
    for i in range(0, T)[::-1]:
        t = torch.full((1, ), i, device="cuda", dtype=torch.long)
        img = sample_timestep(img, t)
        if i % stepsize == 0:
            plt.subplot(1, num_images, i//stepsize +1)
            show_tensor_image(img.detach().to("cuda"))
    plt.show()

Training

In [None]:
from torch.optim import Adam

device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)
model.train()
optimizer = Adam(model.parameters(), lr = 0.001)
epochs = 100
for epoch in range(epochs):
    for step, batch in enumerate(dataloader):
        x0 = batch[0].to("cuda" , non_blocking=True)
        optimizer.zero_grad()
        
        t = torch.randint(0, T, (BATCH_SIZE,), device="cuda").long()
        loss = get_loss(model, x0, t)
        loss.backward()
        optimizer.step()
        
        if step == 0:
            print(f"Epoch {epoch} | step {step:02d} Loss: {loss.item()} ")
            sample_plot_image()
        print(f"Epoch: {epoch}, step: {step}")

In [None]:
import torch; print(torch.__version__, torch.version.cuda, torch.cuda.is_available())


In [None]:
from torch.utils.data import DataLoader
from torch.utils.data._utils.collate import default_collate
from torchvision import transforms
from PIL import Image
import torch

IMG_SIZE = (128, 128)   # e.g., (64, 64) or (256, 256)

# Resize + ToTensor for PIL images
_img_transform = transforms.Compose([
    transforms.Resize(IMG_SIZE, antialias=True),   # force consistent HxW
    transforms.ToTensor(),
])

def pil_to_tensor_collate(batch):
    def convert(sample):
        if isinstance(sample, Image.Image):
            return _img_transform(sample)

        if isinstance(sample, (tuple, list)) and len(sample) > 0:
            first = sample[0]
            if isinstance(first, Image.Image):
                first = _img_transform(first)
            return type(sample)([first, *sample[1:]])

        if isinstance(sample, dict):
            s = dict(sample)
            if 'image' in s and isinstance(s['image'], Image.Image):
                s['image'] = _img_transform(s['image'])
            elif 'x' in s and isinstance(s['x'], Image.Image):
                s['x'] = _img_transform(s['x'])
            return s

        return sample

    converted = [convert(s) for s in batch]
    return default_collate(converted)


# make sure you created test_loader earlier like train_loader
test_loader = DataLoader(test, batch_size=BATCH_SIZE, shuffle=False, drop_last=True, pin_memory=torch.cuda.is_available(), collate_fn=pil_to_tensor_collate, num_workers=0)

def evaluate_model(model, dataloader, T, device="cuda"):
    model.eval()  # set to eval mode
    total_loss = 0.0
    num_batches = 0
    print(device)
    with torch.no_grad():  # no gradients for evaluation
        for step, batch in enumerate(dataloader):
            x0 = batch[0].to(device, non_blocking=True)
            
            # random timesteps for this batch
            t = torch.randint(0, T, (x0.size(0),), device=device).long()
            
            # compute loss (noise prediction error)
            loss = get_loss(model, x0, t)
            
            total_loss += loss.item()
            num_batches += 1
            print(f"step is {step}")

    avg_loss = total_loss / num_batches
    return avg_loss

# Usage after training
test_loss = evaluate_model(model, test_loader, T)
print(f"Average test loss: {test_loss:.6f}")


In [None]:
import torch
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt

@torch.no_grad()
def estimate_x0_from_xt(x_t, t, eps_pred):

    s1mc_t = get_index_from_list(sqrt_one_minus_alphas_cumprod, t, x_t.shape)  # √(1 - a_bar_t)
    sqrt_a_bar_t = get_index_from_list(sqrt_alphas_cumprod, t, x_t.shape)      # √(a_bar_t)
    x0_hat = (x_t - s1mc_t * eps_pred) / (sqrt_a_bar_t + 1e-8)
    return x0_hat

@torch.no_grad()
def denoise_batch_examples(model, dataloader, T, device, max_samples):

    model.eval()
    out = []
    taken = 0
    for batch in dataloader:
        x0 = batch[0].to(device, non_blocking=True)  # clean images in [-1, 1]
        B = x0.size(0)

        # random t per image
        t = torch.randint(0, T, (B,), device=device).long()

        # create noisy x_t
        x_t, true_noise = forward_diffusion_sample(x0, t)

        # predict noise with the model
        eps_pred = model(x_t, t)

        # estimate x0
        x0_hat = estimate_x0_from_xt(x_t, t, eps_pred)

        for i in range(B):
            out.append((x0[i].detach().cpu(), x_t[i].detach().cpu(), x0_hat[i].detach().cpu(), int(t[i].item())))
            taken += 1
            if taken >= max_samples:
                return out
    return out

def tensor_to_uint8(image_tensor):
    """
    Convert a single image tensor in [-1,1], shape (C,H,W), to uint8 HWC for PSNR.
    """
    img = (image_tensor.clamp(-1, 1) + 1) / 2.0  # to [0,1]
    img = (img * 255.0).round().byte().permute(1, 2, 0).numpy()
    return img

def compute_metrics(x0, x0_hat):

    l1 = F.l1_loss(x0, x0_hat).item()
    mse = F.mse_loss(x0, x0_hat).item()

    # PSNR computed in [0,255] space
    x0_u8     = tensor_to_uint8(x0)
    x0_hat_u8 = tensor_to_uint8(x0_hat)
    mse_255 = np.mean((x0_u8.astype(np.float32) - x0_hat_u8.astype(np.float32))**2)
    if mse_255 <= 1e-10:
        psnr = float('inf')
    else:
        psnr = 10.0 * np.log10((255.0**2) / mse_255)
    return l1, mse, psnr

def plot_denoise_triplets(triplets):

    n = len(triplets)
    plt.figure(figsize=(12, 4 * n))
    all_l1, all_mse, all_psnr = [], [], []

    for i, (x0, x_t, x0_hat, t_int) in enumerate(triplets, start=1):
        l1, mse, psnr = compute_metrics(x0, x0_hat)
        all_l1.append(l1); all_mse.append(mse); all_psnr.append(psnr)

        # Clean
        ax = plt.subplot(n, 3, 3*(i-1) + 1)
        ax.set_title(f"Clean x0")
        ax.axis('off')
        show_tensor_image(x0.unsqueeze(0))  # your helper supports batch or single

        # Noisy
        ax = plt.subplot(n, 3, 3*(i-1) + 2)
        ax.set_title(f"Noisy x_t (t={t_int})")
        ax.axis('off')
        show_tensor_image(x_t.unsqueeze(0))

        # Reconstructed
        ax = plt.subplot(n, 3, 3*(i-1) + 3)
        ax.set_title(f"Reconstruction x0_hat\nL1={l1:.4f}  MSE={mse:.4f}  PSNR={psnr:.2f} dB")
        ax.axis('off')
        show_tensor_image(x0_hat.unsqueeze(0))

    plt.tight_layout()
    # Print batch averages
    print(f"[Batch metrics]  L1: {np.mean(all_l1):.4f}  MSE: {np.mean(all_mse):.4f}  PSNR: {np.mean(all_psnr):.2f} dB")

@torch.no_grad()
def evaluate_and_visualize(model, test_loader, T, device, max_samples):

    triplets = denoise_batch_examples(model, test_loader, T, device, max_samples=max_samples)
    plot_denoise_triplets(triplets)


In [None]:
evaluate_and_visualize(model, test_loader, T, "cuda", 10)