# Basic implementation

- load file
  - ~~load file with encoded function~~
  - ~~turn it into a numpy array~~
- noise (preprocessing)
  - ~~scale vector into -1 - 1 range~~
  - add noise into vectors values
  - rescale vector into values range 0-246
- pass it through a model
  - create a simple model (does not have to be diffusion)
  - develop loss function
  - implement diffusion model

## Loading vectors

In [None]:
import numpy as np
import os
from multiprocessing.pool import ThreadPool

# IMPORTANTE
DICTIONARY_SIZE = 246
TOKENS_CAPACITY = 2048

def prepare_record(parsed_file):
  parsed = np.loadtxt(parsed_file, dtype=float)
  return parsed

'''
  Because code files weight very little they can all be loaded at once
'''
c_dir = "./data/JL/"
parsed_dir = "./data/parsed/" # might change ofc

'''
  returns: tuple (np.array of encoded vectors, file names)
'''
def load_dataset(parsed_dir) -> list:
  parsed_files = sorted(os.listdir(parsed_dir))

  # TODO: when script is ready, remove this line
  parsed_files = parsed_files[:256]

  
  with ThreadPool() as pool:
    parsed_files = pool.map(lambda f: f"{parsed_dir}{f}", parsed_files)
    files = list(parsed_files)

  with ThreadPool() as pool:
    # pool.map guaranteese to preserve order
    # pool.map 'consumes' mapping created in previous with block
    # map() function returns a generator that is exhausted after is it used
    return [np.array(pool.map(lambda file: prepare_record(file), files)), files]

## Sanity check

In [None]:
dataset = load_dataset(parsed_dir)
print(dataset[0].shape)
print(len(dataset[1]))

# Preprocessing

In [None]:
def rescale(dataframe):
  return (dataframe + 1) * DICTIONARY_SIZE / 2

def scale(dataframe):
  return (dataframe * 2 / DICTIONARY_SIZE) - 1

dataset[0] = scale(dataset[0])

# Model based on [ddim](https://colab.research.google.com/github/keras-team/keras-io/blob/master/examples/generative/ipynb/ddim.ipynb#scrollTo=8s3L-z9pcdMc)

Very important is to set up a learning process and architecture of my solution.
Exact network that is going to be used does not really mattter, will be simple FC NN and will possibly change in the future.

In [None]:
import math
import matplotlib.pyplot as plt
import tensorflow as tf

from tensorflow import keras
from keras import layers

In [None]:
# data
num_epochs = 1  # train for at least 50 epochs for good results
# KID = Kernel Inception Distance, see related section
kid_diffusion_steps = 5
plot_diffusion_steps = 20

# sampling
min_signal_rate = 0.02
max_signal_rate = 0.95

# architecture
embedding_dims = 32
embedding_max_frequency = 1000.0
widths = [32, 64, 96, 128]
block_depth = 2

# optimization
batch_size = 64
ema = 0.999
learning_rate = 1e-3
weight_decay = 1e-4

Get network with embeddings

In [None]:
def sinusoidal_embedding(x):
    embedding_min_frequency = 1.0
    frequencies = tf.exp(
        tf.linspace(
            tf.math.log(embedding_min_frequency),
            tf.math.log(embedding_max_frequency),
            embedding_dims // 2,
        )
    )
    
    angular_speeds = 2.0 * math.pi * frequencies
    embeddings = tf.concat(
        [tf.sin(angular_speeds * x), tf.cos(angular_speeds * x)], axis=1
    )
    return embeddings
  
def get_network(tokens_capacity):
    noisy_images = keras.Input(shape=(tokens_capacity))
    noise_variances = keras.Input(shape=(1))

    e = layers.Lambda(sinusoidal_embedding)(noise_variances)

    x = layers.Dense(32)(noisy_images)
    x = layers.Concatenate()([x, e])
    x = layers.Dense(1024, kernel_initializer="zeros")(x)
    x = layers.Dense(2048, kernel_initializer="zeros")(x)

    return keras.Model([noisy_images, noise_variances], x, name="simple net")

Because I removed KID metrics part the sample is not generated inside

