<a href="https://colab.research.google.com/github/Ahtesham519/Genrative_Deep_learning_v2_2023/blob/main/ddm.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Diffusion models


In [None]:
%load_ext autoreload
%autoreload 2
import numpy as np
import matplotlib.pyplot as plt

plt.style.use("seaborn-v0_8_colorblind")

import math
import tensorflow as tf
from tensorflow.keras import(
    layers,
    models,
    optimizers,
    utils,
    callbacks,
    metrics,
    losses,
    activations,

)

OSError: ignored

#0. Parameters

In [None]:
IMAGE_SIZE = 64
BATCH_SIZE = 64
DATASET_REPETITIONS = 5
LOAD_MODEL = False

NOISE_EMBEDDING_SIZE = 32
PLOT_DIFFUSION_STEPS = 20

#optimization
EMA = 0.999
LEARNING_RATE = 1e-3
WEIGHT_DECAY = 1e-4
EPOCHS = 50

from notebooks.utils import display, sample_batch

ModuleNotFoundError: ignored

#1. Prepare the data

In [None]:
#Load the data
train_data = utils.image_dataset_from_directory(
    "app/data/pytorch-challange-flower-dataset/dataset",
    labels = None,
    image_size = (IMAGE_SIZE,IMAGE_SIZE),
    batch_size = None,
    shuffle = True,
    seed = 42,
    interpolation = "bilinear",

)

NameError: ignored

In [None]:
#Prepare the data
def preprocess(img):
  img = tf.cast(img, "float32") / 255.0
  return img

train = train_data.map(lambda x: preprocess(x))
train = train.repeat(DATASET_REPETITIONS)
train = train.batch(BATCH_SIZE , drop_remainder = True)

NameError: ignored

In [None]:
#Show some items of the clothings from the training set
train_sample = sample_batch(train)
display(train_sample)

NameError: ignored

#1.1 Diffusion Schedules

In [None]:
def linear_diffusion_schedule(diffusion_times):
  min_rate = 0.0001
  max_rate = 0.02
  betas = min_rate +diffusion_times * (max_rate - min_rate)
  alphas = 1 - betas
  alphas_bars = tf.math.cumprod(alphas)
  signal_rates = tf.sqrt(alphas_bars)
  noise_rates = tf.sqrt(1 - alphas_bars)
  return noise_rates, signal_rates




In [None]:
def cosine_diffusion_schedule(diffusion_times):
  signal_rates = tf.cos(diffusion_times * math.pi / 2)
  noise_rates = tf.sin(diffusion_times * math.pi / 2)
  return noise_rates , signal_rates

In [None]:
def offset_cosine_diffusion_schedule(diffusion_times):
  min_signal_rate = 0.02
  max_signal_rate = 0.95
  start_angle = tf.acos(max_signal_rate)
  end_angle = tf.acos(min_signal_rate)

  diffusion_angles = start_angle + diffusion_times * (end_angle - start_angle)

  signal_rates = tf.cos(diffusion_angles)
  noise_rates = tf.sin(diffusion_angles)

  return noise_rates, signal_rates

In [None]:
T = 1000
diffusion_times = tf.convert_to_tensor([x / T for x in range(T)])
linear_noise_rates , linear_signal_rates = linear_diffusion_schedule(
    diffusion_times
)
cosine_noise_rates, cosine_signal_rates = cosine_diffusion_schedule(
    diffusion_times
)
(
    offset_cosine_noise_rates,
    offset_cosine_signal_rates,
) = offset_cosine_diffusion_schedule(diffusion_times)

NameError: ignored

In [None]:
plt.plot(
    diffusion_times, linear_signal_rates ** 2, linewidth = 1.5, label = "linear"
)
plt.plot(
    diffusion_times , cosine_signal_rates **2 , linewidth = 1.5 , label ="cosine"
)
plt.plot(
    diffusion_times,
    offset_cosine_signal_rates ** 2,
    linewidth = 1.5,
    label = "offset_cosine",
)

plt.xlabel("t/T" , fontsize = 12)
plt.ylabel(r"$\bar{\alpha_t} $ (signal)" , fontsize = 12)
plt.legend()
plt.show()

NameError: ignored

In [None]:
plt.plot(
    diffusion_times , linear_noise_rates ** 2, linewidth = 1.5 , label = "linear"
)
plt.plot(
    diffusion_times , cosine_noise_rates ** 2 , linewidth = 1.5 , label = "cosine"
)
plt.plot(
    diffusion_times,
    offset_cosine_noise_rates ** 2,
    linewidth = 1.5,
    label = "offset_cosine",


    )
plt.xlabel("t/T" , fontsize =12)
plt.ylabel(r"$1- \bar {\alpha_t}$ (noise)" , fontsize = 12)
plt.legend()
plt.show()

NameError: ignored

#2. Build the model

In [7]:
def sinusoidal_embedding(x):
  frequencies = tf.exp(
      tf.linspace(
          tf.math.log(1.0),
          tf.math.log(1000.0),
          NOISE_EMBEDDING_SIZE // 2,
      )
  )
  angular_speeds = 2.0 * math.pi * frequencies
  embeddings = tf.concat(
      [tf.sin(angular_speeds * x) , tf.cos(angular_speeds * x)] , axis =3
  )
  return embeddings


