In [1]:
import numpy as np
import pandas as pd
import tensorflow as tf
import scipy
import glob
import sklearn 
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from tensorflow import keras
from keras import layers, models, optimizers
from tensorflow.keras.layers import Input, Dense, LeakyReLU
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import ModelCheckpoint, Callback
from tensorflow.keras.optimizers import Adam, SGD
from keras_tuner import BayesianOptimization, HyperParameters

2023-12-29 21:19:09.449942: E tensorflow/compiler/xla/stream_executor/cuda/cuda_dnn.cc:9342] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2023-12-29 21:19:09.449980: E tensorflow/compiler/xla/stream_executor/cuda/cuda_fft.cc:609] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2023-12-29 21:19:09.450012: E tensorflow/compiler/xla/stream_executor/cuda/cuda_blas.cc:1518] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2023-12-29 21:19:09.458363: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
# Data import
wt_filtered = ['wt_filtered_lcc_3_50.lccdata', 'wt_filtered_lcc_12_50.lccdata', 'wt_filtered_lcc_20_50.lccdata']

# filtered wt LCC data import
wt_f_var_names = ['wt_3f', 'wt_12f', 'wt_20f']

for var, file in zip(wt_f_var_names, wt_filtered):
    globals()[var] = pd.read_csv(file, sep='\t').drop(columns='Unnamed: 0')

# filtered mutant LCC data import
D132H_filtered = ['D132H_filtered_lcc_3_50.lccdata', 'D132H_filtered_lcc_12_50.lccdata', 'D132H_filtered_lcc_20_50.lccdata']
D132H_f_var_names = ['D132H_3f', 'D132H_12f', 'D132H_20f']

for var, file in zip(D132H_f_var_names, D132H_filtered):
    globals()[var] = pd.read_csv(file, sep='\t').drop(columns='Unnamed: 0')

In [3]:
# Concateneate wt and mutant dataframes and rename columns

wt_f = pd.concat([wt_3f, wt_12f, wt_20f], axis = 1)
    
D132H_f = pd.concat([D132H_3f, D132H_12f, D132H_20f], axis = 1)

colnames = [*range(0,12)]
colnames
wt_f.columns = colnames
D132H_f.columns = colnames

In [4]:
# Data pre processing

def preprocessing(wt, mutant):
    
    wt_label = np.zeros(len(wt)) # Set wt labels to 0
    
    mutant_label = np.ones(len(mutant))
    
    # Concatenate data frames and label arrays

    X_train_full = pd.concat([wt.reset_index(), mutant.reset_index()])
    y_train_full = np.concatenate((wt_label, mutant_label))
    
    #Drop index column and normalise training data
    X_train_full = X_train_full.drop(columns = 'index')
    
    X_train_full= X_train_full.div(100) ## When changed from 100 to 56.1035, errors generated.
    
    # Separate training and validation sets and print relevant shapes
    X_train, X_valid, y_train, y_valid = train_test_split(X_train_full, y_train_full, stratify=y_train_full, test_size=0.2)
    
    print(X_train.shape)
    print(X_valid.shape)
    print(y_train.shape)
    print(y_valid.shape)
    
    return X_train, X_valid, y_train, y_valid

In [5]:
X_train_f, X_valid_f, y_train_f, y_valid_f = preprocessing(wt_f, D132H_f)

(64000, 12)
(16000, 12)
(64000,)
(16000,)


In [6]:
# Initialize tracking for the best overall validation loss and hyperparameters
best_overall = {
    'loss': float('inf'),
    'hyperparameters': None,
    'step': None,
    'details': {}  # A dictionary to store detailed hyperparameters for easy access
}

# Function to update the overall best with results from a new tuning step
def update_best_overall(tuner, step, additional_details=None):
    global best_overall
    best_hp = tuner.get_best_hyperparameters()[0]
    best_loss = tuner.oracle.get_best_trials(1)[0].score  # Adjust based on how you retrieve best score
    
    # Update if the current step's best loss is better than the global best loss
    if best_loss < best_overall['loss']:
        best_overall['loss'] = best_loss
        best_overall['hyperparameters'] = best_hp
        best_overall['step'] = step
        if additional_details:
            best_overall['details'][step] = additional_details

In [7]:
from tensorflow import keras
from tensorflow.keras import layers, models, optimizers
from kerastuner.tuners import BayesianOptimization

