# Lab 4, Fast Sampling

In [1]:
# from google.colab import drive
# drive.mount("/gdrive")
# !ln -s "/gdrive/MyDrive/Doutorado/diffusion-utils" "/content/diffusion-utils"
#
# import sys
# sys.path.insert(0, "/content/diffusion-utils")

In [2]:
from typing import Dict, Tuple
from tqdm.notebook import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import models, transforms
from torchvision.utils import save_image, make_grid
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation, PillowWriter
import numpy as np
from IPython.display import HTML
from diffusion_utilities import *

In [3]:
print(torch.__version__)

2.5.1+cu124


# Setting Things Up

In [4]:
class ContextUnet(nn.Module):
    def __init__(self, in_channels, n_feat=64, n_cfeat=5, height=64):  # cfeat - context features
        super(ContextUnet, self).__init__()

        # number of input channels, number of intermediate feature maps and number of classes
        self.in_channels = in_channels
        self.n_feat = n_feat
        self.n_cfeat = n_cfeat
        self.h = height  #assume h == w. must be divisible by 4, so 28,24,20,16...

        # Initialize the initial convolutional layer
        self.init_conv = ResidualConvBlock(in_channels, n_feat, is_res=True)
                                                     # init_ #[b, 64, 64, 64]

        # Initialize the down-sampling path of the U-Net with two levels
        self.down1 = UnetDown(  n_feat, n_feat)        # down1 #[b,  64, 32, 32]
        self.down2 = UnetDown(  n_feat, 2 * n_feat)    # down2 #[b, 128, 16, 16]
        self.down3 = UnetDown(2 * n_feat, 4 * n_feat)    # down3 #[b, 256,  8,  8]
        self.down4 = UnetDown(4 * n_feat, 8 * n_feat)    # down4 #[b, 512,  4,  4]

         # original: self.to_vec = nn.Sequential(nn.AvgPool2d(7), nn.GELU())
        self.to_vec = nn.Sequential(nn.AvgPool2d(4), nn.GELU())
                                                     # to_ve #[b, 512,  1,  1]

        # Embed the timestep and context labels with a one-layer fully connected neural network
        self.timeembed1 = EmbedFC(1, 8*n_feat)
        self.timeembed2 = EmbedFC(1, 4*n_feat)
        self.timeembed3 = EmbedFC(1, 2*n_feat)
        self.timeembed4 = EmbedFC(1, 1*n_feat)
        self.contextembed1 = EmbedFC(n_cfeat, 8*n_feat)
        self.contextembed2 = EmbedFC(n_cfeat, 4*n_feat)
        self.contextembed3 = EmbedFC(n_cfeat, 2*n_feat)
        self.contextembed4 = EmbedFC(n_cfeat, 1*n_feat)

        # Initialize the up-sampling path of the U-Net with four levels
        self.up0 = nn.Sequential(
            nn.ConvTranspose2d(8 * n_feat, 8 * n_feat, 4, 4),  # alterei!! h//4 para 4
            nn.GroupNorm(8, 8 * n_feat), # normalize
            nn.ReLU(),
        )
        #up0 [b, 512,  1,  1]
        self.up1 = UnetUp(16 * n_feat, 4 * n_feat) #[b, 256,  4,  4]
        self.up2 = UnetUp( 8 * n_feat, 2 * n_feat) #[b, 128,  8,  8]
        self.up3 = UnetUp( 4 * n_feat, 1 * n_feat) #[b,  64, 16, 16]
        self.up4 = UnetUp( 2 * n_feat, 1 * n_feat) #[b,  32, 32, 32]

        # Initialize the final convolutional layers to map to the same number of channels as the input image
        self.out = nn.Sequential(
            nn.Conv2d(2 * n_feat, n_feat, 3, 1, 1), # reduce number of feature maps   #in_channels, out_channels, kernel_size, stride=1, padding=0
            nn.GroupNorm(8, n_feat), # normalize
            nn.ReLU(),
            nn.Conv2d(n_feat, self.in_channels, 3, 1, 1), # map to same number of channels as input
        )

    def forward(self, x, t, c=None):
        """
        x : (batch, n_feat, h, w) : input image
        t : (batch, n_cfeat)      : time step
        c : (batch, n_classes)    : context label
        """
        # x is the input image, c is the context label, t is the timestep, context_mask says which samples to block the context on

        # pass the input image through the initial convolutional layer
        x = self.init_conv(x)
        # pass the result through the down-sampling path
        down1 = self.down1(x)       #[b,  64, 32, 32]
        down2 = self.down2(down1)   #[b, 128, 16, 16]
        down3 = self.down3(down2)   #[b, 256,  8,  8]
        down4 = self.down4(down3)   #[b, 512,  4,  4]
        # print("down1.shape", down1.shape)
        # print("down2.shape", down2.shape)
        # print("down3.shape", down3.shape)
        # print("down4.shape", down4.shape)

        # convert the feature maps to a vector and apply an activation
        hiddenvec = self.to_vec(down4)
        # print("hiddenvec.shape", hiddenvec.shape)
                                    #[b, 128,  1,  1]

        # mask out context if context_mask == 1
        if c is None:
            c = torch.zeros(x.shape[0], self.n_cfeat).to(x)

        # embed context and timestep
        cemb1 = self.contextembed1(c).view(-1, self.n_feat * 8, 1, 1)     # (batch, 2*n_feat, 1,1)
        temb1 = self.timeembed1(t).view(-1, self.n_feat * 8, 1, 1)
        cemb2 = self.contextembed2(c).view(-1, self.n_feat * 4, 1, 1)
        temb2 = self.timeembed2(t).view(-1, self.n_feat * 4, 1, 1)
        cemb3 = self.contextembed3(c).view(-1, self.n_feat * 2, 1, 1)
        temb3 = self.timeembed3(t).view(-1, self.n_feat * 2, 1, 1)
        cemb4 = self.contextembed4(c).view(-1, self.n_feat, 1, 1)
        temb4 = self.timeembed4(t).view(-1, self.n_feat, 1, 1)
        #print(f"uunet forward: cemb1 {cemb1.shape}. temb1 {temb1.shape}, cemb2 {cemb2.shape}. temb2 {temb2.shape}")

        # print("bf up0")
        #[b, 128,  4,  4]
        up1 = self.up0(hiddenvec)
        # print("up1.shape", up1.shape)
        # print("(cemb1*up1 + temb1).shape", (cemb1*up1 + temb1).shape)
        up2 = self.up1(cemb1*up1 + temb1, down4)
        # print("up2.shape", up2.shape)
        up3 = self.up2(cemb2*up2 + temb2, down3)
        # print("up3.shape", up3.shape)
        up4 = self.up3(cemb3*up3 + temb3, down2)
        # print("up4.shape", up4.shape)
        up5 = self.up4(cemb4*up4 + temb4, down1)
        # print("up5.shape", up5.shape)
        out = self.out(torch.cat((up5, x), 1))
        # print("out.shape", out.shape)
        return out


