Copyright (c) 2023 Jinkyo Han

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

In [None]:
            ######################
            #    ANN Designer    #
            #     ver 1.0.0      #
            ######################

# Written by Jinkyo Han, OST, SNU NAOE
# 38jinkyo@snu.ac.kr
# Supervised by Do Kyun Kim, OST, SNU NAOE

# Input:
    # 1) your dataset
    # 2) parameters for optimization: search space, threshold, n_calls, ...
# Output:
    # 1) hyperparameters-model performance set, by csv file
    # 2) full-trained optimal model by '.h5'
    # 3) convergence plot for BOwGP optimization

# Algorithm
    # Search Space : manually defined
    # Search Strategy : BOwGP - "n_calls" should be adjusted as needed
    # Estimation : Early Stopping, by nu-SVR, at 25% learning curve - "threshold" should be adjusted as needed
        # ref: Baker et al. (2017), Accelerationg NAS using performance prediction.
        # NuSVR would be pretrained, with "n_samples" of dataset.
            # Learning Curve at 25% epochs - Final val_loss
    # Early Stopping : implemented by tensorflow.keras

1. Install requirements

In [None]:
!pip install tensorflow
!pip install scikit-optimize

2. Import basic packages

In [None]:
# Import Basics
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from itertools import product
import json
#-----------------------------------------------------------
# Add the required libraries for Bayesian optimization
import skopt
    # Caution : please open your scikit-optimize package,
    # and then replace every 'np.int' into 'int', in 'transformer.py'
    # Anaconda\Lib\site-packages\skopt\space\transformers.py
from skopt import gp_minimize
from skopt.utils import use_named_args
from skopt.space import Real, Categorical, Integer
#-----------------------------------------------------------

3. Apply GPU provided by Google Colaboratory.

Subscribing Colab PRO+ would be recommanded, for minimizing running time.
Thank you Google!

In [None]:
# Does GPU Works?
import os
import tensorflow as tf
from tensorflow.python.client import device_lib
#-----------------------------------------------------------
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
print(device_lib.list_local_devices())
#-----------------------------------------------------------

4. Import you Dataset

Indices should be clearly defined. Modify the **"num_of_input"**

Random state could be defined by user.

In [None]:
# Read Data
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
#-----------------------------------------------------------

# Number of inputs
num_of_input = 5
num_of_output = 1

file_name = 'MyInput.xlsx'
data = pd.read_excel(file_name)
data_clean = data.dropna()

# create a scaler object
scaler = MinMaxScaler()
scaler.fit(data_clean)
data_clean_normalized = scaler.transform(data_clean)

data_clean_normalized = pd.DataFrame(data_clean_normalized)

print(data_clean_normalized)

X_clean = data_clean_normalized.iloc[:, :num_of_input].values
y_clean = data_clean_normalized.iloc[:, num_of_input+num_of_output-1].values

print("\nShapes of enable datasets: ")
print("{}".format(data_clean.shape))

X_train, X_val, y_train, y_val = train_test_split(
    X_clean, y_clean, test_size=0.2, random_state=7)
#-----------------------------------------------------------

5. Basic functions

Modify **create_model** for further complexities, if needed.

In [None]:
# Generate earlyStopping callback
from tensorflow.keras.callbacks import EarlyStopping
#-----------------------------------------------------------
early_stopping = EarlyStopping(monitor='val_loss', patience=30, restore_best_weights=True)
#-----------------------------------------------------------


# Function : Create Model
#-----------------------------------------------------------
def create_model(layer_nodes, learning_rate, activation, dropout_rate, optimizer, loss_function):
    model = tf.keras.Sequential()
    model.add(tf.keras.layers.Input(shape=(num_of_input,)))

    for nodes in layer_nodes:
        model.add(tf.keras.layers.Dense(nodes, activation=activation))
        model.add(tf.keras.layers.Dropout(dropout_rate))

    model.add(tf.keras.layers.Dense(num_of_output))

    opt = None
    if optimizer == 'adam':
        opt = tf.keras.optimizers.Adam(learning_rate=learning_rate)
    elif optimizer == 'sgd':
        opt = tf.keras.optimizers.SGD(learning_rate=learning_rate)
    elif optimizer == 'rmsprop':
        opt = tf.keras.optimizers.RMSprop(learning_rate=learning_rate)

    model.compile(optimizer=opt, loss=loss_function, metrics=['mse'])
    return model
