# Introduction

Denoise Diffusion Probabilistic Models (DDPMs) are generative models based on the idea of reversing a noising process. The idea is fairly simple: Given a dataset, make it more and more noisy with a deterministic process. Then, learn a model that can undo this process.

DDPM-based models have recently drawn a lot of attention due to their high-quality samples. In this notebook, I re-implement the first and most fundamental paper to be familiar with when dealing with DDPMs: <i>Denoising Diffusion Probabilistic Models</i> (https://arxiv.org/pdf/2006.11239.pdf) by Ho et. al.

DDPM은 주어진 이미지에 time에 따른 상수의 파라미터를 갖는 작은 가우시안 노이즈를 time에 대해 더해나가는데, 

image가 destroy하게 되면 결국 noise의 형태로 남을것이다. (normal distribution을 따른다.) 

이런 상황에서 normal distribution 에 대한 noise가 주어졌을때 어떻게 복원할 것인가에 대한 문제이다.

그래서 주어진 Noise를 통해서 완전히 이미지를 복구가 된다면 image generation하는 것이 된다. 

이 논문에서는 diffusion probabilistic models의 과정을 보여준다. 

diffusion model은 유한한 시간 뒤에 이미지를 생성하는  variational inference을 통해 훈련된 Markov chain을 parameterized한 형태이다. 

Markov Chain은 이전의 샘플링이 현재 샘플링에 영향을 미치는 $𝑝(x_t∣x_{t-1})$ 형식을 의미한다. 

그래서 이 diffusion model에서의 한 방향에 대해서는 주어진 이미지에 작은 gaussian noise를 점진적으로 계속 더해서 완전히 image가 destroy 되게하는 과정을 의미한다.

![nn](DDPM.png)

`Forward Diffusion Process`

우선 주어진 이미지를 X0라고 정의함. 서서히 noise를 추가해가는 과정을 q라고한다면 X0에 noise를 적용하여 x1을 만드는 것을 $q(x_1∣x_0)$라고 표현할 수 있다.

이를 time t에 대해 General하게 표현한다면 $q(x_t∣x_{t-1})$ 으로 표현할 수 있다. 이를 forward process(diffusion process)라고 부른다. 이 때 우리가 지정한 time 수를 T이라 하였을 때,

완전히 noise에 의해 destroy된 형태인 XT를 구할수 있으며 이는 정규 분포 `N(XT;0,I)` 를 따름

즉, XT는 X0 에서 정규분포 노이즈 추가를 통하여 만들어 낼 수 있다는 걸 전제로 한다.

`Backward Diffusion Process`

Backward Process 는 제목 그대로 `q`와는 반대로 noise를 점진적으로 걷어내는 denoising process입니다.

그래서  $q(x_t∣x_{t-1})$ 와는 반대로 time이 뒤바뀐  $𝑝(x_{t-1}∣x_t)$ 라고 표현이 된다.

`Objective Function (𝐿)`

이제 이 모델의 목적을 생각을 해보면, 결국 **주어진 noise에 대해 어떻게 noise를 점진적으로 걷어낼 것이냐**의 문제이기 때문에 

우리는 그 방법을 위의 p를 통해서 해결할려고 한다.

<span style="color:#F8876D">**X𝑡가 들어왔을 때 Xt-1을 예측할 수 있게 된다면, 우리는 X0 또한 예측할 수 있다.**</span>

![nn](DDPM_loss.png)

그림을 보면 Loss function이 매우 복잡하다. 

쉽게 설명을 하자면 KL divergence(확률분포 차이 계산)을 통하여 

임의로 생성한 $\epsilon_\Theta$ 값과 $x_T$를 이용하여 정규분포 loss를 계산한다.

위 loss 값을 개선시켜 나가며 학습을 진행한다.

# Installs

필요한 패키지는 **einops** 와 **imageio** 패키지입니다. 

두 라이브러리는 DDPM 모델의 GIF 애니메이션 및 이미지 처리를 진행할 때 사용됩니다.

In [None]:
!pip3 install --upgrade pip
!pip3 install einops
!pip3 install imageio

# Imports and Definitions

In [None]:
# Import of libraries
import random
import imageio
import numpy as np
import math
from tqdm.auto import tqdm
import matplotlib.pyplot as plt
import einops
import torch
import torch.nn as nn
from torch.optim import Adam
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from torchvision.transforms import Compose, ToTensor, Lambda

from IPython import display
import torchvision.transforms as transforms
import torchvision.transforms.functional as F
from glob import glob
from PIL import Image
# Setting reproducibility
SEED = 0
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

## Execution options

설정해야 할 몇 가지 옵션은 다음과 같습니다.

- `params` 이라는 변수는 딕셔너리 정보값을 포함하고 있습니다. 딕셔너리란 key와 value 가 쌍을 이루는 구조로 데이터 관리가 용이하다는 장점이 있습니다.
 <br>ex) params['<span style="color:green">image_size</span>']==128

