In [None]:
#################################################################################
## hypertunes several NN architecturs on fitness data by Papkou et al., 2023   
## for nonlinear fitness regression
#################################################################################

import numpy as np
import pandas as pd
from pathlib import Path

import random

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import regularizers
import keras_tuner as kt

import sys

#for correlation coefficients
from scipy import stats

import deep_funcs_pub as aw
#in case aw has changed since last load
import importlib
importlib.reload(aw)

In [None]:
#setting some global parameters
max_epochs_glob=100
#for the hyperband tuner
max_epochs_hptuner_glob = 10
#training parameters for the additive nn
#number of batches to be used for training
batch_size_glob=128

#patience to be used for training
patience_glob=5

#learning rate to be explored during tuning
lrate_arr_glob=[1e-04, 1e-03, 1e-02]





In [None]:
#########################################################
### extract fitness data
#########################################################

fitdatfile="fitness_data_science_papkou2023.tsv"
pathstr=""
filepath = pathstr + fitdatfile
infile = Path(filepath)
df =  pd.read_csv(infile, sep='\t')


fitdatall=df.T.to_dict('list')

fit={}
sefit={}
aaseq={}
for n in fitdatall.keys():
    ntseq=fitdatall[n][0]
    aaseq[ntseq]=fitdatall[n][1]
    fit[ntseq]=fitdatall[n][2]
    sefit[ntseq]=fitdatall[n][3]


print("\nextracted fitness data", flush=True)

In [None]:
################################################################
# hypertune a stack of dense layers
###################################################################

print("\n****************************")
print("Now hypertuning dense stack")
print("\n****************************", flush=True)

#setting random seeds for each new architecture so that these chunks can be run on their own with
#reproducible results if needed
random.seed(637281) 
np.random.seed(123784)
tf.random.set_seed(243924)



#prepare random training, validation, and test data
tmpdat=aw.prep_dhfr_data_onehot_ran(aaseq, fit, sefit, flattenflag=True, 
                                 fitshift=2, f_tr=0.5, f_va=0.25, f_te=0.25, hilothresh=1.5)

[alldat_ntseq, alldat_aaseq,alldat_fit,alldat_sefit,
traindat_ntseq,traindat_aaseq,traindat_fit,traindat_sefit,
valdat_ntseq,valdat_aaseq,valdat_fit,valdat_sefit,
testdat_ntseq,testdat_aaseq,testdat_fit,testdat_sefit,
traindat_ntseq_lo,traindat_aaseq_lo,traindat_fit_lo,traindat_sefit_lo,
valdat_ntseq_lo,valdat_aaseq_lo,valdat_fit_lo,valdat_sefit_lo,
testdat_ntseq_lo,testdat_aaseq_lo,testdat_fit_lo,testdat_sefit_lo,
traindat_ntseq_hi,traindat_aaseq_hi,traindat_fit_hi,traindat_sefit_hi,
valdat_ntseq_hi,valdat_aaseq_hi,valdat_fit_hi,valdat_sefit_hi,
testdat_ntseq_hi,testdat_aaseq_hi,testdat_fit_hi,testdat_sefit_hi]=tmpdat

#define convenient acronyms for training data 
#also convert fitness data to binary values since we will only classify them here
tr_s=traindat_ntseq_hi
tr_f=traindat_fit_hi
va_s=valdat_ntseq_hi
va_f=valdat_fit_hi
te_s=testdat_ntseq_hi
te_f=testdat_fit_hi


#define the architecture to be hypertuned
def dense_stack_tuner_regr(hp):
    n_stacks=hp.Choice(f"n_stacks", values=[1, 2, 3, 4])
    units=hp.Choice(f"units", values=[8, 16, 32, 64])
    #apply regularization to weights, but do so uniformly across all layers
    regu = hp.Choice(f"reg", values = [0.0, 1e-04, 1e-03])
    learn_rate=hp.Choice(f"learn_rate", values=lrate_arr_glob)
    dropout=hp.Choice(f"dropout", values=[0.0, 0.1, 0.2])
    
    #because data is flattened and 1-hot encoded
    inputs = keras.Input(shape=(36, ))
    x=layers.Dense(units = units, kernel_regularizer = regularizers.l2(regu), activation="relu") (inputs)
    stackin=layers.Dropout(dropout) (x)
    
    for i in range(n_stacks):
        x=layers.Dense(units = units, kernel_regularizer = regularizers.l2(regu), activation="relu") (stackin)
        x=layers.Dropout(dropout) (x)
        x=layers.Dense(units = units, kernel_regularizer = regularizers.l2(regu), activation="relu") (x)
        x=layers.Dropout(dropout) (x)  
        #a residual connection and normalization step
        x=tf.add(stackin, x)
        stackin = layers.LayerNormalization() (x)
    
    outputs = layers.Dense(1) (stackin)
    
    model = keras.Model(inputs, outputs)   
        
    
    model.compile(optimizer=tf.keras.optimizers.RMSprop(learning_rate=learn_rate), 
                  loss="mse", 
                  metrics=["mae", "mape"])
    return model

