In [2]:
import  matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import math

import tensorflow as tf
from tensorflow import keras
from keras import(
    layers,
    models,
    optimizers,
    utils,
    callbacks,
    metrics,
    losses,
    activations,
)

In [3]:
IMAGE_SIZE=64
BATCH_SIZE=64
DATASET_REPETITION=5
LOAD_MODEL=False

NOISE_EMBEDDING_SIZE=32
PLOT_DIFFUSION_STEPS=20

EMA=0.99
LEARNING_RATE=1e-3
WEIGHT_DECAY=1e-4
EPOCHS=50

In [4]:
!wget https://raw.githubusercontent.com/rickiepark/Generative_Deep_Learning_2nd_Edition/main/notebooks/utils.py
!mkdir -p notebooks
!mv utils.py notebooks
# output 디렉토리를 만듭니다.
!mkdir output
!mkdir model

--2024-11-16 16:08:37--  https://raw.githubusercontent.com/rickiepark/Generative_Deep_Learning_2nd_Edition/main/notebooks/utils.py
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.108.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 771 [text/plain]
Saving to: ‘utils.py’


2024-11-16 16:08:38 (41.5 MB/s) - ‘utils.py’ saved [771/771]



In [5]:
from google.colab import files
files.upload()
!mkdir ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json
!kaggle datasets download -d nunenuh/pytorch-challange-flower-dataset
!unzip -q pytorch-challange-flower-dataset.zip
!mkdir output

Saving kaggle (1).json to kaggle (1).json
cp: cannot stat 'kaggle.json': No such file or directory
chmod: cannot access '/root/.kaggle/kaggle.json': No such file or directory
Dataset URL: https://www.kaggle.com/datasets/nunenuh/pytorch-challange-flower-dataset
License(s): CC0-1.0
Downloading pytorch-challange-flower-dataset.zip to /content
100% 330M/330M [00:01<00:00, 219MB/s]
100% 330M/330M [00:01<00:00, 193MB/s]
mkdir: cannot create directory ‘output’: File exists


In [6]:
train_data = utils.image_dataset_from_directory(
    "./dataset/",
    labels=None,
    image_size=(IMAGE_SIZE, IMAGE_SIZE),
    batch_size=None,  # 데이터를 한번에 로드
    shuffle=True,
    seed=42,
    interpolation="bilinear",  #주변 4개의 픽셀을 선형보간
)

Found 8189 files.


In [7]:
def preprocessing(img):
  img = tf.cast(img, "float32")/255.0  # 0-1 값의 소수로 변환
  return img

train = train_data.map(lambda x:preprocessing(x))
trian = train.repeat(DATASET_REPETITION) # 크기 DATASET_REPITITION만큼 복사
train = train.batch(BATCH_SIZE, drop_remainder=True)

In [8]:
def offset_cosine_diffusion_schedule(diffusion_times):
  min_signal_rate= 0.2
  max_signal_rate= 0.95
  start_angle = tf.acos(max_signal_rate)
  end_angle = tf.acos(min_signal_rate)

  diffusion_angles = start_angle + diffusion_times*(end_angle - start_angle)
  signal_rates = tf.cos(diffusion_angles)
  noise_rates = tf.sin(diffusion_angles)

  return signal_rates, noise_rates

In [9]:
# 사인파 임베딩
from keras.utils import register_keras_serializable

@register_keras_serializable(package="Custom")
def sinusoidal_embedding(x): # x 차원 (1, 1, 1, 1)
  frequencies = tf.exp(
      tf.linspace(
          tf.math.log(1.0), tf.math.log(1000.0), NOISE_EMBEDDING_SIZE//2,
          )  # ln1 ~ ln1000 을 16으로 분할
      )
  angular_speeds = 2.0 * math.pi * frequencies  # 차원 (16, )
  embeddings = tf.concat([tf.sin(angular_speeds*x), (tf.cos(angular_speeds*x))], axis=3)
   # 곱할 때 브로드캐스팅 사용됨
   # embeddings 차원 concat((1, 1, 1, 16) + 1, 1, 1, 16) -> (1, 1, 2, 16)
  return embeddings

