# XGBoost.Dask in many threads

Sometimes we want to train many large XGBoost models in parallel.  We do so in this example with ...

1.  The `xgboost.dask` project to do large training runs
2.  Optuna to do hyper-parameter-optimization
3.  A thread pool, to run many of these in parallel
4.  Coiled to launch Dask clusters (but you could swap in your favorite Dask deployment technology as you like)

Using `xgboost.dask` from many threads tooks a couple of small tweaks across projects.  This notebook resulted in the following PRs and issues:

-  https://github.com/dask/distributed/issues/7377
-  https://github.com/dask/dask/pull/9723
-  https://github.com/dask/distributed/pull/7369
-  https://github.com/dmlc/xgboost/pull/8558 (mostly cosmetic, not necessary)
-  Also something in Coiled to allow package_sync to be thread-safe, should be released by 2022-12-07

In [1]:
import datetime
import threading
from concurrent.futures import ThreadPoolExecutor

from distributed import Client
import dask.dataframe as dd
from coiled import Cluster
import coiled

import optuna
from sklearn.metrics import mean_squared_error
from dask_ml.metrics import mean_squared_error as lazy_mse
import xgboost as xgb
from xgboost.dask import DaskDMatrix

from dask_ml.datasets import make_classification_df
from dask_ml.model_selection import train_test_split, KFold
from dask_ml.preprocessing import OneHotEncoder
import dask.array as da


In [2]:
import dask, coiled
print("coiled:", coiled.__version__)
print("dask:", dask.__version__)
print("dask.distributed:", dask.distributed.__version__)
print("optuna:", optuna.__version__)
print("xgboost:", xgb.__version__)
print("coiled:", coiled.__version__)

coiled: 0.2.58
dask: 2022.12.0+13.g0d8e12be
dask.distributed: 2022.12.0+17.gf8302593
optuna: 3.1.0.dev
xgboost: 1.7.2
coiled: 0.2.58


### Load data

In [3]:
import dask.dataframe as dd
from s3fs import S3FileSystem
import dask_optuna

# storage = dask_optuna.DaskStorage()

def load_data():
    start = datetime.datetime.now()
    print("loading data")
    to_exclude=["string", "category", "object"]
    ddf= dd.read_parquet("s3://prefect-dask-examples/nyc-uber-lyft/processed_files.parquet").select_dtypes(exclude=to_exclude)
    ddf = ddf.drop(columns=["base_passenger_fare", "sales_tax", "bcf", "congestion_surcharge", "tips", "driver_pay", "dropoff_datetime"])
    ddf = ddf.assign(accessible_vehicle = 1)
    print("Make accessible feature")
    ddf.accessible_vehicle = ddf.accessible_vehicle.where(ddf.on_scene_datetime.isnull(),0)  # Only applies if the vehicle is wheelchair accessible
    ddf = ddf.assign(request_dow = ddf.request_datetime.dt.dayofweek)
    ddf = ddf.assign(pickup_datetime_dow = ddf.pickup_datetime.dt.dayofweek)
    ddf = ddf.assign(request_hour = ddf.request_datetime.dt.hour)
    ddf = ddf.assign(pickup_datetime_hour = ddf.pickup_datetime.dt.hour)
    ddf = ddf.drop(columns=['on_scene_datetime', 'request_datetime', 'pickup_datetime', "PULocationID", "DOLocationID"])

    ddf = ddf.dropna(how="any")
    ddf = ddf.repartition(partition_size="128MB")
    ddf = ddf.reset_index(drop=True)

    categories = ["request_dow", "request_hour", "pickup_datetime_hour", "pickup_datetime_dow"]
    for cat in categories:
        ddf[cat] = ddf[cat].astype('category')

    # Ideally we would categorize the data here, but splitting
    # causes us to lose that information, so its a wasted operation

    print(f"Completed data preprocessing in {datetime.datetime.now() - start} with {len(ddf.index)} rows")
    return ddf

In [4]:
def _make_cv(df, num_folds):
    frac = [1 / num_folds]*num_folds
    splits = df.random_split(frac)
    for i in range(num_folds):
        train = [splits[j] for j in range(num_folds) if j != i]
        test = splits[i]
        yield train, test


