In [None]:
def fit_and_validate_lstm_model(
    X,
    y,
    train_index,
    val_index,
    params,
):
    X_tr, X_val = X[train_index], X[val_index]
    y_tr, y_val = y[train_index], y[val_index]
    

    model = Sequential()
    model.add(Input(shape=(params["window_size"], X_tr.shape[-1])))
    model.add(LSTM(units=params["n_units_1"], return_sequences=False, seed=42))
    model.add(Dropout(params["dropout_1"]))
    # model.add(LSTM(units=params["n_units_2"], seed=42))
    # model.add(Dropout(params["dropout_2"]))
    model.add(Dense(params["n_neurons"]))
    model.add(Dense(1))
    model.compile(
        optimizer=Adam(learning_rate=params["learning_rate"]),
        loss=MeanSquaredError(),
        metrics=[RootMeanSquaredError()],
    )
    
    
    early_stopping = EarlyStopping(monitor='val_loss', patience=40, restore_best_weights=True)
    
    _ = model.fit(X_tr, y_tr, epochs=100, validation_data=(X_val, y_val), callbacks=[early_stopping], verbose=0, batch_size=params["batch_size"])

    # obtain predictions
    y_val_pred = model.predict(X_val)
    y_val_pred = np.squeeze(y_val_pred)
    
    # return metrics
    return np.sqrt(mean_squared_error(y_val, y_val_pred))

In [None]:
def objective(trial: optuna.trial.Trial, X_cv, y_cv) -> float:
    config = {
        "n_units_1": trial.suggest_categorical("n_units_1", [20, 40, 60]),
        # "n_units_2": trial.suggest_categorical("n_units_2", [20, 40, 60]),
        "n_neurons": trial.suggest_categorical("n_neurons", [20, 40, 60]),        
        "learning_rate": trial.suggest_float("learning_rate", 1e-5, 1e-1, log=True),
        "window_size": trial.suggest_int("window_size", 1, 24, step=1),
        "batch_size": trial.suggest_categorical("batch_size", [16, 32, 64, 128]),
        "dropout_1": trial.suggest_float("dropout_1", 0.1, 0.5),
        # "dropout_2": trial.suggest_float("dropout_2", 0.1, 0.5),
    }
    
    window_size = config["window_size"]
    
    X_train, _, y_train, _ = train_test_split(X_cv, y_cv, test_size=0.2, shuffle=False, random_state=42)
    
    X_train_seq, y_train_seq, _ = create_sequences(X_train, y_train, window_size)

    n_splits = 5
    cv = TimeSeriesSplit(n_splits=n_splits)
    cv_rmse = [None] * n_splits
    for i, (train_index, test_index) in enumerate(
        cv.split(X_train_seq, y_train_seq)
    ):
        cv_rmse[i] = fit_and_validate_lstm_model(
            X_train_seq,
            y_train_seq,
            train_index,
            test_index,
            config,
        )
        
    # saving the individual fold holdout metrics
    # uncomment this line if you don't want this
    # trial.set_user_attr("split_rmse", cv_rmse)
    
    return np.mean(cv_rmse)

In [None]:
lstm_studies = {}

for target_variable, (X, y) in lstm_datasets.items():
    
    if target_variable == 'HNAC (1_mL)':
    
        if os.path.exists(f"{feltre_sqlites_folder}/LSTM - {target_variable}.sqlite3"):
                
            study = optuna.load_study(
            study_name="Hyperparameter Tuning - LSTM - " + target_variable,
            storage=f"sqlite:///{feltre_sqlites_folder}/LSTM - {target_variable}.sqlite3",
            )
                
        else:
                
            study = optuna.create_study(
                direction="minimize",
                storage=f"sqlite:///{feltre_sqlites_folder}/LSTM - {target_variable}.sqlite3",
                study_name="Hyperparameter Tuning - LSTM - " + target_variable,
                load_if_exists=True,
            )
            study.optimize(lambda trial: objective(trial, X.copy(), y.copy()), n_trials=100, show_progress_bar=True)
                
        lstm_studies[target_variable] = study  

## GRU

