Hyperparameter Tuning with the HParams Dashboard and Mealpy
https://www.tensorflow.org/tensorboard/hyperparameter_tuning_with_hparams

In [None]:
import os
import tensorflow as tf
from tensorboard.plugins.hparams import api as hp

import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler, OneHotEncoder
from sklearn.metrics import accuracy_score

from keras.models import Sequential
from keras.layers import LSTM, Dense, Bidirectional, Dropout, BatchNormalization, InputLayer
from keras.callbacks import EarlyStopping
from keras.optimizers import Adam, Nadam, SGD, RMSprop, Adadelta, Adagrad, Adamax

from mealpy import FloatVar, StringVar, IntegerVar, Problem
from mealpy.swarm_based import AO

import time
from datetime import datetime
from collections import Counter
from imblearn.over_sampling import SMOTE

# **Load the data**

In [2]:
# Load the dataset
data = pd.read_csv('data_org.csv')

# dropping some columns 
data = data.drop(['p1', 'p2', 'p3', 'p4','stab'], axis=1)

# Select all columns except the last one for X
X = data.iloc[:, :-1]
y = data.iloc[:, -1].values

print("Original class distribution:", Counter(y))
X, y = SMOTE(random_state=42).fit_resample(X, y)
X = np.array(X)
y = np.array(y)
print("After SMOTE class distribution:", Counter(y))

# Encode the categorical target variable
encoder = OneHotEncoder(sparse_output=False)
y_encoded = encoder.fit_transform(y.reshape(-1, 1))

# Split the dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y_encoded, test_size=0.2, random_state=42)

# Normalize the features
# scaler = StandardScaler()
scaler = MinMaxScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Reshape input to be 3D [samples, timesteps, features] for LSTM
X_train_scaled = X_train_scaled.reshape((X_train_scaled.shape[0], 1, X_train_scaled.shape[1]))
X_test_scaled = X_test_scaled.reshape((X_test_scaled.shape[0], 1, X_test_scaled.shape[1]))

data = [X_train_scaled, X_test_scaled, y_train, y_test]

Original class distribution: Counter({'unstable': 6380, 'stable': 3620})
After SMOTE class distribution: Counter({'unstable': 6380, 'stable': 6380})


# **Define the bounds for hyperparameters**

In [3]:
# These are mealpy's hyperparameters for model's tunninig 
my_bounds = [
    IntegerVar(lb=5, ub=40, name="num_units"),
    IntegerVar(lb=5, ub=100, name="epochs"),
    IntegerVar(lb=16, ub=64, name="batch_size"),
    FloatVar(lb=0.0, ub=0.2, name="dropout"),
    StringVar(valid_sets=('softmax', 'relu', 'sigmoid', 'tanh', 'elu', 'LeakyReLU'), name="activation"),
    StringVar(valid_sets=('Adam', 'Nadam', 'SGD', 'RMSprop', 'Adadelta', 'Adagrad', 'Adamax'), name="optimizer"),
    # FloatVar(lb=0.0001, ub=0.1, name="learning_rate"),
]

In [4]:
# Save the logs to use them later with TensorBoard's Dashboard
SAVE_LOGS_TO_PATH = f"logs/hparam_tuning_{datetime.now().strftime('%Y_%m_%d_%H_%M_%S')}"

if SAVE_LOGS_TO_PATH is not None and not os.path.exists(SAVE_LOGS_TO_PATH):
    os.makedirs(SAVE_LOGS_TO_PATH)

In [5]:
# Visualize the results in TensorBoard's HParams plugin

print(f"tensorboard --logdir {SAVE_LOGS_TO_PATH}")

tensorboard --logdir logs/hparam_tuning_2024_12_12_11_56_22


In [6]:
# These are HParams hyperparameters for logs
HP_NUM_UNITS = hp.HParam('num_units', hp.IntInterval(5, 40))
HP_EPOCHS = hp.HParam('epochs', hp.IntInterval(5, 100))
HP_BATCH_SIZE = hp.HParam('batch_size', hp.IntInterval(16, 64))
HP_DROPOUT = hp.HParam('dropout', hp.RealInterval(0.0, 0.2))
HP_ACTIVATION = hp.HParam('activation', hp.Discrete(['softmax', 'relu', 'sigmoid', 'tanh', 'elu', 'LeakyReLU']))
HP_OPTIMIZER = hp.HParam('optimizer', hp.Discrete(['Adam', 'Nadam', 'SGD', 'RMSprop', 'Adadelta', 'Adagrad', 'Adamax']))

METRIC_ACCURACY = 'accuracy'