In [8]:
embedding_list = []
for y in np.arrange(0,1,0.01):
  embedding_list.append(sinusoidal_embedding(np.array([[[[y]]]]))[0][0][0])
embedding_array = np.array(np.transpose(embedding_list))
fig, ax = plt.subplots()
ax.set_xticks(
    np.arange(0, 100, 10), labels = np.round(np.arange(0.0, 1.0 , 0.1) , 1)
)
ax.set_ylabel("embedding dimension" , fontsize = 8)
ax.set_xlabel("noise variance" , fontsize = 8)
plt.pcolor(embedding_array, cmap = "coolwarm")
plt.colorbar(orientation = "horizantal" , label = "embedding value")
ax.imshow(embedding_array, interpolation = "nearest" , origin = "lower")
plt.show()

NameError: ignored

In [9]:
def ResidualBlock(width):
  def apply(x):
    input_width = x.shape[3]
    if input_width == width:
      residual = x
    else:
      residual = layers.Conv2d(width, kernel_size = 1) (x)
    x = layers.BatchNormalization(center = False, scale = False)(x)
    x = layers.Conv2D(
        width, kernel_size = 3, padding = "same" , activation = activations.swish
    )(x)
    x = layers.Conv2D(width , kernel_size= 3 , padding = "same")(x)
    x = layers.Add()([x, residual])
    return x

  return apply


def DownBlock(width, block_depth):
  def apply(x):
    x, skips = x
    for _ in range(block_depth):
      x = ResidualBlock(width)(x)
      skips.append(x)
    x = layers.AveragePooling2D(pool_size = 2)(x)
    return x

  return apply


def UpBlock(width, block_depth):
  def apply(x):
    x, skips = x
    x = layers.UpSampling2D(size = 2 , interpolation = "bilinear")(x)
    for _ in range(block_depth):
      x = layers.Concatenate()([x , skips.pop()])
      x = ResidualBlock(width)(x)
    return x

  return apply

In [11]:
#Build the U-net

noisy_images = layers.Input(shape=(IMAGE_SIZE , IMAGE_SIZE , 3))
x = layers.Conv2D(32, kernel_size = 1)(noisy_images)

noise_variances = layers.Input(shape = (1,1,1))
noise_embedding = layers.Lambda(sinusoidal_embedding)(noise_variances)
noise_embedding = layers.UpSampling2D(size = IMAGE_SIZE , interpolation = "nearest")(
    noise_embedding
)

x = layers.Concatenate() ([x, noise_embedding])

skips = []

x = DownBlock(32, block_depth = 2)([x, skips])
x = DownBlock(64, block_depth = 2)([x, skips])
x = DownBlock(96, block_depth = 2)([x, skips])

x = ResidualBlock(128)(x)
x = ResidualBlock(128)(x)

x = UpBlock(96, block_depth = 2)([x, skips])
x = UpBlock(64, block_depth = 2)([x, skips])
x = UpBlock(32, block_depth = 2)([x, skips])

x = layers.Conv2D(3, kernel_size = 1, kernel_initializer = "zeros")(x)

unet = models.Model([noisy_images , noise_variances] , x , name = "unet")

NameError: ignored

In [None]:
from tensorflow.python.ops import signal

class DiffusionModel(models.Model):
  def __init__(self):
    super().__init__()

    self.normalizer = layers.Normalization()
    self.network = unet
    self.ema_network = models.clone_model(self.network)
    self.diffusion_schedule = offset_cosine_diffusion_schedule

  def compile(self, **kwargs):
    super().compile(**kwargs)
    self.noise_loss_tracker = metrics.Mean(name = "n_loss")

  @property
  def metrics(self):
    return [self.noise_loss_tracker]

  def denormalize(self, images):
    images = self.normalizer.mean + images * self.normalizer.variance** 0.5
    return tf.clip_by_value(images, 0.0 ,1.0)

  def denoise(self, noisy_images , noise_rates, signal_rates , training):
    if training:
      network = self.network
    else:
      network = self.ema_network
    pred_noises = network(
        [noisy_images, noise_rates** 2], training = training
    )
    pred_images = (noisy_images - noise_rates * pred_noises) / signal

    return pred_noises, pred_images

  def reverse_diffusion(self, initial_noise, diffusion_steps):
    num_images = initial_noise.shape[0]
    step_size = 1.0 / diffusion_steps
    current_images = initial_noise
    for step in range(diffusion_steps):
      diffusion_times = tf.ones((num_images ,1 , 1, 1 ))- step * step_size
      noise_rates , signal_rates = self.diffusion_schedule(diffusion_times)
      pred_noises, pred_images = self.denoise(
          current_images, noise_rates, signal_rates, training = False
      )
      next_diffusion_times = diffusion_times - step_size
      next_noise_rates, next_signal_rates = self.diffusion_schedule(
          next_diffusion_times
      )
      current_images = (
          next_signal_rates * pred_images + next_noise_rates * pred_noises
      )
      return pred_images

    def generate(self, num_images , diffusion_steps , initial_noise = None):
      if initial_noise is None:
        initial_noise = tf.random.normal(
            shape= (num_images, IMAGE_SIZE , IMAGE_SIZE , 3)
        )
      generated_images = self.reverse_diffusion(
          initial_noise, diffusion_steps
      )
      generated_images = self.denormalize(generated_images)
      return generated_images