In [None]:
def fit_and_validate_gru_model(
    X,
    y,
    train_index,
    val_index,
    params,
):
    X_tr, X_val = X[train_index], X[val_index]
    y_tr, y_val = y[train_index], y[val_index]
    

    model = Sequential()
    model.add(Input(shape=(params["window_size"], X_tr.shape[-1])))
    model.add(GRU(units=params["n_units_1"], return_sequences=False, seed=42))
    model.add(Dropout(params["dropout_1"]))
    # model.add(GRU(units=params["n_units_2"], seed=42))
    # model.add(Dropout(params["dropout_2"]))
    model.add(Dense(params["n_neurons"]))
    model.add(Dense(1))
    model.compile(
        optimizer=Adam(learning_rate=params["learning_rate"]),
        loss=MeanSquaredError(),
        metrics=[RootMeanSquaredError()],
    )
    
    
    early_stopping = EarlyStopping(monitor='val_loss', patience=40, restore_best_weights=True)
    
    _ = model.fit(X_tr, y_tr, epochs=100, validation_data=(X_val, y_val), callbacks=[early_stopping], verbose=0, batch_size=params["batch_size"])

    # obtain predictions
    y_val_pred = model.predict(X_val)
    y_val_pred = np.squeeze(y_val_pred)
    
    # return metrics
    return np.sqrt(mean_squared_error(y_val, y_val_pred))

In [None]:
def objective(trial: optuna.trial.Trial, X_cv, y_cv) -> float:
    config = {
        "n_units_1": trial.suggest_categorical("n_units_1", [20, 40, 60]),
        # "n_units_2": trial.suggest_categorical("n_units_2", [20, 40, 60]),
        "n_neurons": trial.suggest_categorical("n_neurons", [20, 40, 60]),        
        "learning_rate": trial.suggest_float("learning_rate", 1e-5, 1e-1, log=True),
        "window_size": trial.suggest_int("window_size", 1, 24, step=1),
        "batch_size": trial.suggest_categorical("batch_size", [16, 32, 64, 128]),
        "dropout_1": trial.suggest_float("dropout_1", 0.1, 0.5),
        # "dropout_2": trial.suggest_float("dropout_2", 0.1, 0.5),
    }
    
    window_size = config["window_size"]
    
    X_train, _, y_train, _ = train_test_split(X_cv, y_cv, test_size=0.2, shuffle=False, random_state=42)
    
    X_train_seq, y_train_seq, _ = create_sequences(X_train, y_train, window_size)

    n_splits = 5
    cv = TimeSeriesSplit(n_splits=n_splits)
    cv_rmse = [None] * n_splits
    for i, (train_index, test_index) in enumerate(
        cv.split(X_train_seq, y_train_seq)
    ):
        cv_rmse[i] = fit_and_validate_gru_model(
            X_train_seq,
            y_train_seq,
            train_index,
            test_index,
            config,
        )
        
    # saving the individual fold holdout metrics
    # uncomment this line if you don't want this
    # trial.set_user_attr("split_rmse", cv_rmse)
    
    return np.mean(cv_rmse)

In [None]:
gru_studies = {}

for target_variable, (X, y) in lstm_datasets.items():
    
    if target_variable == 'HNAC (1_mL)':
    
        if os.path.exists(f"{feltre_sqlites_folder}/GRU - {target_variable}.sqlite3"):
                
            study = optuna.load_study(
            study_name="Hyperparameter Tuning - GRU - " + target_variable,
            storage=f"sqlite:///{feltre_sqlites_folder}/GRU - {target_variable}.sqlite3",
            )
                
        else:
                
            study = optuna.create_study(
                direction="minimize",
                storage=f"sqlite:///{feltre_sqlites_folder}/GRU - {target_variable}.sqlite3",
                study_name="Hyperparameter Tuning - GRU - " + target_variable,
                load_if_exists=True,
            )
            study.optimize(lambda trial: objective(trial, X.copy(), y.copy()), n_trials=100, show_progress_bar=True)
                
        gru_studies[target_variable] = study  

## Bidirectional LSTM

