## DDPM 기초 구현

### 구현해야될 항목
  - 각종 변수들 ($\alpha, \tilde\alpha, \mu, ...$)
  - 훈련 코드
  - 샘플링 코드

## 참조
참고한 코드 출처: https://github.com/CodingVillainKor/SimpleDeepLearning/blob/main/DDPM_notebook.ipynb 

위 레포지토리의 원본 구현을 참고하여 수정하였습니다. 

### 라이브러리

In [1]:
# torch
import torch
import torch.nn as nn
from torch.nn import init
import torch.nn.functional as F
import math

# dataset
from torchvision.datasets import CIFAR10
from torchvision import transforms

# check cuda
if torch.cuda.is_available():
    device = torch.device('cuda:0')
else:
    torch.device("cpu")
print(" Device: ", device)

 Device:  cuda:0


### 기본 계수 코드
<details>
  <summary> torch 예제 코드 (cumprod, pad 함수) </summary>
    
1. cumprod 함수 예제
    
    ``` python
    
    a = torch.tensor([5, 7, 10])
    torch.cumprod(a, dim = 0)
    # 출력: tensor([  5,  35, 350])
    
    ```

---

2. pad 함수 예제

    ```python
    a = torch.tensor([1,2,3,4])
    
    # 앞쪽에 2개, 뒤쪽에 3개 패딩 추가 , 기본 값 0 -> 9
    F.pad(a, (2, 3), value = 9)
    # 출력:: tensor([9, 9, 1, 2, 3, 4, 9, 9, 9])
    ```
</details>

- 기본 계수들은 $T$ 크기만큼 계산이 완료가 되어있는 tensor의 상태로 생각하면 된다.

In [2]:
# time t
T = 1000 

# beta: linear하게 증가
betas = torch.linspace(1e-4, 0.02, T).to(device) 

# alpha는 beta의 변형이므로
alphas = 1 - betas

# alpha bar는 alhpa의 누적 합 -> torch의 cumprod 함수
alphas_bar = torch.cumprod(alphas, dim = 0, ).to(device)

# alpha bar 의 t-1도 sampling 과정에서 필요하다. 
# 맨 처음에 alpha가 하나도 없었다는 뜻으로 맨 앞에 1을 추가.
alphas_bar_prev = F.pad(alphas_bar[:-1], (1, 0), value = 1)

#training에 필요한 변수
sqrt_alphas_bar = torch.sqrt(alphas_bar).to(device)
sqrt_one_minus_alphas_bar = torch.sqrt(1. - alphas_bar).to(device)

# sampling에 필요한 변수
reciprocal_alphas_sqrt = torch.sqrt(1. / alphas_bar).to(device)
reciprocal_alphasm1_sqrt = torch.sqrt(1. / alphas_bar - 1.).to(device)
posterior_mean_coef1 = torch.sqrt(alphas_bar_prev) * betas / (1. - alphas_bar).to(device)
posterior_mean_coef2 = torch.sqrt(alphas) * (1. - alphas_bar_prev) / (1. - alphas_bar).to(device)
sigmas = (betas * (1. - alphas_bar_prev) / (1. - alphas_bar)).to(device) # 시그마는 betas를 사용해도 무관함

### 훈련 코드

1. **repeat**:
   - \($ \mathbf{x}_0 \sim q(\mathbf{x}_0) $\)  (데이터 분포에서 샘플링)
   - \($ t \sim \text{Uniform}(\{1, \dots, T\})$ \)  (랜덤한 시간 스텝 샘플링)
   - \($ \epsilon \sim \mathcal{N}(\mathbf{0}, \mathbf{I})$ \)  (정규분포에서 노이즈 샘플링)
   - Gradient descent step on:

   - $
     \nabla_\theta \left\| \epsilon - \epsilon_\theta \left( \sqrt{\bar{\alpha}_t} \mathbf{x}_0 + \sqrt{1 - \bar{\alpha}_t} \epsilon, t \right) \right\|^2
     $

2. **until converged**