#-----------------------------------------------------------

# Function: show plot
#-----------------------------------------------------------
def show_plot(params, val_mse, epoch_rate, predicted_performance, toshow=False):
    if toshow==True:
        plt.plot(np.arange(int(params['epochs']*epoch_rate)), val_mse, label='Actual')
        plt.axhline(y=predicted_performance, color='r', linestyle='dashed', label='Predicted')
        plt.xlabel('Epoch')
        plt.ylabel('Validation MSE')
        plt.legend()
        plt.show()
#-----------------------------------------------------------

6. Search Space

The most basic settings:
Clearly define the space as needed.

The default settings are for "basic feed-forward ANN model".

In [None]:
# Define the search space
#-----------------------------------------------------------
    # Define the hyperparameter search space
    # Caution: this
max_layers = 4
layer_nodes_space = [Integer(16, 256, name=f'layer_{i+1}_nodes') for i in range(max_layers)]
space = [
    Integer(2, max_layers, name='number_of_layers'),
    *layer_nodes_space,
    Integer(16, 256, name='batch_size'),
    Categorical([400], name='epochs'),
    Real(1e-6, 1e-1, "log-uniform", name='learning_rate'),
    Categorical(['relu','sigmoid','gelu'], name='activation'),
    Categorical(['huber','mae','logcosh'], name='loss_function'),
    Categorical(['adam','sgd','rmsprop'], name='optimizer'),
    Real(0, 0.5, name='dropout_rate')
]
#-----------------------------------------------------------


# Function: Print Hyperparameters
#-----------------------------------------------------------
def print_model(params, layer_nodes):
    print("----------------------------------------")
    print("Model Specifications:")
    print(f"Number of Layers: {params['number_of_layers']}")
    print(f"Layer Nodes: {layer_nodes}")
    print(f"Learning Rate: {params['learning_rate']}")
    print(f"Activation Function: {params['activation']}")
    print(f"Loss Function: {params['loss_function']}")
    print(f"Optimizer: {params['optimizer']}")
    print(f"Epochs: {params['epochs']}")
    print(f"Dropout Rate: {params['dropout_rate']}")
    print(f"Batch Size: {params['batch_size']}")
    print("----------------------------------------")
#-----------------------------------------------------------


# Function: SVR Trainer
from sklearn.svm import NuSVR
from sklearn.svm import SVR
from skopt.sampler import Lhs
from joblib import dump
#-----------------------------------------------------------
# Function: Model Performance Estimation Strategy
    # by nu-SVR, would predict model performance
def predict_final_performance(val_loss, model=None, total_epochs=None):
    x_pred = np.array(val_loss).reshape(1,-1)
    prediction = model.predict(x_pred)
    return prediction

def pretrain_Estimator(X, y, model=None):
    if model is None:
        model = NuSVR(kernel='rbf', nu=0.5, gamma=0.1, C=100)
    model.fit(X, y)
    dump(model, 'estimator.joblib')
    return model
#-----------------------------------------------------------

7. Train SVR

For reasonable estimation of final validation loss of each model,

"n_samples" of random models would be evaluated.

After training, SVR could estimate the final val_loss of models

with high accuracy, at 25% of learning curve.

In [None]:
# Train SVR - from random samples
    # models: 25% learning curve - final_val_loss
#-----------------------------------------------------------
    # Create a sampler
sampler = Lhs(lhs_type="classic", criterion=None)
    # Sample "n_samples" points from the search space
samples = sampler.generate(space, n_samples=100)
    # Convert samples to list of dictionaries
random_datasets = [{dim.name: sample[i] for i, dim in enumerate(space)} for sample in samples]
    # Initialize the lists to store the results