#stop if validation loss does not improve for the last patience epochs
callbacks_list=[keras.callbacks.EarlyStopping(monitor="val_loss", 
                                              patience=patience_glob)]
model = dense_stack_tuner_regr(kt.HyperParameters())


tuner = kt.Hyperband(
hypermodel=dense_stack_tuner_regr,
objective="val_loss",
max_epochs=max_epochs_hptuner_glob,
factor=3,
hyperband_iterations=3,
seed=None,
overwrite=True,
project_name="dense_stack_tuner_regr"
)



tuner.search_space_summary()

In [None]:


tuner.search(tr_s, tr_f, 
             epochs=max_epochs_hptuner_glob,
             batch_size=batch_size_glob,
             callbacks=callbacks_list,
             validation_data=(va_s, va_f),
             verbose=0)
tuner.results_summary()

#load the top  model
models = tuner.get_best_models(num_models=1)
best_tuned_model = models[0]

best_tuned_model.summary()


#now train the best model for another max_epochs_glob epochs to make sure there was enough training time
print("\nnow training best model for more epochs")
history = best_tuned_model.fit(tr_s, tr_f,
                    callbacks=callbacks_list,
                    validation_data=(va_s, va_f),
                    epochs=max_epochs_glob, 
                    batch_size=batch_size_glob, 
                    verbose=0) 
print("trained for additional ", len(history.history['loss']), " epochs")


print("\nregression quality hypertuned dense stack")
eval_results = best_tuned_model.evaluate(te_s, te_f, return_dict=True)
for metric in eval_results.keys():
    print(metric, " on test set ", eval_results[metric])

predict_fit=best_tuned_model.predict(te_s)

#calculate correlation coefficients between predicted and actual fitness 
#necessary to prevent an error when calculating correlation coefficients
predict_fit=predict_fit.flatten() 
print("fitness act. vs. pred. for test set", stats.spearmanr(te_f, predict_fit), "n=", len(te_f))      
print("fitness act. vs. pred. for test set", stats.pearsonr(te_f, predict_fit), "n=", len(te_f))

In [None]:
##########################################
## now build a hypertuner for the RNN model
###########################################

print("\n****************************")
print("Now hypertuning RNN")
print("\n****************************", flush=True)

#setting random seeds for each new architecture so that these chunks can be run on their own with
#reproducible results if needed
random.seed(637281) 
np.random.seed(123784)
tf.random.set_seed(243924)

#prepare random training, validation, and test data
tmpdat=aw.prep_dhfr_data_onehot_ran(aaseq, fit, sefit, flattenflag=False, 
                                 fitshift=2, f_tr=0.5, f_va=0.25, f_te=0.25, hilothresh=1.5)

[alldat_ntseq, alldat_aaseq,alldat_fit,alldat_sefit,
traindat_ntseq,traindat_aaseq,traindat_fit,traindat_sefit,
valdat_ntseq,valdat_aaseq,valdat_fit,valdat_sefit,
testdat_ntseq,testdat_aaseq,testdat_fit,testdat_sefit,
traindat_ntseq_lo,traindat_aaseq_lo,traindat_fit_lo,traindat_sefit_lo,
valdat_ntseq_lo,valdat_aaseq_lo,valdat_fit_lo,valdat_sefit_lo,
testdat_ntseq_lo,testdat_aaseq_lo,testdat_fit_lo,testdat_sefit_lo,
traindat_ntseq_hi,traindat_aaseq_hi,traindat_fit_hi,traindat_sefit_hi,
valdat_ntseq_hi,valdat_aaseq_hi,valdat_fit_hi,valdat_sefit_hi,
testdat_ntseq_hi,testdat_aaseq_hi,testdat_fit_hi,testdat_sefit_hi]=tmpdat


#define convenient acronyms for training data 
tr_s=traindat_ntseq_hi
tr_f=traindat_fit_hi
va_s=valdat_ntseq_hi
va_f=valdat_fit_hi
te_s=testdat_ntseq_hi
te_f=testdat_fit_hi


