In [None]:
from google.colab import drive
drive.mount('/content/drive')
%cd /content/drive/MyDrive/Colab_Notebooks/footprints_mtr

In [None]:
!pip install optuna

In [None]:
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import mean_absolute_error
import optuna
from optuna.samplers import TPESampler
from optuna.visualization import plot_optimization_history, plot_param_importances, plot_contour, plot_parallel_coordinate
import joblib
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import random
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Input
from sklearn.linear_model import MultiTaskElasticNet
from sklearn.ensemble import RandomForestRegressor

# Set seed for Python's random module
random.seed(42)
# Set seed for NumPy's random generator
np.random.seed(42)
# Set seed for TensorFlow
tf.random.set_seed(42)

In [None]:
from configuration import data_path as DATA_PATH
from configuration import results_path as RESULTS_PATH
from configuration import model_name as MODEL_NAME
from configuration import n_splits as N_SPLITS
from configuration import n_trials as N_TRIALS
from configuration import batchsize as BATCH_SIZE
from configuration import epochs as EPOCHS

In [None]:
MODEL_NAME, N_SPLITS, N_TRIALS, BATCH_SIZE, EPOCHS

In [None]:
DATA_PATH, RESULTS_PATH

In [None]:
# Function to create stratified train-validation splits
def train_validation_split(n_splits=5):
    """Create cross-validation splits."""
    X_train = pd.read_csv(f"{DATA_PATH}/X_train.csv", index_col=["f_id", "i_id"])
    y_train = pd.read_csv(f"{DATA_PATH}/y_train.csv", index_col=["f_id", "i_id"])

    skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)

    splits = [
        (X_train.iloc[train_idx], X_train.iloc[valid_idx], y_train.iloc[train_idx], y_train.iloc[valid_idx])
        for train_idx, valid_idx in skf.split(X_train, X_train.index.get_level_values("f_id"))
    ]
    return splits

# Function to create different models based on the trial's hyperparameters
def create_model(trial, num_features: int, num_targets: int):
    """Create and return a model based on the model name and trial hyperparameters."""

    if MODEL_NAME == "multitask_elastic_net":
        alpha = trial.suggest_loguniform("alpha", 1e-5, 1e5)
        l1_ratio = trial.suggest_uniform("l1_ratio", 0, 1)
        model = MultiTaskElasticNet(alpha=alpha, l1_ratio=l1_ratio)

    elif MODEL_NAME == "random_forest":
        n_estimators = trial.suggest_int('n_estimators', 10, 200)
        max_depth = trial.suggest_int("max_depth", 3, 9, step=2)
        max_features = trial.suggest_categorical('max_features', ['sqrt', 'log2', None, 1.0])
        min_samples_split = trial.suggest_int('min_samples_split', 2, 20)
        min_samples_leaf = trial.suggest_int('min_samples_leaf', 2, 20)
        model = RandomForestRegressor(
            n_estimators=n_estimators, max_depth=max_depth, max_features=max_features,
            min_samples_split=min_samples_split, min_samples_leaf=min_samples_leaf, random_state=42
        )

    elif MODEL_NAME == "neural_network":
        n_layers = trial.suggest_int("n_layers", 1, 3)
        model = Sequential()
        model.add(Input(shape=(num_features,)))

        for i in range(n_layers):
            num_hidden = trial.suggest_int(f"n_units_l{i}", 4, 128, log=True)
            model.add(Dense(num_hidden, activation="relu", kernel_initializer="normal"))
            dropout = trial.suggest_float(f"dropout_l{i}", 0.2, 0.5)
            model.add(Dropout(rate=dropout, seed=seed_value))

        model.add(Dense(num_targets))
        model.compile(loss="mean_absolute_error", optimizer="adam")

    else:
        raise ValueError(f"Unknown model name: {MODEL_NAME}")

    return model


# Objective function for Optuna hyperparameter optimization
def objective(trial):
    splits = train_validation_split()
    scores = []

    for X_train, X_valid, y_train, y_valid in splits:
        model = create_model(trial, num_features=X_train.shape[1], num_targets=y_train.shape[1])

        if MODEL_NAME == "neural_network":
            model.fit(X_train, y_train, batch_size=BATCH_SIZE, epochs=EPOCHS, verbose=False)
            score = mean_absolute_error(y_valid, model.predict(X_valid))
        else:
            model.fit(X_train, y_train)
            score = mean_absolute_error(y_valid, model.predict(X_valid))

        scores.append(score)

    return np.mean(scores)


