# DPM()

In [3]:
import numpy as np
import os
import time
import math
import argparse

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

### utils

- `logit(u)`: 확률값(0~1)을 log-odds로 변환하는 log transform을 수행(Loss 계산 시 수행)
    
    - $logit(u) = log \frac{u}{1-u}$

- `get_norms(model)`: 모델 weight의 L2norm(유클리드 거리)계산. 각 가중치와 가중치 gradient의 L2norm을 계산하고, 변화량 확인을 위해 사용.

    1. `model.named_parameters()`를 이용해 모델의 모든 파라미터 가져오기

    2. 각 파라미터(`param`)에 대해 L2 norm($\sqrt{\sum w^2}/ elements$)을 계산

    3. 가중치(`param.grad`)에 대해 gradient norm을 계산

    4. 만약 `param.grad`가 없으면 `grad_norm = 0.0` 설정

    5. (`name`, `grad_norm`) 형태로 리스트에  저장하여 반환

- `create_log_dir(args, model_id)`: 로그를 저장할 디렉터리를 생성하고, 모델 ID와 실행 시간을 폴더명으로 하고, 폴더명 반환

    - `args.suffix`: 모델 ID

In [4]:
### get log transform
def logit(u):
    return torch.log(u / (1. - u))

### log transform at input numpy array
def logit_np(u):
    return torch.log(torch.tensor(u, dtype=torch.float32) / (1. - torch.tensor(u, dtype=torch.float32)))

### calculate L2norm of gradient of weights
def get_norms(model):
    """
    텐서의 원소 당 가중치 및 가중치의 기울기에 대한 정규화(L2norm)를 계산
    """
    norms = [] # 가중치의 L2norm 저장 컨테이너
    grad_norms = [] # 가중치 gradient의 L2norm 저장 컨테이너(없는 경우 0으로 저장)
    
    for param in model.parameters():
        # 가중치의 L2 norm 계산
        norm = torch.sqrt(torch.sum(torch.square(param))) / torch.prod(torch.tensor(param.shape, dtype=torch.float32))
        norms.append(norm)
        
        # 가중치 값을 가지는 경우, 가중치 gradient L2norm 계산
        if param.grad is not None:
            grad_norm = torch.sqrt(torch.sum(torch.square(param.grad))) / torch.prod(torch.tensor(param.shape, dtype=torch.float32))
            grad_norms.append(grad_norm)
        # 가중치 값이 없는 경우, grad_norms 리스트에 0 추가
        else:
            grad_norms.append(torch.tensor(0.0))
    
    return norms, grad_norms

### create log directory
def create_log_dir(args, model_id):
    model_id += args.suffix + time.strftime('-%y%m%dT%H%M%S') # ID + time
    model_dir = os.path.join(os.path.expanduser(args.output_dir), model_id) 
    os.makedirs(model_dir, exist_ok=True)
    return model_dir

### regression

- `class LeackyRelu`: 입력값이 0보다 크면 그대로 출력, 0 이하이면 0.05를 곱한 값을 출력하는 LeakyReLU 활성함수를 구현

- `class MultiscaleConvolution`: 입력된 이미지를 여러 해상도로 변환시키고, 합성곱 및 비선형 활성화를 수행하며, 그 결과를 업샘플링하여 누적 평균을 계산

    $\rightarrow$ 멀티스케일 Conv를 수행하는 이유는 timestep(`t`)별로 노이즈 추가로 인해 분석해야 할 이미지 해상도가 달라지기 때문

    (초기에는 고해상도 이미지를, 후기에는 노이즈가 추가된 이미지에서 저해상도 이미지 패턴을 더 중요시 해야 함)

    $\rightarrow$ 여기서 Isotropic Gaussian 초기화를 통해 Markov Kernel을 구현

    - **Params**

        - `num_channels(int)`: 입력 채널 수

        - `num_filters(int)`: 출력 채널 수
        
        - `spatial_width(int)`: 입력 이미지의 공간적 크기 (정사각형이라고 가정)

        - `num_scales(int)`: 사용할 스케일의 수

        - `filter_size (int)`: 컨볼루션 필터 크기 (정사각형)

        - `downsample_method (str)`: 다운샘플링 방식 ('meanout' 등, 여기서는 평균 풀링 사용)

        - `name (str)`: 레이어 이름(디버깅용)

In [5]:
##############################################
# 1. LeakyReLU 활성화 함수 
# 클래스 래퍼(조건에 따라 MultiscaleConvolution의 동작을 제어)
##############################################
class LeakyRelu(nn.Module):
    """
    음의 기울기를 가지는 LeakyReLU 함수.
    입력이 0보다 크면 그대로 출력하고, 0 이하이면 0.05를 곱한 값을 출력.
    """
    def __init__(self, negative_slope=0.05):
        super(LeakyRelu, self).__init__()
        self.negative_slope = negative_slope

    def forward(self, input):
        return F.leaky_relu(input, negative_slope=self.negative_slope)

# dense와 convolution에서 동일하게 사용
dense_nonlinearity = LeakyRelu(negative_slope=0.05)
conv_nonlinearity = LeakyRelu(negative_slope=0.05)