#define the architecture to be hypertuned
def RNN_tuner_regr(hp): 
    
    num_RNN_layers=hp.Int(f"num_intermediate_RNN_layers", 1, 3)
    #apply regularization to weights, but do so uniformly across all layers
    regu = hp.Choice("reg", [0.0, 1e-04, 1e-03])
    RNN_dropout = hp.Float("RNNdropout", min_value=0, max_value=0.2, step=0.1)
    learn_rate = hp.Choice("learn rate", values=lrate_arr_glob)
    RNNunits=hp.Choice(f"RNNunits", values=[8, 16, 32, 48])
        
    input = keras.Input(shape=(9, 4))
    # a stack of 1-3 bidirectional RNN layers
    stackin=layers.Bidirectional(layers.LSTM(units=RNNunits,
                                            kernel_regularizer = regularizers.l2(regu),
                                            recurrent_regularizer = regularizers.l2(regu),
                                            recurrent_dropout=RNN_dropout, 
                                            return_sequences=True)) (input)
    
    for i in range(1, num_RNN_layers+1): #note that this runs until num_RNN_layers+1-1=num_RNN_layers, 
                                         #so gives correctly the number of intermediate num_RNN_layers
        x=layers.Bidirectional(layers.LSTM(
                units=RNNunits,
                kernel_regularizer = regularizers.l2(regu),
                recurrent_regularizer = regularizers.l2(regu),
                recurrent_dropout=RNN_dropout, 
                return_sequences=True)) (stackin)
        #a residual connection and normalization step
        x=tf.add(stackin, x)
        stackin = layers.BatchNormalization() (x) 

    
    #the last layer must have return sequences = False
    x=layers.Bidirectional(layers.LSTM(
                units=RNNunits,
                kernel_regularizer = regularizers.l2(regu),
                recurrent_regularizer = regularizers.l2(regu),
                recurrent_dropout=RNN_dropout, 
                return_sequences=False)) (stackin)
    #cannot apply residual connection below because stackin on the last layer has not the same dimension as x
    #because of the last return sequence
    stackin = layers.BatchNormalization() (x) 
        
  
    output = layers.Dense(1) (x) 
    
    
    model=keras.Model(inputs=input, outputs=output)
    
    model.compile(optimizer=tf.keras.optimizers.RMSprop(learning_rate=learn_rate), 
                  loss="mse", 
                  metrics=["mae", "mape"])
    return model  
    

#stop if validation loss does not improve for the last patience epochs
callbacks_list=[keras.callbacks.EarlyStopping(monitor="val_loss", 
                                              patience=patience_glob)]

model = RNN_tuner_regr(kt.HyperParameters())

tuner = kt.Hyperband(
hypermodel=RNN_tuner_regr,
objective="val_loss",
factor=3,
hyperband_iterations=3,
seed=None,
overwrite=True,
max_epochs=max_epochs_hptuner_glob,
project_name="RNN_tuner_regr"
)

tuner.search_space_summary()

In [None]:
tuner.search(tr_s, tr_f, 
             epochs=max_epochs_hptuner_glob,
             batch_size=batch_size_glob,
             callbacks=callbacks_list,
             validation_data=(va_s, va_f),
             verbose=0)

tuner.results_summary()

# load the top  model.
models = tuner.get_best_models(num_models=1)
best_tuned_model = models[0]

best_tuned_model.summary()

#now train the best model for another max_epochs_glob epochs to make sure there was enough training time
print("\nnow training best model for more epochs")
history = best_tuned_model.fit(tr_s, tr_f,
                    callbacks=callbacks_list,
                    validation_data=(va_s, va_f),
                    epochs=max_epochs_glob, 
                    batch_size=batch_size_glob, 
                    verbose=0) 
print("trained for additional ", len(history.history['loss']), " epochs")

print("\nregression quality hypertuned RNN")
eval_results = best_tuned_model.evaluate(te_s, te_f, return_dict=True)
for metric in eval_results.keys():
    print(metric, " on test set ", eval_results[metric])

predict_fit=best_tuned_model.predict(te_s)
#calculate correlation coefficients between predicted and actual fitness 
#necessary to prevent an error when calculating correlation coefficients
predict_fit=predict_fit.flatten() 
print("fitness act. vs. pred. for test set", stats.spearmanr(te_f, predict_fit), "n=", len(te_f))      
print("fitness act. vs. pred. for test set", stats.pearsonr(te_f, predict_fit), "n=", len(te_f))      


In [None]:
############################################################
## now implement a hypertuner for a transformer stack with 
## positional embedding
##########################################################

print("\n****************************")
print("Now hypertuning transformer")
print("\n****************************", flush=True)

#setting random seeds for each new architecture so that these chunks can be run on their own with
#reproducible results if needed
random.seed(637281) 
np.random.seed(123784)
tf.random.set_seed(243924)

#prepare random training, validation, and test data
#integer encode data
tmpdat=aw.prep_dhfr_data_int_ran(aaseq, fit, sefit, fitshift=2, 
                                 f_tr=0.5, f_va=0.25, f_te=0.25, hilothresh=1.5)

[alldat_ntseq, alldat_aaseq,alldat_fit,alldat_sefit,
traindat_ntseq,traindat_aaseq,traindat_fit,traindat_sefit,
valdat_ntseq,valdat_aaseq,valdat_fit,valdat_sefit,
testdat_ntseq,testdat_aaseq,testdat_fit,testdat_sefit,
traindat_ntseq_lo,traindat_aaseq_lo,traindat_fit_lo,traindat_sefit_lo,
valdat_ntseq_lo,valdat_aaseq_lo,valdat_fit_lo,valdat_sefit_lo,
testdat_ntseq_lo,testdat_aaseq_lo,testdat_fit_lo,testdat_sefit_lo,
traindat_ntseq_hi,traindat_aaseq_hi,traindat_fit_hi,traindat_sefit_hi,
valdat_ntseq_hi,valdat_aaseq_hi,valdat_fit_hi,valdat_sefit_hi,
testdat_ntseq_hi,testdat_aaseq_hi,testdat_fit_hi,testdat_sefit_hi]=tmpdat