# Function to run hyperparameter optimization
def run_hyperparameter_optimization():
    """Run Optuna hyperparameter optimization."""
    study = optuna.create_study(direction="minimize", sampler=TPESampler(n_startup_trials=40, seed=42))

    # Enqueue some manually tested values
    if MODEL_NAME == "multitask_elastic_net":
        trials = [
            {"alpha": 1.0, "l1_ratio": 0.5},
            {"alpha": 0.5, "l1_ratio": 1},
            {"alpha": 1e-8, "l1_ratio": 0.0},
            {"alpha": 1.0, "l1_ratio": 0.0},
            {"alpha": 1e-7, "l1_ratio": 1.0},
            {"alpha": 1.0, "l1_ratio": 1.0},
            {"alpha": 1e-7, "l1_ratio": 0.5},
            {"alpha": 0.5, "l1_ratio": 0.0}
        ]
        for t in trials:
            study.enqueue_trial(t)

    elif MODEL_NAME == "random_forest":
        study.enqueue_trial({
            'n_estimators': 100, 'min_samples_split': 2, 'min_samples_leaf': 2,
            'max_features': 1.0, 'min_impurity_decrease': 0.0, 'ccp_alpha': 0.0, 'random_state': 42
        })

    study.optimize(objective, n_trials=N_TRIALS)

    print(f"Best trial number: {study.best_trial.number}")
    print(f"Best params: {study.best_params}")
    print(f"Best score: {study.best_value}")

    # Save the best parameters and study results
    joblib.dump(study.best_params, f"{RESULTS_PATH}/best_params.joblib")
    joblib.dump(study, f"{RESULTS_PATH}/optuna_study.pkl")
    study.trials_dataframe().to_csv(f"{RESULTS_PATH}/optimization_report.csv")

    # Visualize the hyperparameter search process
    visualize_hyperparameter_search(study, save_path=RESULTS_PATH)

    return study.best_params


# Function to visualize Optuna hyperparameter optimization results
def visualize_hyperparameter_search(study, save_path):
    """Visualize the Optuna hyperparameter search process."""
    plots = [
        (plot_optimization_history, "optimization_history"),
        (plot_param_importances, "param_importances"),
        (plot_contour, "contour_plot"),
        (plot_parallel_coordinate, "parallel_coordinate")
    ]

    for plot_fn, filename in plots:
        plt.figure(figsize=(10, 10))
        plot_fn(study)
        plt.savefig(f"{save_path}/{filename}.png", bbox_inches='tight', dpi=300)
        plt.show()
        plt.close()

RUN HYPERPARAMETER OPTIMIZATION

In [None]:
params = run_hyperparameter_optimization()

In [None]:
params

In [None]:
# Function to create a sequential neural network model
def create_sequential_model(params, num_features, num_targets):
    """
    Create a neural network model with layers based on the parameters provided by Optuna.
    """
    model = Sequential()
    model.add(Input(shape=(num_features,)))  # Input layer
    # Add hidden layers dynamically based on n_layers parameter
    for i in range(params['n_layers']):
        model.add(Dense(params[f'n_units_l{i}'], activation="relu", kernel_initializer="normal"))
        model.add(Dropout(rate=params[f'dropout_l{i}']))
    model.add(Dense(num_targets))  # Output layer with num_targets
    # Compile the model
    model.compile(
        loss="mean_absolute_error",
        optimizer="adam"
    )
    model.summary()  # Display the model summary
    return model

def plot_learning_curve(params, save_path):
    """ Train the model and plot the learning curve for each cross-validation split. """
    splits = train_validation_split()
    i = 0
    for X_train, X_valid, y_train, y_valid in splits:
      # Create the model with the current parameters
      model = create_sequential_model(params=params, num_features=X_train.shape[1], num_targets=y_train.shape[1])

      # Fit the model and save the training history
      history = model.fit(X_train, y_train, epochs=100, batch_size=BATCH_SIZE, validation_data=(X_valid, y_valid), verbose=0)

      # Evaluate model on the validation set
      model.evaluate(X_valid, y_valid, verbose=1)

      # Plot training and validation loss
      plt.plot(history.history['loss'], label='Train Loss')
      plt.plot(history.history['val_loss'], label='Validation Loss')
      plt.title(f'Model Loss - Split {i}')
      plt.xlabel('Epochs')
      plt.ylabel('Loss')
      plt.legend(loc='upper right')
      i = i + 1
      # Save the plot
      plt.savefig(f'{save_path}/learning_curve_{i}.png', bbox_inches='tight', dpi=300)
      plt.show()

PLOT LEARNING CURVE

In [None]:
if MODEL_NAME == "neural_network":
  plot_learning_curve(params=params, save_path=RESULTS_PATH)