In [5]:
# hyperparameters

# diffusion hyperparameters
timesteps = 500
beta1 = 1e-4
beta2 = 0.02

# network hyperparameters
device = torch.device("cuda:0" if torch.cuda.is_available() else torch.device('cpu'))
print("Device:", device)
n_feat = 64 # 64 hidden dimension feature
n_cfeat = 5 # context vector is of size 5
height = 64 # 16x16 image
save_dir = './weights/'

# training hyperparameters
batch_size = 100
n_epoch = 5
lrate=1e-3

Device: cuda:0


In [6]:
# construct DDPM noise schedule
b_t = (beta2 - beta1) * torch.linspace(0, 1, timesteps + 1, device=device) + beta1
a_t = 1 - b_t
ab_t = torch.cumsum(a_t.log(), dim=0).exp()
ab_t[0] = 1

In [7]:
# construct model
nn_model = ContextUnet(in_channels=4, n_feat=n_feat, n_cfeat=n_cfeat, height=height).to(device)
total_params = sum(p.numel() for p in nn_model.parameters() if p.requires_grad)

print(f'Trainable parameters: {total_params:,}')

Trainable parameters: 20,674,244


In [8]:
# load dataset and construct optimizer
dataset = CustomDataset("./data/train-images-FRONT.npy", "./data/train-classes-FRONT.npy", transform, null_context=False)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=1)
optim = torch.optim.Adam(nn_model.parameters(), lr=lrate)