#define convenient acronyms for training data 
tr_s=traindat_ntseq_hi
tr_f=traindat_fit_hi
va_s=valdat_ntseq_hi
va_f=valdat_fit_hi
te_s=testdat_ntseq_hi
te_f=testdat_fit_hi

#defining the architecture to be hypertuned
def transf_tuner_regr(hp):
   
    emb_dim=hp.Choice(f"emb_dim", values=[2, 3, 4, 8]) 
    n_heads=hp.Choice(f"n_heads", values=[2, 4, 6, 8])    
    subsp_dim=hp.Choice(f"subsp_dim", values=[2, 3, 4, 8])
    dense_dim=hp.Choice(f"dense_dim", values=[4, 8, 16])
    n_stacks=hp.Choice(f"n_stacks", values=[1, 2, 4, 6])
    learn_rate=hp.Choice(f"learn_rate", values=lrate_arr_glob) 
    
    if n_stacks<1:
        print("error_aw: number of stacks invalid")
        exit(1)

    inputs = keras.Input(shape=(9,))
    embedded=aw.PositionalEmbedding(sequence_length=9, input_dim=4, output_dim = emb_dim) (inputs)
    att_out = layers.MultiHeadAttention(num_heads=n_heads, key_dim=subsp_dim) (embedded, embedded, embedded)
    x=tf.add(embedded, att_out)
    dense_input = layers.LayerNormalization() (x) 
    x = layers.Dense(units = dense_dim, activation = 'relu') (dense_input)
    dense_output = layers.Dense(units = emb_dim, activation = 'relu') (x)
    #a final residual connection and normalization step
    x=tf.add(dense_input, dense_output)
    stack_out = layers.LayerNormalization() (x)
    
    #iterate this loop if there is more than one stack
    for i in range(n_stacks):
        att_out = layers.MultiHeadAttention(num_heads=n_heads, key_dim=subsp_dim) (stack_out, stack_out, stack_out)
        x=tf.add(stack_out, att_out)
        dense_input = layers.LayerNormalization() (x) 
        x = layers.Dense(units = dense_dim, activation = 'relu') (dense_input)
        dense_output = layers.Dense(units = emb_dim, activation = 'relu') (x)
        x=tf.add(dense_input, dense_output)
        stack_out = layers.LayerNormalization() (x)
        
    if hp.Boolean("dropout_transformer"):
        stack_out=layers.Dropout(rate=0.1) (stack_out)
    
    #the last part here is no longer part of the transformer proper
    #flatten the layers for the final regression output
    x=layers.Flatten() (stack_out)
    outputs = layers.Dense(1) (x)
    
    model = keras.Model(inputs, outputs)   
    model.compile(optimizer=tf.keras.optimizers.RMSprop(learning_rate = learn_rate), 
                  loss="mse", 
                  metrics=["mae", "mape"])
    return model
    

#stop if validation loss does not improve for the last patience epochs
callbacks_list=[keras.callbacks.EarlyStopping(monitor="val_loss", 
                                              patience=patience_glob)]


model = transf_tuner_regr(kt.HyperParameters())

tuner = kt.Hyperband(
hypermodel=transf_tuner_regr,
objective="val_loss",
max_epochs=max_epochs_hptuner_glob,
factor=3,
hyperband_iterations=3,
seed=None,
overwrite=True, 
project_name="transf_tuner_regr",
)

tuner.search_space_summary()

In [None]:
tuner.search(tr_s, tr_f, 
             epochs=max_epochs_hptuner_glob,
             batch_size=batch_size_glob,
             callbacks=callbacks_list,
             validation_data=(va_s, va_f),
             verbose=0)

tuner.results_summary()

# load the top  model.
models = tuner.get_best_models(num_models=1)
best_tuned_model = models[0]

best_tuned_model.summary()

#now train the best model for another max_epochs_glob epochs to make sure there was enough training time
print("\nnow training best model for more epochs")
history = best_tuned_model.fit(tr_s, tr_f,
                    callbacks=callbacks_list,
                    validation_data=(va_s, va_f),
                    epochs=max_epochs_glob, 
                    batch_size=batch_size_glob, 
                    verbose=0) 
print("trained for additional ", len(history.history['loss']), " epochs")

print("\nregression quality transformed tuned")
eval_results = best_tuned_model.evaluate(te_s, te_f, return_dict=True)
for metric in eval_results.keys():
    print(metric, " on test set ", eval_results[metric])

predict_fit=best_tuned_model.predict(te_s)
#calculate correlation coefficients between predicted and actual fitness 
#necessary to prevent an error when calculating correlation coefficients
predict_fit=predict_fit.flatten() 
print("fitness act. vs. pred. for test set", stats.spearmanr(te_f, predict_fit), "n=", len(te_f))      
print("fitness act. vs. pred. for test set", stats.pearsonr(te_f, predict_fit), "n=", len(te_f))      




In [None]:
###################################################
## now hypertune a convnet 
## based on anly the nucleotide features, flattened 
##########################################################

print("\n****************************")
print("Now hypertuning convnet")
print("\n****************************", flush=True)