##############################################
# 2. MultiScaleConvolution 클래스
##############################################
class MultiScaleConvolution(nn.Module):
    """
    다중 스케일 컨볼루션 레이어.
    
    입력 이미지를 여러 스케일(해상도)로 변환한 후 각 스케일에 대해
    컨볼루션(및 비선형 활성화)을 수행하고, 결과를 업샘플링하여 누적 평균을 계산
    
    Args:
        num_channels (int): 입력 채널 수
        num_filters (int): 출력 채널(필터) 수
        spatial_width (int): 입력 이미지의 공간적 크기 (정사각형이라고 가정)
        num_scales (int): 사용할 스케일의 수
        filter_size (int): 컨볼루션 필터 크기 (정사각형)
        downsample_method (str): 다운샘플링 방식 ('meanout' 등, 여기서는 평균 풀링 사용)
        name (str): 레이어 이름(디버깅용)
    """
    def __init__(self, num_channels, num_filters, spatial_width, num_scales, filter_size, downsample_method='meanout', name=""):
        super(MultiScaleConvolution, self).__init__()
        self.num_scales = num_scales
        self.filter_size = filter_size
        self.num_filters = num_filters
        self.spatial_width = spatial_width
        self.downsample_method = downsample_method
        self.name = name
        # 'overshoot': full Conv.로 인해 생기는 여분의 가장자리 수
        self.overshoot = (filter_size - 1) // 2

        # 각 스케일마다 컨볼루션+활성화 블록 생성 (ModuleList에 저장)
        self.conv_layers = nn.ModuleList()
        for scale in range(num_scales):
            # PyTorch에서는 'full Convolution'을 padding=filter_size-1 로 흉내 낼 수 있음.
            conv = nn.Conv2d(
                in_channels=num_channels,
                out_channels=num_filters,
                kernel_size=filter_size,
                padding=filter_size - 1  # full convolution 효과
            )
            # 가중치를 초기화에 IsotropicGaussian 적용
            # DPM에서는 모든 픽셀에 대해 균등한 노이즈를 학습하도록 하는 역할 수행
            std = torch.sqrt(1.0 / num_filters) / (filter_size ** 2)
            self.isotropic_gaussian_init(conv.weight, std=std)
            
            # bias 초기화
            nn.init.constant_(conv.bias, 0)  # Bias를 0으로 초기화
            
            # 컨볼루션 후 비선형 활성화 적용
            layer = nn.Sequential(conv, conv_nonlinearity)
            # ModuleList에 합성곱+LeakyReLU 저장
            self.conv_layers.append(layer)

    def isotropic_gaussian_init(self, tensor, std=1.0):
        """ Isotropic Gaussian 초기화 """
        shape = tensor.shape
        num_elements = tensor.numel()  # 전체 원소 개수
        identity_cov = torch.eye(num_elements) * (std ** 2)  # 공분산 행렬: I * std^2

        # 다변량 정규분포 샘플링
        w = torch.distributions.MultivariateNormal(torch.zeros(num_elements), identity_cov).sample()

        # 원래 텐서 형태로 변형
        tensor.data = w.view(shape)
        
    def downsample(self, imgs, scale):
        """
        이미지 텐서를 주어진 스케일만큼 평균 풀링으로 다운샘플링.
        Diffusion Mechanism에서 coarse-to-fine 방식으로 학습하도록...
        
        Args:
            imgs (Tensor): (batch, channels, height, width) 형태의 이미지 텐서
            scale (int): 다운샘플링 스케일 (2**scale 만큼 축소)
            
        Returns:
            다운샘플링된 텐서
        """
        if scale == 0:
            return imgs
        kernel_size = 2 ** scale
        return F.avg_pool2d(imgs, kernel_size=kernel_size, stride=kernel_size)

    def forward(self, X):
        """
        입력 이미지에 대해 다중 스케일 컨볼루션을 적용하고,
        각 스케일의 결과를 업샘플하여 누적한 후 평균을 반환.
        
        Args:
            X (Tensor): (batch, channels, spatial_width, spatial_width) 형태.
            
        Returns:
            스케일 평균화된 결과 텐서 (입력과 동일한 공간 크기).
        """
        acc = None  # 결과 누적 변수
        # 스케일을 거칠수록 해상도가 낮아지므로, coarsest (최저 해상도)부터 순차적으로 진행
        for scale in reversed(range(self.num_scales)):
            # 입력 이미지를 해당 스케일로 다운샘플링
            X_down = self.downsample(X, scale)
            # 해당 스케일의 컨볼루션 레이어 적용
            out = self.conv_layers[scale](X_down)
            # full 컨볼루션으로 인한 여분의 가장자리(crop)를 제거하여 원래 크기로 맞춤
            if self.overshoot > 0:
                out = out[:, :, self.overshoot:-self.overshoot, self.overshoot:-self.overshoot]
            # 누적합산: 초기에는 단순히 할당, 이후에는 이전 결과와 더함.
            if acc is None:
                acc = out
            else:
                acc = acc + out
            # 현재 스케일이 최상해상도가 아니면, 다음 스케일과 맞추기 위해 2배 업샘플링 (최근접 보간)
            if scale > 0:
                acc = F.interpolate(acc, scale_factor=2, mode='nearest')
        # 각 스케일의 결과 평균을 반환
        return acc / self.num_scales

##############################################
# 3. MultiLayerConvolution 클래스
##############################################
class MultiLayerConvolution(nn.Module):
    """
    여러 개의 MultiScaleConvolution 레이어를 순차적으로 쌓은 모듈.
    
    Args:
        n_layers (int): 사용할 MultiScaleConvolution 레이어 수.
        n_hidden (int): 각 레이어의 출력 채널 수.
        spatial_width (int): 입력 이미지의 공간 크기.
        n_colors (int): 입력 이미지의 채널 수.
        n_scales (int): 각 MultiScaleConvolution 레이어에서 사용할 스케일 수.
        filter_size (int): 컨볼루션 필터 크기.
    """
    def __init__(self, n_layers, n_hidden, spatial_width, n_colors, n_scales, filter_size=3):
        super(MultiLayerConvolution, self).__init__()
        self.layers = nn.ModuleList()
        num_channels = n_colors
        for i in range(n_layers):
            layer = MultiScaleConvolution(
                num_channels=num_channels,
                num_filters=n_hidden,
                spatial_width=spatial_width,
                num_scales=n_scales,
                filter_size=filter_size,
                name=f"layer{i}_"
            )
            self.layers.append(layer)
            num_channels = n_hidden  # 다음 레이어의 입력 채널은 이전 레이어의 출력 채널

    def forward(self, X):
        """
        순차적으로 각 MultiScaleConvolution 레이어를 적용.
        
        Args:
            X (Tensor): 입력 이미지 (batch, n_colors, spatial_width, spatial_width)
            
        Returns:
            최종 컨볼루션 결과.
        """
        out = X
        for layer in self.layers:
            out = layer(out)
        return out