In [None]:
def fit_and_validate_bi_lstm_model(
    X,
    y,
    train_index,
    val_index,
    params,
):
    X_tr, X_val = X[train_index], X[val_index]
    y_tr, y_val = y[train_index], y[val_index]
    

    model = Sequential()
    model.add(Input(shape=(params["window_size"], X_tr.shape[-1])))
    model.add(Bidirectional(LSTM(units=params["n_units_1"], return_sequences=False, seed=42)))
    model.add(Dropout(params["dropout_1"]))
    # model.add(LSTM(units=params["n_units_2"], seed=42))
    # model.add(Dropout(params["dropout_2"]))
    model.add(Dense(params["n_neurons"]))
    model.add(Dense(1))
    model.compile(
        optimizer=Adam(learning_rate=params["learning_rate"]),
        loss=MeanSquaredError(),
        metrics=[RootMeanSquaredError()],
    )
    
    
    early_stopping = EarlyStopping(monitor='val_loss', patience=40, restore_best_weights=True)
    
    _ = model.fit(X_tr, y_tr, epochs=100, validation_data=(X_val, y_val), callbacks=[early_stopping], verbose=0, batch_size=params["batch_size"])

    # obtain predictions
    y_val_pred = model.predict(X_val)
    y_val_pred = np.squeeze(y_val_pred)
    
    # return metrics
    return np.sqrt(mean_squared_error(y_val, y_val_pred))

In [None]:
def objective(trial: optuna.trial.Trial, X_cv, y_cv) -> float:
    config = {
        "n_units_1": trial.suggest_categorical("n_units_1", [20, 40, 60]),
        # "n_units_2": trial.suggest_categorical("n_units_2", [20, 40, 60]),
        "n_neurons": trial.suggest_categorical("n_neurons", [20, 40, 60]),       
        "learning_rate": trial.suggest_float("learning_rate", 1e-5, 1e-1, log=True),
        "window_size": trial.suggest_int("window_size", 1, 24, step=1),
        "batch_size": trial.suggest_categorical("batch_size", [16, 32, 64, 128]),
        "dropout_1": trial.suggest_float("dropout_1", 0.1, 0.5),
        # "dropout_2": trial.suggest_float("dropout_2", 0.1, 0.5),
    }
    
    window_size = config["window_size"]
    
    X_train, _, y_train, _ = train_test_split(X_cv, y_cv, test_size=0.2, shuffle=False, random_state=42)
    
    X_train_seq, y_train_seq, _ = create_sequences(X_train, y_train, window_size)

    n_splits = 5
    cv = TimeSeriesSplit(n_splits=n_splits)
    cv_rmse = [None] * n_splits
    for i, (train_index, test_index) in enumerate(
        cv.split(X_train_seq, y_train_seq)
    ):
        cv_rmse[i] = fit_and_validate_bi_lstm_model(
            X_train_seq,
            y_train_seq,
            train_index,
            test_index,
            config,
        )
        
    # saving the individual fold holdout metrics
    # uncomment this line if you don't want this
    # trial.set_user_attr("split_rmse", cv_rmse)
    
    return np.mean(cv_rmse)

In [None]:
bi_lstm_studies = {}

for target_variable, (X, y) in lstm_datasets.items():
    
    if target_variable == 'HNAC (1_mL)':
    
        if os.path.exists(f"{feltre_sqlites_folder}/BI_LSTM - {target_variable}.sqlite3"):
                
            study = optuna.load_study(
            study_name="Hyperparameter Tuning - BI_LSTM - " + target_variable,
            storage=f"sqlite:///{feltre_sqlites_folder}/BI_LSTM - {target_variable}.sqlite3",
            )
                
        else:
                
            study = optuna.create_study(
                direction="minimize",
                storage=f"sqlite:///{feltre_sqlites_folder}/BI_LSTM - {target_variable}.sqlite3",
                study_name="Hyperparameter Tuning - BI_LSTM - " + target_variable,
                load_if_exists=True,
            )
            study.optimize(lambda trial: objective(trial, X.copy(), y.copy()), n_trials=100, show_progress_bar=True)
                
        bi_lstm_studies[target_variable] = study  