In [5]:
def train_model(trial_number, study_params, n_splits=5, cluster_name = None, teardown_cluster=False):
    if cluster_name is None:
        thread_id = threading.get_ident()
        cluster_name = "xgb-nyc-taxi-" + str(thread_id)

    cluster = coiled.Cluster(
        worker_vm_types=["m6i.4xlarge"],
        scheduler_vm_types=["m6i.2xlarge"],
        package_sync=True, # copy local packages,
        name=cluster_name,
        shutdown_on_close=False,  # reuse cluster across runs
        show_widget=False,
        n_workers=6,
        use_best_zone=True,
        account="dask-engineering",
        backend_options={"region": "us-east-2", "spot": True, "spot_on_demand_fallback": True}
        )

    print("starting run")
    with Client(cluster) as client:
        print(client.id)
        with client.as_current():

            # Load and pre-process the DataFrame
            ddf = load_data()
            ddf = ddf.persist()

            val_scores = 0
            categorical_vars = ['request_dow', 'pickup_datetime_dow', 'request_hour', 'pickup_datetime_hour']
            for i, (train, test) in enumerate(_make_cv(ddf, n_splits)):
                print(f"Starting training run {i}")
                start = datetime.datetime.now()
                train = dd.concat(train)


                # Doing CV forces df[col].cat.known == False
                # This is risky and needs a fix 
                train = train.categorize(columns=categorical_vars)
                test = test.categorize(columns=categorical_vars)
                y_train = train['trip_time'].to_frame()
                y_train = y_train.astype(float).persist()
                X_train = train.drop(columns=['trip_time']).persist()

                # Make the training data
                y_test = test['trip_time'].to_frame().persist()
                y_test = y_test.astype(float).persist()
                X_test = test.drop(columns='trip_time').persist()


                print("Make dtrain")
                dtrain = DaskDMatrix(client, X_train, y_train, enable_categorical=True)

                print("Make dtest")
                dtest = DaskDMatrix(client, X_test, y_test, enable_categorical=True)

                print("Training model")

                model = xgb.dask.train(
                    client,
                    {
                        'verbosity': 1,
                        'tree_method': 'hist', 
                        "objective": "reg:squarederror",
                        **study_params
                    },
                    dtrain,
                    num_boost_round=4,
                    evals=[(dtrain, "train")],
                )

                print("Make predictions")
                # It's faster to run the prediction directly on X_test DataFrame
                # We also need to confirm that predictions on dtest when it
                # contains categoricals performs as expected
                predictions = xgb.dask.predict(client, model, X_test)

                print("Score the model")
                score = lazy_mse(y_test.astype("float32").to_dask_array(lengths=True).reshape(-1,), 
                                 predictions.to_dask_array(lengths=True)
                                )
                print(score)
                val_scores += score
                print(f"Complete runtime:  {datetime.datetime.now() - start}")

        # if teardown_cluster is True:
        #     cluster.close()
        return val_scores / n_splits


In [6]:
train_options = dict(
    n_splits = 5 
)

In [7]:
def objective(trial):
    params = {
        'n_estimators': trial.suggest_int('n_estimators', 5, 50),
        'learning_rate': trial.suggest_float('learning_rate', 0.1, 0.9),
        'subsample': trial.suggest_float('subsample', 0.1, 0.9),
        'max_depth': trial.suggest_int('max_depth', 1, 6),
        'colsample_bytree': trial.suggest_float('colsample_bytree', 0.1, 0.9),
        'min_child_weight': trial.suggest_int('min_child_weight', 1, 9),
    }
    mse = train_model(
        trial_number=trial.number,
        study_params=params, 
        n_splits=train_options["n_splits"],
    ) 
    return mse

In [8]:

# create a single study
study = optuna.create_study()

executor = ThreadPoolExecutor(4)

futures = [
    executor.submit(study.optimize, objective, n_trials=3) for _ in range(20)
]

