<a href="https://colab.research.google.com/github/evan-placenis/Denoising_Diffusion_Generative_Model/blob/main/Denoising_Diffusion_Models.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install kaggle
from google.colab import drive
drive.mount('/content/drive')
! mkdir ~/.kaggle
!cp /content/drive/MyDrive/KaggleAPI/kaggle.json ~/.kaggle/kaggle.json
! chmod 600 ~/.kaggle/kaggle.json

Mounted at /content/drive


In [None]:
! kaggle datasets download nunenuh/pytorch-challange-flower-dataset

Downloading pytorch-challange-flower-dataset.zip to /content
 99% 328M/330M [00:17<00:00, 22.9MB/s]
100% 330M/330M [00:17<00:00, 19.7MB/s]


In [None]:
!unzip /content/pytorch-challange-flower-dataset.zip

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  inflating: dataset/train/48/image_04685.jpg  
  inflating: dataset/train/48/image_04686.jpg  
  inflating: dataset/train/48/image_04689.jpg  
  inflating: dataset/train/48/image_04692.jpg  
  inflating: dataset/train/48/image_04694.jpg  
  inflating: dataset/train/48/image_04695.jpg  
  inflating: dataset/train/49/image_06198.jpg  
  inflating: dataset/train/49/image_06199.jpg  
  inflating: dataset/train/49/image_06200.jpg  
  inflating: dataset/train/49/image_06201.jpg  
  inflating: dataset/train/49/image_06203.jpg  
  inflating: dataset/train/49/image_06204.jpg  
  inflating: dataset/train/49/image_06205.jpg  
  inflating: dataset/train/49/image_06206.jpg  
  inflating: dataset/train/49/image_06207.jpg  
  inflating: dataset/train/49/image_06208.jpg  
  inflating: dataset/train/49/image_06211.jpg  
  inflating: dataset/train/49/image_06212.jpg  
  inflating: dataset/train/49/image_06214.jpg  
  inflating: dataset/tr

In [None]:
import tensorflow as tf
train_data = tf.keras.preprocessing.image_dataset_from_directory(
    "/content/dataset",
    labels = None,
    image_size = (64,64),
    batch_size = None,
    shuffle = True,
    seed = 42,
    interpolation = "bilinear")

Found 8189 files belonging to 1 classes.


In [None]:
def preprocess(img):
  img = (tf.cast(img, "float32") - 127.5)/ 127.5
  return img

train = train_data.map(lambda x: preprocess(x))
train = train.repeat(5)
train = train.batch(64, drop_remainder = True)

**Diffusion Schedules**

In [None]:
import math
def offset_cosine_diffusion_schedule(diffusion_times):
  min_signal_rate = 0.02
  max_signal_rate = 0.95
  start_angle = tf.acos(max_signal_rate)
  end_angle = tf.acos(min_signal_rate)

  diffusion_angles = start_angle + diffusion_times * (end_angle - start_angle)

  signal_rates = tf.cos(diffusion_angles)
  noise_rates = tf.sin(diffusion_angles)

  return noise_rates, signal_rates

def cosine_diffusion_schedule(diffusion_times):
  signal_rates = tf.cos(diffusion_times * math.pi*2)
  noise_rates = tf.sin(diffusion_times * math.pi / 2)
  return noise_rates, signal_rates

**Build Model**

In [None]:
IMAGE_SIZE = 64
BATCH_SIZE = 64
DATASET_REPETITIONS = 5
LOAD_MODEL = False

NOISE_EMBEDDING_SIZE = 32
PLOT_DIFFUSION_STEPS = 20

# optimization
EMA = 0.999
LEARNING_RATE = 1e-3
WEIGHT_DECAY = 1e-4
EPOCHS = 50

