In [None]:
# diffusion modelの実装
# 参考サイト：https://qiita.com/CabbageRoll/items/7c79ae63ba417271226e

In [None]:
import os
import math
import random
from tqdm.auto import tqdm
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn

# ハイパラをdataclassとjsonを用いて管理
import json
from dataclasses import dataclass, field

# 漢字生成
# from fontTools import ttLib
# from PIL import Image, ImageFont, ImageDraw

# UNetはこちらを利用しています
# ファイルをダウンロードしてインポートするか、コピペしてください
# https://github.com/tcapelle/Diffusion-Models-pytorch/blob/main/modules.py
from modules import UNet

In [None]:
# 今回はDiffusion Modelを理解するためなので実データの投入は後回し

In [None]:
# Denoising Diffuion Probabilistic Model
class DDPM(nn.Module):
  def __init__(self, T, device):
    super().__init__()
    self.T = T
    self.device = device

    # β, αは頻出なのでイニシャライズ段階で書いておく
    self.beta_1 = 1e-4 # ddpm reportの値，変更可能
    self.beta_T = 0.02 # ddpm reportの値，変更可能

    self.betas = torch.linspace(self.beta_1, self.beta_T, T, device=device) # 要素数，開始値，終了値，step数を指定し連番や等差数列を生成（線形補間）
    self.alphas = 1.0 - self.betas
    self.alphas_bars = torch.cumprod(self.alphas, dim=0) # Παi，dim=0は横方向

  # 拡散過程(x_0からノイズ入りのx_tを生成)
  def diffusion_process(self, x0, t=None):
    if t is None:
      # バッチの各データについてランダムに時間を決定
      t = torch.randint(low=1, high=self.T, size=(x0.shape[0],), device=self.device) # t：[1, T]のランダムな整数値，配列の大きさは size=(batch_size, )
    noise = torch.randn_like(x0, device=self.device)
    alpha_bar = self.alpha_bars[t].reshape(-1,1,1,1)
    # x_t = √(1-β_t) * x_(t-1) + √β_t * ε = √α_bar * x_0 + √(1 - α_bar) * ε
    xt = torch.sqrt(alpha_bar) * x0 + torch.sqrt(1 - alpha_bar) * noise
    return xt, t, noise

  # 逆拡散過程
  # MEMO: "model"はノイズを予測する学習済みモデル
  # MEMO: "img"はノイズを除去する画像(x_T)
  # MEMO: "ts"はx_Tの時刻T
  def denoising_process(self, model, img, ts):
    batch_size = img.shape[0]
    model.eval()
    with torch.no_grad():
      time_step_bar = tqdm(reversed(range(1,ts)), leave=False, position=0)
      for t in time_step_bar: # ts, ts-1, .... 3, 2, 1
        # 時刻をテンソル変換　サイズは(batch_size, )
        time_tensor = (torch.ones(batch_size, device=self.device) * t).long()
        # ノイズを予測
        prediction_noise = model(img, time_tensor)
        # ノイズを少し取り除く
        img = self._calc_denoising_one_step(img, time_tensor, prediction_noise)
    model.train()

  # TODO: なぜreshapeしているのか調べる
  def _calc_denoising_one_step(self, img, time_tensor, prediction_noise):
    beta = self.betas[time_tensor].reshape(-1, 1, 1, 1)
    sqrt_alpha = torch.sqrt(self.alphas[time_tensor].reshape(-1, 1, 1, 1))
    alpha_bar = self.alpha_bars[time_tensor].reshape(-1, 1, 1, 1)
    sigma_t = torch.sqrt(beta)
    noise = torch.rand_like(img, device=self.device) if time_tensor[0].item() > 1 else torch.zeros_like(img, device=self.device)
    # x_(t-1) = 1/(√1-β_t) * (x_t - β_t / (√1-α_bar)) * ε_θ+σ_t * Z
    img = 1 / sqrt_alpha * (img - (beta / (torch.sqrt(1 - alpha_bar))) * prediction_noise) + sigma_t * noise
    return img

