In [1]:
import math
import matplotlib.pyplot as plt
import tensorflow as tf
#tf.config.set_visible_devices([], 'GPU')
import numpy as np

from tensorflow import keras
from keras import layers


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
batch_size = 16
max_length = 25000

min_signal_rate = 0.02
max_signal_rate = 0.95
ema = 0.999

def sinusoidal_embedding(x):
    embedding_dims = 32
    embedding_max_frequency = 1000.0
    embedding_min_frequency = 1.0
    frequencies = tf.exp(
      tf.linspace(
          tf.math.log(embedding_min_frequency),
          tf.math.log(embedding_max_frequency),
          embedding_dims // 2,
      )
    )
    angular_speeds = 2.0 * math.pi * frequencies
    embeddings = tf.concat([tf.sin(angular_speeds * x), tf.cos(angular_speeds * x)], axis=2)
    return embeddings

def createNetwork():
    noisy_dna = keras.Input(shape=(max_length, 5))
    noise_variances = keras.Input(shape=(1, 1))

    e = layers.UpSampling1D(size=max_length)(sinusoidal_embedding(noise_variances))
    
    x = layers.Conv1D(10, kernel_size=1)(noisy_dna)
    x = layers.Concatenate()([x, e])
    
    skip1 = layers.Conv1D(10, kernel_size=3,padding="same",activation=keras.activations.swish,kernel_initializer=tf.keras.initializers.RandomNormal(mean=0., stddev=0.02))(x)
    x = layers.BatchNormalization(center=False, scale=False)(skip1)
    x = layers.AveragePooling1D(pool_size=2)(x)
    
    skip2 = layers.Conv1D(20, kernel_size=3,padding="same",activation=keras.activations.swish,kernel_initializer=tf.keras.initializers.RandomNormal(mean=0., stddev=0.02))(x)
    x = layers.BatchNormalization(center=False, scale=False)(skip2)
    x = layers.AveragePooling1D(pool_size=2)(x)
    
    x = layers.Conv1D(40, kernel_size=3,padding="same",activation=keras.activations.swish,kernel_initializer=tf.keras.initializers.RandomNormal(mean=0., stddev=0.02))(x)
    x = layers.BatchNormalization(center=False, scale=False)(x)
    
    x = layers.UpSampling1D(size=2)(x)
    x = layers.Conv1D(20, kernel_size=3,padding="same",activation=keras.activations.swish,kernel_initializer=tf.keras.initializers.RandomNormal(mean=0., stddev=0.02))(x)
    x = layers.Add()([x,skip2])
    x = layers.BatchNormalization(center=False, scale=False)(x)
    
    x = layers.UpSampling1D(size=2)(x)
    x = layers.Conv1D(10, kernel_size=3,padding="same",activation=keras.activations.swish,kernel_initializer=tf.keras.initializers.RandomNormal(mean=0., stddev=0.02))(x)
    x = layers.Add()([x,skip1])
    x = layers.BatchNormalization(center=False, scale=False)(x)
    
    x = layers.Conv1D(5, kernel_size=1, kernel_initializer="zeros")(x)

    return keras.Model([noisy_dna, noise_variances], x, name="residual_unet")

