# Sampling

In [None]:
from typing import Dict, Tuple 
from tqdm import tqdm 
import torch
import torch.nn as nn

import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import transforms, models
from torchvision.utils import save_image, make_grid
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation, PillowWriter
import numpy as np
from diffusion_utilities import *

### Setting things up


In [None]:
class ContextUnet(nn.Module):
    def __init__(self, in_channels, n_feat=256, n_cfeat=10, height=28) -> None:
        # c_feat means the number of context features
        super().__init__()
        
        # number of input channels, number of intermediate feature maps and number of classes
        self.in_channels = in_channels
        self.n_feat = n_feat
        self.n_cfeat = n_cfeat
        self.h = height # assume w == h, the value of height must be divisible by 4
        
        # initialize the initial convolustional layer
        self.init_conv = ResidualConvBlock(in_channels, n_feat, is_res=True)
        
        # initialize the down-sampling path of the U-Net
        self.down1 = UnetDown(n_feat, n_feat)
        self.down2 = UnetDown(n_feat, 2* n_feat)
        
        self.to_vec = nn.Sequential(nn.AvgPool2d((4)), nn.GELU())
        
        # embed the timestep and context labels with a one-layer fully connected neural network
        self.timeembed1 = EmbedFC(1, 2*n_feat)
        self.timeembed2 = EmbedFC(1, 1*n_feat)
        self.contextembed1 = EmbedFC(n_cfeat, 2*n_feat)
        self.contextembed2 = EmbedFC(n_cfeat, 1*n_feat)
        
        # initialize the up-sampling path of the U-net with three levels
        self.up0 = nn.Sequential(
            nn.ConvTranspose2d(2 * n_feat, 2 * n_feat, self.h //4, self.h//4),
            nn.GroupNorm(8, 2 * n_feat),
            nn.ReLU()
        )
        self.up1 = UnetUp(4 * n_feat, n_feat)
        self.up2 = UnetUp(2 * n_feat, n_feat)
        
        # initialize the final convolutional layers to map to the same number of channels as the input image
        self.out = nn.Sequential(
            nn.Conv2d(2*n_feat, n_feat, 3, 1, 1),
            nn.GroupNorm(8, n_feat),
            nn.ReLU(),
            nn.Conv2d(n_feat, self.in_channels, 3, 1, 1)
        )
        
    def forward(self, x, t, c=None):
        """
        x: (batch, n_feat, h, w) : input image
        t: (batch, n_cfeat) : time step
        c: (batch, n_classes) : context label
        """
        x = self.init_conv(x)
        down1 = self.down1(x)
        down2 = self.down2(down1)
        
        hiddenvec = self.to_vec(down2)
        
        if c is None:
            c = torch.zeros(x.shape[0], self.n_cfeat).to(x)
            
        # embed context and timestep
        cemb1 = self.contextembed1(c).view(-1, self.n_feat * 2, 1, 1)
        temb1 = self.timeembed1(t).view(-1, self.n_feat * 2, 1, 1)
        cemb2 = self.contextembed2(c).view(-1, self.n_feat, 1, 1)
        temb2 = self.timeembed2(t).view(-1, self.n_feat, 1, 1)
        
        up1 = self.up0(hiddenvec)
        up2 = self.up1(cemb1*up1 + temb1, down2)
        up3 = self.up2(cemb2 * up2 + temb2, down1)
        out = self.out(torch.cat((up3, x), 1))
        
        return out
        
        

In [None]:
# Training hyperparameters

# diffusion hyperparameters
timesteps = 500
beta1 = 1e-4
beta2 = 0.02

# network hyperparameters
device = torch.device("cuda:0" if torch.cuda.is_available() else torch.device('cpu'))
n_feat = 64
n_cfeat = 5
height = 16
save_dir = './weights/'

In [None]:
b_t = (beta2 - beta1) * torch.linspace(0, 1, timesteps + 1, device=device) + beta1
a_t = 1 - b_t
ab_t = torch.cumsum(a_t.log(), dim=0).exp()
ab_t[0] = 1

In [None]:
# construct model
nn_model = ContextUnet(in_channels=3, n_feat=n_feat, n_cfeat=n_cfeat, height=height).to(device)

## Sampling

In [None]:
# helper function; remvoes the predicted noise (but adds some noise back in to avoid collapse)
def denoise_add_noise(x, t, pred_noise, z=None):
    if z is None:
        z = torch.randn_like(x)
    noise = b_t.sqrt()[t] * z
    mean = (x - pred_noise * ((1 - a_t[t]) / (1 - ab_t[t]).sqrt())) / a_t[t].sqrt()
    return mean + noise

In [None]:
# load in model weights and set to eval mode
nn_model.load_state_dict(torch.load(f"{save_dir}/model_trained.pth", map_location=device))
nn_model.eval()
print("loaded in model")

In [None]:
@torch.no_grad()
def sample_ddqm(n_sample, save_rate=20):
    samples = torch.randn(n_sample, 3, height, height).to(device)
    
    intermediate = []
    for i in range(timesteps, 0, -1):
        print(f"sampling timestep {i:3d}", end="\r")
        t = torch.tensor([i / timesteps])[: None, None, None].to(device)
        z = torch.randn_like(samples) if i > 1 else 0
        eps = nn_model(samples, t)
        samples = denoise_add_noise(samples, i, eps, z)
        if (i % save_rate ) == 0 or i == timesteps or i < 8:
            intermediate.append(samples.detach().cpu().numpy())
            
    intermediate = np.stack(intermediate)
    return samples, intermediate
            
        

In [None]:
plt.clf()
samples, intermediate_ddqm = sample_ddqm(32)
animation_ddqm = plot_sample(intermediate_ddqm, 32, 4, save_dir, "ani_run", None, save=False)
HTML(animation_ddqm.to_jshtml())

##### Demonstrate incorrectly sample without adding the 'extra noise'

In [None]:
@torch.no_grad()
def sample_ddqm_incorrect(n_sample):
    samples = torch.randn(n_sample, 3, height, height).to(device)
    intermediate = []
    for i in range(timesteps, 0, -1):
        print(f'sampling timestep {i:3d}', end='\r')
        # reshape time tensor
        t = torch.tensor([i / timesteps])[:None, None, None].to(device)
        
        # don't add back the noise
        z = 0
        eps = nn_model(samples, t)
        samples = denoise_add_noise(samples, i, eps, z)
        if i%20 ==0 or i == timesteps or i < 8:
            intermediate.append(samples.detatch().cpu().numpy())
            
    intermediate = np.stack(intermediate)
    return samples, intermediate

In [None]:
plt.clf()
samples, intermediate = sample_ddqm_incorrect(32)
animation = plot_sample(intermediate, 32, 4, save_dir, "ani_run", None, save=False)
HTML(animation.to_jshtml())