#setting random seeds for each new architecture so that these chunks can be run on their own with
#reproducible results if needed
random.seed(637281) 
np.random.seed(123784)
tf.random.set_seed(243924)


#prepare random training, validation, and test data, flattendc 1-hot-encoded
tmpdat=aw.prep_dhfr_data_onehot_ran(aaseq, fit, sefit, flattenflag=True, 
                                 fitshift=2, f_tr=0.5, f_va=0.25, f_te=0.25, hilothresh=1.5)

[alldat_ntseq, alldat_aaseq,alldat_fit,alldat_sefit,
traindat_ntseq,traindat_aaseq,traindat_fit,traindat_sefit,
valdat_ntseq,valdat_aaseq,valdat_fit,valdat_sefit,
testdat_ntseq,testdat_aaseq,testdat_fit,testdat_sefit,
traindat_ntseq_lo,traindat_aaseq_lo,traindat_fit_lo,traindat_sefit_lo,
valdat_ntseq_lo,valdat_aaseq_lo,valdat_fit_lo,valdat_sefit_lo,
testdat_ntseq_lo,testdat_aaseq_lo,testdat_fit_lo,testdat_sefit_lo,
traindat_ntseq_hi,traindat_aaseq_hi,traindat_fit_hi,traindat_sefit_hi,
valdat_ntseq_hi,valdat_aaseq_hi,valdat_fit_hi,valdat_sefit_hi,
testdat_ntseq_hi,testdat_aaseq_hi,testdat_fit_hi,testdat_sefit_hi]=tmpdat

#define convenient acronyms for training data 
tr_s=traindat_ntseq_hi
tr_f=traindat_fit_hi
va_s=valdat_ntseq_hi
va_f=valdat_fit_hi
te_s=testdat_ntseq_hi
te_f=testdat_fit_hi



#some parameters for the first layer
#make the kernel size for the convolution span at least two nucleotides =8 position in the hot-encoding 
ker_size = 8
print("kernel size ", ker_size)
#number of filters for first convnet layer, same as possible dinucleotide combinations
n_filters=16
print("convolutional filters ", n_filters)
#use a minimal stride of one nucleotide = 4 positions in the flattened 
convstride=4
print("stride for convolution", convstride)

#define the architecture to be hypertuned
def convnet_tuner_regr(hp):
    n_conv_layers=hp.Choice(f"conv layers", values=[2, 3,4, 5,6,7])
    n_dense_layers=hp.Choice(f"dense layers", values=[1,2,3])
    n_units_dense=hp.Choice(f"units", values=[8, 16,32])
    learn_rate=hp.Choice(f"learn_rate", values=lrate_arr_glob)
    regul=hp.Choice(f"regul", values=[0.0, 1e-04, 1e-03])
    
    #a word is 9x4 nts long
    inputs = keras.Input(shape=(36,))
   
    #the convnet needs an input where the last dimension equals the "channel" and is one, so need to 
    #create that channel
    x=layers.Reshape((36, 1)) (inputs)
    
    x=layers.Conv1D(filters = n_filters, kernel_size = ker_size, strides=convstride, activation='relu')(x)
    filtarr=[24,32, 48,64,96,128]
    for i in range(1, n_conv_layers):
        x=layers.Conv1D(filters = filtarr[i-1], kernel_size = 2, strides=1, activation='relu')(x)
    
    #flatten the output to feed into a dense layer
    x=layers.Flatten() (x) 
    #now add dense layers    
    for i in range(n_dense_layers):
        x=layers.Dense(n_units_dense, kernel_regularizer = regularizers.l2(regul), activation="relu") (x)
    
    
    outputs = layers.Dense(1)(x)
    
    model = keras.Model(inputs, outputs)   
    model.compile(optimizer=tf.keras.optimizers.RMSprop(learning_rate=learn_rate), 
                  loss="mse", 
                  metrics=["mae", "mape"])
    return model

 
#stop if validation loss does not improve for the last patience epochs
callbacks_list=[keras.callbacks.EarlyStopping(monitor="val_loss", 
                                              patience=patience_glob)]
model = convnet_tuner_regr(kt.HyperParameters())

tuner = kt.Hyperband(
hypermodel=convnet_tuner_regr, 
objective="val_loss",
max_epochs=max_epochs_hptuner_glob,
factor=3,
hyperband_iterations=1,
seed=None,
overwrite=True,
project_name="convnet_tuner_regr",
)

tuner.search_space_summary()

In [None]:
tuner.search(tr_s, tr_f, 
             epochs=max_epochs_hptuner_glob, 
             batch_size=batch_size_glob,
             callbacks=callbacks_list,
             validation_data=(va_s, va_f),
             verbose=0)

tuner.results_summary()

# load the top  model.
models = tuner.get_best_models(num_models=1)
best_tuned_model = models[0]

best_tuned_model.summary()

#now train the best model for another max_epochs_glob epochs to make sure there was enough training time
print("\nnow training best model for more epochs")
history = best_tuned_model.fit(tr_s, tr_f,
                    callbacks=callbacks_list,
                    validation_data=(va_s, va_f),
                    epochs=max_epochs_glob, 
                    batch_size=batch_size_glob, 
                    verbose=0) 