In [None]:
class DiffusionModel(keras.Model):
    def __init__(self, tokens_capacity):
        super().__init__()

        self.tokens_capacity = tokens_capacity
        self.network = get_network(tokens_capacity)
        self.ema_network = keras.models.clone_model(self.network)

    def compile(self, **kwargs):
        super().compile(**kwargs)

        self.noise_loss_tracker = keras.metrics.Mean(name="n_loss")
        self.sample_loss_tracker = keras.metrics.Mean(name="i_loss")

    @property
    def metrics(self):
        # return [self.noise_loss_tracker, self.sample_loss_tracker, self.kid]
        return [self.noise_loss_tracker, self.sample_loss_tracker]

    def normalize(self, samples):
        return (samples * 2 / DICTIONARY_SIZE) - 1

    def denormalize(self, samples):
        return (samples + 1) * DICTIONARY_SIZE / 2

    def diffusion_schedule(self, diffusion_times):
        # diffusion times -> angles
        start_angle = tf.acos(max_signal_rate)
        end_angle = tf.acos(min_signal_rate)

        diffusion_angles = start_angle + diffusion_times * (end_angle - start_angle)

        # angles -> signal and noise rates
        signal_rates = tf.cos(diffusion_angles)
        noise_rates = tf.sin(diffusion_angles)
        # note that their squared sum is always: sin^2(x) + cos^2(x) = 1

        return noise_rates, signal_rates

    def denoise(self, noisy_samples, noise_rates, signal_rates, training):
        # the exponential moving average weights are used at evaluation
        if training:
            network = self.network
        else:
            network = self.ema_network

        # predict noise component and calculate the sample component using it
        pred_noises = network([noisy_samples, noise_rates**2], training=training)
        pred_samples = (noisy_samples - noise_rates * pred_noises) / signal_rates

        return pred_noises, pred_samples

    def reverse_diffusion(self, initial_noise, diffusion_steps):
        # reverse diffusion = sampling
        num_samples = initial_noise.shape[0]
        step_size = 1.0 / diffusion_steps

        # important line:
        # at the first sampling step, the "noisy sample" is pure noise
        # but its signal rate is assumed to be nonzero (min_signal_rate)
        next_noisy_samples = initial_noise
        for step in range(diffusion_steps):
            noisy_samples = next_noisy_samples

            # separate the current noisy sample to its components
            diffusion_times = tf.ones((num_samples)) - step * step_size
            noise_rates, signal_rates = self.diffusion_schedule(diffusion_times)
            pred_noises, pred_samples = self.denoise(
                noisy_samples, noise_rates, signal_rates, training=False
            )
            # network used in eval mode

            # remix the predicted components using the next signal and noise rates
            next_diffusion_times = diffusion_times - step_size
            next_noise_rates, next_signal_rates = self.diffusion_schedule(
                next_diffusion_times
            )
            next_noisy_samples = (
                next_signal_rates * pred_samples + next_noise_rates * pred_noises
            )
            # this new noisy sample will be used in the next step

        return pred_samples

    def generate(self, num_samples, diffusion_steps):
        # noise -> samples -> denormalized samples
        initial_noise = tf.random.normal(shape=(num_samples, self.tokens_capacity))
        generated_sample = self.reverse_diffusion(initial_noise, diffusion_steps)
        generated_sample = self.denormalize(generated_sample)
        return generated_sample

    def train_step(self, samples):
        # normalize samples to have standard deviation of 1, like the noises
        samples = self.normalize(samples)
        noises = tf.random.normal(shape=(batch_size, ))

        # sample uniform random diffusion times
        diffusion_times = tf.random.uniform(
            shape=(batch_size, 1), minval=0.0, maxval=1.0
        )
        noise_rates, signal_rates = self.diffusion_schedule(diffusion_times)
        # mix the samples with noises accordingly
        noisy_samples = signal_rates * samples + noise_rates * noises

        with tf.GradientTape() as tape:
            # train the network to separate noisy samples to their components
            pred_noises, pred_samples = self.denoise(
                noisy_samples, noise_rates, signal_rates, training=True
            )

            noise_loss = self.loss(noises, pred_noises)  # used for training
            sample_loss = self.loss(samples, pred_samples)  # only used as metric

        gradients = tape.gradient(noise_loss, self.network.trainable_weights)
        self.optimizer.apply_gradients(zip(gradients, self.network.trainable_weights))

        self.noise_loss_tracker.update_state(noise_loss)
        self.sample_loss_tracker.update_state(sample_loss)

        # track the exponential moving averages of weights
        for weight, ema_weight in zip(self.network.weights, self.ema_network.weights):
            ema_weight.assign(ema * ema_weight + (1 - ema) * weight)

        # KID is not measured during the training phase for computational efficiency
        return {m.name: m.result() for m in self.metrics[:-1]}

    def test_step(self, samples):
        # normalize samples to have standard deviation of 1, like the noises
        samples = self.normalize(samples)
        noises = tf.random.normal(shape=(batch_size, self.tokens_capacity))

        # sample uniform random diffusion times
        diffusion_times = tf.random.uniform(
            shape=(batch_size, 1), minval=0.0, maxval=1.0
        )
        noise_rates, signal_rates = self.diffusion_schedule(diffusion_times)
        # mix the samples with noises accordingly
        noisy_samples = signal_rates * samples + noise_rates * noises

        # use the network to separate noisy samples to their components
        pred_noises, pred_samples = self.denoise(
            noisy_samples, noise_rates, signal_rates, training=False
        )

        noise_loss = self.loss(noises, pred_noises)
        sample_loss = self.loss(samples, pred_samples)

        self.sample_loss_tracker.update_state(sample_loss)
        self.noise_loss_tracker.update_state(noise_loss)

        return {m.name: m.result() for m in self.metrics}

# Training loop

In [None]:
model = DiffusionModel(TOKENS_CAPACITY)
# below tensorflow 2.9:
# pip install tensorflow_addons
# import tensorflow_addons as tfa
# optimizer=tfa.optimizers.AdamW
model.compile(
    optimizer=keras.optimizers.experimental.AdamW(
        learning_rate=learning_rate, weight_decay=weight_decay
    ),
    loss=keras.losses.mean_absolute_error,
)
# pixelwise mean absolute error is used as loss

# save the best model based on the validation KID metric
checkpoint_path = "checkpoints/diffusion_model"
checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_path,
    save_weights_only=True,
    monitor="val_kid",
    mode="min",
    save_best_only=True,
)

In [None]:
model.generate(1, 10)