##############################################
# 4. MLP 생성 헬퍼 함수
##############################################
def make_mlp(layer_dims, activations):
    """
    주어진 층 크기와 활성화 함수 리스트로 MLP(완전연결 신경망)를 구성.
    
    Args:
        layer_dims (list of int): 각 층의 크기 (예: [input_dim, hidden_dim, output_dim]).
        activations (list of nn.Module): 각 층에 적용할 활성화 함수.
            (출력층에는 보통 활성화 함수가 없거나 nn.Identity() 사용)
            
    Returns:
        nn.Sequential: 구성된 MLP 모듈.
    """
    layers = []
    for i in range(len(layer_dims) - 1):
        layers.append(nn.Linear(layer_dims[i], layer_dims[i + 1]))
        if i < len(activations):
            layers.append(activations[i])
    return nn.Sequential(*layers)

##############################################
# 5. MLP_conv_dense 클래스
##############################################
class MLP_conv_dense(nn.Module):
    """
    입력 이미지에서 시간적 계수를 반영하고, 전역적 특성 반영을 위한 Convolution/지역적인 특성 반영을 위한 MLP 모듈을 사용.
    이를 수행하는 두 개의 분기가 있음:
      1) 하부 분기: 컨볼루션 기반 MLP (MultiLayerConvolution)와 (선택적으로) 완전연결 MLP
      => CNN으로 공간적 특징 추출 -> 전체 이미지를 1D 벡터로 변환하여 학습
      2) 상부 분기: 각 픽셀마다 독립적으로 적용되는 완전연결 MLP (실제로 1x1 컨볼루션과 유사)
      => mu, sigma 예측을 위한 학습
    
    Args:
        n_layers_conv (int): 컨볼루션 분기의 레이어 수.
        n_layers_dense_lower (int): 하부 완전연결 MLP의 레이어 수.
        n_layers_dense_upper (int): 상부 완전연결 MLP의 레이어 수.
        n_hidden_conv (int): 컨볼루션 분기의 은닉 채널 수.
        n_hidden_dense_lower (int): 하부 완전연결 MLP의 은닉 유닛 수.
        n_hidden_dense_lower_output (int): 하부 완전연결 MLP의 출력 채널 수 (공간 전체에 대해).
        n_hidden_dense_upper (int): 상부 완전연결 MLP의 은닉 유닛 수.
        spatial_width (int): 입력 이미지의 공간 크기.
        n_colors (int): 입력 이미지의 채널 수.
        n_scales (int): 컨볼루션 분기에서 사용할 스케일 수.
        n_temporal_basis (int): 시간적 기저(temporal basis)의 수 (출력 계수에 영향을 줌).
    """
    def __init__(self, n_layers_conv, n_layers_dense_lower, n_layers_dense_upper,
                 n_hidden_conv, n_hidden_dense_lower, n_hidden_dense_lower_output, 
                 n_hidden_dense_upper, spatial_width, n_colors, n_scales, n_temporal_basis):
        super(MLP_conv_dense, self).__init__()
        self.n_colors = n_colors
        self.spatial_width = spatial_width
        self.n_hidden_conv = n_hidden_conv
        self.n_hidden_dense_lower = n_hidden_dense_lower
        self.n_hidden_dense_lower_output = n_hidden_dense_lower_output
        self.n_layers_dense_lower = n_layers_dense_lower  # 선택적 분기 존재 여부 판단용

        # 하부 분기 - 컨볼루션 기반 MLP
        self.mlp_conv = MultiLayerConvolution(n_layers_conv, n_hidden_conv, spatial_width, n_colors, n_scales)

        # 하부 분기 - 완전연결 MLP (선택적)
        if n_hidden_dense_lower > 0 and n_layers_dense_lower > 0:
            n_input = n_colors * (spatial_width ** 2)
            n_output = n_hidden_dense_lower_output * (spatial_width ** 2)
            activations = [dense_nonlinearity] * (n_layers_dense_lower - 1)
            self.mlp_dense_lower = make_mlp(
                [n_input] + [n_hidden_dense_lower] * (n_layers_dense_lower - 1) + [n_output],
                activations
            )
        else:
            self.mlp_dense_lower = None
            self.n_hidden_dense_lower_output = 0

        # 상부 분기 - 각 픽셀마다 적용되는 완전연결 MLP (출력: mu와 sigma 각각에 대해)
        n_output = n_colors * n_temporal_basis * 2  # mu와 sigma 두 값을 위해 *2
        # 상부 MLP의 활성화: 마지막 층은 항등함수(nn.Identity()) 사용
        activations_upper = [dense_nonlinearity] * (n_layers_dense_upper - 1) + [nn.Identity()]
        self.mlp_dense_upper = make_mlp(
            [n_hidden_conv + self.n_hidden_dense_lower_output] + [n_hidden_dense_upper] * (n_layers_dense_upper - 1) + [n_output],
            activations_upper
        )

    def forward(self, X):
        """
        noisy 입력 이미지로부터 시간적 계수(temporal coefficients)를 생성.
        
        순서:
          1) 컨볼루션 분기를 통해 특징 추출 (mlp_conv).
          2) 특징 차원 변경: (batch, channels, H, W) → (batch, H, W, channels).
          3) (선택적) 하부 완전연결 분기를 통해 전역 정보를 추출한 후, 컨볼루션 분기의 결과와 결합.
          4) 상부 완전연결 MLP를 각 픽셀에 대해 적용하여 최종 출력을 생성.
          
        Args:
            X (Tensor): (batch, n_colors, spatial_width, spatial_width) 크기의 입력 이미지.
            
        Returns:
            최종 출력 텐서 (batch, spatial_width, spatial_width, n_output)
        """
        # 1) 컨볼루션 기반 특징 추출
        Y = self.mlp_conv(X)  # 시간(t)에서 노이즈가 추가된 이미지 입력
                              # (batch, n_hidden_conv, spatial_width, spatial_width)
        # 2) 채널 차원을 마지막 축으로 이동
        Y = Y.permute(0, 2, 3, 1)  # (batch, spatial_width, spatial_width, n_hidden_conv)
        
        # 3) 선택적 하부 완전연결 분기 적용 및 특징 결합
        if self.mlp_dense_lower is not None:
            batch_size = X.size(0)
            X_flat = X.view(batch_size, -1)  # (batch, n_colors * spatial_width^2)
            Y_dense = self.mlp_dense_lower(X_flat)  # (batch, n_hidden_dense_lower_output * spatial_width^2)
            Y_dense = Y_dense.view(batch_size, self.spatial_width, self.spatial_width, self.n_hidden_dense_lower_output)
            # 각 분기의 결과를 정규화하여 결합 (스케일 조정을 위해 각각 sqrt(유닛수)로 나눔)
            Y = torch.cat([
                Y / math.sqrt(self.n_hidden_conv),
                Y_dense / math.sqrt(self.n_hidden_dense_lower_output)
            ], dim=3)

        # 4) 상부 MLP를 각 픽셀에 대해 독립적으로 적용 (픽셀 단위 MLP. 1x1 컨볼루션과 유사)
        batch_size, H, W, channels = Y.shape
        Y_flat = Y.view(batch_size * H * W, channels)
        Z_flat = self.mlp_dense_upper(Y_flat)
        Z = Z_flat.view(batch_size, H, W, -1)
        return Z