sprite shape: (12074, 64, 64, 4)
labels shape: (12074, 5)


In [9]:
# helper function: perturbs an image to a specified noise level
def perturb_input(x, t, noise):
    return ab_t.sqrt()[t, None, None, None] * x + (1 - ab_t[t, None, None, None]) * noise

In [None]:
# training with context code
# set into train mode
nn_model.train()

for ep in (pbar_epochs := tqdm(range(n_epoch))):
    # print(f'epoch {ep}/{n_epoch}')
    pbar_epochs.set_description(f'Training Epoch {ep}')

    # linearly decay learning rate
    optim.param_groups[0]['lr'] = lrate*(1-ep/n_epoch)

    for x, c in (pbar_batches := tqdm(dataloader, mininterval=2, leave=False)):
        pbar_batches.set_description(f'Minibatches')
        optim.zero_grad()
        x = x.to(device)
        c = c.to(x)

        # randomly mask out c
        context_mask = torch.bernoulli(torch.zeros(c.shape[0]) + 0.9).to(device)
        c = c * context_mask.unsqueeze(-1)

        # perturb data
        noise = torch.randn_like(x)
        t = torch.randint(1, timesteps + 1, (x.shape[0],)).to(device)
        x_pert = perturb_input(x, t, noise)

        # use network to recover noise
        pred_noise = nn_model(x_pert, t / timesteps, c=c)

        # loss is mean squared error between the predicted and true noise
        loss = F.mse_loss(pred_noise, noise)
        loss.backward()

        optim.step()

    # save model periodically
    if ep%4==0 or ep == int(n_epoch-1):
        if not os.path.exists(save_dir):
            os.mkdir(save_dir)
        torch.save(nn_model.state_dict(), save_dir + f"context_model_{ep}.pth")
        print('saved model at ' + save_dir + f"context_model_{ep}.pth")

  0%|          | 0/5 [00:00<?, ?it/s]

  0%|          | 0/121 [00:00<?, ?it/s]

# Fast Sampling

In [None]:
# define sampling function for DDIM
# removes the noise using ddim
def denoise_ddim(x, t, t_prev, pred_noise):
    ab = ab_t[t]
    ab_prev = ab_t[t_prev]

    x0_pred = ab_prev.sqrt() / ab.sqrt() * (x - (1 - ab).sqrt() * pred_noise)
    dir_xt = (1 - ab_prev).sqrt() * pred_noise

    return x0_pred + dir_xt

In [None]:
# sample quickly using DDIM
@torch.no_grad()
def sample_ddim(n_sample, n=20):
    # x_T ~ N(0, 1), sample initial noise
    samples = torch.randn(n_sample, 3, height, height).to(device)

    # array to keep track of generated steps for plotting
    intermediate = []
    step_size = timesteps // n
    for i in range(timesteps, 0, -step_size):
        print(f'sampling timestep {i:3d}', end='\r')

        # reshape time tensor
        t = torch.tensor([i / timesteps])[:, None, None, None].to(device)

        eps = nn_model(samples, t)    # predict noise e_(x_t,t)
        samples = denoise_ddim(samples, i, i - step_size, eps)
        intermediate.append(samples.detach().cpu().numpy())

    intermediate = np.stack(intermediate)
    return samples, intermediate