In [None]:
def sinusoidal_embedding(x):
    frequencies = tf.exp(
        tf.linspace(
            tf.math.log(1.0),
            tf.math.log(1000.0),
            NOISE_EMBEDDING_SIZE // 2,
        )
    )
    angular_speeds = 2.0 * math.pi * frequencies
    embeddings = tf.concat(
        [tf.sin(angular_speeds * x), tf.cos(angular_speeds * x)], axis=3
    )
    return embeddings

Define ResidualBlock, DownBlock and UpBlock for Unet

In [None]:
def ResidualBlock(width):
    def apply(x):
        input_width = x.shape[3]
        if input_width == width:
            residual = x
        else:
            residual = layers.Conv2D(width, kernel_size=1)(x)
        x = layers.BatchNormalization(center=False, scale=False)(x)
        x = layers.Conv2D(
            width, kernel_size=3, padding="same", activation="swish"
        )(x)
        x = layers.Conv2D(width, kernel_size=3, padding="same")(x)
        x = layers.Add()([x, residual])
        return x

    return apply


def DownBlock(width, block_depth):
    def apply(x):
        x, skips = x
        for _ in range(block_depth):
            x = ResidualBlock(width)(x)
            skips.append(x)
        x = layers.AveragePooling2D(pool_size=2)(x)
        return x

    return apply


def UpBlock(width, block_depth):
    def apply(x):
        x, skips = x
        x = layers.UpSampling2D(size=2, interpolation="bilinear")(x)
        for _ in range(block_depth):
            x = layers.Concatenate()([x, skips.pop()])
            x = ResidualBlock(width)(x)
        return x

    return apply

Build Unet

In [None]:
from tensorflow.keras import layers, models
noisy_images = layers.Input(shape=(IMAGE_SIZE, IMAGE_SIZE, 3))
x = layers.Conv2D(32, kernel_size=1)(noisy_images)

noise_variances = layers.Input(shape=(1, 1, 1))
noise_embedding = layers.Lambda(sinusoidal_embedding)(noise_variances)
noise_embedding = layers.UpSampling2D(size=IMAGE_SIZE, interpolation="nearest")(
    noise_embedding
)

x = layers.Concatenate()([x, noise_embedding])

skips = []

x = DownBlock(32, block_depth=2)([x, skips])
x = DownBlock(64, block_depth=2)([x, skips])
x = DownBlock(96, block_depth=2)([x, skips])

x = ResidualBlock(128)(x)
x = ResidualBlock(128)(x)

x = UpBlock(96, block_depth=2)([x, skips])
x = UpBlock(64, block_depth=2)([x, skips])
x = UpBlock(32, block_depth=2)([x, skips])

x = layers.Conv2D(3, kernel_size=1, kernel_initializer="zeros")(x)

unet = models.Model([noisy_images, noise_variances], x, name="unet")

In [None]:
# from tensorflow.keras import metrics, losses
# class DiffusionModel(models.Model):
#   def __init__(self):
#     super().__init__()
#     self.normalizer=  layers.Normalization()
#     self.network = unet
#     self.ema_network = models.clone_model(self.network)
#     self.diffusion_schedule = offset_cosine_diffusion_schedule

#   def compile(self, **kwargs):
#     super().compile(**kwargs)
#     self.noise_loss_tracker = metrics.Mean(name="n_loss")

#   @property
#   def metrics(self):
#     return [self.noise_loss_tracker]

#   def denormalize(self, images):
#       images = self.normalizer.mean + images * self.normalizer.variance**0.5
#       return tf.clip_by_value(images, 0.0, 1.0)

#   def denoise(self, noisy_images, noise_rates, signal_rates, training):
#     if training:
#       network = self.network
#     else:
#       network = self.ema_network
#     pred_noises = network([noisy_images, noise_rates**2], training = training)
#     pred_images = (noisy_images - noise_rates * pred_noises) / signal_rates

#     return pred_noises, pred_images