print("trained for additional ", len(history.history['loss']), " epochs")

print("\nregression quality hypertuned convnet")
eval_results = best_tuned_model.evaluate(te_s, te_f, return_dict=True)
for metric in eval_results.keys():
    print(metric, " on test set ", eval_results[metric])

predict_fit=best_tuned_model.predict(te_s)
#calculate correlation coefficients between predicted and actual fitness 
#necessary to prevent an error when calculating correlation coefficients
predict_fit=predict_fit.flatten() 

print("fitness act. vs. pred. for test set", stats.spearmanr(te_f, predict_fit), "n=", len(te_f))      
print("fitness act. vs. pred. for test set", stats.pearsonr(te_f, predict_fit), "n=", len(te_f))      


In [None]:
#######################################################
## hypertune codon-based RNN with positional embedding
#######################################################

print("\n*******************************************************")
print("Now hypertuning codon-based RNN with positional embedding")
print("\n********************************************************", flush=True)


#setting random seeds for each new architecture so that these chunks can be run on their own with
#reproducible results if needed
random.seed(637281) 
np.random.seed(123784)
tf.random.set_seed(243924)


#prepare random training, validation, and test data
#integer-encode the ntseq, the aa seq and the codon sequence
tmpdat=aw.prep_dhfr_data_int_codon_ran(aaseq, fit, sefit, fitshift=2, f_tr=0.5, f_va=0.25, f_te=0.25, hilothresh=1.5)
[alldat_ntseq, alldat_aaseq,alldat_codseq,alldat_fit,alldat_sefit,
traindat_ntseq,traindat_aaseq,traindat_codseq, traindat_fit,traindat_sefit,
valdat_ntseq,valdat_aaseq,valdat_codseq, valdat_fit,valdat_sefit,
testdat_ntseq,testdat_aaseq,testdat_codseq, testdat_fit,testdat_sefit,
traindat_ntseq_lo,traindat_aaseq_lo,traindat_codseq_lo,traindat_fit_lo,traindat_sefit_lo,
valdat_ntseq_lo,valdat_aaseq_lo,valdat_codseq_lo,valdat_fit_lo,valdat_sefit_lo,
testdat_ntseq_lo,testdat_aaseq_lo,testdat_codseq_lo,testdat_fit_lo,testdat_sefit_lo,
traindat_ntseq_hi,traindat_aaseq_hi,traindat_codseq_hi,traindat_fit_hi,traindat_sefit_hi,
valdat_ntseq_hi,valdat_aaseq_hi,valdat_codseq_hi,valdat_fit_hi,valdat_sefit_hi,
testdat_ntseq_hi,testdat_aaseq_hi,testdat_codseq_hi,testdat_fit_hi,testdat_sefit_hi]=tmpdat

#define convenient acronyms for training data 
tr_s=traindat_ntseq_hi
tr_c=traindat_codseq_hi
tr_f=traindat_fit_hi
va_s=valdat_ntseq_hi
va_c=valdat_codseq_hi
va_f=valdat_fit_hi
te_s=testdat_ntseq_hi
te_c=testdat_codseq_hi
te_f=testdat_fit_hi


#define the architecture to be hypertuned
#treat the codon data as if it was a 64-dimensional time series of length 3
def RNN_cod_pos_tuner_regr(hp):
    
    emb_dim=hp.Choice(f"emb_dim", values=[4, 8, 16, 32])   
    #note that the first RNN layer but not the first dense layer is treated differently
    #to allow for residual connection
   
    num_RNN_layers=hp.Int(f"num_intermediate_RNN_layers", 1, 3)
    RNNunits=hp.Choice(f"RNNunits", values=[8, 16, 32, 48])

    #apply regularization to weights, but do so uniformly across all layers
    regu = hp.Choice("reg", [0.0, 1e-04, 1e-03])
    RNN_dropout = hp.Float("RNNdropout", min_value=0, max_value=0.2, step=0.1)
    learn_rate = hp.Choice("learn rate", values=lrate_arr_glob)
        

    inputs = keras.Input(shape=(3,))
    embedded=aw.PositionalEmbedding(sequence_length=3, input_dim=64, output_dim = emb_dim) (inputs)
    
    
    stackin=layers.Bidirectional(layers.LSTM(units=RNNunits,
                                            kernel_regularizer = regularizers.l2(regu),
                                            recurrent_regularizer = regularizers.l2(regu),
                                            recurrent_dropout=RNN_dropout, 
                                            return_sequences=True)) (embedded)
    
    for i in range(1, num_RNN_layers+1): #note that this runs until num_RNN_layers+1-1=num_RNN_layers, 
                                         #so gives correctly the number of intermediate num_RNN_layers
        x=layers.Bidirectional(layers.LSTM(
                units=RNNunits,
                kernel_regularizer = regularizers.l2(regu),
                recurrent_regularizer = regularizers.l2(regu),
                recurrent_dropout=RNN_dropout, 
                return_sequences=True)) (stackin)
        #a residual connection and normalization step
        x=tf.add(stackin, x)
        stackin = layers.BatchNormalization() (x) 

    
    #the last layer must have return  sequences = False
    x=layers.Bidirectional(layers.LSTM(
                units=RNNunits,
                kernel_regularizer = regularizers.l2(regu),
                recurrent_regularizer = regularizers.l2(regu),
                recurrent_dropout=RNN_dropout, 
                return_sequences=False)) (stackin)
    #cannot use residual connection here because of return_sequences=False -- dimensionality not preserved
    stackin = layers.BatchNormalization() (x)  
  
    output = layers.Dense(1) (x) 
    
    model=keras.Model(inputs=inputs, outputs=output)
    
    model.compile(optimizer=tf.keras.optimizers.RMSprop(learning_rate=learn_rate), 
                  loss="mse", 
                  metrics=["mae", "mape"])
    return model

      
        