<details>
  <summary> torch 예제 코드 (gather, view 함수) </summary>

1. gather 함수
    
    ```python
        # 원본 텐서
        coeff = torch.tensor([10, 20, 30, 40, 50])
        
        # 특정 인덱스를 가져올 t 텐서
        t = torch.tensor([0, 2, 4])  # 인덱스 0, 2, 4를 가져오도록 지정
        
        # gather 사용 (dim=0)
        coeff_t = torch.gather(coeff, index=t, dim=0)
        # 출력: tensor([10, 30, 50])
    ```
    
2. view 함수

    ```python
    # 예제 텐서
    coeff_t = torch.tensor([1, 2, 3, 4])  # Shape: (4,)
    
    # 변환할 차원 리스트
    dims = [2, 2]  # 여기서 len(dims) = 2
    
    # 새로운 차원으로 변환
    B = coeff_t.shape[0] # 4
    reshaped_tensor = coeff_t.view([B] + [1] * len(dims))
    # 출력: torch.Size([4, 1, 1])
    ```
</details>

In [3]:
def gather_and_expand(coeff, t, xshape):
    ''' 
    t시간에 해당하는 계수(인덱스)를 계수 텐서(coeff)에서 가져오고, 
    해당 계수들을 batch size에 맞게 확장하는 함수 
    '''
    # 입력 텐서의 차원 분리
    batch_size, *dims = xshape # batch_size는 첫 번째 차원, dims는 나머지 차원

     # t시간에 해당하는 계수 가져오기
    coeff_t = torch.gather(coeff, index = t, dim = 0) # Shape: (len(t), ) = (batch_size, )
    
    # 차원 확장
    coeff_t = coeff_t.view([batch_size] + [1] * len(dims)) # 나머지 차원들에 맞게 확장 -> 이후의 계산 차원을 맞추기 위하여
    return coeff_t
    
def train(model, x_0): # x_0: 데이터 분포에서 샘플링한 입력 데이터
    # 랜덤한 시간 스텝에서 배치 사이즈(x_0.shape[0])만큼 샘플링
    t = torch.randint(T, size = (x_0.shape[0], ), device = x_0.device)\
    
    # 정규분포에서 노이즈 샘플링, shape이 batch와 같게
    eps = torch.randn_like(x_0)
    
    # model input, batch 들간 계수
    # 모든 batch가 같은 t를 사용하지 않기 때문에 해당 함수를 사용해야함
    x_t = gather_and_expand(sqrt_alphas_bar, t, x_0.shape)*x_0 + gather_and_expand(sqrt_one_minus_alphas_bar, t, x_0.shape) * eps
    
    # eps와 model output mse 구하기
    loss = F.mse_loss(model(x_t, t), eps)
    return loss

### 샘플링 코드
1. **Initialize**:  
       - \($ \mathbf{x}_T \sim \mathcal{N}(\mathbf{0}, \mathbf{I}) $\)  (시작 샘플은 표준 정규분포에서 샘플링)
    
2. **For \( t = T, ..., 1 \) do**:
   - \($ \mathbf{z} \sim \mathcal{N}(\mathbf{0}, \mathbf{I})$ \) if \($ t > 1 $\), else \($ \mathbf{z} = 0 $\)  
     (마지막 단계가 아니면 가우시안 노이즈 추가)

   - 업데이트:
     $
     \mathbf{x}_{t-1} = \frac{1}{\sqrt{\alpha_t}} 
     \left( \mathbf{x}_t - \frac{1 - \alpha_t}{\sqrt{1 - \bar{\alpha}_t}} \epsilon_\theta(\mathbf{x}_t, t) \right) 
     + \sigma_t \mathbf{z}
     $
       - 구현 시엔 $\mu_\theta(x_t,t) = \tilde{\mu}_t \left( \mathbf{x}_t, \frac{1}{\sqrt{\bar{\alpha}_t}} \left( \mathbf{x}_t - \sqrt{1 - \bar{\alpha}_t} \epsilon_{\theta} (\mathbf{x}_t) \right) \right)$ 의 수식을 사용하여 구현한다.
       - 따라서 다음과 같아서 $ \tilde{\mu}_t (\mathbf{x}_t, \mathbf{x}_0) := 