In [10]:
embedding_list=[]
for y in np.arange(0, 1, 0.01):
  embedding_list.append(sinusoidal_embedding(np.array([[[[y]]]]))[0][0][0]) # (2, 16) 차원 요소 추가
embedding_array = np.array(np.transpose(embedding_list))
# embedding_array 차원 : (100, 2, 16) -> (16, 100, 2)
# 임베딩 배열에서 2요소는 0부터 1까지의 값을 가짐 & 2요소는 특정 노이즈 분산

In [11]:
"""잔차 블록
스킵 연결을 사용하는데 그레디언트 소실과 성능저하를 막고 신경망을 더 깊게 만들 수 있음
스킵연결이란 컨볼루션을 진행하기 전과 후의 이미지 요소들의 값들을 더하는 것이다.
즉 학습되기 이전의 이미지를 더해주므로서 그레디언트 소실을 예방한다."""
def ResidualBlock(width):
  def apply(x):
    input_width = x.shape[3]
    if input_width == width:
      residual = x
    else:
      residual = layers.Conv2D(width, kernel_size=1)(x)
      # 깊이가 width와 다르다면 깊이를 1로 만들어 브로드캐스팅을 한다
    x = layers.BatchNormalization(center=False, scale=False)(x) # 정규화
    x = layers.Conv2D(width, kernel_size = 3, padding= 'same', activation= activations.swish)(x)
    x = layers.Conv2D(width, kernel_size = 3, padding = 'same')(x)
    # 크기 유지하면서 깊이는 width로 2번 컨볼루션 진행
    x = layers.Add()([x, residual])
    return x
  return apply

In [12]:
"""block_depth만큼 스킵연결을 수행하고 Upsampling과 연결할 이미지를 저장한다.
    또한 이미지의 크기를 줄여서 특성이미지를 생성한다. """
def DownBlock(width, block_depth):
  def apply(x):
    x, skips = x
    for _ in range(block_depth):
      x = ResidualBlock(width)(x)
      skips.append(x) # UpSampling에서 concatenate할 이미지 저장
    x = layers.AveragePooling2D(pool_size=2)(x)
    return x
  return apply

In [13]:
"""Upsampling을 하여 pred nosie를 찾는다. skips.pop을 수행하여 이전 이미지와 연결한다 """
def UpBlock(width, block_depth):
  def apply(x):
    x, skips = x
    x = layers.UpSampling2D(size=2, interpolation="bilinear")(x) # 선형보간법으로 Upsampling
    for _ in range(block_depth):
      x = layers.Concatenate()([x, skips.pop()])
      x = ResidualBlock(width)(x)
    return x
  return apply

In [14]:
"""unet 구축"""

# noisy 이미지 입력
noisy_images = layers.Input(shape= (IMAGE_SIZE, IMAGE_SIZE, 3))
x = layers.Conv2D(32, kernel_size= 1)(noisy_images)

#  noise_variance 값에서 사인파 인베딩의 값
noise_variances = layers.Input(shape= (1, 1, 1))
noise_embedding = layers.Lambda(sinusoidal_embedding)(noise_variances) # 크기 = (1, 1, 32)
noise_embedding = layers.UpSampling2D(size=IMAGE_SIZE, interpolation="nearest")(noise_embedding)

# 노이즈 이미지와 노이즈 분산값을 연결
x = layers.Concatenate()([x, noise_embedding])

skips = []

x = DownBlock(32, block_depth=2)([x, skips])
x = DownBlock(64, block_depth=2)([x, skips])
x = DownBlock(96, block_depth=2)([x, skips])

x = ResidualBlock(128)(x)
x = ResidualBlock(128)(x)

x = UpBlock(96, block_depth=2)([x, skips])
x = UpBlock(64, block_depth=2)([x, skips])
x = UpBlock(32, block_depth=2)([x, skips])

x = layers.Conv2D(3, kernel_size=1, kernel_initializer="zeros")(x)
unet = models.Model([noisy_images, noise_variances], x, name="unet")

In [20]:

class DiffusionModel(models.Model):
  def __init__(self):
    super().__init__()
    self.normalizer = layers.Normalization()
    self.network = unet
    self.ema_network = models.clone_model(self.network)
    self.diffusion_schedule = offset_cosine_diffusion_schedule

  def compile(self, **kwargs):
    super().compile(**kwargs)
    self.noise_loss_tracker = metrics.Mean(name = "n_loss")

  @property
  def metrics(self):
    return [self.noise_loss_tracker]

  # 잡음 제거 예상 이미지에 잡음을 첨가하여 새로운 이미지를 생성한다.
  def denormalize(self, images): # images <- pred_images
    images = self.normalizer.mean + images*self.normalizer.variance**0.5 # 초기 잡은 생성
    return tf.clip_by_value(images, 0.0, 1.0) # 0-1 사이의 값을 가진 이미지 생성

  # noise 예측, 초기 이미지 예측
  def denoise(self, noisy_images, noise_rates, signal_rates, training):
  # noise_rates, signal_rates <- offset_cosine_diffusion_schedule
    if training:
      network = self.network  #unet을 이용하여 pred_noise 도출
    else:
      network = self.ema_network
    pred_noises = network([noisy_images, noise_rates**2], training = training)
    pred_images = (noisy_images - noise_rates*pred_noises) / signal_rates
    """noisy_images = signal_rates * pred_images + noise_rates * pred_noise"""
    return pred_noises, pred_images

  # 노이즈를 정확도를 높여서 초기 이미지를 더 정교하게 예측하여 반환한다.
  def reverse_diffusion(self, initial_noise, diffusion_steps):
    num_images = initial_noise.shape[0] # batch size
    step_size = 1.0 / diffusion_steps # diffusion_steps = PLOT_DIFFUSION_STEPS = 20
    current_images = initial_noise
    for step in range(diffusion_steps):
      diffusion_times = tf.ones((num_images, 1, 1, 1)) - step * step_size
      noise_rates, signal_rates  = self.diffusion_schedule(diffusion_times) # offset_cosine_diffusion_schedule
      pred_noises, pred_images = self.denoise(current_images, noise_rates, signal_rates, training = False)
      # n 번째 노이즈 이미지로부터 pred_noise와 pred_image를 얻음
      next_diffusion_times = diffusion_times - step_size
      next_noise_rates, next_image_rates = self.diffusion_schedule(next_diffusion_times)
      current_images = next_image_rates * pred_images + next_noise_rates * pred_noises
      # 예측된 이미지로부터 n - 1번째 노이즈 이미지를 얻음
    return pred_images

    """ n 번째 이미지로부터 초기 이미지를 예측하고 예측한 이미지는 n - 1번째 노이즈 이미지를 예측하고
      다시 이를 이용하여 n - 2 ...을 반복하면서 노이즈 예측의 정확성을 점진적으로 높여 pred_images
      를 얻는다."""

  def generate(self, num_images, diffusion_steps, initial_noise=None):
    if initial_noise is None:
      initial_noise = tf.random.normal(shape=(num_images, IMAGE_SIZE, IMAGE_SIZE, 3))
      # 초기 노이즈가 존재하지 않는 경우 노이즈를 랜덤한 값으로 생성한다.
    generated_images = self.reverse_diffusion(initial_noise, diffusion_steps)
    generated_images = self.denormalize(generated_images)
    return generated_images

  def train_step(self, images):
    images = self.normalizer(images, training=True)
    noises = tf.random.normal(shape=(BATCH_SIZE, IMAGE_SIZE, IMAGE_SIZE, 3))

    diffusion_times = tf.random.uniform(shape=(BATCH_SIZE, 1, 1, 1), minval = 0.0, maxval = 1.0)
    noise_rates, signal_rates = self.diffusion_schedule(diffusion_times)
    noisy_images = signal_rates * images + noise_rates * noises # 노이즈 이미지 생성

    """노이즈 예측 오차 학습"""
    with tf.GradientTape() as tape:
      pred_noises, pred_images = self.denoise(noisy_images, noise_rates, signal_rates, training=True)
      noise_loss = self.loss(noises, pred_noises)

    gradients = tape.gradient(noise_loss, self.network.trainable_weights)
    self.optimizer.apply_gradients(zip(gradients, self.network.trainable_weights))
    self.noise_loss_tracker.update_state(noise_loss)

    for weight, ema_weight in zip(self.network.weights, self.ema_network.weights):
      ema_weight.assign(EMA * ema_weight + (1-EMA)*weight)
    return {m.name: m.result() for m in self.metrics}

  def test_step(self, images):
        images = self.normalizer(images, training=False)
        noises = tf.random.normal(shape=(BATCH_SIZE, IMAGE_SIZE, IMAGE_SIZE, 3))
        diffusion_times = tf.random.uniform(
            shape=(BATCH_SIZE, 1, 1, 1), minval=0.0, maxval=1.0
        )
        noise_rates, signal_rates = self.diffusion_schedule(diffusion_times)
        noisy_images = signal_rates * images + noise_rates * noises
        pred_noises, pred_images = self.denoise(
            noisy_images, noise_rates, signal_rates, training=False
        )
        noise_loss = self.loss(noises, pred_noises)
        self.noise_loss_tracker.update_state(noise_loss)

        return {m.name: m.result() for m in self.metrics}