def build_autoencoder(hp):
    input_shape = X_train_f.shape[1:]  

    # Encoder
    encoder_input = keras.Input(shape=input_shape)
    x = encoder_input
    encoder_layers = hp.Int('num_encoder_layers', 1, 5)
    for i in range(encoder_layers):  
        layer_size = hp.Int(f'encoder_nodes_{i}', min_value=16, max_value=336, step=16)
        x = layers.Dense(layer_size, activation=keras.layers.LeakyReLU(alpha=0.01))(x)

    # Latent space
    latent_space = layers.Dense(2)(x)

    # Decoder
    x = latent_space
    decoder_layers = hp.Int('num_decoder_layers', 1, 5)
    for i in range(decoder_layers):
        layer_size = hp.Int(f'decoder_nodes_{i}', min_value=16, max_value=336, step=16)
        x = layers.Dense(layer_size, activation=keras.layers.LeakyReLU(alpha=0.01))(x)

    decoder_output = layers.Dense(input_shape[0], activation='linear')(x)

    # Autoencoder model
    autoencoder = models.Model(encoder_input, decoder_output)

    # Compile model
    autoencoder.compile(optimizer=optimizers.Adam(learning_rate=0.005), loss='mse')
                        
    return autoencoder

# Prepare Keras Tuner with Bayesian Optimization
tuner = BayesianOptimization(
    build_autoencoder,
    objective='val_loss',
    max_trials=50,
    executions_per_trial=2,
    num_initial_points=10,
    directory='AE_Tuning_Asym',
    project_name='AET_Nodes_Layers'
)

# Start tuning
tuner.search(
    X_train_f, X_train_f,
    epochs=50,
    batch_size=256,
    validation_data=(X_valid_f, X_valid_f)  
)

# Retrieve the best hyperparameters after tuning
best_hp = tuner.get_best_hyperparameters()[0]

# After nodes and layers tuning is complete
update_best_overall(tuner, 'nodes_layers', additional_details={
    'num_encoder_layers': best_hp.get('num_encoder_layers'),
    'encoder_nodes_per_layer': [best_hp.get(f'encoder_nodes_{i}') for i in range(best_hp.get('num_encoder_layers'))],
    'num_decoder_layers': best_hp.get('num_decoder_layers'),
    'decoder_nodes_per_layer': [best_hp.get(f'decoder_nodes_{i}') for i in range(best_hp.get('num_decoder_layers'))]
})

Reloading Tuner from AE_Tuning_Asym/AET_Nodes_Layers/tuner0.json


  from kerastuner.tuners import BayesianOptimization


In [8]:
# Get the best model and print summary
best_model = tuner.get_best_models(num_models=1)[0]
best_model.summary()

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 12)]              0         
                                                                 
 dense (Dense)               (None, 176)               2288      
                                                                 
 dense_1 (Dense)             (None, 112)               19824     
                                                                 
 dense_2 (Dense)             (None, 336)               37968     
                                                                 
 dense_3 (Dense)             (None, 2)                 674       
                                                                 
 dense_4 (Dense)             (None, 240)               720       
                                                                 
 dense_5 (Dense)             (None, 12)                2892  

2023-12-29 21:19:13.056134: E tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:268] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected


In [9]:
# Fetch the full details of the top 5 trials
top_trials = tuner.oracle.get_best_trials(num_trials=5)

# Print the details of the top 5 trials
for i, trial in enumerate(top_trials):
    print(f"Rank: {i+1}")
    print("Hyperparameters:")
    for hp, value in trial.hyperparameters.values.items():
        print(f"{hp}: {value}")
    
    # Include the validation loss for each model
    val_loss = trial.score
    print(f"Validation Loss: {val_loss}\n")

Rank: 1
Hyperparameters:
num_encoder_layers: 3
encoder_nodes_0: 176
num_decoder_layers: 1
decoder_nodes_0: 240
encoder_nodes_1: 112
encoder_nodes_2: 336
encoder_nodes_3: 192
decoder_nodes_1: 336
decoder_nodes_2: 224
decoder_nodes_3: 176
decoder_nodes_4: 256
encoder_nodes_4: 80
Validation Loss: 0.00036727500264532864

Rank: 2
Hyperparameters:
num_encoder_layers: 3
encoder_nodes_0: 16
num_decoder_layers: 2
decoder_nodes_0: 16
encoder_nodes_1: 176
encoder_nodes_2: 336
encoder_nodes_3: 208
decoder_nodes_1: 336
decoder_nodes_2: 320
decoder_nodes_3: 160
decoder_nodes_4: 304
encoder_nodes_4: 192
Validation Loss: 0.0003689763107104227

Rank: 3
Hyperparameters:
num_encoder_layers: 2
encoder_nodes_0: 32
num_decoder_layers: 2
decoder_nodes_0: 272
encoder_nodes_1: 240
encoder_nodes_2: 336
encoder_nodes_3: 224
decoder_nodes_1: 208
decoder_nodes_2: 32
decoder_nodes_3: 240
decoder_nodes_4: 128
encoder_nodes_4: 272
Validation Loss: 0.0003738460800377652