\frac{\sqrt{\bar{\alpha}_{t-1}} \beta_t}{1 - \bar{\alpha}_t} \mathbf{x}_0 
+ \frac{\sqrt{\alpha_t} (1 - \bar{\alpha}_{t-1})}{1 - \bar{\alpha}_t} \mathbf{x}_t$ , $x_0$은 다음과 같아진다 $x_0 = \frac{1}{\sqrt{\bar{\alpha}_t}} 
\left( \mathbf{x}_t - \sqrt{1 - \bar{\alpha}_t} \epsilon_{\theta} (\mathbf{x}_t) \right)$

3. **Return** \( $\mathbf{x}_0 $\)  (최종 생성된 샘플)

In [4]:
#기존
def sample(model, x_T): # x_T: noisy data
    x_t = x_T
    for time_step in reversed(range(T)): # T , ..., 1 반복 수행
        # 각 time_step를 batch size로 확장
        t = torch.full((x_T.shape[0], ), time_step, dtype=torch.long, device=device)
        
        # 마지막 단계가 아니면 가우시안 노이즈 샘플링
        z = torch.randn_like(x_t) if time_step else 0

        # 업데이트 과정
        eps = model(x_t, t) # 모델 예측

        # x_0을 구함
        print(f"기존 차원: {gather_and_expand(reciprocal_alphas_sqrt, t, eps.shape).shape} \n  \
                view 차원: {reciprocal_alphas_sqrt[t].view(eps.shape).shape}")
              
        x0_predicted = gather_and_expand(reciprocal_alphas_sqrt, t, eps.shape) * x_t - \
            gather_and_expand(reciprocal_alphasm1_sqrt, t, eps.shape) * eps
        
        # 위 x_0과 함께 평균을 구한다
        mean = gather_and_expand(posterior_mean_coef1, t, eps.shape) * x0_predicted + \
            gather_and_expand(posterior_mean_coef2, t, eps.shape) * x_t
        
        # 분산 구함
        var = torch.sqrt(gather_and_expand(sigmas, t, eps.shape)) * z

        x_t = mean + var
        
    # 마지막 결과 return 
    x_0 = x_t
    return x_0

# **Prepare Dataset/Dataloader**

In [5]:
dataset = CIFAR10(
    root="./data", train=True, download=True,
    transform=transforms.Compose([
        transforms.ToTensor()
    ])
)
dataloader = torch.utils.data.DataLoader(
    dataset, batch_size=32, shuffle=True, num_workers=4
)

Files already downloaded and verified


# **Model architecture**

https://github.com/w86763777/pytorch-ddpm/blob/master/model.py

위 github에서 copy함(\_\_name\_\_ == "\_\_name\_\_" 제외)

In [6]:
class Swish(nn.Module):
    def forward(self, x):
        return x * torch.sigmoid(x)

class TimeEmbedding(nn.Module):
    def __init__(self, T, d_model, dim):
        assert d_model % 2 == 0
        super().__init__()
        emb = torch.arange(0, d_model, step=2) / d_model * math.log(10000)
        emb = torch.exp(-emb)
        pos = torch.arange(T).float()
        emb = pos[:, None] * emb[None, :]
        emb = torch.stack([torch.sin(emb), torch.cos(emb)], dim=-1)
        emb = emb.view(T, d_model)

        self.timembedding = nn.Sequential(
            nn.Embedding.from_pretrained(emb),
            nn.Linear(d_model, dim),
            Swish(),
            nn.Linear(dim, dim),
        )
        self.initialize()

    def initialize(self):
        for module in self.modules():
            if isinstance(module, nn.Linear):
                init.xavier_uniform_(module.weight)
                init.zeros_(module.bias)

    def forward(self, t):
        emb = self.timembedding(t)
        return emb