#     def reverse_diffusion(self, initial_noise, diffusion_steps):
#       num_images = initial_noise.shape[0]
#       step_size = 1.0 / diffusion_steps
#       current_images = initial_noise
#       for step in range(diffusion_steps):
#           diffusion_times = tf.ones((num_images, 1, 1, 1)) - step * step_size
#           noise_rates, signal_rates = self.diffusion_schedule(diffusion_times)
#           pred_noises, pred_images = self.denoise(
#               current_images, noise_rates, signal_rates, training=False
#           )
#           next_diffusion_times = diffusion_times - step_size
#           next_noise_rates, next_signal_rates = self.diffusion_schedule(
#               next_diffusion_times
#           )
#           current_images = (
#               next_signal_rates * pred_images + next_noise_rates * pred_noises
#           )
#       return pred_images

#     def generate(self, num_images, diffusion_steps, initial_noise=None):
#       if initial_noise is None:
#           initial_noise = tf.random.normal(
#               shape=(num_images, IMAGE_SIZE, IMAGE_SIZE, 3)
#           )
#       generated_images = self.reverse_diffusion(
#           initial_noise, diffusion_steps
#       )
#       generated_images = self.denormalize(generated_images)
#       return generated_images

#   def train_step(self, images):
#     images = self.normalizer(images, training = True)
#     noises = tf.random.normal(shape = tf.shape(images))
#     batch_size = tf.shape(images)[0]
#     diffusion_times = tf.random.uniform(shape = (batch_size, 1, 1, 1), minval=0.0, maxval = 1.0)
#     noise_rates, signal_rates = self.diffusion_schedule(diffusion_times)
#     noisy_images = signal_rates*images + noise_rates * noises

#     with tf.GradientTape() as tape:
#       pred_noises, pred_images = self.denoise(noisy_images, noise_rates, signal_rates, training = True)
#       noise_loss = self.loss(noises, pred_noises)

#     gradients = tape.gradient(noise_loss, self.network.trainable_weights)
#     self.optimizer.apply_gradients(
#         zip(gradients, self.network.trainiable_weights)
#     )
#     self.noise_loss_tracker.update_state(noise_loss)

#     for weight, ema_weight in zip(
#         self.network.weights, self.ema_network.weights
#     ):
#       ema_weight.assign(0.999 * ema_weight + (1 - 0.9990) * weight)

#     return {m.name: m.result() for m in self.metrics}

In [None]:
class DiffusionModel(models.Model):
    def __init__(self):
        super().__init__()

        self.normalizer = layers.Normalization()
        self.network = unet
        self.ema_network = models.clone_model(self.network)
        self.diffusion_schedule = offset_cosine_diffusion_schedule

    def compile(self, **kwargs):
        super().compile(**kwargs)
        self.noise_loss_tracker = metrics.Mean(name="n_loss")

    @property
    def metrics(self):
        return [self.noise_loss_tracker]

    def denormalize(self, images):
        images = self.normalizer.mean + images * self.normalizer.variance**0.5
        return tf.clip_by_value(images, 0.0, 1.0)

    def denoise(self, noisy_images, noise_rates, signal_rates, training):
        if training:
            network = self.network
        else:
            network = self.ema_network
        pred_noises = network(
            [noisy_images, noise_rates**2], training=training
        )
        pred_images = (noisy_images - noise_rates * pred_noises) / signal_rates

        return pred_noises, pred_images

    def reverse_diffusion(self, initial_noise, diffusion_steps):
        num_images = initial_noise.shape[0]
        step_size = 1.0 / diffusion_steps
        current_images = initial_noise
        for step in range(diffusion_steps):
            diffusion_times = tf.ones((num_images, 1, 1, 1)) - step * step_size
            noise_rates, signal_rates = self.diffusion_schedule(diffusion_times)
            pred_noises, pred_images = self.denoise(
                current_images, noise_rates, signal_rates, training=False
            )
            next_diffusion_times = diffusion_times - step_size
            next_noise_rates, next_signal_rates = self.diffusion_schedule(
                next_diffusion_times
            )
            current_images = (
                next_signal_rates * pred_images + next_noise_rates * pred_noises
            )
        return pred_images

    def generate(self, num_images, diffusion_steps, initial_noise=None):
        if initial_noise is None:
            initial_noise = tf.random.normal(
                shape=(num_images, IMAGE_SIZE, IMAGE_SIZE, 3)
            )
        generated_images = self.reverse_diffusion(
            initial_noise, diffusion_steps
        )
        generated_images = self.denormalize(generated_images)
        return generated_images

    def train_step(self, images):
        images = self.normalizer(images, training=True)
        noises = tf.random.normal(shape=(BATCH_SIZE, IMAGE_SIZE, IMAGE_SIZE, 3))

        diffusion_times = tf.random.uniform(
            shape=(BATCH_SIZE, 1, 1, 1), minval=0.0, maxval=1.0
        )
        noise_rates, signal_rates = self.diffusion_schedule(diffusion_times)

        noisy_images = signal_rates * images + noise_rates * noises

        with tf.GradientTape() as tape:
            # train the network to separate noisy images to their components
            pred_noises, pred_images = self.denoise(
                noisy_images, noise_rates, signal_rates, training=True
            )

            noise_loss = self.loss(noises, pred_noises)  # used for training

        gradients = tape.gradient(noise_loss, self.network.trainable_weights)
        self.optimizer.apply_gradients(
            zip(gradients, self.network.trainable_weights)
        )

        self.noise_loss_tracker.update_state(noise_loss)

        for weight, ema_weight in zip(
            self.network.weights, self.ema_network.weights
        ):
            ema_weight.assign(EMA * ema_weight + (1 - EMA) * weight)

        return {m.name: m.result() for m in self.metrics}