model = RNN_cod_pos_tuner_regr(kt.HyperParameters())


#stop if validation loss does not improve for the last patience epochs
callbacks_list=[keras.callbacks.EarlyStopping(monitor="val_loss", 
                                              patience=patience_glob)]


tuner = kt.Hyperband(
hypermodel=RNN_cod_pos_tuner_regr,
objective="val_loss",
max_epochs=max_epochs_hptuner_glob,
factor=3,
hyperband_iterations=3,
seed=None,
overwrite=True,
project_name="RNN_cod_pos_tuner_regr"
)

tuner.search_space_summary()

In [None]:
tuner.search(tr_c, tr_f, 
             epochs=max_epochs_hptuner_glob, 
             batch_size=batch_size_glob,
             callbacks=callbacks_list,
             validation_data=(va_c, va_f),
             verbose=0)

tuner.results_summary()


# load the top  model.
models = tuner.get_best_models(num_models=1)
best_tuned_model = models[0]

best_tuned_model.summary()


#now train the best model for another max_epochs_glob epochs to make sure there was enough training time
print("\nnow training best model for more epochs")
history = best_tuned_model.fit(tr_c, tr_f,
                    callbacks=callbacks_list,
                    validation_data=(va_c, va_f),
                    epochs=max_epochs_glob, 
                    batch_size=batch_size_glob, 
                    verbose=0) 
print("trained for additional ", len(history.history['loss']), " epochs")

print("\nregression quality codon-based RNN with pos embedding")
eval_results = best_tuned_model.evaluate(te_c, te_f, return_dict=True)
for metric in eval_results.keys():
    print(metric, " on test set ", eval_results[metric])

predict_fit=best_tuned_model.predict(te_c)
#calculate correlation coefficients between predicted and actual fitness 
#necessary to prevent an error when calculating correlation coefficients
predict_fit=predict_fit.flatten() 
print("fitness act. vs. pred. for test set", stats.spearmanr(te_f, predict_fit), "n=", len(te_f))      
print("fitness act. vs. pred. for test set", stats.pearsonr(te_f, predict_fit), "n=", len(te_f))      


In [None]:
#############################################################################
## hypertune  a codon-based transformer stack with positional embedding
#############################################################################

print("\n*******************************************************")
print("Now hypertuning codon-based transformer with positional embedding")
print("\n********************************************************", flush=True)

#setting random seeds for each new architecture so that these chunks can be run on their own with
#reproducible results if needed
random.seed(637281) 
np.random.seed(123784)
tf.random.set_seed(243924)

#prepare random training, validation, and test data
#Now integer-encode the ntseq, the aa seq and the codon sequence
tmpdat=aw.prep_dhfr_data_int_codon_ran(aaseq, fit, sefit, fitshift=2, f_tr=0.5, f_va=0.25, f_te=0.25, hilothresh=1.5)
[alldat_ntseq, alldat_aaseq,alldat_codseq,alldat_fit,alldat_sefit,
traindat_ntseq,traindat_aaseq,traindat_codseq, traindat_fit,traindat_sefit,
valdat_ntseq,valdat_aaseq,valdat_codseq, valdat_fit,valdat_sefit,
testdat_ntseq,testdat_aaseq,testdat_codseq, testdat_fit,testdat_sefit,
traindat_ntseq_lo,traindat_aaseq_lo,traindat_codseq_lo,traindat_fit_lo,traindat_sefit_lo,
valdat_ntseq_lo,valdat_aaseq_lo,valdat_codseq_lo,valdat_fit_lo,valdat_sefit_lo,
testdat_ntseq_lo,testdat_aaseq_lo,testdat_codseq_lo,testdat_fit_lo,testdat_sefit_lo,
traindat_ntseq_hi,traindat_aaseq_hi,traindat_codseq_hi,traindat_fit_hi,traindat_sefit_hi,
valdat_ntseq_hi,valdat_aaseq_hi,valdat_codseq_hi,valdat_fit_hi,valdat_sefit_hi,
testdat_ntseq_hi,testdat_aaseq_hi,testdat_codseq_hi,testdat_fit_hi,testdat_sefit_hi]=tmpdat

