# Lab 4, Fast Sampling

지금까지 sampling을 통해 denoising하는 방식에 대해 알아봤습니다.  
하지만 사실 지금까지 공부한 sampling 방식은 굉장히 느립니다.  
본 강의에서는 속도를 개선한 sampling 방식을 다룹니다.

In [None]:
from typing import Dict, Tuple
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import models, transforms
from torchvision.utils import save_image, make_grid
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation, PillowWriter
import numpy as np
from IPython.display import HTML
from diffusion_utilities import *

# Setting Things Up

In [None]:
class ContextUnet(nn.Module):
    def __init__(self, in_channels, n_feat=256, n_cfeat=10, height=28):  # cfeat - context features
        super(ContextUnet, self).__init__()

        # 입력 채널의 수, 중간 피쳐맵의 수, 클래스의 수
        self.in_channels = in_channels
        self.n_feat = n_feat
        self.n_cfeat = n_cfeat
        self.h = height  # h == w 라고 가정합니다. 단, 28,24,20처럼, 반드시 4로 나누어 떨어져야 합니다. 

        # 초기 컨볼루션 레이어를 초기화합니다.
        self.init_conv = ResidualConvBlock(in_channels, n_feat, is_res=True)

        # U-Net을 두 단계에 걸쳐 다운샘플링합니다.
        self.down1 = UnetDown(n_feat, n_feat)        # down1 #[10, 256, 8, 8]
        self.down2 = UnetDown(n_feat, 2 * n_feat)    # down2 #[10, 256, 4,  4]
        
        # 기존: self.to_vec = nn.Sequential(nn.AvgPool2d(7), nn.GELU())
        self.to_vec = nn.Sequential(nn.AvgPool2d((4)), nn.GELU())

        # 한 층의 fully connected neural network에 타입 스텝과 context label을 삽입합니다.
        self.timeembed1 = EmbedFC(1, 2*n_feat)
        self.timeembed2 = EmbedFC(1, 1*n_feat)
        self.contextembed1 = EmbedFC(n_cfeat, 2*n_feat)
        self.contextembed2 = EmbedFC(n_cfeat, 1*n_feat)

        # 세 단계에 걸쳐 U-Net을 업샘플링합니다.
        self.up0 = nn.Sequential(
            nn.ConvTranspose2d(2 * n_feat, 2 * n_feat, self.h//4, self.h//4), # up-sample  
            nn.GroupNorm(8, 2 * n_feat), # normalize                       
            nn.ReLU(),
        )
        self.up1 = UnetUp(4 * n_feat, n_feat)
        self.up2 = UnetUp(2 * n_feat, n_feat)

        # 마지막 컨볼루션 레이어를 입력 이미지의 채널 수와 일치할 수 있도록 초기화합니다.
        self.out = nn.Sequential(
            nn.Conv2d(2 * n_feat, n_feat, 3, 1, 1), # 피쳐맵의 수를 줄입니다.   #in_channels, out_channels, kernel_size, stride=1, padding=0
            nn.GroupNorm(8, n_feat), # 정규화
            nn.ReLU(),
            nn.Conv2d(n_feat, self.in_channels, 3, 1, 1), # 입력과 동일한 채널로 맞춰줍니다.
        )

    def forward(self, x, t, c=None):
        """
        x : (batch, n_feat, h, w) : input image
        t : (batch, n_cfeat)      : time step
        c : (batch, n_classes)    : context label
        """
        # x는 입력 이미지, c는 context label, t는 타임 스텝입니다. 

        # 초기 컨볼루션 레이어를 통해 입력 이미지를 전달합니다.
        x = self.init_conv(x)
        # 다운 샘플링을 통해 결과를 전달합니다.
        down1 = self.down1(x)       #[10, 256, 8, 8]
        down2 = self.down2(down1)   #[10, 256, 4, 4]
        
        # 피쳐맵을 벡터로 전환하고 활성화함수를 적용합니다.
        hiddenvec = self.to_vec(down2)
        
        # 만약 context_mask가 1이라면 마스크를 지웁니다.
        if c is None:
            c = torch.zeros(x.shape[0], self.n_cfeat).to(x)
            
        # context와 타임 스텝을 삽입합니다.
        cemb1 = self.contextembed1(c).view(-1, self.n_feat * 2, 1, 1)     # (batch, 2*n_feat, 1,1)
        temb1 = self.timeembed1(t).view(-1, self.n_feat * 2, 1, 1)
        cemb2 = self.contextembed2(c).view(-1, self.n_feat, 1, 1)
        temb2 = self.timeembed2(t).view(-1, self.n_feat, 1, 1)
        #print(f"uunet forward: cemb1 {cemb1.shape}. temb1 {temb1.shape}, cemb2 {cemb2.shape}. temb2 {temb2.shape}")


        up1 = self.up0(hiddenvec)
        up2 = self.up1(cemb1*up1 + temb1, down2)  # 임베딩을 더하고 곱해줍니다.
        up3 = self.up2(cemb2*up2 + temb2, down1)
        out = self.out(torch.cat((up3, x), 1))
        return out


In [None]:
# hyperparameters

# diffusion hyperparameters
timesteps = 500
beta1 = 1e-4
beta2 = 0.02

# network hyperparameters
device = torch.device("cuda:0" if torch.cuda.is_available() else torch.device('cpu'))
n_feat = 64 # 64 hidden dimension feature
n_cfeat = 5 # context vector is of size 5
height = 16 # 16x16 image
save_dir = './weights/'

# training hyperparameters
batch_size = 100
n_epoch = 32
lrate=1e-3

In [None]:
# construct DDPM noise schedule
b_t = (beta2 - beta1) * torch.linspace(0, 1, timesteps + 1, device=device) + beta1
a_t = 1 - b_t
ab_t = torch.cumsum(a_t.log(), dim=0).exp()    
ab_t[0] = 1

In [None]:
# construct model
nn_model = ContextUnet(in_channels=3, n_feat=n_feat, n_cfeat=n_cfeat, height=height).to(device)

# Fast Sampling

기존의 DDPM 대신 DDIM을 사용하여 denoising합니다.  
어떤 차이점을 지녔는지에 대해 깊게 공부하고 싶다면 따로 검색해봐야 합니다.

In [None]:
# DDIM를 위한 샘플링 함수를 정의합니다.
# DDIM을 사용하여 노이즈를 제거합니다.
def denoise_ddim(x, t, t_prev, pred_noise):
    ab = ab_t[t]
    ab_prev = ab_t[t_prev]
    
    x0_pred = ab_prev.sqrt() / ab.sqrt() * (x - (1 - ab).sqrt() * pred_noise)
    dir_xt = (1 - ab_prev).sqrt() * pred_noise

    return x0_pred + dir_xt

In [None]:
# 모델 가중치를 불러오고 평가 모드로 세팅합니다.
nn_model.load_state_dict(torch.load(f"{save_dir}/model_31.pth", map_location=device))
nn_model.eval() 
print("Loaded in Model without context")

In [None]:
# DDIM를 사용하여 빠르게 샘플링하기
@torch.no_grad()
def sample_ddim(n_sample, n=20):
    # x_T ~ N(0, 1), 초기 노이즈를 샘플링합니다.
    samples = torch.randn(n_sample, 3, height, height).to(device)  

    # 생성된 스텝을 추적하여 그래프로 그리기 위한 배열
    intermediate = [] 
    step_size = timesteps // n
    for i in range(timesteps, 0, -step_size):
        print(f'sampling timestep {i:3d}', end='\r')

        # 시간 텐서 모양 조정
        t = torch.tensor([i / timesteps])[:, None, None, None].to(device)

        eps = nn_model(samples, t)    # predict noise e_(x_t,t)
        samples = denoise_ddim(samples, i, i - step_size, eps)
        intermediate.append(samples.detach().cpu().numpy())

    intermediate = np.stack(intermediate)
    return samples, intermediate

In [None]:
# 샘플을 시각화합니다.
plt.clf()
samples, intermediate = sample_ddim(32, n=25)
animation_ddim = plot_sample(intermediate,32,4,save_dir, "ani_run", None, save=False)
HTML(animation_ddim.to_jshtml())

In [None]:
# 모델 가중치를 불러오고 평가 모드로 세팅합니다.
nn_model.load_state_dict(torch.load(f"{save_dir}/context_model_31.pth", map_location=device))
nn_model.eval() 
print("Loaded in Context Model")

In [None]:
# 컨텍스트를 포함한 fast 샘플링 알고리즘
@torch.no_grad()
def sample_ddim_context(n_sample, context, n=20):
    # x_T ~ N(0, 1), 초기 노이즈 샘플링
    samples = torch.randn(n_sample, 3, height, height).to(device)  

    # 생성된 스텝을 추적하여 그래프로 그리기 위한 배열
    intermediate = [] 
    step_size = timesteps // n
    for i in range(timesteps, 0, -step_size):
        print(f'sampling timestep {i:3d}', end='\r')

        # 시간 텐서 재조정
        t = torch.tensor([i / timesteps])[:, None, None, None].to(device)

        eps = nn_model(samples, t, c=context)    # predict noise e_(x_t,t)
        samples = denoise_ddim(samples, i, i - step_size, eps)
        intermediate.append(samples.detach().cpu().numpy())

    intermediate = np.stack(intermediate)
    return samples, intermediate

In [None]:
# 샘플 시각화
plt.clf()
ctx = F.one_hot(torch.randint(0, 5, (32,)), 5).to(device=device).float()
samples, intermediate = sample_ddim_context(32, ctx)
animation_ddpm_context = plot_sample(intermediate,32,4,save_dir, "ani_run", None, save=False)
HTML(animation_ddpm_context.to_jshtml())

#### 그렇다면 실제 DDPM과 DDIM의 속도를 비교해봅시다.

In [None]:
# helper function; 예측되는 노이즈를 제거하는 함수입니다.
# (하지만 이미지가 붕괴하는 것을 방지하기 위해 일부 노이즈는 다시 더합니다)
def denoise_add_noise(x, t, pred_noise, z=None):
    if z is None:
        z = torch.randn_like(x)
    noise = b_t.sqrt()[t] * z
    mean = (x - pred_noise * ((1 - a_t[t]) / (1 - ab_t[t]).sqrt())) / a_t[t].sqrt()
    return mean + noise

In [None]:
# 표준 알고리즘을 사용하여 샘플링합니다.
@torch.no_grad()
def sample_ddpm(n_sample, save_rate=20):
    # x_T ~ N(0, 1), sample initial noise
    samples = torch.randn(n_sample, 3, height, height).to(device)  

    # 그래프를 기리기 위해서 각 스텝을 트래킹합니다.
    intermediate = [] 
    for i in range(timesteps, 0, -1):
        print(f'sampling timestep {i:3d}', end='\r')

        # 시간 텐서의 모양을 조정합니다.
        t = torch.tensor([i / timesteps])[:, None, None, None].to(device)

        # 일부 노이즈를 골라 더합니다. 만약 i가 1이라면 더하지 않습니다.
        z = torch.randn_like(samples) if i > 1 else 0

        eps = nn_model(samples, t)    # predict noise e_(x_t,t)
        samples = denoise_add_noise(samples, i, eps, z)
        if i % save_rate ==0 or i==timesteps or i<8:
            intermediate.append(samples.detach().cpu().numpy())

    intermediate = np.stack(intermediate)
    return samples, intermediate

In [None]:
%timeit -r 1 sample_ddim(32, n=25)
%timeit -r 1 sample_ddpm(32, )

# Acknowledgments
Sprites by ElvGames, [FrootsnVeggies](https://zrghr.itch.io/froots-and-veggies-culinary-pixels) and  [kyrise](https://kyrise.itch.io/)   
This code is modified from, https://github.com/cloneofsimo/minDiffusion   
Diffusion model is based on [Denoising Diffusion Probabilistic Models](https://arxiv.org/abs/2006.11239) and [Denoising Diffusion Implicit Models](https://arxiv.org/abs/2010.02502)