In [21]:
ddm = DiffusionModel()
ddm.normalizer.adapt(train)

In [22]:
if LOAD_MODEL:
    ddm.built = True
    ddm.load_weights("./checkpoint/checkpoint.weights.h5")

In [23]:
from tensorflow.keras.optimizers import Adam
ddm.compile(
    optimizer=optimizers.Adam(
        learning_rate=LEARNING_RATE, weight_decay=WEIGHT_DECAY
    ),
    loss=losses.mean_absolute_error,
)

In [25]:
sns.set_palette('colorblind')
from notebooks.utils import display, sample_batch

In [26]:
# 훈련을 실행하고 생성된 이미지를 주기적으로 출력합니다.
model_checkpoint_callback = callbacks.ModelCheckpoint(
    filepath="./checkpoint/checkpoint.weights.h5",
    save_weights_only=True,
    save_freq="epoch",
    verbose=0,
)

tensorboard_callback = callbacks.TensorBoard(log_dir="./logs")


class ImageGenerator(callbacks.Callback):
    def __init__(self, num_img):
        self.num_img = num_img

    def on_epoch_end(self, epoch, logs=None):
        generated_images = self.model.generate(
            num_images=self.num_img,
            diffusion_steps=PLOT_DIFFUSION_STEPS,
        ).numpy()
        display(
            generated_images,
            save_to="./output/generated_img_%03d.png" % (epoch),
        )


image_generator_callback = ImageGenerator(num_img=10)

ddm.fit(
    train,
    epochs=EPOCHS,
    callbacks=[
        model_checkpoint_callback,
        tensorboard_callback,
        image_generator_callback,
    ],
)

Output hidden; open in https://colab.research.google.com to view.

In [36]:
!git clone https://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@github.com/JaeYunChung/pythonProject.git

fatal: destination path 'pythonProject' already exists and is not an empty directory.


In [37]:
%cd ..
%ls

/content
[0m[01;34mdrive[0m/  [01;34mpythonProject[0m/  [01;34msample_data[0m/


In [20]:
%cp /content/drive/MyDrive/Colab\ Notebooks/diffusion_model.ipynb /content/pythonProject/

In [21]:
%cd pythonProject/
%

/content/pythonProject


In [22]:
!git add *

In [23]:
!git commit -m "diffusoin model commit"

Author identity unknown

*** Please tell me who you are.

Run

  git config --global user.email "you@example.com"
  git config --global user.name "Your Name"

to set your account's default identity.
Omit --global to set the identity only in this repository.

fatal: unable to auto-detect email address (got 'root@c0d717ee1109.(none)')


In [30]:
!git config --global user.email "wjdwodbs2334@gmail.com"
!git config --global user.name "JaeYunChung"

In [26]:
!git commit -m "diffusion model commit"

[main (root-commit) 527d86e] diffusion model commit
 1 file changed, 1 insertion(+)
 create mode 100644 diffusion_model.ipynb


In [35]:
!git push

fatal: could not read Username for 'https://github.com': No such device or address
