# Try out raytune with scikit & keras

In [None]:
import random

import mlflow
import numpy as np
import tensorflow as tf
from ray import air, tune
from ray.air import session
from ray.air.integrations.mlflow import MLflowLoggerCallback
from ray.tune.integration.keras import TuneReportCallback
from ray.tune.schedulers import AsyncHyperBandScheduler, HyperBandScheduler
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
from tensorflow.keras import layers, models, optimizers

from q2_time.engineer_features import transform_features
from q2_time.model import split_data_by_host
from q2_time.simulate_data import simulate_data

%matplotlib inline
%load_ext autoreload
%autoreload 2

In [None]:
# Load a simulated dataset
host_id = "host_id"
target = "age_days"
train_size = 0.8
seed_data = 12
seed_model = 12

# todo: potentially rename columns to track md and ft columns
ft, md = simulate_data(100)
data = md.join(ft, how="left")
data.sort_values([host_id, target], inplace=True)

# (train+val) & test split
train_val, test = split_data_by_host(data, host_id, train_size, seed_data)

In [None]:
def process_data(config, train_val, target, seed_data):
    non_features_ls = [target, "host_id"]
    # todo: redefine features selection here -> config
    features_ls = [x for x in train_val.columns if x not in non_features_ls]

    # feature engineering method -> config
    if config["data_transform"] is None:
        ft_transformed = train_val[features_ls].copy()
    else:
        ft_transformed = transform_features(
            train_val[features_ls], config["data_transform"], config["alr_denom_idx"]
        )
    train_val_t = train_val[non_features_ls].join(ft_transformed)

    # train & val split - for training purposes
    train, val = split_data_by_host(train_val_t, host_id, 0.8, seed_data)
    X_train, y_train = train[ft_transformed.columns], train[target]
    X_val, y_val = val[ft_transformed.columns], val[target]
    return X_train.values, y_train.values, X_val.values, y_val.values


def predict_score(model, X, y):
    y_pred = model.predict(X)
    return mean_squared_error(y, y_pred)

In [None]:
# ! define model training functions


# Linear Regression (for consistency with other training)
def train_linreg(config, train_val, target, seed_data, seed_model):
    # ! process dataset
    X_train, y_train, X_val, y_val = process_data(config, train_val, target, seed_data)

    # ! model
    np.random.seed(seed_model)
    linreg = LinearRegression(fit_intercept=config["fit_intercept"])
    linreg.fit(X_train, y_train)

    score_train = predict_score(linreg, X_train, y_train)
    score_val = predict_score(linreg, X_val, y_val)
    session.report({"mse_val": score_val, "mse_train": score_train})


# Define a training function for RandomForest
def train_rf(config, train_val, target, seed_data, seed_model):
    # ! process dataset
    X_train, y_train, X_val, y_val = process_data(config, train_val, target, seed_data)

    # ! model
    # setting seed for scikit library
    np.random.seed(seed_model)
    rf = RandomForestRegressor(
        n_estimators=config["n_estimators"], max_depth=config["max_depth"]
    )
    rf.fit(X_train, y_train)

    score_train = predict_score(rf, X_train, y_train)
    score_val = predict_score(rf, X_val, y_val)

    session.report({"mse_val": score_val, "mse_train": score_train})


# Define a training function for Keras neural network
def train_nn(config, train_val, target, seed_data, seed_model):
    # ! process dataset
    X_train, y_train, X_val, y_val = process_data(config, train_val, target, seed_data)

    # ! model
    # set seeds
    random.seed(seed_model)
    np.random.seed(seed_model)
    tf.random.set_seed(seed_model)
    tf.compat.v1.set_random_seed(seed_model)

    # define neural network
    model = models.Sequential()
    model.add(layers.Input(shape=(X_train.shape[1],)))

    n_layers = config["n_layers"]
    for i in range(n_layers):
        num_hidden = config[f"n_units_l{i}"]
        model.add(layers.Dense(num_hidden, activation="relu"))

    model.add(layers.Dense(1))

    # define learning
    learning_rate = config["learning_rate"]
    optimizer = optimizers.Adam(learning_rate=learning_rate)
    model.compile(optimizer=optimizer, loss="mean_squared_error", metrics=["mse"])

    # todo: reconsider adding early stopping
    # early_stopping = callbacks.EarlyStopping(patience=10, restore_best_weights=True)
    mlflow.tensorflow.autolog()

    # Add TuneReportCallback to report metrics for each epoch
    report_callback = TuneReportCallback(
        {"mse_val": "val_mse", "mse_train": "mse"}, on="epoch_end"
    )

    model.fit(
        X_train,
        y_train,
        validation_data=(X_val, y_val),
        epochs=100,
        batch_size=config["batch_size"],
        callbacks=[report_callback],
        verbose=0,
    )

    # score_train = predict_score(model, X_train, y_train)
    # score_val = predict_score(model, X_val, y_val)

    # session.report({"mse_val": score_val, "mse_train": score_train})