- `ModelConfig` 은 Class 형식으로 값을 포함하고 있습니다. Class의 값을 불러오기 위해서는 밑의 예시와 같이 사용하면 됩니다.
<br>ex) <span style="color:yellow">ModelConfig</span>.TIME_EMB_MULT==4

In [None]:

params={'image_size':128,
        'lr':2e-5,
        'batch_size':8,
        'epochs':30,
        'data_path':'../notices/data/colon/image/**/',
        'n_step':1000,
        'image_channels':3,}
class ModelConfig:
    BASE_CH = 128  # 128, 256, 256, 512, 512
    BASE_CH_MULT = (1, 2, 2, 4, 4) # 128, 64, 32, 16, 8
    APPLY_ATTENTION = (False, False, False, True, False)
    DROPOUT_RATE = 0.1
    TIME_EMB_MULT = 4 # 128

# Utility functions

다음은 두 가지 유틸리티 함수입니다. 

`show_images`를 사용하면 사용자 정의 제목과 함께 정사각형 모양의 패턴으로 이미지를 표시할 수 있는 반면, 

`show_fist_batch`는 단순히 DataLoader 객체의 첫 번째 배치에 있는 이미지를 표시합니다.

In [None]:
def show_images(images, title=""):
    """Shows the provided images as sub-pictures in a square"""

    # Converting images to CPU numpy arrays
    if type(images) is torch.Tensor:
        images = images.detach().cpu().numpy()
        images=np.transpose(images, (0,2, 3, 1))
        images=images-images.min()
        images=images/images.max()*255
        images=images.astype(np.uint8)
    # Defining number of rows and columns
    fig = plt.figure(figsize=(8, 8))
    rows = int(len(images) ** (1 / 2))
    cols = round(len(images) / rows)

    # Populating figure with sub-plots
    idx = 0
    for r in range(rows):
        for c in range(cols):
            fig.add_subplot(rows, cols, idx + 1)

            if idx < len(images):
                plt.imshow(images[idx])
                idx += 1
    fig.suptitle(title, fontsize=30)

    # Showing the figure
    plt.show()

In [None]:
def show_first_batch(loader):
    for batch in loader:
        show_images(batch, "Images in the first batch")
        break
    

## Loading data
대장 내시경 데이터 세트를 사용하며 대장내시경의 이미지 사이즈는 우리가 사용할 (128,128,3)이 아닙니다. 그렇기 때문에 이미지 사이즈를 수정할 필요가 있으며 이를 위하여 `torchvision.transform`을 사용합니다

**참고**: 일반적으로 하는 것처럼 `[0,1]`이 아닌 `[-1,1]` 범위의 이미지를 정규화하는 것이 중요합니다. 이는 DDPM 네트워크가 잡음 제거 프로세스 전반에 걸쳐 정규 분포 잡음을 예측하기 때문입니다.

In [None]:
# Loading the data (converting each image into a tensor and normalizing between [-1, 1])

class CustomDataset(Dataset):
    def __init__(self, args):
        super(Dataset, self).__init__()
        self.w = self.h = args['image_size']

        # image and mask
        self.image_path =glob(args['data_path']+'*.png')
        self.trans_1 = transforms.Compose(
            [
                transforms.Resize((args['image_size'],args['image_size']), interpolation=transforms.InterpolationMode.NEAREST)
            ]
        )
        
    def __len__(self):
        return len(self.image_path)

    def __getitem__(self, index):
        # load image
        image = Image.open(self.image_path[index])
        
        image = self.trans_1(F.to_tensor(image)) * 2.0 - 1.0
        
        return image
    