with tf.summary.create_file_writer(SAVE_LOGS_TO_PATH).as_default():
    hp.hparams_config(
        hparams=[HP_NUM_UNITS, HP_EPOCHS, HP_BATCH_SIZE, HP_DROPOUT, HP_ACTIVATION, HP_OPTIMIZER],
        metrics=[hp.Metric(METRIC_ACCURACY, display_name='Accuracy')],
    )

# **Define the custom problem for optimization**

In [7]:
class LstmOptimizationProblem(Problem):    
    def __init__(self, bounds=None, minmax="max", data=None, **kwargs):
        self.data = data
        super().__init__(bounds, minmax, obj_weights=(1.0, 0.), **kwargs)

    def obj_func(self, x):
        x_decoded = self.decode_solution(x)
    
        v_num_units = x_decoded["num_units"]
        v_epochs = x_decoded["epochs"]
        v_batch_size = x_decoded["batch_size"]
        v_dropout = x_decoded["dropout"]
        v_activation = x_decoded["activation"]
        v_optimizer = x_decoded["optimizer"]
        v_loss_function = "categorical_crossentropy"
        

        model = Sequential([
            InputLayer(input_shape=(X_train_scaled.shape[1], X_train_scaled.shape[2])),
            
            Bidirectional(LSTM(units=v_num_units, return_sequences=True), merge_mode='sum'),
            Dropout(v_dropout),
            
            Bidirectional(LSTM(units=v_num_units, return_sequences=True), merge_mode='sum'),
            Dropout(v_dropout),
            
            Bidirectional(LSTM(units=v_num_units, return_sequences=False), merge_mode='sum'),
            Dropout(v_dropout),
            
            Dense(units=y_encoded.shape[1], activation=v_activation)
        ])

        model.compile(optimizer=v_optimizer, loss=v_loss_function, metrics=['accuracy'])

        early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)

        # Directory for TensorBoard logs specific to each trial
        trial_log_dir = f'{SAVE_LOGS_TO_PATH}/{datetime.now().strftime("%Y_%m_%d_%H_%M_%S")}'

        hparams = {
            HP_NUM_UNITS: v_num_units,
            HP_EPOCHS: v_epochs,
            HP_BATCH_SIZE: v_batch_size,
            HP_DROPOUT: v_dropout,
            HP_ACTIVATION: v_activation,
            HP_OPTIMIZER: v_optimizer,
        }
        
        history = model.fit(X_train_scaled, 
                    y_train, 
                    epochs=v_epochs, 
                    batch_size=v_batch_size,
                    validation_data=(X_test_scaled, y_test), 
                    callbacks=[
                        early_stopping,
                        tf.keras.callbacks.TensorBoard(trial_log_dir),  # Log metrics
                        # hp.KerasCallback(trial_log_dir, hparams),  # log hparams
                    ], 
                    verbose=0)
        
        # val_loss, accuracy = model.evaluate(X_test_scaled, y_test, verbose=0)
         
        # Predict the test set results
        y_pred = model.predict(X_test_scaled)
        y_pred_classes = np.argmax(y_pred, axis=1)
        y_true = np.argmax(y_test, axis=1)
        accuracy = accuracy_score(y_true, y_pred_classes)
        
        print(f"Accuracy = {accuracy}, num_units = {v_num_units}, activation = {v_activation}, optimizer = {v_optimizer}, epochs = {v_epochs}, batch_size = {v_batch_size}")
        
        with tf.summary.create_file_writer(trial_log_dir).as_default():
            hp.hparams(hparams)  # record the values used in this trial
            tf.summary.scalar("accuracy", accuracy, step=1)
            
        return accuracy 

# **Run the Experiment**

In [None]:
try:
    # Define the optimization problem
    problem = LstmOptimizationProblem(bounds=my_bounds, minmax="max", data=data)
    
    # Define the Adaptive Aquila Optimizer (AAO) model for hyperparameter tuning with both k and m
    model = AO.AdaptiveAO(epoch=20, pop_size=15, sharpness=10, sigmoid_midpoint=0.5)
 
    start_time = datetime.now()
    print('Time Start', start_time)
    model.solve(problem)
    end_time = datetime.now()
    print('Time elapsed', end_time - start_time)
    
    best_agent = model.g_best
    best_solution = model.g_best.solution
    best_accuracy = model.g_best.target.fitness
    best_parameters = model.problem.decode_solution(model.g_best.solution)
    
    print(f"Best agent: {best_agent}")
    print(f"Best solution: {best_solution}")
    print(f"Best accuracy: {best_accuracy}")
    print(f"Best parameters: {best_parameters}")
    
except Exception as e:
    print(f"Error occured': {str(e)}")