In [1]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

2023-01-04 20:04:31.415754: I tensorflow/core/util/util.cc:169] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.


In [32]:
class MLP(tf.keras.Model):
    
    def __init__(self,num_blocks = 6,re_dim = 512):
        """
    Args:
      num_blocks: numbers of building blocks for the dense layers.

      re_dim: dimension reduction. If int, then for each residue connection, concate with inputs after dimension reduction

    Returns:
      A `keras.Model` instance.
        """
        super().__init__()
        
        self.num_blocks = num_blocks
        self.re_dim = re_dim
        
        # init dense layers 
        self.init_dense = keras.Sequential(
    [
        layers.Dense(128, activation='relu'),
        layers.Dense(64, activation='relu'),
    ])     
        # You can add new dense block with different parameters and Dropout layers
        
        # dense layers for dimension reduction and concate (serves as PCA in stacking)
        self.dense2 = keras.Sequential(
    [
        layers.Dense(self.re_dim * 4, activation='relu'),
        layers.Dense(self.re_dim, activation='relu'),
        layers.Dense(self.re_dim, activation='relu'),
    ])

        # module list
        self.dense_block = [
                keras.Sequential(
            [  
                layers.Dense(128, activation='relu'),
                layers.Dense(128, activation='relu'),
                layers.LayerNormalization(),
                layers.Dense(128, activation='relu'),
            ]) for _ in range(self.num_blocks)
        ]
        
        # residue concate
        self.concate = layers.Concatenate()
            
        # final dense layer
        self.final_dense = keras.Sequential(
    [
        layers.Dense(128, activation='relu'),
        layers.Dense(64, activation='relu'),
        layers.Dense(1, activation='linear'),
    ] )
    
    def call(self, inputs):

        x = self.init_dense(inputs)
        x_dim = self.dense2(inputs)
        new_x = self.concate([x_dim,x])

        for layers in self.dense_block:

            x = layers(new_x)

            new_x = self.concate([new_x, x])

        return self.final_dense(new_x)

In [None]:
# Example
model = MLP()
inputs = tf.random.normal([12,1024], 0, 1, tf.float32, seed=1)
model(inputs)
model.compile(loss=root_mean_squared_error,optimizer=Adam(0.001))
# compile and train like normal APIs

In [3]:
import numpy as np
from sklearn.feature_extraction.text import CountVectorizer
import pandas as pd
from sklearn.metrics import mean_squared_error
from math import sqrt
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Add
import matplotlib.pyplot as plt
from tensorflow.keras.optimizers import Adam
from tensorflow import cast,float32
from keras import backend as kb
from statistics import mean, stdev

In [4]:
def generate_kmer_multiple(seqlist,k):
    kmer_list = []
    n = -1
    for seq in seqlist:
        kmer_list.append(generate_kmer_single(seq,k))
    return kmer_list
    
def generate_kmer_single(seq,k):
    kmer = ""
    for i in range(0,len(seq)-k,1):
        kmer += seq[i:i+k]+" "
    return kmer[:-1]

def test_rmse(model,X_test,Y_test):
    test_preds = model.predict(X_test)
    mse = mean_squared_error(Y_test, test_preds)
    rmse = sqrt(mse)
    return rmse

def root_mean_squared_error(y_true, y_pred):
    y_true = cast(y_true,float32)
    return kb.sqrt(kb.mean(kb.square(y_pred - y_true)))

In [5]:
def split_test_train(da,i,multiplicand):
    X_test = da["sequence"].loc[i*multiplicand:(i+1)*multiplicand]
    Y_test = da["copy_number"].loc[i*multiplicand:(i+1)*multiplicand]
    X_train = pd.concat([da["sequence"].loc[0:i*multiplicand],
                         da["sequence"].loc[(i+1)*multiplicand:]],axis = 0)
    Y_train = pd.concat([da["copy_number"].loc[0:i*multiplicand],
                         da["copy_number"].loc[(i+1)*multiplicand:]],axis = 0)
    vectorizer = CountVectorizer()
    X_train = pd.DataFrame(X_train)
    X_test = pd.DataFrame(X_test)
    X_train = X_train.values.reshape(X_train.shape[0], )
    X_test = X_test.values.reshape(X_test.shape[0], )
    kmer_train = generate_kmer_multiple(X_train.tolist(), 6)
    kmer_test = generate_kmer_multiple(X_test.tolist(), 6)
    X_train = vectorizer.fit_transform(kmer_train).toarray()
    X_test = vectorizer.transform(kmer_test).toarray()
    return X_train,Y_train,X_test,Y_test

In [6]:
da = pd.read_csv("datasets/full_length_reads.csv")
multiplicand = int(da.shape[0]*0.2)

In [35]:
rmse = []
for i in range(0,5,1):
    X_train,Y_train,X_test,Y_test=split_test_train(da,i,multiplicand)
    model = MLP(num_blocks = 12,re_dim = 512)
    model.compile(loss=root_mean_squared_error,optimizer=Adam(0.001))
    model.fit(X_train,Y_train,validation_split=0.1, batch_size=100,epochs=50,verbose=0)
    pred = model.predict(X_test)
    rmse.append(sqrt(mean_squared_error(Y_test,pred)))
    print(rmse[i])

  return t[start:end]


0.7079146122285779


  return t[start:end]


0.7893465638895595


  return t[start:end]


0.6884118099897429


  return t[start:end]


0.7187855791058497
0.7256888219162198


In [36]:
mean(rmse)

0.72602947742599

In [37]:
rmse

[0.7079146122285779,
 0.7893465638895595,
 0.6884118099897429,
 0.7187855791058497,
 0.7256888219162198]