#define convenient acronyms for training data 
tr_s=traindat_ntseq_hi
tr_c=traindat_codseq_hi
tr_f=traindat_fit_hi
va_s=valdat_ntseq_hi
va_c=valdat_codseq_hi
va_f=valdat_fit_hi
te_s=testdat_ntseq_hi
te_c=testdat_codseq_hi
te_f=testdat_fit_hi

#define the architecture to be hypertuned
def transf_cod_pos_tuner_regr(hp):
   
    emb_dim=hp.Choice(f"emb_dim", values=[8, 16, 32, 48])    
    n_heads=hp.Choice(f"n_heads", values=[2, 4, 6, 8])    
    subsp_dim=hp.Choice(f"subsp_dim", values=[4, 8, 16, 32, 48])
    dense_dim=hp.Choice(f"dense_dim", values=[4, 8, 16])
    n_stacks=hp.Choice(f"n_stacks", values=[1,2, 4, 6])
    learn_rate=hp.Choice(f"learn_rate", values=lrate_arr_glob)
   
    if n_stacks<1:
        print("error_aw: number of stacks invalid")
        exit(1)

    inputs = keras.Input(shape=(3,))
    embedded=aw.PositionalEmbedding(sequence_length=3, input_dim=64, output_dim = emb_dim) (inputs)
    
    att_out = layers.MultiHeadAttention(num_heads=n_heads, key_dim=subsp_dim) (embedded, embedded, embedded)
    x=tf.add(embedded, att_out)
    dense_input = layers.LayerNormalization() (x) 
    x = layers.Dense(units = dense_dim, activation = 'relu') (dense_input)
    dense_output = layers.Dense(units = emb_dim, activation = 'relu') (x)
    #a final residual connection and normalization step
    x=tf.add(dense_input, dense_output)
    stack_out = layers.LayerNormalization() (x)
    
    #iterate this loop if there is more than one stack
    for i in range(n_stacks):
        att_out = layers.MultiHeadAttention(num_heads=n_heads, key_dim=subsp_dim) (stack_out, stack_out, stack_out)
        x=tf.add(stack_out, att_out)
        dense_input = layers.LayerNormalization() (x) 
        x = layers.Dense(units = dense_dim, activation = 'relu') (dense_input)
        dense_output = layers.Dense(units = emb_dim, activation = 'relu') (x)
        x=tf.add(dense_input, dense_output)
        stack_out = layers.LayerNormalization() (x)
        
    if hp.Boolean("dropout_transformer"):
        stack_out=layers.Dropout(rate=0.1) (stack_out)
    
    #the last part here is no longer part of the transformer proper
    #flatten the layers for the final regression output
    x=layers.Flatten() (stack_out)
    outputs = layers.Dense(1) (x)
    
    model = keras.Model(inputs, outputs)   
    model.compile(optimizer=tf.keras.optimizers.RMSprop(learning_rate = learn_rate), 
                  loss="mse", 
                  metrics=["mae", "mape"])
    return model
    

#stop if validation loss does not improve for the last patience epochs
callbacks_list=[keras.callbacks.EarlyStopping(monitor="val_loss", 
                                              patience=patience_glob)]

model = transf_cod_pos_tuner_regr(kt.HyperParameters())

tuner = kt.Hyperband(
hypermodel=transf_cod_pos_tuner_regr,
objective="val_loss",
max_epochs=max_epochs_hptuner_glob,
factor=3,
hyperband_iterations=1,
seed=None,
overwrite=True,
project_name="transf_cod_pos_tuner_regr",
)

tuner.search_space_summary()



In [None]:
tuner.search(tr_c, tr_f, 
             epochs=max_epochs_hptuner_glob, 
             batch_size=batch_size_glob,
             callbacks=callbacks_list,
             validation_data=(va_c, va_f),
             verbose=0)

tuner.results_summary()

#load the top  model.
models = tuner.get_best_models(num_models=1)
best_tuned_model = models[0]

best_tuned_model.summary()

#now train the best model for another max_epochs_glob epochs to make sure there was enough training time
print("\nnow training best model for more epochs")
history = best_tuned_model.fit(tr_c, tr_f,
                    callbacks=callbacks_list,
                    validation_data=(va_c, va_f),
                    epochs=max_epochs_glob, 
                    batch_size=batch_size_glob, 
                    verbose=0) 
print("trained for additional ", len(history.history['loss']), " epochs")


print("\nregression quality hypertuned transformer with codon-based pos embedding")
eval_results = best_tuned_model.evaluate(te_c, te_f, return_dict=True)
for metric in eval_results.keys():
    print(metric, " on test set ", eval_results[metric])

predict_fit=best_tuned_model.predict(te_c)
#calculate correlation coefficients between predicted and actual fitness 
#necessary to prevent an error when calculating correlation coefficients
predict_fit=predict_fit.flatten() 
print("fitness act. vs. pred. for test set", stats.spearmanr(te_f, predict_fit), "n=", len(te_f))      
print("fitness act. vs. pred. for test set", stats.pearsonr(te_f, predict_fit), "n=", len(te_f))      