val_losses_25pct = []
final_val_losses = []
    # Train each model and record the results
for params in random_datasets:
        # Create and compile the model
    number_of_layers = params['number_of_layers']
    layer_nodes = [params[f'layer_{i+1}_nodes'] for i in range(number_of_layers)]

    model = create_model(
        layer_nodes=layer_nodes,
        learning_rate=params['learning_rate'],
        activation=params['activation'],
        dropout_rate=params['dropout_rate'],
        optimizer=params['optimizer'],
        loss_function=params['loss_function']
    )

    model.compile(optimizer=params['optimizer'], loss=params['loss_function'])

        # Train the model
    history = model.fit(X_train, y_train, epochs=int(params['epochs']*0.25), batch_size=params['batch_size'],
                        validation_data=(X_val, y_val), verbose=2)

        # Record the 25% val_loss
    val_losses_25pct.append(history.history['val_loss'])

    # Continue training to 100%
    history = model.fit(X_train, y_train, epochs=params['epochs'],
                        batch_size=params['batch_size'],
                        validation_data=(X_val, y_val), verbose=2,
                        callbacks=[early_stopping],
                        initial_epoch=int(params['epochs']*0.25))

    # Record the final val_loss
    final_val_losses.append(history.history['val_loss'][-1])

# Train the NuSVR model
Learning_Curve_train = np.array(val_losses_25pct)
Model_Performance_train = np.array(final_val_losses)

estimator = pretrain_Estimator(
    Learning_Curve_train,
    Model_Performance_train)
#-----------------------------------------------------------

8. Objective Function for Bayesian Optimization with Gaussian Process

Thresholds could be modified, but modifying the epoch_rate would *not* be welcomed.

In [None]:
# Function: Objective Function
from joblib import load
estimator = load('estimator.joblib')
#-----------------------------------------------------------
@use_named_args(space)
def objective(**params):
    # User Input for Performance estimation
    threshold=0.00015
    epoch_rate=0.25 # When the model to be estimated
    #-------------------------------------------------------

    number_of_layers = params['number_of_layers']
    layer_nodes = [params[f'layer_{i+1}_nodes'] for i in range(number_of_layers)]

    model = create_model(
        layer_nodes=layer_nodes,
        learning_rate=params['learning_rate'],
        activation=params['activation'],
        dropout_rate=params['dropout_rate'],
        optimizer=params['optimizer'],
        loss_function=params['loss_function']
    )

    # Print the model structure
    print_model(params, layer_nodes)

    val_mse = []
    for epoch in range(params['epochs']):
        history = model.fit(X_train, y_train, epochs=1,
                            batch_size=params['batch_size'],
                            validation_data=(X_val, y_val),
                            callbacks=[early_stopping],verbose=2)
        val_mse.append(history.history['val_mse'][-1])

        if epoch == int(params['epochs'] * epoch_rate-1):
            predicted_performance = predict_final_performance(val_mse, estimator)
            print("\nPredicted Performance: {}\n".format(predicted_performance))

            # Plot actual vs. predicted learning curve
            #show_plot(params, val_mse, epoch_rate, predicted_performance, toshow=False)

            if predicted_performance >= threshold:
                break

    return val_mse[-1]
#-----------------------------------------------------------

9. MAIN

for n_calls of iterations, BOwGP will find out the optimal model.

logs(without verboses) would be automatically generated in .csv format.

In [None]:
# MAIN: Run Bayesian optimization
import time
#-----------------------------------------------------------
    # n_calls : iterations for iptimization
    # Recommand : n_calls might be large enough, dependent to Search Space
n_calls = 200
res_gp = gp_minimize(objective, dimensions=space, n_calls=n_calls, random_state=7, verbose=True)

    # Extract and print the best parameters
best_params = res_gp.x
print("\nBest parameters:", best_params)
print("Best MSE:", res_gp.fun)

    # Convert results to DataFrame and save as CSV