In [None]:
# load in model weights and set to eval mode
nn_model.load_state_dict(torch.load(f"{save_dir}/context_model_{n_epoch-1}.pth", map_location=device))
nn_model.eval()
print("Loaded in Context Model")

In [None]:
# fast sampling algorithm with context
@torch.no_grad()
def sample_ddim_context(n_sample, context, n=20):
    # x_T ~ N(0, 1), sample initial noise
    samples = torch.randn(n_sample, 4, height, height).to(device)

    # array to keep track of generated steps for plotting
    intermediate = []
    step_size = timesteps // n
    for i in range(timesteps, 0, -step_size):
        print(f'sampling timestep {i:3d}', end='\r')

        # reshape time tensor
        t = torch.tensor([i / timesteps])[:, None, None, None].to(device)

        eps = nn_model(samples, t, c=context)    # predict noise e_(x_t,t)
        samples = denoise_ddim(samples, i, i - step_size, eps)
        intermediate.append(samples.detach().cpu().numpy())

    intermediate = np.stack(intermediate)
    return samples, intermediate

In [None]:
# visualize samples
plt.clf()
ctx = F.one_hot(torch.randint(0, 5, (32,)), 5).to(device=device).float()
samples, intermediate = sample_ddim_context(32, ctx)
animation_ddim_context = plot_sample(intermediate,32,4,save_dir, "ani_run", None, save=False)
HTML(animation_ddim_context.to_jshtml())

#### Compare DDPM, DDIM speed

In [None]:
# helper function; removes the predicted noise (but adds some noise back in to avoid collapse)
def denoise_add_noise(x, t, pred_noise, z=None):
    if z is None:
        z = torch.randn_like(x)
    noise = b_t.sqrt()[t] * z
    mean = (x - pred_noise * ((1 - a_t[t]) / (1 - ab_t[t]).sqrt())) / a_t[t].sqrt()
    return mean + noise

In [None]:
# sample with context using standard algorithm
@torch.no_grad()
def sample_ddpm_context(n_sample, context, save_rate=20):
    # x_T ~ N(0, 1), sample initial noise
    samples = torch.randn(n_sample, 4, height, height).to(device)

    # array to keep track of generated steps for plotting
    intermediate = []
    for i in range(timesteps, 0, -1):
        print(f'sampling timestep {i:3d}', end='\r')

        # reshape time tensor
        t = torch.tensor([i / timesteps])[:, None, None, None].to(device)

        # sample some random noise to inject back in. For i = 1, don't add back in noise
        z = torch.randn_like(samples) if i > 1 else 0

        eps = nn_model(samples, t, c=context)    # predict noise e_(x_t,t, ctx)
        samples = denoise_add_noise(samples, i, eps, z)
        if i % save_rate==0 or i==timesteps or i<8:
            intermediate.append(samples.detach().cpu().numpy())

    intermediate = np.stack(intermediate)
    return samples, intermediate

In [None]:
# visualize samples
plt.clf()
ctx = F.one_hot(torch.randint(0, 5, (32,)), 5).to(device=device).float()
samples, intermediate = sample_ddpm_context(32, ctx)
animation_ddpm_context = plot_sample(intermediate,32,4,save_dir, "ani_run", None, save=False)
HTML(animation_ddpm_context.to_jshtml())

In [None]:
# %timeit -r 1 sample_ddim(32, n=25)
# %timeit -r 1 sample_ddpm(32, )

# Acknowledgments
Sprites by ElvGames, [FrootsnVeggies](https://zrghr.itch.io/froots-and-veggies-culinary-pixels) and  [kyrise](https://kyrise.itch.io/)   
This code is modified from, https://github.com/cloneofsimo/minDiffusion   
Diffusion model is based on [Denoising Diffusion Probabilistic Models](https://arxiv.org/abs/2006.11239) and [Denoising Diffusion Implicit Models](https://arxiv.org/abs/2010.02502)