In [None]:
# 学習コード
# MEMO: ハイパラはdataclassで定義
# MEMO: UNetでベースをつくる
# MEMO: 損失関数はMSEでOK
def ddpm_train(params):
  device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
  print(f"{device=}")
  log_dir = make_save_dir_and_save_params(params) # ログディレクトリの作成とパラメータの保存
  model_path = os.path.join(log_dir, f"model_weight_on_{device}") # モデルの重みを保存するパスを指定

  # dataset = create_dataset(params.pix, params.extst_load=True) # TODO: 使用するデータに合わせて変更
  dataloader = torch.utils.data.DataLoader(dataset, batch_size=params.batch_size, shuffle=True, drop_last=True) # 訓練データのロード、バッチ処理やシャッフルの準備
  ddpm = DDPM(params.time_steps, device)
  model = UNet(params.image_ch, params.image_ch).to(device)
  optimizer = torch.optim.AdamW(model.parameters(), lr=params.lr)
  loss_fn = torch.nn.MSELoss()

  start_epoch = 1
  loss_longger = []
  loss_min = 9e+9

  # 継続して計算する場合はロード
  if params.load_file and os.path.exists(model_path):
    model, optimizer, start_epoch, loss_longer, loss_min = load_checkpoint(params, model, optimizer, model_path, device)

  model.train()
  epoch_bar = tqdm(range(start_epoch, params.epoch + 1))
  # MEMO: "loss_tmp" はエポックごとの損失の合計を保持
  # MEMO: "out" はx_tとtをUNetに代入し得られた出力で，"loss" で元のノイズとの差を計算する
  for epoch in epoch_bar:
    epoch_bar.set_description(f"Epoch:{epoch}")
    loss_tmp = 0
    iter_bar = tqdm(dataloader, leave=False)
    for iter, x in enumerate(iter_bar): # 各バッチごとの処理
      x = x.to(device)
      xt, t, noise = ddpm.diffusion_process(x)
      out = model(xt, t)
      loss = loss_fn(noise, out)
      optimizer.zero_grad() # 以前のバッチで計算された勾配を初期化
      loss.backward() # 損失を逆伝播し各パラメータの勾配を計算
      optimizer.step() # パラメータ更新

      iter_bar.set_postfix({"loss=":f"{loss.item():.2e}"})
      loss_tmp += loss.item()

    loss_logger.append(loss_tmp / (iter + 1)) # 各エポックでの損失を記録
    epoch_bar.set_postfix({"loss=": f"{loss_logger[-1]:.2e}"}) # 進捗バーの右側に最新のエポックでの損失値を表示

    # 保存処理
    # lossの経過グラフを出力
    save_loss_logger_and_graph(log_dir, loss_logger)
    if loss_min >= loss_logger[-1]:
      torch.save({'epoch': epoch,
                  'model_state_dict': model.state_dict(),
                  'optimizer_state_dict': optimizer.state_dict(),
                  'loss': loss_logger,
                  }, model_path
      )
      loss_min = loss_logger[-1]

    # 指定したstepで逆拡散による画像生成
    if epoch % params.img_save_steps == 0:
      x0 = torch.randn([32, params.image_ch, params.pix, params.pix], device=device)
      img = ddpm.denoising_process(model, x0, params.time_steps).to("cpu")
      save_images_plt(img, log_dir=log_dir, epoch=epoch)

# ここからコピペ
def load_checkpoint(params, model, optimizer, model_parh, device):
  print(f"load model {model_path}")
  checkpoint = torch.load(model_path, map_location=device)
  model.load_state_dict(checkpoint['model_state_dict'])
  optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
  start_epoch = checkpoint['epoch']
  loss_logger = checkpoint["loss"]
  loss_min = min(loss_logger)
  print(start_epoch)
  return model, optimizer, start_epoch, loss_logger, loss_min

def make_save_dir_and_save_params(params):
    # タスクの保存フォルダ
    log_dir = os.path.join(r"./", "log", params.task_name)
    os.makedirs(log_dir, exist_ok=True)
    # epoch, iter毎のデータは多くなるので別フォルダを作る
    log_dir_hist = os.path.join(log_dir, "hist")
    os.makedirs(log_dir_hist, exist_ok=True)
    # 設定ファイルの保存
    with open(os.path.join(log_dir, "parameters.json"), 'w') as f:
        json.dump(vars(params), f, indent=4)
    return log_dir

def save_loss_logger_and_graph(log_dir, loss_logger):
    # loss履歴情報を保管しつつ、グラフにして画像としても書き出す
    torch.save(loss_logger, os.path.join(log_dir, "loss_logger.pt"))
    fig, ax = plt.subplots(1,1)
    epoch = range(len(loss_logger))
    ax.plot(epoch, loss_logger, label="train_loss")
    ax.set_ylim(0, loss_logger[-1]*5)
    ax.legend()
    fig.savefig(os.path.join(log_dir, "loss_history.jpg"))
    plt.clf()
    plt.close()

def save_images_plt(images, log_dir, epoch, s=2):
    # 生成した画像を並べた図を作成して保存する
    num_img = images.shape[0]
    img_arr = 255 - images.detach().numpy()
    num_row = int(num_img ** 0.5)
    num_col = (num_img - 1) // num_row + 1
    fig, ax = plt.subplots(num_row, num_col,
                          figsize=(num_col*s, num_row*s),
                          tight_layout=True,
                          sharex=True, sharey=True )
    axs = ax.ravel() if num_img > 1 else [ax]
    for i, img in enumerate(img_arr):
        if img.shape[0] == 1:
            img = img[0, :, :]
            axs[i].imshow(img, cmap="gray")
        else:
            img = np.transpose(img, [1, 2, 0])
            axs[i].imshow(img)
    if isinstance(epoch, int):
        epoch = f"{epoch:06d}"
    fig.suptitle(f"epoch={epoch}", size=15)
    fig.savefig(os.path.join(log_dir, "hist", f"kanji_imgs_epoch_{epoch}.jpg"))
    fig.savefig(os.path.join(log_dir, f"kanji_imgs_epoch_latest.jpg"))
    plt.clf()
    plt.close()

@dataclass # TODO: タスクによって変更
class HyperParameters:
    task_name: str = "kanji_diffusion"
    epochs: int = 500
    img_save_steps: int = 10
    batch_size: int = 128
    lr: float = 3e-4
    time_steps: int = 1000  # T もう少し小さくても良いはず
    load_file: bool = True
    pix: int = 32
    font_file: str = r"./ヒラギノ角ゴシック W5.ttc"
    image_ch: int = 1


In [None]:
params = HyperParameters()
ddqm_train(params)

In [None]:
# 拡散過程をお試し

In [None]:
# 学習結果をお試し