results_df = pd.DataFrame(res_gp.x_iters, columns=[dimension.name for dimension in space])
results_df['MSE'] = res_gp.func_vals

    # Optionally, add a column to indicate which configurations are better than a certain threshold
result_threshold = 0.00005  # adjust this value as needed
results_df['is_better_than_threshold'] = results_df['MSE'] < result_threshold

    # Save to CSV
results_df.to_csv("NAS_Result_{}.csv".format(
    time.strftime('%Y_%m_%d_%H_%M', time.localtime(time.time()))
    ), index=False)
#-----------------------------------------------------------



10. Export

The best model would be exported as .h5 format.

In [None]:
# Export best '.h5' file
#-----------------------------------------------------------
best_number_of_layers = best_params[0]
best_layer_nodes = best_params[1:1+best_number_of_layers]
best_batch_size = best_params[1+max_layers]
best_epochs = best_params[2+max_layers]
best_learning_rate = best_params[3+max_layers]
best_activation = best_params[4+max_layers]
best_loss_function = best_params[5+max_layers]
best_optimizer = best_params[6+max_layers]
best_dropout_rate = best_params[7+max_layers]

best_model = create_model(
    layer_nodes=best_layer_nodes,
    learning_rate=best_learning_rate,
    activation=best_activation,
    dropout_rate=best_dropout_rate,
    optimizer=best_optimizer,
    loss_function=best_loss_function,
)

best_model.fit(X_train, y_train, epochs=best_epochs, batch_size=best_batch_size)
best_model.save("Best_Model_{}.h5".format(
    time.strftime('%Y_%m_%d_%H_%M', time.localtime(time.time()))
    ))

best_val_loss = min(best_model.history.history['val_loss'])
print(f"Best Validation Loss: {best_val_loss}")
#-----------------------------------------------------------

11. further visualizations of convergence curve. enjoy!

In [None]:
# Visualization
#-----------------------------------------------------------
from skopt.plots import plot_convergence, plot_objective, plot_evaluations

# Convergence plot
plot_convergence(res_gp)
plt.show()

'''
# Objective plot (can be computationally intensive for large search spaces)
plot_objective(res_gp)
plt.show()

# Evaluation plot
plot_evaluations(res_gp)
plt.show()
'''
#-----------------------------------------------------------

12. To rewind the normalization.

In [None]:
# create a scaler object
y_scaler = MinMaxScaler()
y_scaler.fit(data_clean.iloc[:,num_of_input].values.reshape(-1,1))

# Inverse Transform
def inverse_transform_y(y_normalized):
    y_reshaped = y_normalized.reshape(-1, num_of_output)
    return y_scaler.inverse_transform(y_reshaped)

13. Comparison between the dataset, and predicted values.

In [None]:
from sklearn.metrics import r2_score

# Generate prediction
y_pred = model.predict(X_val)

print(y_val)
print(y_pred)

# Rescale y : remind - those were previously normalized by MixMaxScaler
y_pred_rescaled = inverse_transform_y(y_pred)/315
y_val_rescaled = inverse_transform_y(y_val)/315

# for plot
min_axis = min(np.amin(y_pred_rescaled), np.amin(y_val_rescaled))
max_axis = max(np.amax(y_pred_rescaled), np.amax(y_val_rescaled))

print(y_val_rescaled)
print(y_pred_rescaled)

# R squared
r2 = r2_score(y_val_rescaled, y_pred_rescaled)

# COV
cov = (np.std(y_pred_rescaled - y_val_rescaled) / np.mean(y_val_rescaled))

# Show Plot: Data vs Predicted Value
plt.figure(figsize=(5,5))
plt.scatter(y_val_rescaled, y_pred_rescaled, alpha=0.4)
plt.plot([min_axis, max_axis], [min_axis, max_axis], color='red', linestyle='--')  # y=x line
plt.title(f"Data vs Predicted (R^2: {r2:.5f}, COV: {cov:.5f})")
plt.xlabel("Data")
plt.ylabel("Predicted Values")
plt.grid(True)
plt.show()


All tasks were finished.

Thank you for using!