train_dataset=CustomDataset(params)
loader = DataLoader(train_dataset, params['batch_size'], shuffle=True)

In [None]:
len(train_dataset)

In [None]:
# Optionally, show a batch of regular images
show_first_batch(loader)

## Getting device

학습시 사용할 GPU를 설정합니다. CPU로 잡힐 경우 채팅 부탁드립니다. 정상적으로 GPU가 잡힌다면 `Using device: cuda	NVIDIA A100 80GB PCIe MIG 1g.10gb` 이렇게 떠야합니다

In [None]:
# Getting device
device = torch.device("cuda:3" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}\t" + (f"{torch.cuda.get_device_name(0)}" if torch.cuda.is_available() else "CPU"))

# Defining the DDPM module

DDPM PyTorch 모듈을 정의하고 진행합니다. 원칙적으로 DDPM 체계는 각 노이즈 제거 단계에 사용되는 모델 아키텍처와 독립적이므로 `network` 매개변수를 사용하여 구성되는 상위 수준 모델을 정의합니다.

- `n_steps`: 확산 단계 수 $T$;
- `min_beta`: 첫번째 $\beta_t$ 값 ($\beta_1$);
- `max_beta`: 마지막  $\beta_t$ 값 ($\beta_T$);
- `device`: 모델이 실행되는 장치입니다;
- `image_chw`: 이미지의 차원을 포함하는 튜플입니다.

MyDDPM의 `forward` 프로세스는 다음과 같은 속성의 이점을 얻습니다. 실제로 단계별로 천천히 노이즈를 추가할 필요는 없게 `alpha_bar` 계수를 사용하여 원하는 $t$ 단계로 직접 건너뛸 수 있습니다.

그래서 ,`backward` 프로세스에서 정규 분포 노이즈를 얻습니다.

이 구현에서 $t$는 `(N, 1)` 텐서로 가정됩니다. 여기서 `N`은 텐서 `x`에 있는 이미지 수(batch_size)입니다. 따라서 여러 이미지에 대해 서로 다른 시간 단계를 지원합니다.

In [None]:
# DDPM class
class MyDDPM(nn.Module):
    def __init__(self, network, n_steps=1000, min_beta=10 ** -4, max_beta=0.02, device=None, image_chw=(3, 128, 128)):
        super(MyDDPM, self).__init__()
        self.n_steps = n_steps
        self.device = device
        self.image_chw = image_chw
        self.network = network.to(device)
        self.betas = torch.linspace(min_beta, max_beta, n_steps).to(
            device)  # Number of steps is typically in the order of thousands
        self.alphas = 1 - self.betas
        self.alpha_bars = torch.tensor([torch.prod(self.alphas[:i + 1]) for i in range(len(self.alphas))]).to(device)

    def forward(self, x0, t, XTa=None):
        # Make input image more noisy (we can directly skip to the desired step)
        n, c, h, w = x0.shape
        a_bar = self.alpha_bars[t]

        if XTa is None:
            XTa = torch.randn(n, c, h, w).to(self.device)

        noisy = a_bar.sqrt().reshape(n, 1, 1, 1) * x0 + (1 - a_bar).sqrt().reshape(n, 1, 1, 1) * XTa
        return noisy

    def backward(self, x, t):
        # Run each image through the network for each timestep t in the vector t.
        # The network returns its estimation of the noise that was added.
        return self.network(x, t)

## Visualizing forward and backward

DDPM 모델의 고급 기능을 정의했으므로 몇 가지 관련 유틸리티 기능을 정의합니다

`show_forward`라는 함수는 `Forward Diffusion Process`를 출력합니다. (하나의 예시이며 영상처리 기법과 동일합니다.)

`generate_new_images`함수를 통하여 새로운 이미지를 생성하는데, GIF포맷으로 $t$의 변화마다 이미지를 생성합니다

