In [None]:
# VAE tranining

from __future__ import absolute_import, division, print_function, unicode_literals
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow import keras
import numpy as np
import random
from keras.layers.convolutional import MaxPooling2D
from sklearn.metrics import mean_squared_error
import theano
from keras import optimizers
from tensorflow.keras import datasets, layers, models
from scipy.stats import pearsonr
from sklearn.model_selection import KFold
from IPython import display
import glob
import imageio
import matplotlib.pyplot as plt
import numpy as np
import PIL
import tensorflow_probability as tfp
import time
data = pd.read_excel('Traning dataset.xlsx')

In [None]:
# 1. Define one_hot_encoding

def one_hot_encoding(df, seq_column, expression):
    bases = ['A','C','G','T']
    base_dict = dict(zip(bases,range(4)))
    n = len(df)
    total_width = df[seq_column].str.len().max()+20
    X = np.zeros((n,1,4,total_width))
    seqs = df[seq_column].values
    for i in range(n):
        seq = seqs[i]
        for b in range(len(seq)):
            X[i,0,base_dict[seq[b]], b+10+50-len(seq)] = 1    
    X = X.astype(theano.config.floatX)
    return X, total_width

X, total_width = one_hot_encoding(data,'Promoter','Reads')

In [None]:
# 2. Build VAE frame

class CVAE(tf.keras.Model):

    def __init__(self, latent_dim):
        super(CVAE, self).__init__()
        self.latent_dim = latent_dim
        self.encoder = tf.keras.Sequential(
            [
                tf.keras.layers.InputLayer(input_shape=(1,4,70)),
                tf.keras.layers.Conv2D(
                    filters=16, kernel_size=(4,35), strides=(1, 1), activation='relu',data_format = 'channels_first'),
                tf.keras.layers.Conv2D(
                    filters=16, kernel_size=(1,21), strides=(1, 1), activation='relu',data_format = 'channels_first'),
                tf.keras.layers.Conv2D(
                    filters=16, kernel_size=(1,15), strides=(1, 1), activation='relu',data_format = 'channels_first'),
                tf.keras.layers.Flatten(),
                tf.keras.layers.Dense(latent_dim + latent_dim),
            ]
        )
        self.decoder = tf.keras.Sequential(
            [
                tf.keras.layers.InputLayer(input_shape=(latent_dim,)),
                tf.keras.layers.Dense(units=1*4*120, activation=tf.nn.relu),
                tf.keras.layers.Reshape(target_shape=(1, 4, 120)),
                tf.keras.layers.Conv2DTranspose(
                    filters=16, kernel_size=(1,15), strides=(1,1), padding='same',
                    activation='relu',data_format = 'channels_first'),
                tf.keras.layers.Conv2DTranspose(
                    filters=16, kernel_size=(1,21), strides=(1,1), padding='same',
                    activation='relu',data_format = 'channels_first'),
                tf.keras.layers.Conv2DTranspose(
                    filters=16, kernel_size=(1,35), strides=(1,1), padding='same',
                    activation='relu',data_format = 'channels_first'),
                tf.keras.layers.Conv2DTranspose(
                    filters=1, kernel_size=(4,29), strides=1, padding='same', data_format = 'channels_first'),
            ]
        )

    @tf.function
    def sample(self, eps=None):
        if eps is None:
            eps = tf.random.normal(shape=(100, self.latent_dim))
        return self.decode(eps, apply_sigmoid=True)

    def encode(self, x):
        mean, logvar = tf.split(self.encoder(x), num_or_size_splits=2, axis=1)
        return mean, logvar

    def reparameterize(self, mean, logvar):
        eps = tf.random.normal(shape=mean.shape)
        return eps * tf.exp(logvar * .5) + mean

    def decode(self, z, apply_sigmoid=False):
        logits = self.decoder(z)
        if apply_sigmoid:
            probs = tf.sigmoid(logits)
            return probs
        return logits

In [None]:
# 3. Define loss function & train step

optimizer = tf.keras.optimizers.Adam(1e-4)

def log_normal_pdf(sample, mean, logvar, raxis=1):
    
    log2pi = tf.math.log(2. * np.pi)
    return tf.reduce_sum(
      -.5 * ((sample - mean) ** 2. * tf.exp(-logvar) + logvar + log2pi),
      axis=raxis)

def compute_loss(model, x):
    
    mean, logvar = model.encode(x)
    z = model.reparameterize(mean, logvar)
    x_logit = tf.dtypes.cast(model.decode(z),tf.float64)
    cross_ent = tf.nn.sigmoid_cross_entropy_with_logits(logits=x_logit, labels=x)
    logpx_z = -tf.reduce_sum(cross_ent, axis=[1, 2, 3])
    logpz = tf.dtypes.cast(log_normal_pdf(z, 0., 0.),tf.float64)
    logqz_x = tf.dtypes.cast(log_normal_pdf(z, mean, logvar),tf.float64)
    
    return -tf.reduce_mean(logpx_z + logpz - logqz_x)

@tf.function
def train_step(model, x, optimizer):
    with tf.GradientTape() as tape:
        loss = compute_loss(model, x)
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

In [None]:
# 4. Set up a model

epochs = 1000
latent_dim = 2
num_examples_to_generate = 10000
random_vector_for_generation = tf.random.normal(
    shape=[num_examples_to_generate, latent_dim])
model = CVAE(latent_dim)

In [None]:
# 5. Model training

X = np.expand_dims(X,axis = 0)

elbolist = []
for epoch in range(1, epochs + 1):
    start_time = time.time()
    for tot_X in X:
        train_step(model, tot_X, optimizer)
    end_time = time.time()

    loss = tf.keras.metrics.Mean()
    for tot_X in X:
        loss(compute_loss(model, tot_X))
    elbo = -loss.result()
    elbolist.append(elbo)

In [None]:
# 6. Save model

model.encoder.save('cyano_encode.h5')
model.decoder.save('cyano_decode.h5')