class DownSample(nn.Module):
    def __init__(self, in_ch):
        super().__init__()
        self.main = nn.Conv2d(in_ch, in_ch, 3, stride=2, padding=1)
        self.initialize()

    def initialize(self):
        init.xavier_uniform_(self.main.weight)
        init.zeros_(self.main.bias)

    def forward(self, x, temb):
        x = self.main(x)
        return x

class UpSample(nn.Module):
    def __init__(self, in_ch):
        super().__init__()
        self.main = nn.Conv2d(in_ch, in_ch, 3, stride=1, padding=1)
        self.initialize()

    def initialize(self):
        init.xavier_uniform_(self.main.weight)
        init.zeros_(self.main.bias)

    def forward(self, x, temb):
        _, _, H, W = x.shape
        x = F.interpolate(
            x, scale_factor=2, mode='nearest')
        x = self.main(x)
        return x

class AttnBlock(nn.Module):
    def __init__(self, in_ch):
        super().__init__()
        self.group_norm = nn.GroupNorm(32, in_ch)
        self.proj_q = nn.Conv2d(in_ch, in_ch, 1, stride=1, padding=0)
        self.proj_k = nn.Conv2d(in_ch, in_ch, 1, stride=1, padding=0)
        self.proj_v = nn.Conv2d(in_ch, in_ch, 1, stride=1, padding=0)
        self.proj = nn.Conv2d(in_ch, in_ch, 1, stride=1, padding=0)
        self.initialize()

    def initialize(self):
        for module in [self.proj_q, self.proj_k, self.proj_v, self.proj]:
            init.xavier_uniform_(module.weight)
            init.zeros_(module.bias)
        init.xavier_uniform_(self.proj.weight, gain=1e-5)

    def forward(self, x):
        B, C, H, W = x.shape
        h = self.group_norm(x)
        q = self.proj_q(h)
        k = self.proj_k(h)
        v = self.proj_v(h)

        q = q.permute(0, 2, 3, 1).view(B, H * W, C)
        k = k.view(B, C, H * W)
        w = torch.bmm(q, k) * (int(C) ** (-0.5))
        assert list(w.shape) == [B, H * W, H * W]
        w = F.softmax(w, dim=-1)

        v = v.permute(0, 2, 3, 1).view(B, H * W, C)
        h = torch.bmm(w, v)
        assert list(h.shape) == [B, H * W, C]
        h = h.view(B, H, W, C).permute(0, 3, 1, 2)
        h = self.proj(h)

        return x + h

class ResBlock(nn.Module):
    def __init__(self, in_ch, out_ch, tdim, dropout, attn=False):
        super().__init__()
        self.block1 = nn.Sequential(
            nn.GroupNorm(32, in_ch),
            Swish(),
            nn.Conv2d(in_ch, out_ch, 3, stride=1, padding=1),
        )
        self.temb_proj = nn.Sequential(
            Swish(),
            nn.Linear(tdim, out_ch),
        )
        self.block2 = nn.Sequential(
            nn.GroupNorm(32, out_ch),
            Swish(),
            nn.Dropout(dropout),
            nn.Conv2d(out_ch, out_ch, 3, stride=1, padding=1),
        )
        if in_ch != out_ch:
            self.shortcut = nn.Conv2d(in_ch, out_ch, 1, stride=1, padding=0)
        else:
            self.shortcut = nn.Identity()
        if attn:
            self.attn = AttnBlock(out_ch)
        else:
            self.attn = nn.Identity()
        self.initialize()

    def initialize(self):
        for module in self.modules():
            if isinstance(module, (nn.Conv2d, nn.Linear)):
                init.xavier_uniform_(module.weight)
                init.zeros_(module.bias)
        init.xavier_uniform_(self.block2[-1].weight, gain=1e-5)

    def forward(self, x, temb):
        h = self.block1(x)
        h += self.temb_proj(temb)[:, :, None, None]
        h = self.block2(h)

        h = h + self.shortcut(x)
        h = self.attn(h)
        return h