[32m[I 2022-12-23 10:07:56,945][0m A new study created in memory with name: no-name-194efc27-5b22-4598-a59c-0b13804d5c54[0m


In [12]:
futures

[<Future at 0x107e91990 state=finished returned NoneType>,
 <Future at 0x17be13ca0 state=finished returned NoneType>,
 <Future at 0x17be13760 state=finished returned NoneType>,
 <Future at 0x17be10850 state=finished returned NoneType>,
 <Future at 0x107e6f580 state=finished returned NoneType>,
 <Future at 0x17bd137f0 state=finished returned NoneType>,
 <Future at 0x17bd13a30 state=finished returned NoneType>,
 <Future at 0x17bd117e0 state=finished returned NoneType>,
 <Future at 0x17bd12ad0 state=finished returned NoneType>,
 <Future at 0x17bd12a70 state=finished returned NoneType>,
 <Future at 0x17bd129e0 state=finished returned NoneType>,
 <Future at 0x17bd118d0 state=finished returned NoneType>,
 <Future at 0x17bd13820 state=finished returned NoneType>,
 <Future at 0x17bd12200 state=finished returned NoneType>,
 <Future at 0x17bd12140 state=finished returned NoneType>,
 <Future at 0x17bd11b70 state=finished returned NoneType>,
 <Future at 0x17bd12ce0 state=finished returned NoneType

Traceback (most recent call last):
  File "/Users/greghayes/mambaforge/envs/xgboost_test/lib/python3.10/site-packages/distributed/comm/tcp.py", line 498, in connect
    stream = await self.client.connect(
  File "/Users/greghayes/mambaforge/envs/xgboost_test/lib/python3.10/site-packages/tornado/tcpclient.py", line 275, in connect
    af, addr, stream = await connector.start(connect_timeout=timeout)
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/greghayes/mambaforge/envs/xgboost_test/lib/python3.10/asyncio/tasks.py", line 456, in wait_for
    return fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/greghayes/mambaforge/envs/xgboost_test/lib/python3.10/site-packages/distributed/comm/core.py", line 291, in connect
    comm = await asyncio.wait_for(
  File "/Users/greg

## RUN IN NOTEBOOK

In [None]:
test_params = {
    'n_estimators': 5,
    'learning_rate': 0.99,
    'subsample':  0.9,
    'max_depth': 10,
    'colsample_bytree': 0.9,
    'min_child_weight': 9,
}

In [None]:
results = train_model(trial_number = 1, study_params = test_params, cluster_name="xgb-nyc-taxi-10888441856")

In [None]:
results

In [None]:
## Ignore Below

def cv_estimate(trial_number, clf_params, n_splits=5):
    thread_id = threading.get_ident()
    with coiled.Cluster(
        worker_vm_types=["m6i.4xlarge"],
        scheduler_vm_types=["m6i.2xlarge"],
        package_sync=True, # copy local packages
        # name="xgb-nyc-taxi-" + str(thread_id),
        name="xgb-nyc-taxi-6291025920",
        shutdown_on_close=False,  # reuse cluster across runs
        show_widget=False,
        n_workers=6,
        # worker_memory="16 GiB",
        use_best_zone=True,
        account="dask-engineering",
        backend_options={"region": "us-east-2", "spot": True, "spot_on_demand_fallback": True}
    ) as cluster:
        with Client(cluster) as client:
            print(client.id)
            with client.as_current():
                print(client.id)
                print(f"starting run in thread:  {thread_id}.  Scheduler at:  {client.dashboard_link}")

                ddf = load_data()
                print(f"Num tasks:  {len(ddf.dask.get_all_dependencies())}")
                ddf = ddf.persist()

                val_scores = 0
                categorical_vars = ['request_dow', 'pickup_datetime_dow', 'request_hour', 'pickup_datetime_hour']
                for i, (train, test) in enumerate(_make_cv(ddf, n_splits)):
                    print(f"Starting training run {i}")
                    start = datetime.datetime.now()
                    train = dd.concat(train)
                    
                    # Doing CV forces df[col].cat.known == False
                    # This is risky and needs a fix 
                    train = train.categorize(columns=categorical_vars)
                    test = test.categorize(columns=categorical_vars)
                    
                    # Make the training data
                    y_train = train['trip_time'].to_frame()
                    y_train = y_train.persist()
                    X_train = train.drop(columns=['trip_time']).persist()
                   
                    # Make the test data
                    y_test = test['trip_time'].to_frame()
                    y_test = y_test.persist()
                    X_test = test.drop(columns='trip_time').persist()
                    
                    dtrain = DaskDMatrix(client, X_train, y_train, enable_categorical=True)
                    print("created dtrain")
                    dtest = DaskDMatrix(client, X_test, y_test, enable_categorical=True)
                    params = {
                        'n_estimators': 5,
                        'learning_rate': 0.99,
                        'subsample':  0.9,
                        'max_depth': 10,
                        'colsample_bytree': 0.9,
                        'min_child_weight': 9,
                    }
                    print("training model")
                    model = xgb.dask.train(
                        client,
                        {
                            'verbosity': 1,
                            'tree_method': 'hist', 
                            "objective": "reg:squarederror",
                            "nthreads": 8,
                            **params
                        },
                        dtrain,
                        num_boost_round=4, 
                    )
                    print("Finished training model")
                    # predictions = xgb.dask.predict(client, model, X_test)
                    # print("Done making predictions")

                    # actual, predictions = dask.compute(y_test, predictions)
                    # actual = dask.compute(y_test)
                    print("Computed actuals")
                    # predictions = dask.compute(predictions)
                    print("Computed unknowns")
                #     score = mean_squared_error(actual, predictions)
                #     val_scores += score
                #     print(f"Complete runtime:  {datetime.datetime.now() - start}")
                # print(f"Fold {i}, score: {score}")
                    return y_test, X_test, model

    # cluster.close()
    # return val_scores / n_splits
    # return actual, predictions