In [None]:
ddm = DiffusionModel()
ddm.normalizer.adapt(train)

In [None]:
from tensorflow.keras import metrics, losses
ddm.compile(
    optimizer=tf.keras.optimizers.experimental.AdamW(
        learning_rate=LEARNING_RATE, weight_decay=WEIGHT_DECAY
    ),
    loss=losses.mean_absolute_error,
)

ddm.fit(train, epochs = 100)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100


**Inference**

In [None]:
import matplotlib.pyplot as plt
def display(
    images, n=10, size=(20, 3), cmap="gray_r", as_type="float32", save_to=None
):
    """
    Displays n random images from each one of the supplied arrays.
    """
    if images.max() > 1.0:
        images = images / 255.0
    elif images.min() < 0.0:
        images = (images + 1.0) / 2.0

    plt.figure(figsize=size)
    for i in range(n):
        _ = plt.subplot(1, n, i + 1)
        plt.imshow(images[i].astype(as_type), cmap=cmap)
        plt.axis("off")

    if save_to:
        plt.savefig(save_to)
        print(f"\nSaved to {save_to}")

    plt.show()

In [None]:
generated_images = ddm.generate(num_images=10, diffusion_steps=20).numpy()
display(generated_images)

In [None]:
# View improvement over greater number of diffusion steps
import numpy as np
for diffusion_steps in list(np.arange(1, 6, 1)) + [20] + [100]:
    tf.random.set_seed(42)
    generated_images = ddm.generate(
        num_images=10,
        diffusion_steps=diffusion_steps,
    ).numpy()
    display(generated_images)

In [None]:

tf.random.set_seed(100)


def spherical_interpolation(a, b, t):
    return np.sin(t * math.pi / 2) * a + np.cos(t * math.pi / 2) * b


for i in range(5):
    a = tf.random.normal(shape=(IMAGE_SIZE, IMAGE_SIZE, 3))
    b = tf.random.normal(shape=(IMAGE_SIZE, IMAGE_SIZE, 3))
    initial_noise = np.array(
        [spherical_interpolation(a, b, t) for t in np.arange(0, 1.1, 0.1)]
    )
    generated_images = ddm.generate(
        num_images=2, diffusion_steps=20, initial_noise=initial_noise
    ).numpy()
    display(generated_images, n=11)