# Training models with XGBoost, Optuna, and Dask



In [None]:
import datetime
import joblib
import uuid

from distributed import Client, wait
import dask.dataframe as dd
import coiled

import dask
import numpy as np
import pandas as pd
import optuna
from dask_ml.metrics import mean_squared_error as lazy_mse
import xgboost as xgb
from xgboost.dask import DaskDMatrix

import dask.dataframe as dd
from xgboost.core import XGBoostError

## Start Cluster

In [None]:
cluster = coiled.Cluster(
    n_workers=10,
    package_sync=True, # copy local packages
    worker_vm_types=["m6i.4xlarge"],
    scheduler_vm_types=["m6i.2xlarge"],
    backend_options={"region": "us-east-2"},
)

In [None]:
client = Client(cluster)
client

## Load Data

1.  Load data from S3
2.  Convert categorical data
3.  Down-cast to `float32` for space-efficiency

In [None]:
ddf = dd.read_parquet(
    "s3://coiled-datasets/prefect-dask/nyc-uber-lyft/feature_table.parquet"
).persist()

categorical_vars = ddf.select_dtypes(include="category").columns.tolist()
ddf = ddf.categorize(columns=categorical_vars)

float_cols = ddf.select_dtypes(include="float").columns.tolist()
ddf[float_cols] = ddf[float_cols].astype(np.float32)  # Under the hood, XGBoost converts floats to `float32`
ddf = ddf.persist()  

## Train Model

In [None]:
def make_cv(df, num_folds):
    frac = [1 / num_folds]*num_folds
    splits = df.random_split(frac, shuffle=True)
    for i in range(num_folds):
        train = [splits[j] for j in range(num_folds) if j != i]
        test = splits[i]
        yield train, test

        
def train_model(trial_number, study_params, n_splits=2):        

    scores = []

    for train, test in make_cv(ddf, n_splits):
        print("Starting run")
        start = datetime.datetime.now()
        train = dd.concat(train)

        try:
            assert all(train[c].cat.known for c in categorical_vars)
            assert all(test[c].cat.known for c in categorical_vars)
        except Exception as e:
            cluster.shutdown()
            raise RuntimeError(f"Categorical_vars are not known")

        y_train = train['trip_time'].to_frame().persist()
        X_train = train.drop(columns=['trip_time']).persist()

        # Make the test data
        y_test = test['trip_time'].to_frame().persist()
        X_test = test.drop(columns='trip_time').persist()

        dtrain = DaskDMatrix(client, X_train, y_train, enable_categorical=True)

        print("Training... ", end="")
        model = xgb.dask.train(
            client,
            {
                'verbosity': 1,
                'tree_method': 'hist', 
                "objective": "reg:squarederror",
                **study_params,
            },
            dtrain,
            num_boost_round=4,
            evals=[(dtrain, "train")],
        )

        print("Predict... ", end="")
        predictions = xgb.dask.predict(client, model, X_test)

        print("Scoring... ")
        score = lazy_mse(
            y_test.to_dask_array(lengths=True).reshape(-1,), 
            predictions.to_dask_array(lengths=True), squared=False,
        )
        wait(score)
        scores.append(score)
        print(f"Scores:  {scores}")
        print(f"Duration:  {datetime.datetime.now() - start} seconds\n")

    return np.mean(scores)

In [None]:
def objective(trial):
    params = {
        'n_estimators': trial.suggest_int('n_estimators', 50, 125),
        'learning_rate': trial.suggest_float('learning_rate', 0.1, 0.9),
        'subsample': trial.suggest_float('subsample', 0.1, 1),
        'max_depth': trial.suggest_int('max_depth', 3, 9),
        'colsample_bytree': trial.suggest_float('colsample_bytree', 0, 1),
        'min_child_weight': trial.suggest_int('min_child_weight', 1, 3),
        'colsample_bynode': trial.suggest_float('colsample_bynode', 0, 1),
        'colsample_bylevel': trial.suggest_float('colsample_bylevel', 0, 1),
        'reg_alpha': trial.suggest_float('reg_alpha', 0, 0.5),
        'reg_lambda': trial.suggest_float('reg_lambda', 0, 1),
    }
    rmse = train_model(
        trial_number=trial.number,
        study_params=params, 
        n_splits=5,
    )
    return rmse

In [None]:
# Create a single study and run some trials

study = optuna.create_study(study_name="nyc-travel-time-model")
study.optimize(objective, n_trials=5)

In [None]:
len(study.trials)

In [None]:
study.best_params

In [None]:
study.best_value

In [None]:
study.best_trial