In [None]:
# ! define hyperparameter search spaces
# ! data engineering
data_eng_space = {
    # adjust to tune.grid_search checks all options:
    # new nb_trials= num_trials * nb of options in data_transform
    "data_transform": tune.grid_search([None, "clr", "ilr", "alr"]),
    # todo: remove manual setting of max number of features (19 now)
    "alr_denom_idx": tune.randint(0, 19),
}

# ! adding models
# linear regression
linreg_space = {
    **data_eng_space,
    "fit_intercept": tune.choice([True]),
}

# scikit random forest
rf_space = {
    **data_eng_space,
    "n_estimators": tune.randint(100, 1000),
    "max_depth": tune.randint(2, 32),
    "min_samples_split": tune.choice([0.0001, 0.001, 0.01, 0.1]),
    "min_samples_leaf": tune.choice([0.00001, 0.0001, 0.001]),
    "max_features": tune.choice([None, "sqrt", "log2", 0.1, 0.2, 0.5, 0.8]),
    "min_impurity_decrease": tune.choice([0.0001, 0.001, 0.01]),
    "bootstrap": tune.choice([True, False]),
}

# keras neural network
nn_space = {
    **data_eng_space,
    # Sample random uniformly between [1,9] rounding to multiples of 3
    "n_layers": tune.qrandint(1, 9, 3),
    "learning_rate": tune.loguniform(1e-5, 1e-1),
    "batch_size": tune.choice([32, 64, 128]),
}
for i in range(9):
    nn_space[f"n_units_l{i}"] = tune.randint(3, 64)

In [None]:
def run_trials(
    mlflow_tracking_uri,  # MLflow with MLflowLoggerCallback
    exp_name,
    trainable,
    search_space,
    train_val,
    target,
    seed_data,
    seed_model,
    fully_reproducible=False,  # if true hyperband instead of ASHA scheduler is used
    num_trials=2,  # todo: increase default num_trials
    scheduler_grace_period=5,
    scheduler_max_t=100,
    resources={"cpu": 1},
):
    # set seed for search algorithms/schedulers
    random.seed(seed_model)
    np.random.seed(seed_model)
    tf.random.set_seed(seed_model)

    if not fully_reproducible:
        # AsyncHyperBand enables aggressive early stopping of bad trials.
        # ! efficient & fast BUT
        # ! not fully reproducible with seeds (caused by system load, network
        # ! communication and other factors in env) due to asynchronous mode only
        scheduler = AsyncHyperBandScheduler(
            # stop trials at least this old in time (measured in training iteration)
            grace_period=scheduler_grace_period,
            # stopping trials after max_t iterations have passed
            max_t=scheduler_max_t,
        )
    else:
        # ! slower BUT
        # ! improves the reproducibility of experiments by ensuring that all trials
        # ! are evaluated in the same order.
        scheduler = HyperBandScheduler(max_t=scheduler_max_t)

    analysis_rf = tune.Tuner(
        # trainable with input parameters passed and set resources
        tune.with_resources(
            tune.with_parameters(
                trainable,
                train_val=train_val,
                target=target,
                seed_data=seed_data,
                seed_model=seed_model,
            ),
            resources,
        ),
        # mlflow
        run_config=air.RunConfig(
            # experiment name: with subfolders with trials within
            name="mlflow",
            callbacks=[
                MLflowLoggerCallback(
                    tracking_uri=mlflow_tracking_uri,
                    experiment_name=exp_name,
                    save_artifact=True,
                )
            ],
        ),
        # hyperparameter space
        param_space=search_space,
        tune_config=tune.TuneConfig(
            # todo: consider taking RMSE loss
            metric="mse_val",
            mode="min",
            # define the scheduler
            scheduler=scheduler,
            # number of trials to run
            num_samples=num_trials,
            # ! set seed
            search_alg=tune.search.BasicVariantGenerator(),
        ),
    )
    return analysis_rf.fit()

In [None]:
mlflow_tracking_uri = "mlruns"

In [None]:
results_rf = run_trials(
    mlflow_tracking_uri,
    "rf",
    train_rf,
    rf_space,
    train_val,
    target,
    seed_data,
    seed_model,
    fully_reproducible=False,
)
print("Best hyperparameters found were: ", results_rf.get_best_result().config)

In [None]:
# first test
results_nn = run_trials(
    mlflow_tracking_uri,
    "nn",
    train_nn,
    nn_space,
    train_val,
    target,
    seed_data,
    seed_model,
    fully_reproducible=False,
)
print("Best hyperparameters found were: ", results_nn.get_best_result().config)

In [None]:
# second test
results_nn = run_trials(
    mlflow_tracking_uri,
    "nn",
    train_nn,
    nn_space,
    train_val,
    target,
    seed_data,
    seed_model,
    fully_reproducible=False,
)
print("Best hyperparameters found were: ", results_nn.get_best_result().config)

In [None]:
results_linreg = run_trials(
    mlflow_tracking_uri,
    "linreg",
    train_linreg,
    linreg_space,
    train_val,
    target,
    seed_data,
    seed_model,
    fully_reproducible=False,
)
print("Best hyperparameters found were: ", results_linreg.get_best_result().config)