Rank: 4
Hyperparameters:
num_encoder_layers: 3


In [10]:
# Tuning Batch Size and Learning Rate
# 'tuner' is the previous tuner instance
best_hp = tuner.get_best_hyperparameters()[0]

# Retrieve best architecture parameters for both encoder and decoder
best_num_encoder_layers = best_hp.get('num_encoder_layers')
best_encoder_nodes_per_layer = [best_hp.get(f'encoder_nodes_{i}') for i in range(best_num_encoder_layers)]
best_num_decoder_layers = best_hp.get('num_decoder_layers')
best_decoder_nodes_per_layer = [best_hp.get(f'decoder_nodes_{i}') for i in range(best_num_decoder_layers)]



def build_autoencoder_for_tuning(hp):
    input_shape = X_train_f.shape[1:]  

     # Encoder: Fixed architecture from previous tuning
    encoder_input = keras.Input(shape=input_shape)
    x = encoder_input
    for layer_size in best_encoder_nodes_per_layer:
        x = layers.Dense(layer_size, activation=keras.layers.LeakyReLU(alpha=0.01))(x) 

    # Latent space
    latent_space = layers.Dense(2)(x)

    # Decoder: Fixed architecture from previous tuning
    x = latent_space
    for layer_size in reversed(best_decoder_nodes_per_layer):
        x = layers.Dense(layer_size, activation=keras.layers.LeakyReLU(alpha=0.01))(x)  

    decoder_output = layers.Dense(input_shape[0], activation='linear')(x)

    # Autoencoder model
    autoencoder = models.Model(encoder_input, decoder_output)

    # Hyperparameters for optimizer: learning rate
    lr = hp.Float('learning_rate', min_value=1e-5, max_value=1e-2, sampling='log')
    batch_size = hp.Int('batch_size', min_value=32, max_value=528, step=16)
    
    # Compile the model
    autoencoder.compile(optimizer=optimizers.Adam(learning_rate=lr), loss='mse')

    return autoencoder

# Set up a new tuner instance for the extended search
batch_size_tuner = BayesianOptimization(
    build_autoencoder_for_tuning,
    objective='val_loss',
    max_trials=20,
    executions_per_trial=1,
    num_initial_points=5,
    directory='AE_Tuning_Asym',
    project_name='AET_LR_BS'
)

# Start the new tuning session
batch_size_tuner.search(
    X_train_f, X_train_f,  
    validation_data=(X_valid_f, X_valid_f),  
    epochs=50
)

# Retrieve the best hyperparameters after batch size and learning rate tuning
best_batch_lr_hp = batch_size_tuner.get_best_hyperparameters()[0]

# After batch size and learning rate tuning is complete
update_best_overall(batch_size_tuner, 'batch_size_lr', additional_details={
    'batch_size': best_batch_lr_hp.get('batch_size'),
    'learning_rate': best_batch_lr_hp.get('learning_rate')
})

Reloading Tuner from AE_Tuning_Asym/AET_LR_BS/tuner0.json


In [11]:
# Fetching the best hyperparameters after the search is completed
best_hp = batch_size_tuner.get_best_hyperparameters()[0]

# Retrieve and print best batch size & learning rate
best_batch_size = best_hp.get('batch_size')
best_learning_rate = best_hp.get('learning_rate')

print(f"Best batch size: {best_batch_size}")
print(f"Best learning rate: {best_learning_rate}")

Best batch size: 304
Best learning rate: 0.00018792358749218356


In [13]:
# Testing Activation Functions
# 'tuner' is tuner instance for architecture
best_hp = tuner.get_best_hyperparameters()[0]

# 'batch_size_tuner' tuner instance for batch size and learning rate
best_batch_lr_hp = batch_size_tuner.get_best_hyperparameters()[0]

# Retrieve best architecture parameters for both encoder and decoder
best_num_encoder_layers = best_hp.get('num_encoder_layers')
best_encoder_nodes_per_layer = [best_hp.get(f'encoder_nodes_{i}') for i in range(best_num_encoder_layers)]
best_num_decoder_layers = best_hp.get('num_decoder_layers')
best_decoder_nodes_per_layer = [best_hp.get(f'decoder_nodes_{i}') for i in range(best_num_decoder_layers)]

# Retrieve best batch size and learning rate
best_batch_size = best_batch_lr_hp.get('batch_size')
best_learning_rate = best_batch_lr_hp.get('learning_rate')