우리가 실제로 관심을 갖는 손실 함수의 유일한 항은 $||\epsilon - \epsilon_\theta(\sqrt{\bar{\alpha}_t}x_0 + \sqrt{1 - \bar{\alpha}_t}\epsilon, t)||^2$,  여기서 $\epsilon$은 임의의 노이즈이고 $\epsilon_\theta$는 모델의 노이즈 예측입니다. 

이때, $x_T$는 $\epsilon$ 과 같습니다. 

결국 $\epsilon$ 와 $\epsilon_\theta$ 를 제외한 다른 항 같은 경우 값 정규화 및 $t$에 따른 가중치 부과를 위한 상수이므로 크게 신경쓰지 않아도 됩니다.

In [None]:
def show_forward(ddpm, loader, device):
    # Showing the forward process
    for batch in loader:
        imgs = batch

        show_images(imgs, "Original images")

        for percent in [0.25, 0.5, 0.75, 1]:
            show_images(
                ddpm(imgs.to(device),
                     [int(percent * ddpm.n_steps) - 1 for _ in range(len(imgs))]),
                f"DDPM Noisy images {int(percent * 100)}%"
            )
        break

In [None]:
def generate_new_images(ddpm, n_samples=16, device=None, frames_per_gif=100, gif_name="sampling.gif", c=3, h=128, w=128):
    """Given a DDPM model, a number of samples to be generated and a device, returns some newly generated samples"""
    frame_idxs = np.linspace(0, ddpm.n_steps, frames_per_gif).astype(np.uint)
    frames = []

    with torch.no_grad():
        if device is None:
            device = ddpm.device

        # Starting from random noise
        xt = torch.randn(n_samples, c, h, w).to(device)

        for idx, t in enumerate(list(range(ddpm.n_steps))[::-1]):
            # Estimating noise to be removed
            t=torch.tensor([t])
            time_tensor = (torch.ones(n_samples, 1) * t)
            time_tensor =time_tensor.squeeze().to(device).long()
            eta_theta = ddpm.backward(xt, time_tensor)

            alpha_t = ddpm.alphas[t]
            alpha_t_bar = ddpm.alpha_bars[t]

            # Partially denoising the image
            x = (1 / alpha_t.sqrt()) * (xt - (1 - alpha_t) / (1 - alpha_t_bar).sqrt() * eta_theta)

            if t > 0:
                z = torch.randn(n_samples, c, h, w).to(device)

                # Option 1: sigma_t squared = beta_t
                beta_t = ddpm.betas[t]
                sigma_t = beta_t.sqrt()

                x = x + sigma_t * z

            # Adding frames to the GIF
            if idx in frame_idxs or t == 0:
                # Putting digits in range [0, 255]
                normalized = x.clone()
                for i in range(len(normalized)):
                    normalized[i] -= torch.min(normalized[i])
                    normalized[i] *= 255 / torch.max(normalized[i])
                    
                # Reshaping batch (n, c, h, w) to be a (as much as it gets) square frame
                frame = einops.rearrange(normalized, "(b1 b2) c h w -> (b1 h) (b2 w) c", b1=int(n_samples ** 0.5))
                frame = frame.cpu().numpy().astype(np.uint8)
                
                # Rendering frame
                frames.append(frame)
    # Storing the gif
    with imageio.get_writer(gif_name, mode="I") as writer:
        for idx, frame in enumerate(frames):
            rgb_frame = frame
            writer.append_data(rgb_frame)

            # Showing the last frame for a longer time
            if idx == len(frames) - 1:
                last_rgb_frame = frames[-1]
                for _ in range(frames_per_gif // 3):
                    writer.append_data(last_rgb_frame)
    return x

# UNet architecture

지금까지 DDPM과 관련된 utility를 모두 구축하였습니다.

$x$와 $t$를 이용하여 $\epsilon_\theta$ 를 구하는 모델입니다.

이미지 $x$와 시간 간격 $t$를 나타내는 스칼라 값을 입력으로 사용하는 모델입니다.

이를 위해 각 시간 단계를 'time_emb_dim' 차원에 매핑하는 정현파 임베딩('SinusoidalPositionEmbeddings' 함수)을 사용합니다.


In [None]:
class SinusoidalPositionEmbeddings(nn.Module):
    def __init__(self, total_time_steps=1000, time_emb_dims=128, time_emb_dims_exp=512):
        super().__init__()

        half_dim = time_emb_dims // 2

        emb = math.log(10000) / (half_dim - 1)
        emb = torch.exp(torch.arange(half_dim, dtype=torch.float32) * -emb)

        ts = torch.arange(total_time_steps, dtype=torch.float32)

        emb = torch.unsqueeze(ts, dim=-1) * torch.unsqueeze(emb, dim=0)
        emb = torch.cat((emb.sin(), emb.cos()), dim=-1)

        self.time_blocks = nn.Sequential(
            nn.Embedding.from_pretrained(emb),
            nn.Linear(in_features=time_emb_dims, out_features=time_emb_dims_exp),
            nn.SiLU(),
            nn.Linear(in_features=time_emb_dims_exp, out_features=time_emb_dims_exp),
        )

    def forward(self, time):
        return self.time_blocks(time)

class AttentionBlock(nn.Module):
    def __init__(self, channels=64):
        super().__init__()
        self.channels = channels

        self.group_norm = nn.GroupNorm(num_groups=8, num_channels=channels)
        self.mhsa = nn.MultiheadAttention(embed_dim=self.channels, num_heads=4, batch_first=True)

    def forward(self, x):
        B, _, H, W = x.shape
        h = self.group_norm(x)
        h = h.reshape(B, self.channels, H * W).swapaxes(1, 2)  # [B, C, H, W] --> [B, C, H * W] --> [B, H*W, C]
        h, _ = self.mhsa(h, h, h)  # [B, H*W, C]
        h = h.swapaxes(2, 1).view(B, self.channels, H, W)  # [B, C, H*W] --> [B, C, H, W]
        return x + h

class ResnetBlock(nn.Module):
    def __init__(self, *, in_channels, out_channels, dropout_rate=0.1, time_emb_dims=512, apply_attention=False):
        super().__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels

        self.act_fn = nn.SiLU()
        # Group 1
        self.normlize_1 = nn.GroupNorm(num_groups=8, num_channels=self.in_channels)
        self.conv_1 = nn.Conv2d(in_channels=self.in_channels, out_channels=self.out_channels, kernel_size=3, stride=1, padding="same")

        # Group 2 time embedding
        self.dense_1 = nn.Linear(in_features=time_emb_dims, out_features=self.out_channels)

        # Group 3
        self.normlize_2 = nn.GroupNorm(num_groups=8, num_channels=self.out_channels)
        self.dropout = nn.Dropout2d(p=dropout_rate)
        self.conv_2 = nn.Conv2d(in_channels=self.out_channels, out_channels=self.out_channels, kernel_size=3, stride=1, padding="same")

        if self.in_channels != self.out_channels:
            self.match_input = nn.Conv2d(in_channels=self.in_channels, out_channels=self.out_channels, kernel_size=1, stride=1)
        else:
            self.match_input = nn.Identity()

        if apply_attention:
            self.attention = AttentionBlock(channels=self.out_channels)
        else:
            self.attention = nn.Identity()

    def forward(self, x, t):
        # group 1
        h = self.act_fn(self.normlize_1(x))
        h = self.conv_1(h)

        # group 2
        # add in timestep embedding
        h += self.dense_1(self.act_fn(t))[:, :, None, None]

        # group 3
        h = self.act_fn(self.normlize_2(h))
        h = self.dropout(h)
        h = self.conv_2(h)

        # Residual and attention
        h = h + self.match_input(x)
        h = self.attention(h)
        
        return h

class DownSample(nn.Module):
    def __init__(self, channels):
        super().__init__()
        self.downsample = nn.Conv2d(in_channels=channels, out_channels=channels, kernel_size=3, stride=2, padding=1)

    def forward(self, x, *args):
        return self.downsample(x)

class UpSample(nn.Module):
    def __init__(self, channels):
        super().__init__()
        self.upsample = nn.Sequential(
            nn.Upsample(scale_factor=2, mode="nearest"),
            nn.Conv2d(in_channels=channels, out_channels=channels, kernel_size=3, stride=1, padding="same")
        )

    def forward(self, x, *args):
        return self.upsample(x)

class MyUNet(nn.Module):
    def __init__(
        self,
        input_channels=3,
        output_channels=3,
        num_res_blocks=2,
        base_channels=128,
        base_channels_multiples=(1, 2, 4, 8),
        apply_attention=(False, False, True, False),
        dropout_rate=0.1,
        time_multiple=4,
    ):
        super().__init__()

        time_emb_dims_exp = base_channels * time_multiple
        self.time_embeddings = SinusoidalPositionEmbeddings(time_emb_dims=base_channels, time_emb_dims_exp=time_emb_dims_exp)

        self.first = nn.Conv2d(in_channels=input_channels, out_channels=base_channels, kernel_size=3, stride=1, padding="same")

        num_resolutions = len(base_channels_multiples)

        # Encoder part of the UNet. Dimension reduction.
        self.encoder_blocks = nn.ModuleList()
        curr_channels = [base_channels]
        in_channels = base_channels

        for level in range(num_resolutions):
            out_channels = base_channels * base_channels_multiples[level]

            for _ in range(num_res_blocks):

                block = ResnetBlock(
                    in_channels=in_channels,
                    out_channels=out_channels,
                    dropout_rate=dropout_rate,
                    time_emb_dims=time_emb_dims_exp,
                    apply_attention=apply_attention[level],
                )
                self.encoder_blocks.append(block)
                
                in_channels = out_channels
                curr_channels.append(in_channels)

            if level != (num_resolutions - 1):
                self.encoder_blocks.append(DownSample(channels=in_channels))
                curr_channels.append(in_channels)

        # Bottleneck in between
        self.bottleneck_blocks = nn.ModuleList(
            (
                ResnetBlock(
                    in_channels=in_channels,
                    out_channels=in_channels,
                    dropout_rate=dropout_rate,
                    time_emb_dims=time_emb_dims_exp,
                    apply_attention=True,
                ),
                ResnetBlock(
                    in_channels=in_channels,
                    out_channels=in_channels,
                    dropout_rate=dropout_rate,
                    time_emb_dims=time_emb_dims_exp,
                    apply_attention=False,
                ),
            )
        )

        # Decoder part of the UNet. Dimension restoration with skip-connections.
        self.decoder_blocks = nn.ModuleList()

        for level in reversed(range(num_resolutions)):
            out_channels = base_channels * base_channels_multiples[level]

            for _ in range(num_res_blocks + 1):
                encoder_in_channels = curr_channels.pop()
                block = ResnetBlock(
                    in_channels=encoder_in_channels + in_channels,
                    out_channels=out_channels,
                    dropout_rate=dropout_rate,
                    time_emb_dims=time_emb_dims_exp,
                    apply_attention=apply_attention[level],
                )

                in_channels = out_channels
                self.decoder_blocks.append(block)

            if level != 0:
                self.decoder_blocks.append(UpSample(in_channels))

        self.final = nn.Sequential(
            nn.GroupNorm(num_groups=8, num_channels=in_channels),
            nn.SiLU(),
            nn.Conv2d(in_channels=in_channels, out_channels=output_channels, kernel_size=3, stride=1, padding="same"),
        )

    def forward(self, x, t):
        time_emb = self.time_embeddings(t)

        h = self.first(x)
        outs = [h]

        for layer in self.encoder_blocks:
            h = layer(h, time_emb)
            outs.append(h)

        for layer in self.bottleneck_blocks:
            h = layer(h, time_emb)

        for layer in self.decoder_blocks:
            if isinstance(layer, ResnetBlock):
                out = outs.pop()
                h = torch.cat([h, out], dim=1)
            h = layer(h, time_emb)

        h = self.final(h)

        return h

# model summary

현재 Network까지 다 구축을 하였습니다. 하지만 코드만 보고 확인하기엔 모델 구조가 어떻게 생긴지 알기 어렵습니다.

확인을 위해서 `pytorch_model_summary`라는 라이브러리를 설치하고 이 라이브러리로 모델 구조를 한번 확인해보도록 하겠습니다.

In [None]:
!pip install pytorch_model_summary

In [None]:
import pytorch_model_summary as tms
tms.summary(MyUNet(
    input_channels          = 3,
    output_channels         = 3,
    base_channels           = ModelConfig.BASE_CH,
    base_channels_multiples = ModelConfig.BASE_CH_MULT,
    apply_attention         = ModelConfig.APPLY_ATTENTION,
    dropout_rate            = ModelConfig.DROPOUT_RATE,
    time_multiple           = ModelConfig.TIME_EMB_MULT,
), torch.zeros(params['batch_size'],params['image_channels'], params['image_size'], params['image_size']),torch.randint(low=1, high=params['n_step'], size=(params['batch_size'],)), show_input=True,print_summary=True)

# Instantiating the model

이제 모델을 인스턴스화 할 차례입니다.

모델을 인스턴스화 하는 이유는 당연히 훈련을 위해서 입니다.



In [None]:
# Defining model
min_beta, max_beta =  10 ** -4, 0.02  
ddpm = MyDDPM(MyUNet(
    input_channels          = 3,
    output_channels         = 3,
    base_channels           = ModelConfig.BASE_CH,
    base_channels_multiples = ModelConfig.BASE_CH_MULT,
    apply_attention         = ModelConfig.APPLY_ATTENTION,
    dropout_rate            = ModelConfig.DROPOUT_RATE,
    time_multiple           = ModelConfig.TIME_EMB_MULT,
), n_steps=params['n_step'], min_beta=min_beta, max_beta=max_beta, device=device)

In [None]:
sum([p.numel() for p in ddpm.parameters()])

# Optional visualizations

여기엔 총 3개의 항이 있습니다.

첫번째 항은 모델을 학습하기 전 이미 학습되어 있는 모델 가중치를 불러와 추가 학습을 위한 코드입니다. 현재는 비활성화 시켰습니다. 

두번째 항은 `Forward Diffusion Process`를 시각적으로 표현합니다. t*(0,25,50,75,100)%

세번째 항은 `Backward Diffusion Process`를 시각적으로 표현합니다. gif형식의 이미지

In [None]:
# Optionally, load a pre-trained model that will be further trained
# ddpm.load_state_dict(torch.load('../../model/DDPM/ddpm_colon_3.pt', map_location=device))

In [None]:
# Optionally, show the diffusion (forward) process
show_forward(ddpm, loader, device)

In [None]:
# Optionally, show the denoising (backward) process
generated = generate_new_images(ddpm, gif_name="../../data/dataset/colon_all/result/before_training.gif")
show_images(generated, "Images generated before training")
display.Image(open("../../data/dataset/colon_all/result/before_training.gif",'rb').read())

# Training loop

The training loop is fairly simple. With each batch of our dataset, we run the forward process on the batch. We use a different timesteps $t$ for each of the `N` images in our `(N, C, H, W)` batch tensor to guarantee more training stability. The added noise is a `(N, C, H, W)` tensor $\epsilon$.

Once we obtained the noisy images, we try to predict $\epsilon$ out of them with our network. We optimize with a simple Mean-Squared Error (MSE) loss.

In [None]:
mse = nn.MSELoss()
best_loss = float("inf")
optim=Adam(ddpm.parameters(), params['lr'])
n_steps = ddpm.n_steps
scaler = torch.cuda.amp.GradScaler()
for epoch in tqdm(range(params['epochs']),desc='Epochs', leave=True):
    epoch_loss = 0.0
    count=0
    for batch in loader:
        # Loading data
        x0 = batch.to(device)
        n = len(x0)

        # Picking some noise for each of the images in the batch, a timestep and the respective alpha_bars
        XTf = torch.randn_like(x0).to(device)
        t = torch.randint(1, n_steps, (n,)).to(device)

        # Computing the noisy image based on x0 and the time-step (forward process)
        noisy_imgs = ddpm(x0, t, XTf)

        # Getting model estimation of noise based on the images and the time-step
        XTf_theta = ddpm.backward(noisy_imgs, t)

        # Optimizing the MSE between the noise plugged and the predicted noise
        loss = mse(XTf_theta, XTf)
        optim.zero_grad(set_to_none=True)
        scaler.scale(loss).backward()

            # scaler.unscale_(optimizer)
            # torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)

        scaler.step(optim)
        scaler.update()
        count+=1
        epoch_loss += loss.item() * len(x0) / len(loader.dataset)
    # Display images generated at this epoch
    if epoch%10 ==0:
        genaa=generate_new_images(ddpm, device=device,gif_name='../../data/dataset/colon_all/result/epoch_'+str(epoch+1)+'.gif')

    log_string = f"Loss at epoch {epoch + 1}: {epoch_loss:.3f}"
    # Storing the model
    if best_loss > epoch_loss:
        best_loss = epoch_loss
        torch.save(ddpm.state_dict(), '../../model/DDPM/ddpm_colon_check.pt')
        log_string += " --> Best model ever (stored)"
        print(log_string)

# Testing the trained model

Time to check how well our model does. We re-store the best performing model according to our training loss and set it to evaluation mode. Finally, we display a batch of generated images and the relative obtained and nice GIF.

In [None]:
# Loading the trained model
best_model = MyDDPM(MyUNet(), n_steps=n_steps, device=device)
best_model.load_state_dict(torch.load('../../model/DDPM/ddpm_colon_'+str(epoch)+'.pt', map_location=device))
best_model.eval()
print("Model loaded")

# Visualizing the diffusion

In [None]:
print("Generating new images")
generated = generate_new_images(
        best_model,
        n_samples=16,
        device=device,
        gif_name="../../data/dataset/colon_all/result/after_training.gif"
    )
show_images(generated, "Final result")
display.Image(open("../../data/dataset/colon_all/result/after_training.gif",'rb').read())

# Conclusion

In this notebook, we implemented a DDPM PyTorch module from scratch. We used a custom UNet-like architecture and the nice sinusoidal positional-embedding technique to condition the denoising process of the network on the particular time-step. We trained the model on the MNIST / Fashion-MNIST dataset and in only 20 epochs (08:47 minutes using a Tesla T4 GPU) we were able to generate new samples for these toy datasets.

# Further learning!

The vanilla DDPM (the one implemented in this notebook) got promptly improved by a couple of papers. Here, I refer the reader to some of them. Finally I would like to acknowledge the resources I personally used to learn more about DDPM and be able to come up with this notebook.

## Papers
- **Denoising Diffusion Implicit Models** by Song et. al. (https://arxiv.org/abs/2010.02502);
- **Improved Denoising Diffusion Probabilistic Models** by Nichol et. al. (https://arxiv.org/abs/2102.09672);
- **Hierarchical Text-Conditional Image Generation with CLIP Latents** by Ramesh et. al. (https://arxiv.org/abs/2204.06125);
- **Photorealistic Text-to-Image Diffusion Models with Deep Language Understanding** by Saharia et. al. (https://arxiv.org/abs/2205.11487);




## Acknowledgements

This notebook was possible thanks also to these amazing people out there on the web that helped me grasp the math and implementation of DDPMs. Make sure you check them out!

 - <b>Lilian Weng</b>'s [blog](https://lilianweng.github.io/posts/2021-07-11-diffusion-models/): <i>What are Diffusion Models?</i>
 - <b>abarankab</b>'s [Github repository](https://github.com/abarankab/DDPM)
 - <b>Jascha Sohl-Dickstein</b>'s [MIT class](https://www.youtube.com/watch?v=XCUlnHP1TNM&ab_channel=AliJahanian)
 - <b>Niels Rogge</b> and <b>Kashif Rasul</b> [Huggingface's blog](https://huggingface.co/blog/annotated-diffusion): <i>The Annotated Diffusion Model</i>
 - <b>Outlier</b>'s [Youtube video](https://www.youtube.com/watch?v=HoKDTa5jHvg&ab_channel=Outlier)
 - <b>AI Epiphany</b>'s [Youtube video](https://www.youtube.com/watch?v=y7J6sSO1k50&t=450s&ab_channel=TheAIEpiphany)