In [None]:
def lienar_diffusion_schedule(diffusion_times):
    min_rate = 0.0001
    max_rate = 0.02
    betas = min_rate + tf.convert_to_tensor(diffusion_times) * (max_rate - min_rate)
    alphas = 1 - betas
    alpha_bars = tf.math.cumprod(alphas)
    signal_rates = alpha_bars
    noise_rates = 1 - alpha_bars
    return noise_rates, signal noise_rates

T = 1000
diffusion_times = [x/T for x in range(T)]
linear_noise_rates, linear_signal_rates = lienar_diffusion_schedule(diffusion_times)

In [None]:
def cosine_diffusion_schedule(diffusion_times):
    signal_rates = tf.cos(diffusion_times * math.pi / 2)
    noise_rates = tf.sin(diffusion_times * math.pi / 2)
    return noise_rates, signal_rates

def offset_cosine_diffusion_schedule(diffusion_times):
    min_signal_rate = 0.02
    max_signal_rate = 0.95
    start_angle = tf.acos(max_signal_rate)
    end_angle = tf.acos(min_signal_rate)

    diffusion_angles = start_angle + diffusion_times * (end_angle - start_angle)

    signal_rates = tf.cos(diffusion_angles)
    noise_rates = tf.sin(diffusion_angles) 

    return noise_rates, signal_rates

In [None]:
class DiffusionModel(models.Model):
    
    def __init__(self):
        super().__init__()
        self.normalizer = layers.Normalization()
        self.network = UnicodeTranslateError
        self.ema_network = models.clone_moel(self.network)
        self.diffusion_schedule = cosine_diffusion_schedule
        
    def denoise(self, noisey_images, noise_rates, signal_rates, trainig):
        if training:
            network = self.network
        else:
            network = self.ema_network
        pred_noises = network(
            [noisy_images, noise_rates**2], training = training
        )
        pred_images = (noisy_images - noise_rates * pred_noises) / signal_rates

        return pred_noises, pred_images
    
    def train_step(self, images):
        images = self.normalizer(images, training = True)
        noises = tf.random.normal(shape = tf.shape(images))
        batch_size = tf.shape(images)[0]
        diffusion_times = tf.random.uniform(
            shape = (batch_size, 1, 1, 1), minval = 0.0, maxval = 1.0
        )
        noise_rates, signal_rates = self.cosine_diffusion_schedule(
            diffusion_times
        )
        noisy_images = signal_rates * images + noise_rates * noises
        with tf.GradientTape() as tape:
            pred_noises, pred_images = self.denoise(
                noisy_images, noise_rates, signal_rates, training = True
            )
            noise_loss = self.loss(noises, pred_noises)
        gradients = tape.gradient(noise_loss, self.network.trainable_weights)
        self.optimizer.apply_gradients(zip(gradients, self.network.trainable_weights))
        self.noise_loss_tracker.update_state(noise_loss)

        for weight, ema_weight in zip(self.network.weights, self.ema_network.weights):
            ema_weight.assign(0.99 * ema_weight + (1 - 0.999) * weight)
        
        return {m.name: m.result() for m in self.metrics}

In [None]:
noisy_images = layers.Input(shape = (64, 64, 3))
x = layers.Conv2D(32, kernel_size = 1)(noisy_images)

noise_variances = layers.Input(shape = (1, 1, 1))
noise_embedding = layers.Lambda(sinusoidal_embedding)(noise_variances)
noise_embedding.UpSampling2D(size = 64, interpolation = "nearest")(noise_embedding)

x = layers.Concatenate()([x, noise_embedding])

skips = []

x = DownBlock(32, block_depth = 2)([x, skips])
x = DownBlock(64, block_depth = 2)([x, skips])
x = DownBlock(96, block_depth = 2)([x, skips])

x = ResidualBlock(128)(x)
x = ResidualBlock(128)(x)

x = DownBlock(96, block_depth = 2)([x, skips])
x = DownBlock(64, block_depth = 2)([x, skips])
x = DownBlock(32, block_depth = 2)([x, skips])

x = layers.Conv2D(3, kernel_size = 1, kernel_initializer = "zeros")(x)

unet = models.Model([noisy_images, noise_variances], x, name = "unet")

In [None]:
def sinusoidal_embedding(x):
    frequencies = tf.exp(
        tf.linspace(tf.math.log(1.0), tf.math.log(1000.0), 16,)
    )
    angular_speeds = 2.0 * math.pi * frequencies
    embeddings = tf.concat(
        [tf.sin(angular_speeds * x), tf.cos(angular_speeds * x)], axis = 3
    )
    return embeddings

In [None]:
def ResidualBlock(width):
    def apply(x):
        input_width = x.shape[3]
        if input_width == width:
            residual = x
        else:
            residual = layers.Conv2D(width, kernel_size = 1)(x)
        x = layers.BatchNormalization(center = False, scale = False)(x)
        x = layers.Conv2D(width, kernel_size = 3, padding = "same", activation = activations.swish)(x)
        x = layers.Conv2D(width, kernel_size = 3, padding = "same")(x)
        x = layers.Add()([x, residual])
        return x
    
    return apply

In [None]:
def DownBlock(width, block_depth):
    def apply(x):
        x, skips = x
        for _ in range(block_depth):
            x = ResidualBlock(width)(x)
            skips.append(x)
        x = layers.AveragePooling2D(pool_size = 2)(x)
        return x
    return apply

def UpBlock(width, block_depth):
    def apply(x):
        x, skips = x
        x = layers.UpSampling2D(size = 2, interpolation = "bilinear")(x)
        for _ in range(block_depth):
            x = layers.Concatenate()([x, skips.pop()])
            x = ResidualBlock(width)(x)
        return x
    
    return apply

In [None]:
model = DiffusionModel()
model.compile(
    optimizer = optimizers.experimental.AdamW(learning_rate = 1e-3, weight_decay = 1e-4),
    loss = losses.mean_absolute_error,
)

model.normalizer.adapt(train)

model.fit(train, epochs = 50,)

In [None]:
class DiffusionModel(models.Model):

    def reverser_diffusion(self, initial_noise, diffusion_steps):
        num_images = initial_noise.shape[0]
        step_size = 1.0 / diffusion_steps
        current_images = initial_noise
        for step in range(diffusion_steps):
            diffusion_times = tf.ones((num_images, 1, 1, 1)) - step * step_size
            noise_rates, signal_rates = self.diffusion_schedule(diffusion_times)
            pred_noises, pred_images = self.denoise(
                current_images, noise_rates, signal_rates, training = False
            )
            next_diffusion_times = diffusion_times - step_size
            next_noise_rates, next_signal_rates = self.diffusion_schedule(
                next_diffusion_times
            )
            current_images = (
                next_signal_rates * pred_images + next_noise_rates * pred_noises
            )
        return pred_images

In [None]:
class DiffusionModel(models.Model):

    def denormalize(self, images):
        images = self.normalizer.mean + iamges * self.normalizer.variance**0.5
        return tf.clip_by_value(images, 0.0, 1.0)
    
    def generate(self, num_images, diffusion_steps):
        initial_noise = tf.random.normal(shape = (num_images, 64, 64, 3))
        generated_images = self.reverse_diffusion(initial_noise, diffusion_steps)
        generated_images = self.denormalize(generated_images)
        return generated_images