def build_autoencoder_for_activation_tuning(hp):
    input_shape = X_train_f.shape[1:]  

    # Encoder
    encoder_input = keras.Input(shape=input_shape)
    x = encoder_input
    
    # Lists to hold activation functions for each layer in the encoder and decoder
    encoder_activation_functions = []
    decoder_activation_functions = []

    # Encoder architecture based on previous tuning
    for i in range(best_num_encoder_layers):  
        layer_size = best_encoder_nodes_per_layer[i]
        # Define hyperparameter for activation function for each layer in the encoder
        activation_choice = hp.Choice(f'encoder_activation_{i}', values=['relu', 'sigmoid', 'tanh', 'LeakyReLU'])
        encoder_activation_functions.append(activation_choice)

        if activation_choice == 'LeakyReLU':
            x = layers.Dense(layer_size)(x)
            x = layers.LeakyReLU(alpha=0.01)(x)
        else:
            x = layers.Dense(layer_size, activation=activation_choice)(x)

    # Latent space
    latent_space = layers.Dense(2)(x)  # Assuming a latent space size of 2

    # Decoder (not mirroring the encoder)
    x = latent_space
    for i in range(best_num_decoder_layers):
        layer_size = best_decoder_nodes_per_layer[i]
        # Define hyperparameter for activation function for each layer in the decoder
        activation_choice = hp.Choice(f'decoder_activation_{i}', values=['relu', 'sigmoid', 'tanh', 'LeakyReLU'])
        decoder_activation_functions.append(activation_choice)

        if activation_choice == 'LeakyReLU':
            x = layers.Dense(layer_size)(x)
            x = layers.LeakyReLU(alpha=0.01)(x)
        else:
            x = layers.Dense(layer_size, activation=activation_choice)(x)

    decoder_output = layers.Dense(input_shape[0], activation='linear')(x)

    # Autoencoder model
    autoencoder = models.Model(encoder_input, decoder_output)

    # Compile the model with fixed learning rate from previous tuning
    autoencoder.compile(optimizer=optimizers.Adam(learning_rate=best_learning_rate), loss='mse')

    return autoencoder

# Set up a new tuner instance for the activation function search
activation_tuner = BayesianOptimization(
    build_autoencoder_for_activation_tuning,
    objective='val_loss',
    max_trials=50,     
    executions_per_trial=1,
    num_initial_points=10,
    directory='AE_Tuning_Asym',
    project_name='AET_AF'
)

# Start the new tuning session with fixed batch size
activation_tuner.search(
    X_train_f, X_train_f,  
    validation_data=(X_valid_f, X_valid_f),  
    epochs=50,
    batch_size=best_batch_size  # Fixed batch size from previous tuning
)

# Retrieve the best hyperparameters after activation function tuning
best_activation_hp = activation_tuner.get_best_hyperparameters()[0]

# After activation function tuning is complete
update_best_overall(activation_tuner, 'activation', additional_details={
    'encoder_activations': [best_activation_hp.get(f'encoder_activation_{i}') for i in range(best_num_encoder_layers)],
    'decoder_activations': [best_activation_hp.get(f'decoder_activation_{i}') for i in range(best_num_decoder_layers)]
})

Trial 50 Complete [00h 00m 39s]
val_loss: 0.00043184214155189693

Best val_loss So Far: 0.00040717588854022324
Total elapsed time: 00h 32m 03s


In [14]:
# Tuning Loss Function and Optimizer
# 'tuner' is tuner instance for architecture
best_hp = tuner.get_best_hyperparameters()[0]

# 'batch_size_tuner' is tuner instance for batch size and learning rate
best_batch_lr_hp = batch_size_tuner.get_best_hyperparameters()[0]

# 'activation_tuner' is tuner instance for activation
best_activation_hp = activation_tuner.get_best_hyperparameters()[0]

# Retrieve best architecture hyperparameters for encoder and decoder
best_num_encoder_layers = best_hp.get('num_encoder_layers')
best_encoder_nodes_per_layer = [best_hp.get(f'encoder_nodes_{i}') for i in range(best_num_encoder_layers)]
best_num_decoder_layers = best_hp.get('num_decoder_layers')
best_decoder_nodes_per_layer = [best_hp.get(f'decoder_nodes_{i}') for i in range(best_num_decoder_layers)]

# Retrieve the best activation functions for each layer from the activation tuner
best_encoder_activations = [best_activation_hp.get(f'encoder_activation_{i}') for i in range(best_num_encoder_layers)]
best_decoder_activations = [best_activation_hp.get(f'decoder_activation_{i}') for i in range(best_num_decoder_layers)]