class DiffusionModel(keras.Model):
    def __init__(self):
        super().__init__()

        self.normalizer = layers.Normalization()
        self.network = createNetwork()
        print(self.network.summary())
        self.ema_network = keras.models.clone_model(self.network)

    def compile(self, **kwargs):
        super().compile(**kwargs)

        self.noise_loss_tracker = keras.metrics.Mean(name="n_loss")
        self.image_loss_tracker = keras.metrics.Mean(name="i_loss")

    @property
    def metrics(self):
        return [self.noise_loss_tracker, self.image_loss_tracker]
    
    def denormalize(self, images):
        return keras.utils.to_categorical(np.argmax(images, axis=2), 5)

    def diffusion_schedule(self, diffusion_times):
        # diffusion times -> angles
        start_angle = tf.acos(max_signal_rate)
        end_angle = tf.acos(min_signal_rate)

        diffusion_angles = start_angle + diffusion_times * (end_angle - start_angle)

        # angles -> signal and noise rates
        signal_rates = tf.cos(diffusion_angles)
        noise_rates = tf.sin(diffusion_angles)
        # note that their squared sum is always: sin^2(x) + cos^2(x) = 1

        return noise_rates, signal_rates

    def denoise(self, noisy_images, noise_rates, signal_rates, training):
        # the exponential moving average weights are used at evaluation
        if training:
            network = self.network
        else:
            network = self.ema_network

        # predict noise component and calculate the image component using it
        pred_noises = network([noisy_images, noise_rates**2], training=training)
        pred_images = (noisy_images - noise_rates * pred_noises) / signal_rates

        return pred_noises, pred_images

    def reverse_diffusion(self, initial_noise, diffusion_steps):
        # reverse diffusion = sampling
        num_images = initial_noise.shape[0]
        step_size = 1.0 / diffusion_steps

        # important line:
        # at the first sampling step, the "noisy image" is pure noise
        # but its signal rate is assumed to be nonzero (min_signal_rate)
        next_noisy_images = initial_noise
        for step in range(diffusion_steps):
            noisy_images = next_noisy_images

            # separate the current noisy image to its components
            diffusion_times = tf.ones((num_images, 1, 1)) - step * step_size
            noise_rates, signal_rates = self.diffusion_schedule(diffusion_times)
            pred_noises, pred_images = self.denoise(
                noisy_images, noise_rates, signal_rates, training=False
            )
            # network used in eval mode

            # remix the predicted components using the next signal and noise rates
            next_diffusion_times = diffusion_times - step_size
            next_noise_rates, next_signal_rates = self.diffusion_schedule(
                next_diffusion_times
            )
            next_noisy_images = (
                next_signal_rates * pred_images + next_noise_rates * pred_noises
            )
            # this new noisy image will be used in the next step

        return pred_images

    def train_step(self, images):
        #print(images.shape)
        # normalize images to have standard deviation of 1, like the noises
        images = self.normalizer(images, training=True)
        noises = tf.random.normal(shape=(batch_size, max_length, 5))

        # sample uniform random diffusion times
        diffusion_times = tf.random.uniform(
            shape=(batch_size, 1, 1), minval=0.0, maxval=1.0
        )
        noise_rates, signal_rates = self.diffusion_schedule(diffusion_times)
        
        noisy_images = signal_rates * images + noise_rates * noises
        

        with tf.GradientTape() as tape:
            # train the network to separate noisy images to their components
            pred_noises, pred_images = self.denoise(
                noisy_images, noise_rates, signal_rates, training=True
            )

            noise_loss = self.loss(noises, pred_noises)  # used for training
            image_loss = self.loss(images, pred_images)  # only used as metric

        gradients = tape.gradient(noise_loss, self.network.trainable_weights)
        self.optimizer.apply_gradients(zip(gradients, self.network.trainable_weights))

        self.noise_loss_tracker.update_state(noise_loss)
        self.image_loss_tracker.update_state(image_loss)

        # track the exponential moving averages of weights
        for weight, ema_weight in zip(self.network.weights, self.ema_network.weights):
            ema_weight.assign(ema * ema_weight + (1 - ema) * weight)

        # KID is not measured during the training phase for computational efficiency
        return {m.name: m.result() for m in self.metrics[:-1]}

    def test_step(self, images):
        print(images.shape)
        # normalize images to have standard deviation of 1, like the noises
        images = self.normalizer(images, training=False)
        noises = tf.random.normal(shape=(batch_size, max_length, 5))

        # sample uniform random diffusion times
        diffusion_times = tf.random.uniform(
            shape=(batch_size, 1, 1), minval=0.0, maxval=1.0
        )
        noise_rates, signal_rates = self.diffusion_schedule(diffusion_times)
        # mix the images with noises accordingly
        noisy_images = signal_rates * images + noise_rates * noises

        # use the network to separate noisy images to their components
        pred_noises, pred_images = self.denoise(noisy_images, noise_rates, signal_rates, training=False)

        noise_loss = self.loss(noises, pred_noises)
        image_loss = self.loss(images, pred_images)

        self.image_loss_tracker.update_state(image_loss)
        self.noise_loss_tracker.update_state(noise_loss)

        return {m.name: m.result() for m in self.metrics}
    
    def generate(self):
        # noise -> images -> denormalized images
        initial_noise = tf.random.normal(shape=(1, max_length, 5))
        generated_images = self.reverse_diffusion(initial_noise, 20)
        generated_images = self.denormalize(generated_images)
        return generated_images


model = DiffusionModel()

lr_schedule = keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=1e-3,
    decay_steps=1000,
    decay_rate=0.99)

model.compile(
    optimizer=keras.optimizers.experimental.AdamW(
        learning_rate=lr_schedule, weight_decay=1e-4
    ),
    loss=keras.losses.mean_absolute_error,
)


dnaData = tf.data.Dataset.from_tensor_slices(np.load('oneHotDna.npy')).batch(batch_size, drop_remainder=True)


Model: "residual_unet"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_2 (InputLayer)           [(None, 1, 1)]       0           []                               
                                                                                                  
 tf.math.multiply (TFOpLambda)  (None, 1, 16)        0           ['input_2[0][0]']                
                                                                                                  
 tf.math.multiply_1 (TFOpLambda  (None, 1, 16)       0           ['input_2[0][0]']                
 )                                                                                                
                                                                                                  
 tf.math.sin (TFOpLambda)       (None, 1, 16)        0           ['tf.math.multiply[0]

In [6]:
model.fit(dnaData,epochs=100)


Epoch 1/10

KeyboardInterrupt: 

In [4]:
def onehotToBP(onehot):
    if onehot[0] == 1:
        return 'A'
    if onehot[1] == 1:
        return 'T'
    if onehot[2] == 1:
        return 'C'
    if onehot[3] == 1:
        return 'G'
    return ''
print(''.join(list(map(lambda x: onehotToBP(x),model.generate()[0]))))

[[0. 1. 0. 0. 0.]
 [0. 0. 1. 0. 0.]
 [1. 0. 0. 0. 0.]
 ...
 [0. 0. 0. 1. 0.]
 [0. 0. 0. 1. 0.]
 [0. 1. 0. 0. 0.]]


NotImplementedError: Saving the model to HDF5 format requires the model to be a Functional model or a Sequential model. It does not work for subclassed models, because such models are defined via the body of a Python method, which isn't safely serializable. Consider saving to the Tensorflow SavedModel format (by setting save_format="tf") or using `save_weights`.