# Distributed XGBoost on Dask in the cloud

This is the accompanying notebook to the blog post [XGBoost â€“ frictionless training on datasets too big for the memory](https://coiled.io/blog/xgboost-frictionless-training/).

Swap in your dataset, spin up a cluster in 2 minutes and train at any scale!

### Cluster setup 

In [None]:
from dask import dataframe as dd
import coiled

Order the cluster and look at it coming up in your [Coiled dashboard](https://cloud.coiled.io/):

In [None]:
%%time
cluster = coiled.Cluster(n_workers=12, software="blog-notebooks/xgboost-on-coiled")

Connect to the cluster:

In [None]:
from dask.distributed import Client, progress
client = Client(cluster)
client

### Data preprocessing

The dataset needs a little work - we need to prepare categorical columns to a format that is supported by XGBoost.

The columns we'll be working with:

In [5]:
columns = [
    "delinquency_12",
    "interest_rate",
    "loan_age",
    "adj_remaining_months_to_maturity",
    "longest_ever_deliquent",
    "orig_channel",
    "num_borrowers",
    "borrower_credit_score",
    "first_home_buyer",
    "loan_purpose",
    "property_type",
    "num_units",
    "occupancy_status",
    "property_state",
    "zip",
    "mortgage_insurance_percent",
    "coborrow_credit_score",
    "relocation_mortgage_indicator",
]
categorical = [
    "orig_channel",
    "occupancy_status",
    "property_state",
    "first_home_buyer",
    "loan_purpose",
    "property_type",
    "zip",
    "relocation_mortgage_indicator",
    "delinquency_12",
]

Create a column categorizer:

In [6]:
from dask_ml.preprocessing import Categorizer

ce = Categorizer(columns=categorical)

Load the dataset sample:

In [35]:
mortgage_data = dd.read_parquet(
    "s3://coiled-data/mortgage-2000.parq/*", 
    compression="gzip", 
    columns=columns, 
    storage_options={"anon":True}
)

mortgage_data

Apply column categorizer:

In [39]:
mortgage_data = ce.fit_transform(mortgage_data)

mortgage_data.dtypes

delinquency_12                      category
interest_rate                        float64
loan_age                             float64
adj_remaining_months_to_maturity     float64
longest_ever_deliquent                 int32
orig_channel                        category
num_borrowers                        float64
borrower_credit_score                float64
first_home_buyer                    category
loan_purpose                        category
property_type                       category
num_units                              int32
occupancy_status                    category
property_state                      category
zip                                 category
mortgage_insurance_percent           float64
coborrow_credit_score                float64
relocation_mortgage_indicator       category
dtype: object

In [40]:
dependent_vars = mortgage_data.columns.difference("delinquency_12")
X, y = mortgage_data.iloc[:, dependent_vars], mortgage_data["delinquency_12"]

In [42]:
# https://github.com/dmlc/xgboost/blame/9a0399e8981b2279d921fe2312f7ab1b880fd3c3/python-package/xgboost/dask.py#L227
# Dask categorical columns are not yet available

# the commit is already in master, can be expected in release 1.4.0

# Because this is not possible yet, I will cast to ints
y = y.cat.codes
for col in categorical:
    X[col] = X[col].cat.codes

In [40]:
from dask_ml.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, shuffle=True, random_state=2
)

In [41]:
import xgboost as xgb

In [44]:
dtrain = xgb.dask.DaskDMatrix(client, X_train, y_train)    
dtest = xgb.dask.DaskDMatrix(client, X_test, y_test)    

In [46]:
params = {
    "max_depth": 8,
    "max_leaves": 2 ** 8,
    "alpha": 0.9,
    "eta": 0.1,
    "gamma": 0.1,
    "learning_rate": 0.1,
    "subsample": 1,
    "reg_lambda": 1,
    "scale_pos_weight": 2,
    "min_child_weight": 30,
#     "tree_method": "gpu_hist",
    "objective": "binary:logistic",
    "grow_policy": "lossguide",
}

In [None]:
%%time
output = xgb.dask.train(
    client,
    params,
    dtrain,
    num_boost_round=20,
    evals=[(dtrain, 'train'), (dtest, 'test')]
)

In [None]:
booster = output['booster']  # booster is the trained model
history = output['history']  # A dictionary containing evaluation 

In [None]:
booster

In [None]:
history

In [None]:
client.close()