In [None]:
!pip install optuna
!pip install optuna-integration



In [None]:
!git clone https://github.com/Djinho/EvoNet-CNN-Insight.git

# Navigate to the cloned repository
%cd EvoNet-CNN-Insight/Optimisation_notebooks

In [None]:
%run -i ../ImaGene.py

In [None]:
import optuna
import tensorflow as tf
from tensorflow.keras import models, layers, regularizers, callbacks
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import numpy as np
import pandas as pd
import logging
import os
import time
import gzip
import skimage.transform
from optuna.integration import TFKerasPruningCallback

# Define the dataset paths with absolute paths and the correct simulation folders
SIMULATIONS_FOLDERS = [
    '/content/EvoNet-CNN-Insight/model_training/Ancient_moderate/AM',
    '/content/EvoNet-CNN-Insight/model_training/Ancient_weak/AW',
    '/content/EvoNet-CNN-Insight/model_training/Ancient_strong/AS'
]

# Configure logging to file in the current directory with the specified name
logging.basicConfig(filename='Ancient_Scenarios.log', level=logging.INFO,
                    format='%(asctime)s:%(levelname)s:%(message)s')

# Function to preprocess data
def preprocess_data(simulations_folder, batch_number):
    # Create ImaFile object for the specified simulation folder and model
    file_sim = ImaFile(simulations_folder=os.path.join(simulations_folder, f'Simulations{batch_number}'), nr_samples=198, model_name='Marth-3epoch-CEU')

    # Read the simulation data
    gene_sim = file_sim.read_simulations(parameter_name='selection_coeff_hetero', max_nrepl=4000)

    # Filter simulations based on frequency threshold
    gene_sim.filter_freq(0.01)

    # Sort simulations by row frequency
    gene_sim.sort('rows_freq')

    # Resize simulation data to the required dimensions
    gene_sim.resize((198, 192))

    # Convert simulation data (flip=True allows for data augmentation)
    gene_sim.convert(flip=True)

    # Get a random subset of indices and subset the simulation data
    gene_sim.subset(get_index_random(gene_sim))

    # Convert targets to binary format
    gene_sim.targets = to_binary(gene_sim.targets)

    return gene_sim

# Objective function for Optuna optimization
def objective(trial, simulations_folder, epochs=1):
    # Set random seed
    np.random.seed(42)
    tf.random.set_seed(42)

    # Hyperparameters to be optimized
    num_layers = trial.suggest_int('num_layers', 3, 6)  # Limit the number of layers to avoid excessive reduction
    filters = [trial.suggest_int(f'filters_{i}', 32, 128) for i in range(num_layers)]  # Adjust filters range
    kernel_size = trial.suggest_int('kernel_size', 2, 3)  # Limit the kernel size to avoid excessive reduction
    l1 = trial.suggest_float('l1', 1e-6, 1e-2, log=True)
    l2 = trial.suggest_float('l2', 1e-6, 1e-2, log=True)
    dense_units = trial.suggest_int('dense_units', 64, 256)  # Adjust dense units range
    dropout_rate = trial.suggest_float('dropout_rate', 0.1, 0.5)
    learning_rate = trial.suggest_float('learning_rate', 1e-5, 1e-3, log=True)

    # Log the hyperparameters at the start of each trial
    logging.info(f'Trial {trial.number} started with hyperparameters: num_layers={num_layers}, filters={filters}, '
                 f'kernel_size={kernel_size}, l1={l1}, l2={l2}, dense_units={dense_units}, '
                 f'dropout_rate={dropout_rate}, learning_rate={learning_rate}')
    print(f'Trial {trial.number} started with hyperparameters: num_layers={num_layers}, filters={filters}, '
          f'kernel_size={kernel_size}, l1={l1}, l2={l2}, dense_units={dense_units}, '
          f'dropout_rate={dropout_rate}, learning_rate={learning_rate}')

    # Building the model based on sampled hyperparameters
    model = models.Sequential()
    model.add(layers.Input(shape=(198, 192, 1)))  # Define the input layer
    for i in range(num_layers):
        model.add(layers.Conv2D(filters=filters[i], kernel_size=(kernel_size, kernel_size), strides=(1, 1), activation='relu',
                                kernel_regularizer=regularizers.l1_l2(l1=l1, l2=l2), padding='same'))  # Use 'same' padding to avoid dimension issues
        model.add(layers.MaxPooling2D(pool_size=(2, 2)))

    model.add(layers.Flatten())  # Flatten the output for the Dense layers
    model.add(layers.Dense(units=dense_units, activation='relu'))  # Add Dense layer
    model.add(layers.Dropout(rate=dropout_rate))  # Add Dropout layer
    model.add(layers.Dense(units=1, activation='sigmoid'))  # Output layer with sigmoid activation

    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)  # Define optimizer
    model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])  # Compile the model

    # Pruning callback to terminate unpromising trials
    pruning_callback = TFKerasPruningCallback(trial, 'val_loss')

    # Early stopping to prevent overfitting
    early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

    # Incremental training on 9 batches
    for batch_number in range(1, 10):
        gene_sim = preprocess_data(simulations_folder, batch_number)  # Preprocess data
        history = model.fit(gene_sim.data, gene_sim.targets, batch_size=64, epochs=epochs, verbose=1, validation_split=0.2,
                            callbacks=[early_stopping, pruning_callback])  # Train the model with callbacks

    # Evaluate the model on the 10th batch
    gene_sim = preprocess_data(simulations_folder, 10)
    evaluation = model.evaluate(gene_sim.data, gene_sim.targets, verbose=1)
    return evaluation[1]  # Return the accuracy

# Function to run the optimization process
def run_optimization():
    start_time = time.time()  # Record start time

    # Iterate through the simulation folders and run optimization
    for simulations_folder in SIMULATIONS_FOLDERS:
        logging.info(f"Starting optimization for {simulations_folder}")
        print(f"Starting optimization for {simulations_folder}")

        # Create or load an Optuna study and optimize the objective function
        study_name = f"study_{simulations_folder.replace('/', '_')}.db"  # Unique name for each study
        storage = f"sqlite:///{study_name}"

        # Load existing study if it exists, otherwise create a new one
        study = optuna.create_study(study_name=study_name, storage=storage, load_if_exists=True, direction='maximize', pruner=optuna.pruners.MedianPruner())

        study.optimize(lambda trial: objective(trial, simulations_folder), n_trials=30)  # Set to 30 trials

        try:
            trial = study.best_trial
            print('Best trial:')
            print('Value: {}'.format(trial.value))
            print('Params: ')
            for key, value in trial.params.items():
                print('    {}: {}'.format(key, value))  # Print best trial parameters

            logging.info('Best trial:')
            logging.info('Value: {}'.format(trial.value))
            logging.info('Params: ')
            for key, value in trial.params.items():
                logging.info('    {}: {}'.format(key, value))  # Log best trial parameters

        except ValueError as e:
            logging.error(f"No completed trials: {e}")  # Log error if no trials are completed

    elapsed_time = time.time() - start_time  # Calculate elapsed time
    print(f"Elapsed time: {elapsed_time:.2f} seconds")
    logging.info(f"Elapsed time: {elapsed_time:.2f} seconds")  # Log elapsed time

# Run the optimization process
if __name__ == '__main__':
    run_optimization()  # Execute the optimization function