### model

- `generate_beta_arr`:Diffusion 과정에서 timestep(`t`)의 수만큼 `beta_t`값을 생성하는 메서드

    - `beta_t`는 $T_\pi (y | y'; \beta) = N(y;\sqrt{1-\beta}\cdot y', \beta I)$ 에 따라 각 시간(`t`)에서 노이즈의 크기를 나타냄.

    - 초기 입력값은 `step1_beta`이며, 시간적 기저 함수(`__init__`에서 `generate_temporal_basis`함수에 의해 정의)에 의해 정의되고, 파라미터인 `beta_perturb_coefficients_values`와 곱해져 계산된 `beta_perturb`가 증가하는 `

- `get_t_weights`: 특정 시점(`t`)을 매개변수로 입력받아 해당 Index만 1이고, 나머지는 0인 1차원 텐서를 반환

    - t 시점의 정보를 뽑아내는 역할을 하며, 다른 함수에서 사용될 때 해당 시점만 선택

- `get_beta_forward`: 특정 시점(`t`)를 매개변수로 입력받아 해당 Index만 beta_t 값을 가지는 timestep 수 만큼의 1차원 텐서 반환

- `get_mu_sigma`: 역방향 과정에서의 평균 `μ_t`와 표준편차 `σ_t`를 계산하는 함수.

    - forward diffusion에서의 노이즈 추가 분포 식: $x^{(t)} = \sqrt{1-\beta_t} x^{(t-1)} + \sqrt{\beta_t}\epsilon, \quad \epsilon \thicksim N(0, I)$

        1) 위 식은 $x^{(t)} - \sqrt{\beta_t -1} x^{(t-1)} = \sqrt{\beta_t}\epsilon$ 으로 변형 가능

        2) 위 식에서 우측 항은 표준 정규분포(Gaussian distribution)을 따르므로 $(x^{(t)} - \sqrt{\beta_t -1} x^{(t-1)})\thicksim N(0, I)$ 와 같이 표현 가능

        3) 일반 Gaussian Distribution의 분포의 확률밀도함수는 $p(x) = \frac {1} {\sqrt{2 \pi \sigma^2}} exp\big(- \frac{(x-\mu)^2} {2 \sigma^2} \big)$ 이고, forward diffusion 분포 식을 확룰 밀도함수(P.D.F.)형태로 풀어쓰면 $p(x^{(t)}|x^{(t-1)}) = \frac {1} {\sqrt{2 \pi \beta_t}} exp\big( -\frac{(x^{(t)}-\sqrt{\beta_t x^{(t-1)}})^2}{2 \beta_t} \big)$ 가 된다.

        4) 따라서 평균 $(\mu) = \frac {1} {\sqrt{2 \pi \beta_t}} $, 분산 $(\sigma^2) = \beta_t$ 로 설명할 수 있음

    - reverse diffusion에서의 노이즈 추가 분포 유도(`μ_t` 및 `σ_t` 유도과정)

        1) 베이즈 정리를 이용한 reverse diff. 확률분포: $p(x^{(t-1)}|x^{(t)}) = p(x^{(t)}|x^{(t-1)})p(x^{(t-1)})/p(x^{(t)})$

        2) 이전 timestep(`t-1`)에서의 데이터($x^{(t-1)}$)는 가우시안 분포를 따른다고 가정하면, **사전(Prior)확률분포**는 $ p^{(x^{(t-1)})} = N(x^{(t-1)}; 0, \sigma^2_{t-1}I) $ 이고, 이를 일반적인 Gaussian Distribution 형태로 표현하면 $p(x^{(t-1)}) = 1 / \sqrt{2\pi \sigma^2_{t-1}} exp \big(  - \frac{x^2_{t-1}}{2 \sigma^2_{t-1}} \big) $ 임

        3) `1)`에 의해 $p(x^{(t-1)}|x^{(t)}) \propto p(x^{(t)}|x^{(t-1)})p(x^{(t-1)}) $ 와 같은 관계를 가지므로, **사후(posterior)확률**을 구하기 위해서는 $p(x^{(t)}|x^{(t-1)})$ 와 $p(x^{(t-1)})$ 두 개의 가우시안 분포를 곱해서 새로운 가우시안 분포로 만들 수 있음

        4) 각 분포의 **지수 부분**만 추출해서 곱하면 $ \exp \left(-\frac{\left(X^{(t)}-\sqrt{1-\beta_t} X^{(t-1)}\right)^2}{2 \beta_t}\right) \times \exp \left(-\frac{X_{t-1}^2}{2 \sigma_{t-1}^2}\right) = \exp \left(-\frac{\left(X_t-\sqrt{\alpha_t} X_{t-1}\right)^2}{2 \beta_t}-\frac{X_{t-1}^2}{2 \sigma_{t-1}^2}\right) $ 와 같이 정리 가능.

        5) `4)`식의 지수 내부를 전개 시 $ -\frac{\left(X_t^2-2 X_t \sqrt{\alpha_t} X_{t-1}+\alpha_t X_{t-1}^2\right)}{2 \beta_t}-\frac{X_{t-1}^2}{2 \sigma_{t-1}^2} $ 가 되고, $ x^{(t-1)} $ 항을 묶으면 $ -\frac{X_t^2}{2 \beta_t}+X_t X_{t-1} \frac{\sqrt{\alpha_t}}{\beta_t}-X_{t-1}^2\left(\frac{\alpha_t}{2 \beta_t}+\frac{1}{2 \sigma_{t-1}^2}\right) $ 이 된다. 
        
        6) 이를 완전제곱식($(x^{(t-1)}-\mu)^2$ 형태)으로 만들게 되면 $ -\frac{1}{2}\left(\frac{\alpha_t}{\beta_t}+\frac{1}{\sigma_{t-1}^2}\right)\left(X_{t-1}-\frac{\frac{X_t \sqrt{\alpha_t}}{\beta_t}}{\frac{\alpha_t}{\beta_t}+\frac{1}{\sigma_{t-1}^2}}\right)^2+\text { (상수항) } $ 가 되기 때문에 얻어내고자 했던 $\mu = 1 / (\frac{1}{\sigma_{t-1}^2} + \frac{\alpha_t}{\beta_t}) \big( \frac{X_t \sqrt{1- \beta_t }}{\beta_t}\big) $ 와 $ \sigma^2 = 1 / (\frac{1}{\sigma_{t-1}^2} + \frac{\alpha_t}{\beta_t}) $ 를 알아낼 수 있음

    - 구현: 시점(`t-1`) 에서의 데이터(`x`)대해 Reverse diff. process의 평균(`μ_t-1`)과 표준편차(`σ_t-1`)를 계산

        1) `mlp_conv_dense` 모델으로부터 timstep에 따른 Reverse과정에서의  `Z_t`를 얻어냄. 이는 `μ_t`와 `β_t`의 계수로 이뤄져있음
        
        2) `get_beta_forward`를 이용해 시점 t의 forward diff Noise 강도(`β_t`)를 얻어냄(t 시점의 beta값 외에는 모두 0으로 이뤄진 전체 timestep 길이만큼의 1차원 텐서)
        
        3) mlp에서 추출한 `beta_coeff`를  전체 timestep 길이에 맞게 스케일링(정규화)된 `beta_coeff_scaled`에 log(`beta_forward`)를 더한 값을 sigmoid 함수에 통과시켜 Reverse Diff. 에서 사용할 `β_t`값을 얻어냄

        4) bayes theorem에 따라 얻어진 porterior 확률분포의 `μ_t-1`를 계산. 여기서 `X_noisy * sqrt(1 - beta_forward)` 부분은 노이즈가 포함된 이미지 `X_t-1`를, `mu_coeff * sqrt(beta_forward)`는 MLP에서 계산한 `μ_t-1` 계수(모델이 학습한 결과)를 반영.

        5) 상기 유도된 두 가우시안 분포의 결합을 바탕으로 분산( $σ^2_{t-1}$ )을 계산. `beta_t-1`값이 커지면 `σ_t-1`도 커지면서 샘플링의 랜덤성이 증가하게 됨.

        - 정리: 베이즈 정리(상기 유도) 기반으로 Reverse Diff 과정의 t-1 시점의 평균 및 분산을 도출했고, 이를 통해 diffusion model이 역방향으로 데이터를 복원할 수 있도록 설계

- `generate_forward_diffusion_sample`: Noising 과정을 구현하고, 이 때의 `μ_t`와  `σ_t`를 계산

    

In [None]:
class DiffusionModel(nn.Module):
    """
    Diffusion 모델은 데이터의 분포를 점진적으로 변환하는 과정인 확산(또는 디퓨전) 과정을 학습.
    특히 역확산(Reverse Diffusion) 과정에서 새로운 샘플을 생성하는 데 사용됨

    Args:
        n_layers_conv (int): 컨볼루션 계층의 수 (하위 MLP와 함께 사용되는 레이어)
        n_layers_dense_lower (int): 하부 MLP에서 사용할 층의 수
        n_layers_dense_upper (int): 상부 MLP에서 사용할 층의 수
        n_hidden_conv (int): 각 컨볼루션 계층의 은닉 유닛 수
        n_hidden_dense_lower (int): 하부 MLP의 은닉 유닛 수
        n_hidden_dense_lower_output (int): 하부 MLP 출력 크기 (공간적으로 전역 정보 추출)
        n_hidden_dense_upper (int): 상부 MLP의 은닉 유닛 수
        spatial_width (int): 입력 이미지의 공간적 크기
        n_colors (int): 입력 이미지의 채널 수
        n_scales (int): 각 컨볼루션에서 사용할 스케일 수
        n_temporal_basis (int): 출력 시간적 계수의 수 (예: mu와 sigma를 예측하는 데 사용)
    """
    def __init__(self,
                 spatial_width,  # 이미지의 공간 크기 (예: 28x28)
                 n_colors,  # 이미지의 색상 채널 수 (예: 3 for RGB)
                 trajectory_length=1000,  # 확산(디퓨전) 경로의 길이
                 n_temporal_basis=10,  # 시간적 기저 함수의 개수
                 n_hidden_dense_lower=500,  # 하위 MLP의 은닉층 유닛 수
                 n_hidden_dense_lower_output=2,  # 하위 MLP의 출력 크기
                 n_hidden_dense_upper=20,  # 상위 MLP의 은닉층 유닛 수
                 n_hidden_conv=20,  # 컨볼루션 레이어의 은닉 유닛 수
                 n_layers_conv=4,  # 컨볼루션 네트워크의 레이어 수
                 n_layers_dense_lower=4,  # 하위 MLP의 레이어 수
                 n_layers_dense_upper=2,  # 상위 MLP의 레이어 수
                 n_t_per_minibatch=1,  # 한 미니배치에서의 타임스텝 수
                 n_scales=1,  # 스케일 수 (스케일 공간을 다루는 방식)
                 step1_beta=0.001,  # 첫 번째 베타 값
                 uniform_noise=0):  # 균일 분포 잡음
        super(DiffusionModel, self).__init__()

        # 주요 파라미터 초기화
        self.spatial_width = spatial_width
        self.n_colors = n_colors
        self.trajectory_length = trajectory_length
        self.n_temporal_basis = n_temporal_basis
        self.n_t_per_minibatch = n_t_per_minibatch
        self.uniform_noise = uniform_noise

        # MLP 모델 정의 (컨볼루션+MLP 혼합 아키텍처)
        self.mlp = MLP_conv_dense(n_layers_conv, n_layers_dense_lower, n_layers_dense_upper,
                                  n_hidden_conv, n_hidden_dense_lower, n_hidden_dense_lower_output,
                                  n_hidden_dense_upper, spatial_width, n_colors, n_scales, n_temporal_basis)
        
        # 시간적 기저 함수 생성
        self.temporal_basis = self.generate_temporal_basis(trajectory_length, n_temporal_basis)
        
        # 베타 값 계산 (확산 경로의 noise 강도)
        self.beta_arr = self.generate_beta_arr(step1_beta)

    def generate_beta_arr(self, step1_beta):
        """
        확산 경로에서 베타 값을 생성하는 함수.
        베타는 각 timestep마다 noise의 강도를 결정하며,
        이를 통해 데이터가 점진적으로 확산되는 과정을 모델링함.
        
        순서:
          1) 최소 베타값 초기화 및 timestep수에 대해 min_beta_val로 초기화
          2) 첫 번째 타임스텝에만 step1_beta 추가(최소값인 1e-6이 너무 작음)
          3) 베타 변화를 결정하는 보정값(beta_perturb) 계산
          4) 베타의 기저선을 계산한 뒤 보정값을 더하고 sigmoid 통과 
          5) 최대/최소값을 지키도록 보정하여 1차원 텐서로 반환
          
        Args:
            step1 beta(t=1일때 t=0일때 보다 Noise를 확실히 증가시키기 위함)
            
        Returns:
            최종 출력 텐서 (batch, spatial_width, spatial_width, n_output)
        """
        min_beta_val = 1e-6  # 최소 베타 값 초기화
        # 모든 timestep(trajectory_length)에 대해 min_beta_val로 초기화
        min_beta_values = np.ones((self.trajectory_length,)) * min_beta_val 
        min_beta_values[0] += step1_beta  # 첫 번째 타임스텝에만 step1_beta를 추가
                                          # t=0보다 확실히 많은 노이즈를 주기 위해 적용
        
        # 베타 변화를 결정하는 보정값(beta_perturb) 계산
        # 1) 시간 기저 함수(n_temporal_basis) 에 따른 베타값 변화(기본값 = 0)
        # TODO add beta_perturb_coefficients to the parameters to be learned
        beta_perturb_coefficients_values = np.zeros((self.n_temporal_basis,))
        
        # 2) 시간 기저 함수와 베타 변화 보정값을 합산(dot product)
        beta_perturb = torch.matmul(self.temporal_basis.T,
                                    torch.tensor(beta_perturb_coefficients_values, dtype=torch.float32))
        
        # 베타의 기저선을 계산
        # step 수에 따라 t=0 -> t=T 까지 선형 증가하는 배열 생성
        beta_baseline = 1. / np.linspace(self.trajectory_length, 2., self.trajectory_length)
        beta_baseline_offset = torch.tensor(np.log(beta_baseline), dtype=torch.float32) # 로그를 취해 스케일 조정

        # 최종 베타 값 계산 (sigmoid 함수를 통해 스케일링)
        # 베타 기저값에 보정값을 더해 sigmoid 함수에 통과(0~1사이값으로 변환)
        beta_arr = torch.sigmoid(beta_perturb + beta_baseline_offset)
        # sigmoid 통과 후 최소 베타값 및 최대값(1 미만)이 되도록 보정
        beta_arr = min_beta_val + beta_arr * (1 - min_beta_val - 1e-5) 
        return beta_arr.view(self.trajectory_length, 1) # t에 따른 베타값을 1차원 텐서로 반환

    def get_t_weights(self, t):
        """
        주어진 timestep(t)에 대해 다른 t와의 차이를 계산하여 가중치를 설정하는 함수.
        입력 시점(t)을 제외하고는 0으로 처리하여 t 시점의 정보만 뽑아내는 역할을 수행.
        
        순서:
          1) 0~(t-1)까지의 배열 생성
          2) 입력받은 t와 다른 step들의 차이를 절댓값으로 계산 후 1에서 뺌
          3) 최대값을 제외하고 모두 0으로 변경(입력받은 t의 Index = 1/ 나머지는 0)
          4) 입력된 t값을 제외한 다른 Index의 값이 0인 timestep 길이만큼의 배열을 반환
          
        Args:
            t(추출하고싶은 시점의 timestep)
        
        Returns:
            입력받은 t 시점을 제외하고 모두 0인 timestep 수 만큼의 1차원 텐서
        """
        n_seg = self.trajectory_length # 전체 timestep 수
        # 0부터 trajectory_length-1까지의 타임스텝 배열 초기화
        t_compare = torch.arange(n_seg, dtype=torch.float32).view(1, n_seg) 
        # 매개변수 t와 모든 timestep의 차이를 절대값으로 계산
        diff = torch.abs(t.view(1, 1) - t_compare)
        # 입력받은 시점(t)에 가까울수록 높은 가중치를 가지도록 함
        # 입력 시점을 제외하고는 0으로 처리하여 get_beta_forward 함수 등에서 get_t_weights(t)를 곱하면 t시점에서의 값을 강조
        t_weights = torch.max(torch.stack([(1 - diff).view(n_seg, 1), torch.zeros((n_seg, 1))]), dim=1)[0]
        return t_weights.view(-1, 1) # 1차원 배열로 반환

    def get_beta_forward(self, t):
        """
        get_t_weights를 이용하여 특정 timestep(t)에 해당하는 베타 값을 계산하고 반환함.
        학습 과정에서 사용함.
        
        Args:
          t(beta값을 얻고자 하는 시점(t))
        
        Returns:
          t 시점의 Beta 외에는 0으로 이뤄진 timestep 수 만큼의 1차원 텐서
        """
        t_weights = self.get_t_weights(t) # t 시점을 제외하고 모두 0인 timestep 수 만큼의 1차원 텐서 반환
        return torch.matmul(t_weights.T, self.beta_arr) # t 시점의 Beta 외에는 0으로 이뤄진 timestep 수 만큼의 1차원 텐서 반환

    def get_mu_sigma(self, X_noisy, t):
        """
        MLP를 통해 노이즈가 추가된 이미지 X_noisy와 타임스텝 t에 대해 mu와 sigma를 계산.
        계산된 mu와 sigma에 보정 분포를 더해 forward에 "근사"시켜 
        역확산 모델에서 사용할 평균과 표준편차를 계산.
        
        순서:
          1) forward process를 통해 얻어진 X_t 를 MLP에 통과하여 Reverse 과정에서의 mu_t와 beta_t 예측값을 얻어냄 
          2) forward에서의 beta_t에 log를 취한 값 + 정규화된 reverse Diff에서의 beta_t => sigmoid에 통과 => 확률값(beta_reverse) 얻어냄
          3) t 에서의 Noising된 데이터 + MLP를 통해 얻어낸 부분에서 sigma를 계산
          4) t 에서의 분산은 beta_t가 지배하므로, beta_t에 루트를 씌워 표준편차(sigma_t)를 구함
        
        Args:
            Noise가 추가된 X_t와 시점(t)
        Returns:
            보정에 의해 얻어진 mu와 sigma
        """
        Z = self.mlp(X_noisy)  # MLP를 통해 Reverse Diff의 mu, beta 계산
        mu_coeff, beta_coeff = self.temporal_readout(Z, t)  # t에 해당하는 에서 mu와 beta를 제외한 전체 step 길이의 1차원 텐서
        beta_forward = self.get_beta_forward(t) # 시점 t에서의 beta값 외에는 0으로 이뤄진 timestep 수 만큼의 1차원 텐서
        # 시간적 기저에서의 beta를 전체 trajectory 길이로 스케일링
        beta_coeff_scaled = beta_coeff / torch.sqrt(torch.tensor(self.trajectory_length, dtype=torch.float32))
        # sigmoid 함수를 통해(0~1사이의 valid한 값) 시점 t에서의 beta를 조정
        # -> reverse diffusion 과정에서의 노이즈 수준을 결정
        beta_reverse = torch.sigmoid(beta_coeff_scaled + torch.log(beta_forward))
        
        # mu와 sigma(근사치) 계산
        mu = X_noisy * torch.sqrt(1 - beta_forward) + mu_coeff * torch.sqrt(beta_forward)
        sigma = torch.sqrt(beta_reverse)
        return mu, sigma

    def generate_forward_diffusion_sample(self, X_noiseless):
        """
        주어진 X_noiseless 이미지에 대해 t 시점에 대해 Noising된 forward diffusion 샘플을 생성.
        동시에 forward 과정에서의 mu_t와 sigma_t도 계산.
        
        순서:
          1) 입력된 이미지를 (n_channels, w, h)형태로 변환, 
          2) 
          
        Args:
          원본 데이터(X_0)  
        Returns:
          t 시점에서의 노이징 된 이미지
          선택한 timestep
          Reverse에서 사용할 평균(mu) 및 표준편차(sigma)
        """
        X_noiseless = X_noiseless.view(-1, self.n_colors, self.spatial_width, self.spatial_width)
        n_images = X_noiseless.size(0)
        t = torch.floor(torch.rand(1) * (self.trajectory_length - 1) + 1)
        t_weights = self.get_t_weights(t)

        # 가우시안 노이즈 추가
        N = torch.normal(mean=torch.zeros((n_images, self.n_colors, self.spatial_width, self.spatial_width)),
                         std=torch.ones((n_images, self.n_colors, self.spatial_width, self.spatial_width)))

        beta_forward = self.get_beta_forward(t)
        alpha_forward = 1. - beta_forward
        alpha_arr = 1. - self.beta_arr
        alpha_cum_forward_arr = torch.cumprod(alpha_arr, dim=0).view(self.trajectory_length, 1)
        alpha_cum_forward = torch.matmul(t_weights.T, alpha_cum_forward_arr)
        
        beta_cumulative = 1. - alpha_cum_forward
        beta_cumulative_prior_step = 1. - alpha_cum_forward / alpha_forward

        X_uniformnoise = X_noiseless + (torch.rand_like(X_noiseless) - 0.5) * self.uniform_noise
        X_noisy = X_uniformnoise * torch.sqrt(alpha_cum_forward) + N * torch.sqrt(1. - alpha_cum_forward)

        # mu와 sigma 계산
        mu1_scl = torch.sqrt(alpha_cum_forward / alpha_forward)
        mu2_scl = 1. / torch.sqrt(alpha_forward)
        cov1 = 1. - alpha_cum_forward / alpha_forward
        cov2 = beta_forward / alpha_forward
        lam = 1. / cov1 + 1. / cov2
        
        mu = (X_uniformnoise * mu1_scl / cov1 + X_noisy * mu2_scl / cov2) / lam
        sigma = torch.sqrt(1. / lam).view(1, 1, 1, 1)

        return X_noisy, t, mu, sigma

    def get_beta_full_trajectory(self):
        """
        전체 확산 경로에 대한 베타 값을 계산합니다.
        """
        alpha_arr = 1. - self.beta_arr
        beta_full_trajectory = 1. - torch.exp(torch.sum(torch.log(alpha_arr)))
        return beta_full_trajectory

    def get_negL_bound(self, mu, sigma, mu_posterior, sigma_posterior):
        """
        모델의 손실 함수인 음의 로그 우도(negative log likelihood)를 계산합니다.
        이는 KL 발산과 엔트로피를 기반으로 한 바운드를 포함합니다.
        """
        KL = torch.log(sigma) - torch.log(sigma_posterior) + (sigma_posterior ** 2 + (mu_posterior - mu) ** 2) / (2 * sigma ** 2) - 0.5
        H_startpoint = (0.5 * (1 + np.log(2. * np.pi))).astype(torch.float32) + 0.5 * torch.log(self.beta_arr[0])
        H_endpoint = (0.5 * (1 + np.log(2. * np.pi))).astype(torch.float32) + 0.5 * torch.log(self.get_beta_full_trajectory())
        H_prior = (0.5 * (1 + np.log(2. * np.pi))).astype(torch.float32) + 0.5 * torch.log(torch.tensor(1.))
        
        negL_bound = KL * self.trajectory_length + H_startpoint - H_endpoint + H_prior
        negL_gauss = (0.5 * (1 + np.log(2. * np.pi))).astype(torch.float32) + 0.5 * torch.log(torch.tensor(1.))
        negL_diff = negL_bound - negL_gauss
        L_diff_bits = negL_diff / torch.log(torch.tensor(2.))
        L_diff_bits_avg = L_diff_bits.mean() * self.n_colors
        return L_diff_bits_avg

    def cost_single_t(self, X_noiseless):
        """
        주어진 타임스텝에 대해 하나의 비용을 계산하는 함수입니다.
        """
        X_noisy, t, mu_posterior, sigma_posterior = self.generate_forward_diffusion_sample(X_noiseless)
        mu, sigma = self.get_mu_sigma(X_noisy, t)
        negL_bound = self.get_negL_bound(mu, sigma, mu_posterior, sigma_posterior)
        return negL_bound

    def temporal_readout(self, Z, t):
        """
        MLP 출력을 기반으로 시간적 정보를 읽어들이는 함수.
        
        
        """
        n_images = Z.size(0) # mlp출력의 원소 수
        t_weights = self.get_t_weights(t) # 
        Z = Z.view(n_images, self.spatial_width, self.spatial_width, self.n_colors, 2, self.n_temporal_basis)
        coeff_weights = torch.matmul(self.temporal_basis, t_weights)
        concat_coeffs = torch.matmul(Z, coeff_weights)
        mu_coeff = concat_coeffs[:, :, :, :, 0].permute(0, 3, 1, 2)
        beta_coeff = concat_coeffs[:, :, :, :, 1].permute(0, 3, 1, 2)
        return mu_coeff, beta_coeff

    def generate_temporal_basis(self, trajectory_length, n_basis):
        """
        시간적 기저 함수(temporal basis)를 생성하는 함수.
        """
        temporal_basis = np.zeros((trajectory_length, n_basis))
        xx = np.linspace(-1, 1, trajectory_length)
        x_centers = np.linspace(-1, 1, n_basis)
        width = (x_centers[1] - x_centers[0]) / 2.
        for ii in range(n_basis):
            temporal_basis[:, ii] = np.exp(-(xx - x_centers[ii]) ** 2 / (2 * width ** 2))
        temporal_basis /= np.sum(temporal_basis, axis=1).reshape((-1, 1))
        temporal_basis = temporal_basis.T
        return torch.tensor(temporal_basis, dtype=torch.float32)

    def forward(self, X_noiseless):
        """
        주어진 X_noiseless 이미지에 대해 전체 모델을 실행하여 손실을 반환합니다.
        """
        cost = 0.
        for ii in range(self.n_t_per_minibatch):
            cost += self.cost_single_t(X_noiseless)
        return cost / self.n_t_per_minibatch

### train

In [7]:
# 🔹 Argument Parser
def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('--batch-size', default=512, type=int, help='Batch size')
    parser.add_argument('--lr', default=1e-3, type=float, help='Initial learning rate')
    parser.add_argument('--resume_file', default=None, type=str, help='Saved model to continue training')
    parser.add_argument('--suffix', default='', type=str, help='Optional suffix for model')
    parser.add_argument('--output-dir', type=str, default='./', help='Output directory')
    parser.add_argument('--epochs', type=int, default=100, help='Number of training epochs')
    parser.add_argument('--dropout_rate', type=float, default=0., help='Dropout rate')
    parser.add_argument('--dataset', type=str, default='MNIST', help='Dataset to use')
    args = parser.parse_args()
    return args

# 🔹 데이터셋 로딩 함수
def get_dataloader(dataset_name, batch_size):
    if dataset_name == 'MNIST':
        transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.5,), (0.5,))  # MNIST는 흑백이므로 채널 1개
        ])
        train_dataset = datasets.MNIST(root='./data', train=True, transform=transform, download=True)
        test_dataset = datasets.MNIST(root='./data', train=False, transform=transform, download=True)
        n_colors, spatial_width = 1, 28
    elif dataset_name == 'CIFAR10':
        transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # CIFAR10은 RGB
        ])
        train_dataset = datasets.CIFAR10(root='./data', train=True, transform=transform, download=True)
        test_dataset = datasets.CIFAR10(root='./data', train=False, transform=transform, download=True)
        n_colors, spatial_width = 3, 32
    else:
        raise ValueError("Unknown dataset: {}".format(dataset_name))

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    
    return train_loader, test_loader, n_colors, spatial_width

# 🔹 학습 함수
def train_model(args):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # 데이터 로딩
    train_loader, test_loader, n_colors, spatial_width = get_dataloader(args.dataset, args.batch_size)

    # 모델 초기화
    dpm = DiffusionModel(spatial_width, n_colors, uniform_noise=1./255.).to(device)
    
    # 옵티마이저 및 손실 함수
    optimizer = optim.RMSprop(dpm.parameters(), lr=args.lr)
    criterion = nn.MSELoss()  # 논문에 따라 손실함수를 맞춰야 함

    # 모델 이어서 학습하기
    if args.resume_file:
        print(f"Loading checkpoint: {args.resume_file}")
        dpm.load_state_dict(torch.load(args.resume_file, map_location=device))

    # 🔹 학습 루프
    num_epochs = args.epochs
    for epoch in range(num_epochs):
        dpm.train()
        total_loss = 0.0

        for batch in train_loader:
            inputs, _ = batch  # MNIST/CIFAR10 데이터는 (image, label) 형식
            inputs = inputs.to(device)

            optimizer.zero_grad()
            loss = dpm.cost(inputs)  # 모델의 cost() 함수 사용
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
        
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss/len(train_loader):.6f}")

        # 모델 저장
        if (epoch + 1) % 25 == 0:
            save_path = os.path.join(args.output_dir, f'model_epoch{epoch+1}.pth')
            torch.save(dpm.state_dict(), save_path)
            print(f"Model saved: {save_path}")