# Retrieve best batch size and learning rate
best_batch_size = best_batch_lr_hp.get('batch_size')
best_learning_rate = best_batch_lr_hp.get('learning_rate')

def build_autoencoder_for_final_tuning(hp):
    input_shape = X_train_f.shape[1:]

    # Encoder
    encoder_input = keras.Input(shape=input_shape)
    x = encoder_input
    
    # Encoder architecture based on previous tuning, with respective activation functions
    for i in range(best_num_encoder_layers):  
        layer_size = best_encoder_nodes_per_layer[i]
        activation_choice = best_encoder_activations[i]  # Use best activation from the activation tuner

        if activation_choice == 'LeakyReLU':
            x = layers.Dense(layer_size)(x)
            x = layers.LeakyReLU(alpha=0.01)(x)
        else:
            x = layers.Dense(layer_size, activation=activation_choice)(x)

    # Latent space
    latent_space = layers.Dense(2)(x)

    # Decoder (not mirroring the encoder)
    x = latent_space
    for i in range(best_num_decoder_layers):
        layer_size = best_decoder_nodes_per_layer[i]
        activation_choice = best_decoder_activations[i]  # Use best activation from the activation tuner for decoder

        if activation_choice == 'LeakyReLU':
            x = layers.Dense(layer_size)(x)
            x = layers.LeakyReLU(alpha=0.01)(x)
        else:
            x = layers.Dense(layer_size, activation=activation_choice)(x)

    decoder_output = layers.Dense(input_shape[0], activation='linear')(x)

    # Autoencoder model
    autoencoder = models.Model(encoder_input, decoder_output)

    # Hyperparameters for optimizer and loss function
    optimizer_name = hp.Choice('optimizer', values=['adam', 'sgd', 'rmsprop'])
    loss_function = hp.Choice('loss_function', values=['mse', 'binary_crossentropy', 'mae'])

    # Compile the model with hyperparameters
    if optimizer_name == 'adam':
        optimizer = optimizers.Adam(learning_rate=best_learning_rate)
    elif optimizer_name == 'sgd':
        optimizer = optimizers.SGD(learning_rate=best_learning_rate)
    elif optimizer_name == 'rmsprop':
        optimizer = optimizers.RMSprop(learning_rate=best_learning_rate)
    
    autoencoder.compile(optimizer=optimizer, loss=loss_function)

    return autoencoder

# New tuner instance for the loss function and optimizer search
final_tuner = BayesianOptimization(
    build_autoencoder_for_final_tuning,
    objective='val_loss',
    max_trials=50,
    executions_per_trial=1,
    num_initial_points=10,
    directory='AE_Tuning_Asym',
    project_name='AET_LF_Opt'
)

# Start new tuning session with fixed batch size
final_tuner.search(
    X_train_f, X_train_f,
    validation_data=(X_valid_f, X_valid_f),
    epochs=50,
    batch_size=best_batch_size  # Fixed batch size from previous tuning
)

# Retrieve the best hyperparameters after loss function and optimizer tuning
best_final_hyperparams = final_tuner.get_best_hyperparameters()[0]

# After loss function and optimizer tuning is complete
update_best_overall(final_tuner, 'loss_optimizer', additional_details={
    'loss_function': best_final_hyperparams.get('loss_function'),
    'optimizer': best_final_hyperparams.get('optimizer')
})

Trial 9 Complete [00h 00m 37s]
val_loss: 0.009039172902703285

Best val_loss So Far: 0.00042053364450111985
Total elapsed time: 00h 05m 56s


In [15]:
# Fetch the best hyperparameters after the final tuning session
best_final_hp = final_tuner.get_best_hyperparameters()[0]

# Retrieve the best loss function and optimizer
best_loss_function = best_final_hp.get('loss_function')
best_optimizer = best_final_hp.get('optimizer')

# Print the results
print(f"Best Loss Function: {best_loss_function}")
print(f"Best Optimizer: {best_optimizer}")

Best Loss Function: mse
Best Optimizer: adam


In [16]:
# Print Best Global Validation Loss and Hyperparameters
print(f"Best global validation loss achieved at step: {best_overall['step']}")
print(f"With Validation Loss: {best_overall['loss']}")

# Printing details depending on the step
if best_overall['step'] in best_overall['details']:
    print("Best Hyperparameters:")
    for key, value in best_overall['details'][best_overall['step']].items():
        print(f"{key}: {value}")

Best global validation loss achieved at step: batch_size_lr
With Validation Loss: 0.0003387618635315448
Best Hyperparameters:
batch_size: 304
learning_rate: 0.00018792358749218356