class UNet(nn.Module):
    def __init__(self, T, ch, ch_mult, attn, num_res_blocks, dropout):
        super().__init__()
        assert all([i < len(ch_mult) for i in attn]), 'attn index out of bound'
        tdim = ch * 4
        self.time_embedding = TimeEmbedding(T, ch, tdim)

        self.head = nn.Conv2d(3, ch, kernel_size=3, stride=1, padding=1)
        self.downblocks = nn.ModuleList()
        chs = [ch]  # record output channel when dowmsample for upsample
        now_ch = ch
        for i, mult in enumerate(ch_mult):
            out_ch = ch * mult
            for _ in range(num_res_blocks):
                self.downblocks.append(ResBlock(
                    in_ch=now_ch, out_ch=out_ch, tdim=tdim,
                    dropout=dropout, attn=(i in attn)))
                now_ch = out_ch
                chs.append(now_ch)
            if i != len(ch_mult) - 1:
                self.downblocks.append(DownSample(now_ch))
                chs.append(now_ch)

        self.middleblocks = nn.ModuleList([
            ResBlock(now_ch, now_ch, tdim, dropout, attn=True),
            ResBlock(now_ch, now_ch, tdim, dropout, attn=False),
        ])

        self.upblocks = nn.ModuleList()
        for i, mult in reversed(list(enumerate(ch_mult))):
            out_ch = ch * mult
            for _ in range(num_res_blocks + 1):
                self.upblocks.append(ResBlock(
                    in_ch=chs.pop() + now_ch, out_ch=out_ch, tdim=tdim,
                    dropout=dropout, attn=(i in attn)))
                now_ch = out_ch
            if i != 0:
                self.upblocks.append(UpSample(now_ch))
        assert len(chs) == 0

        self.tail = nn.Sequential(
            nn.GroupNorm(32, now_ch),
            Swish(),
            nn.Conv2d(now_ch, 3, 3, stride=1, padding=1)
        )
        self.initialize()

    def initialize(self):
        init.xavier_uniform_(self.head.weight)
        init.zeros_(self.head.bias)
        init.xavier_uniform_(self.tail[-1].weight, gain=1e-5)
        init.zeros_(self.tail[-1].bias)

    def forward(self, x, t):
        # Timestep embedding
        temb = self.time_embedding(t)
        # Downsampling
        h = self.head(x)
        hs = [h]
        for layer in self.downblocks:
            h = layer(h, temb)
            hs.append(h)
        # Middle
        for layer in self.middleblocks:
            h = layer(h, temb)
        # Upsampling
        for layer in self.upblocks:
            if isinstance(layer, ResBlock):
                h = torch.cat([h, hs.pop()], dim=1)
            h = layer(h, temb)
        h = self.tail(h)

        assert len(hs) == 0
        return h

## **Make model,optimizer, scheduler instance**

In [7]:
model = UNet(T=T, ch=128, ch_mult=[1, 2, 2, 1], attn=[1],
             num_res_blocks=2, dropout=0.1).to(device)
#ema_model = copy.deepcopy(model)
optim = torch.optim.Adam(model.parameters(), lr=2e-4)
#sched = torch.optim.lr_scheduler.LambdaLR(optim, lr_lambda=warmup_lr)

# **Train Iteration**

In [None]:
for e in range(1, 100+1):
    model.train()
    for i, (x, _) in enumerate(dataloader, 1):
        optim.zero_grad()
        x = x.to(device)
        loss = train(model, x)
        loss.backward()
        optim.step()
        print("\r[Epoch: {} , Iter: {}/{}]  Loss: {:.3f}".format(e, i, len(dataloader), loss.item()), end='')
    print("\n> Eval at epoch {}".format(e))
    model.eval()
    with torch.no_grad():
        x_T = torch.randn(5, 3, 32, 32).to(device)
        x_0 = sample(model, x_T)
        x_0 = x_0.permute(0, 2, 3, 1).clamp(0, 1).detach().cpu().numpy() * 255
        for i in range(5):
            cv2_imshow(x_0[i])
 

[Epoch: 1 , Iter: 397/1563]  Loss: 0.021