# Scaling XGBoost with Dask and Coiled
This notebook shows you how to solve the common **MemoryError** issue that is thrown whenever you try to train an XGBoost model that doesn't fit into your memory. 

You'll learn how to leverage **distributed [XGBoost](https://xgboost.readthedocs.io/en/latest/) training** for effective modelling on datasets that exceed the hardware limitations of your local machine.

Specifically, you will learn to write code to:
1. Train a distributed XGBoost model locally on a small dataset using [Dask](https://dask.org/), 
2. Scale your distributed XGBoost model to the cloud using Dask and [Coiled](https://coiled.io/) to train on a larger-than-memory dataset,
3. Speed up your training with Pro tips from the Dask core team.

### About the Dataset
We'll be using a ~20GB subset of the Arcos dataset released by the Washington Post.
You can download the complete dataset [here](https://www.washingtonpost.com/national/2019/07/18/how-download-use-dea-pain-pills-database/).

For more context on the dataset, including descriptions of the columns, check out the 
Washington Post Github repository [here](https://github.com/wpinvestigative/arcos-api/)

Note that the original dataset is stored in .tsv format. This notebook uses a preprocessed version stored in the more efficient Parquet file format.


In [2]:
import warnings
warnings.filterwarnings('ignore')

import logging
logger = logging.getLogger("distributed.utils_perf")
logger.setLevel(logging.ERROR)

## 1. Local Distributed XGBoost Model using Dask

By default, XGBoost trains models sequentially. This is fine for smaller projects, but when the size of your dataset and/or ML model exceeds the limitations of your local machine, you will want to leverage the potential of distributed computing.

Starting from version 1.0, XGBoost comes with a native Dask integration that makes this possible. 

It only requires two changes to your regular XGBoostcode:
1. substitute `dtrain = xgb.DMatrix(X_train, y_train)` with `dtrain = xgb.dask.DaskDMatrix(X_train, y_train)`, and
2. substitute `xgb.train(params, dtrain, ...)` with `xgb.dask.train(client, params, dtrain, ...)`

Let's see this in action with an actual dataset.

### Instantiate Dask Cluster

We'll begin by instantiating a local version of the Dask distributed scheduler, which will orchestrate the distributed processing of our model. Read more about the Dask schedulers [here](https://distributed.dask.org/en/latest/).

In [3]:
from dask.distributed import Client, LocalCluster

# local dask cluster
cluster = LocalCluster(n_workers=4)
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 8,Total memory: 16.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:51496,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 8
Started: Just now,Total memory: 16.00 GiB

0,1
Comm: tcp://127.0.0.1:51516,Total threads: 2
Dashboard: http://127.0.0.1:51517/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:51501,
Local directory: /Users/rpelgrim/Documents/git/coiled-resources/xgboost-with-coiled/dask-worker-space/worker-wbb0tlqv,Local directory: /Users/rpelgrim/Documents/git/coiled-resources/xgboost-with-coiled/dask-worker-space/worker-wbb0tlqv

0,1
Comm: tcp://127.0.0.1:51519,Total threads: 2
Dashboard: http://127.0.0.1:51520/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:51499,
Local directory: /Users/rpelgrim/Documents/git/coiled-resources/xgboost-with-coiled/dask-worker-space/worker-fg5350qk,Local directory: /Users/rpelgrim/Documents/git/coiled-resources/xgboost-with-coiled/dask-worker-space/worker-fg5350qk

0,1
Comm: tcp://127.0.0.1:51522,Total threads: 2
Dashboard: http://127.0.0.1:51523/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:51500,
Local directory: /Users/rpelgrim/Documents/git/coiled-resources/xgboost-with-coiled/dask-worker-space/worker-elhaf0tm,Local directory: /Users/rpelgrim/Documents/git/coiled-resources/xgboost-with-coiled/dask-worker-space/worker-elhaf0tm

0,1
Comm: tcp://127.0.0.1:51513,Total threads: 2
Dashboard: http://127.0.0.1:51514/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:51502,
Local directory: /Users/rpelgrim/Documents/git/coiled-resources/xgboost-with-coiled/dask-worker-space/worker-h_hi49a4,Local directory: /Users/rpelgrim/Documents/git/coiled-resources/xgboost-with-coiled/dask-worker-space/worker-h_hi49a4


### Import the Data
To reduce preprocessing to a minimum, we'll work with a subset of the dataset by only importing selected columns and loading those into a Dask dataframe.

We are able to do this because we've already converted the dataset (originally in .tsv format) into Parquet, which allows for [column pruning](https://coiled.io/blog/parquet-column-pruning-predicate-pushdown/).

In [3]:
# define the columns we want to import
columns = [
    "QUANTITY",
    "CALC_BASE_WT_IN_GM",
    "DOSAGE_UNIT",
]

categorical = [
    "REPORTER_BUS_ACT",
    "REPORTER_CITY",
    "REPORTER_STATE",
    "REPORTER_ZIP",
    "BUYER_BUS_ACT",
    "BUYER_CITY",
    "BUYER_STATE",
    "BUYER_ZIP",
    "DRUG_NAME",
]

In [5]:
import dask.dataframe as dd

# download data from S3
data = dd.read_parquet(
    "s3://coiled-datasets/dea-opioid/arcos_washpost_comp.parquet", 
    compression="lz4",
    storage_options={"anon": True, 'use_ssl': True},
    columns=columns+categorical,
)

In [6]:
data

Unnamed: 0_level_0,QUANTITY,CALC_BASE_WT_IN_GM,DOSAGE_UNIT,REPORTER_BUS_ACT,REPORTER_CITY,REPORTER_STATE,REPORTER_ZIP,BUYER_BUS_ACT,BUYER_CITY,BUYER_STATE,BUYER_ZIP,DRUG_NAME
npartitions=3750,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
,float64,float64,float64,object,object,object,int64,object,object,object,int64,object
,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...


Since we're working locally to begin with, we won't be able to process the entire 20GB dataset.

We'll subset the first 50 partitions.

In [7]:
# select the first 50 partitions
data_local = data.partitions[0:50]

In [8]:
# inspect the first 5 entries
data_local.head()

Unnamed: 0,QUANTITY,CALC_BASE_WT_IN_GM,DOSAGE_UNIT,REPORTER_BUS_ACT,REPORTER_CITY,REPORTER_STATE,REPORTER_ZIP,BUYER_BUS_ACT,BUYER_CITY,BUYER_STATE,BUYER_ZIP,DRUG_NAME
0,1.0,0.6054,100.0,DISTRIBUTOR,BROCKTON,MA,2301,PRACTITIONER,MALDEN,MA,2148,HYDROCODONE
1,4.0,0.12108,40.0,DISTRIBUTOR,PHOENIX,AZ,85006,RETAIL PHARMACY,PHOENIX,AZ,85085,HYDROCODONE
2,40.0,3.6324,1200.0,DISTRIBUTOR,PHOENIX,AZ,85006,PRACTITIONER,GILBERT,AZ,85233,HYDROCODONE
3,20.0,2.7243,600.0,DISTRIBUTOR,PHOENIX,AZ,85006,PRACTITIONER,GILBERT,AZ,85233,HYDROCODONE
4,10.0,0.9081,300.0,DISTRIBUTOR,PHOENIX,AZ,85006,PRACTITIONER,GILBERT,AZ,85233,HYDROCODONE


This is looking good.

### Preprocessing

Before we can start training our XGBoost model, we'll have to conduct some basic preprocessing steps:
1. Deal with any missing values
2. Cast our categorical columns to the correct types (XGBoost only accepts float, integer and boolean dtypes)
3. Create our train and test splits

*Note: we're using the **[dask_ml](https://ml.dask.org/)** library for this, which mimics the familiar scikit-learn API*

In [9]:
# count missing values
data_local.isna().sum()

Dask Series Structure:
npartitions=1
BUYER_BUS_ACT    int64
REPORTER_ZIP       ...
dtype: int64
Dask Name: dataframe-sum-agg, 3901 tasks

There are 3 missing values in the DOSAGE_UNIT column.

We'll use **fillna** to deal with these as Dask does not allow dropping NaNs along rows.

In [10]:
data_local.DOSAGE_UNIT = data_local.DOSAGE_UNIT.fillna(value=0)

Next let's cast our categorical features to the correct dtypes.

E.g. the strings containing names of cities in the REPORTER_CITY column will be replaced with integers.

> *NOTE: to focus on implementing XGBoost in the cloud, we'll use a simple Categorizer here. In practice, you may want to consider one-hot encoding your categorical variables to avoid XGBoost treating these features as ordinal.*

In [11]:
from dask_ml.preprocessing import Categorizer

# cast categorical columns to the correct type
ce = Categorizer(columns=categorical)
data_local = ce.fit_transform(data_local)
for col in categorical:
    data_local[col] = data_local[col].cat.codes

In [12]:
# verify
data_local.head()

Unnamed: 0,QUANTITY,CALC_BASE_WT_IN_GM,DOSAGE_UNIT,REPORTER_BUS_ACT,REPORTER_CITY,REPORTER_STATE,REPORTER_ZIP,BUYER_BUS_ACT,BUYER_CITY,BUYER_STATE,BUYER_ZIP,DRUG_NAME
0,1.0,0.6054,100.0,0,0,0,0,0,0,0,0,0
1,4.0,0.12108,40.0,0,1,1,1,1,1,1,1,0
2,40.0,3.6324,1200.0,0,1,1,1,0,2,1,2,0
3,20.0,2.7243,600.0,0,1,1,1,0,2,1,2,0
4,10.0,0.9081,300.0,0,1,1,1,0,2,1,2,0


The next step is to define our train and test splits. This means we also need to decide on our target and predictor features.

Let's create a model that will **predict the total active weight of the drug in the transaction** ("CALC_BASE_WT_IN_GM") from the remaining features in our dataset.

We'll begin by rearranging the dataframe so that the target feature is located in the last column.

In [13]:
# rearrange columns
cols = data_local.columns.to_list()
cols_new = [cols[0]] + cols[2:] + [cols[1]]
data_local = data_local[cols_new]

In [14]:
from dask_ml.model_selection import train_test_split

# Create the train-test split
X, y = data_local.iloc[:, :-1], data_local["CALC_BASE_WT_IN_GM"]
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.3, shuffle=True, random_state=21
)

### Train XGBoost Model

Now we're all set to start training our XGBoost model.

First, we'll create the XGBoost DMatrix and set the model parameters. We'll use the default parameters for this example.

For more information on training XGBoost models and setting model parameter, have a look at the [XGBoost documentation](https://xgboost.readthedocs.io/en/latest/get_started.html).

In [15]:
import xgboost as xgb

In [16]:
# Create the XGBoost DMatrix for our training and testing splits
dtrain = xgb.dask.DaskDMatrix(client, X_train, y_train)
dtest = xgb.dask.DaskDMatrix(client, X_test, y_test)

# Set model parameters (XGBoost defaults)
params = {
    "max_depth": 6,
    "gamma": 0,
    "eta": 0.3,
    "min_child_weight": 30,
    "objective": "reg:squarederror",
    "grow_policy": "depthwise"
}

Then let's go ahead and train the model.

In [17]:
%%time 
# train the model
output = xgb.dask.train(
    client, params, dtrain, num_boost_round=5,
    evals=[(dtrain, 'train')]
)

  from pandas import MultiIndex, Int64Index
  from pandas import MultiIndex, Int64Index
  from pandas import MultiIndex, Int64Index
  from pandas import MultiIndex, Int64Index
[11:33:21] task [xgboost.dask]:tcp://127.0.0.1:51513 got new rank 0
[11:33:21] task [xgboost.dask]:tcp://127.0.0.1:51516 got new rank 1
[11:33:21] task [xgboost.dask]:tcp://127.0.0.1:51522 got new rank 2
[11:33:21] task [xgboost.dask]:tcp://127.0.0.1:51519 got new rank 3
  elif isinstance(data.columns, (pd.Int64Index, pd.RangeIndex)):
  elif isinstance(data.columns, (pd.Int64Index, pd.RangeIndex)):
  elif isinstance(data.columns, (pd.Int64Index, pd.RangeIndex)):
  elif isinstance(data.columns, (pd.Int64Index, pd.RangeIndex)):


[0]	train-rmse:11.44700
[1]	train-rmse:10.76580
[2]	train-rmse:10.32878
[3]	train-rmse:10.04225
[4]	train-rmse:9.79066
CPU times: user 80.4 ms, sys: 25 ms, total: 105 ms
Wall time: 2.02 s


And use our trained model together with our testing split to make predictions.

In [18]:
# make predictions
y_pred = xgb.dask.predict(client, output, dtest)

And finally, let's evaluate our results by getting the accuracy score.

In [19]:
from sklearn.metrics import mean_absolute_error

In [20]:
mae = mean_absolute_error(y_test, y_pred)
print(f"Mean Absolute Error: {mae}")

Mean Absolute Error: 1.5837309113674591


### Try Locally with Entire Dataset... if you dare...

Unless you're running this on a supercomputer, uncommenting and running the cell below will likely not complete.

But don't just take our word for it, of course ;)

In [20]:
# # fill NaN values
# data.BUYER_CITY = data.BUYER_CITY.fillna(value="Unknown")
# data.DOSAGE_UNIT = data.DOSAGE_UNIT.fillna(value=0)

# # instantiate categorizer
# ce = Categorizer(columns=categorical)

# # fit categorizer and transform data
# data = ce.fit_transform(data)

# # replace values in categorical columns with their numerical codes
# for col in categorical:
#     data[col] = data[col].cat.codes

# # rearrange columns
# cols = data.columns.to_list()
# cols_new = [cols[0]] + cols[2:] + [cols[1]]
# data = data[cols_new]

# # Create the train-test split
# X, y = data.iloc[:, :-1], data["CALC_BASE_WT_IN_GM"]
# X_train, X_test, y_train, y_test = train_test_split(
#     X, y, test_size=0.3, shuffle=True, random_state=2
# )

# # Create DaskDMatrices
# dtrain = xgb.dask.DaskDMatrix(client, X_train, y_train)
# dtest = xgb.dask.DaskDMatrix(client, X_test, y_test)

```MemoryError
distributed.batched - ERROR - Error in batched write
```
```
MemoryError
```
```
distributed.worker - WARNING - Worker is at 80% memory usage. Pausing worker.  Process memory: 1.49 GiB -- Worker memory limit: 1.86 GiB
```

## 2. Distributed XGBoost in the Cloud using Dask and Coiled

Let's now expand this workflow to process the entire dataset (~20 GB). 

We'll the same code as above except for **2 changes**:
1. We'll connect Dask to a Coiled cluster in the cloud, instead of to our local CPU cores,
2. We'll work with the entire 20GB dataset, instead of the first 50 partitions.

In the section below we've copied and pasted the cells from above so that you can run this notebook from top to bottom in one go. Alternatively, you could run the cell below (where we instantiate the Coiled Cluster) and then simply re-run the cells above -- making sure to adjust the cell that downloads the data as well, of course.

### Instantiate Coiled Cluster
Let's create our Coiled cluster in the cloud. 

We'll specify a cluster of 50 workers, with 4 CPU cores and 16GB of RAM each. That will allow the entire dataset to fit into the cluster's memory comfortably and should make for quick training.

> *Note: if you're running this using the Coiled Free Tier, you'll want to reduce your **n_workers** to 25 to stay within the Total Core limit.*

In [None]:
import coiled

coiled.create_software_environment(
    account="coiled-examples",
    name="xgboost-coiled",
    conda="environment.yml",
)

Creating new software environment
Creating new ecr build
STEP 1: FROM coiled/default:sha-6b4e896
STEP 2: COPY environment.yml environment.yml
--> 96b0c6407f5
STEP 3: RUN conda env update -n coiled -f environment.yml     && rm environment.yml     && conda clean --all -y     && echo "conda activate coiled" >> ~/.bashrc
Collecting package metadata (repodata.json): ...working... done
Solving environment: ...working... done

Downloading and Extracting Packages
parso-0.8.3          | 69 KB     | ########## | 100% 
websocket-client-1.3 | 41 KB     | ########## | 100% 
botocore-1.24.21     | 5.3 MB    | ########## | 100% 
send2trash-1.8.0     | 17 KB     | ########## | 100% 
lcms2-2.12           | 443 KB    | ########## | 100% 
xorg-renderproto-0.1 | 9 KB      | ########## | 100% 
soupsieve-2.3.1      | 33 KB     | ########## | 100% 
sqlite-3.39.0        | 1.5 MB    | ########## | 100% 
atk-1.0-2.36.0       | 560 KB    | ########## | 100% 
packaging-21.3       | 36 KB     | ########## | 100% 


In [1]:
import coiled

cluster = coiled.Cluster(
    name="xgboost-2",
    software="coiled-examples/xgboost-coiled",
    n_workers=25,
    worker_memory='16Gib',
    shutdown_on_close=False,
)

Output()

In [2]:
from distributed import Client

client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: coiled.ClusterBeta
Dashboard: http://3.236.39.79:8787,

0,1
Dashboard: http://3.236.39.79:8787,Workers: 25
Total threads: 100,Total memory: 387.24 GiB

0,1
Comm: tls://10.4.1.175:8786,Workers: 25
Dashboard: http://10.4.1.175:8787/status,Total threads: 100
Started: 5 minutes ago,Total memory: 387.24 GiB

0,1
Comm: tls://10.4.15.211:34649,Total threads: 4
Dashboard: http://10.4.15.211:45011/status,Memory: 15.49 GiB
Nanny: tls://10.4.15.211:40395,
Local directory: /scratch/dask-worker-space/worker-_gs8tq1h,Local directory: /scratch/dask-worker-space/worker-_gs8tq1h

0,1
Comm: tls://10.4.12.81:39427,Total threads: 4
Dashboard: http://10.4.12.81:41343/status,Memory: 15.49 GiB
Nanny: tls://10.4.12.81:38999,
Local directory: /scratch/dask-worker-space/worker-2dejhl2l,Local directory: /scratch/dask-worker-space/worker-2dejhl2l

0,1
Comm: tls://10.4.10.70:34181,Total threads: 4
Dashboard: http://10.4.10.70:43055/status,Memory: 15.49 GiB
Nanny: tls://10.4.10.70:36889,
Local directory: /scratch/dask-worker-space/worker-i1wxmk_8,Local directory: /scratch/dask-worker-space/worker-i1wxmk_8

0,1
Comm: tls://10.4.9.255:44933,Total threads: 4
Dashboard: http://10.4.9.255:37311/status,Memory: 15.49 GiB
Nanny: tls://10.4.9.255:35105,
Local directory: /scratch/dask-worker-space/worker-zivkr1dc,Local directory: /scratch/dask-worker-space/worker-zivkr1dc

0,1
Comm: tls://10.4.2.177:40775,Total threads: 4
Dashboard: http://10.4.2.177:32927/status,Memory: 15.49 GiB
Nanny: tls://10.4.2.177:45497,
Local directory: /scratch/dask-worker-space/worker-yn017_zh,Local directory: /scratch/dask-worker-space/worker-yn017_zh

0,1
Comm: tls://10.4.2.153:40451,Total threads: 4
Dashboard: http://10.4.2.153:46823/status,Memory: 15.49 GiB
Nanny: tls://10.4.2.153:42865,
Local directory: /scratch/dask-worker-space/worker-33c6n2tw,Local directory: /scratch/dask-worker-space/worker-33c6n2tw

0,1
Comm: tls://10.4.13.58:45259,Total threads: 4
Dashboard: http://10.4.13.58:35433/status,Memory: 15.49 GiB
Nanny: tls://10.4.13.58:33905,
Local directory: /scratch/dask-worker-space/worker-up5bb4ks,Local directory: /scratch/dask-worker-space/worker-up5bb4ks

0,1
Comm: tls://10.4.11.13:33675,Total threads: 4
Dashboard: http://10.4.11.13:39379/status,Memory: 15.49 GiB
Nanny: tls://10.4.11.13:32949,
Local directory: /scratch/dask-worker-space/worker-b0n1wpit,Local directory: /scratch/dask-worker-space/worker-b0n1wpit

0,1
Comm: tls://10.4.13.224:38309,Total threads: 4
Dashboard: http://10.4.13.224:42131/status,Memory: 15.49 GiB
Nanny: tls://10.4.13.224:39339,
Local directory: /scratch/dask-worker-space/worker-fbwt78o9,Local directory: /scratch/dask-worker-space/worker-fbwt78o9

0,1
Comm: tls://10.4.4.190:40093,Total threads: 4
Dashboard: http://10.4.4.190:42067/status,Memory: 15.49 GiB
Nanny: tls://10.4.4.190:39041,
Local directory: /scratch/dask-worker-space/worker-0ah730i_,Local directory: /scratch/dask-worker-space/worker-0ah730i_

0,1
Comm: tls://10.4.6.214:35335,Total threads: 4
Dashboard: http://10.4.6.214:39571/status,Memory: 15.49 GiB
Nanny: tls://10.4.6.214:46505,
Local directory: /scratch/dask-worker-space/worker-86k_sevq,Local directory: /scratch/dask-worker-space/worker-86k_sevq

0,1
Comm: tls://10.4.0.32:35169,Total threads: 4
Dashboard: http://10.4.0.32:32831/status,Memory: 15.49 GiB
Nanny: tls://10.4.0.32:37187,
Local directory: /scratch/dask-worker-space/worker-so0fw20y,Local directory: /scratch/dask-worker-space/worker-so0fw20y

0,1
Comm: tls://10.4.12.56:38263,Total threads: 4
Dashboard: http://10.4.12.56:46529/status,Memory: 15.49 GiB
Nanny: tls://10.4.12.56:39445,
Local directory: /scratch/dask-worker-space/worker-b60rhup5,Local directory: /scratch/dask-worker-space/worker-b60rhup5

0,1
Comm: tls://10.4.0.15:42405,Total threads: 4
Dashboard: http://10.4.0.15:33811/status,Memory: 15.49 GiB
Nanny: tls://10.4.0.15:33243,
Local directory: /scratch/dask-worker-space/worker-3w7ekg5n,Local directory: /scratch/dask-worker-space/worker-3w7ekg5n

0,1
Comm: tls://10.4.14.224:41967,Total threads: 4
Dashboard: http://10.4.14.224:37771/status,Memory: 15.49 GiB
Nanny: tls://10.4.14.224:39465,
Local directory: /scratch/dask-worker-space/worker-1z1geus7,Local directory: /scratch/dask-worker-space/worker-1z1geus7

0,1
Comm: tls://10.4.11.181:38721,Total threads: 4
Dashboard: http://10.4.11.181:41529/status,Memory: 15.49 GiB
Nanny: tls://10.4.11.181:41113,
Local directory: /scratch/dask-worker-space/worker-2kxhfb6r,Local directory: /scratch/dask-worker-space/worker-2kxhfb6r

0,1
Comm: tls://10.4.5.6:39965,Total threads: 4
Dashboard: http://10.4.5.6:39129/status,Memory: 15.49 GiB
Nanny: tls://10.4.5.6:43693,
Local directory: /scratch/dask-worker-space/worker-k3ggga1o,Local directory: /scratch/dask-worker-space/worker-k3ggga1o

0,1
Comm: tls://10.4.15.107:35923,Total threads: 4
Dashboard: http://10.4.15.107:36037/status,Memory: 15.49 GiB
Nanny: tls://10.4.15.107:32809,
Local directory: /scratch/dask-worker-space/worker-xki6mc5f,Local directory: /scratch/dask-worker-space/worker-xki6mc5f

0,1
Comm: tls://10.4.5.124:36639,Total threads: 4
Dashboard: http://10.4.5.124:38503/status,Memory: 15.49 GiB
Nanny: tls://10.4.5.124:35113,
Local directory: /scratch/dask-worker-space/worker-eu1ayutw,Local directory: /scratch/dask-worker-space/worker-eu1ayutw

0,1
Comm: tls://10.4.14.117:37317,Total threads: 4
Dashboard: http://10.4.14.117:32877/status,Memory: 15.49 GiB
Nanny: tls://10.4.14.117:36005,
Local directory: /scratch/dask-worker-space/worker-ks60ft1r,Local directory: /scratch/dask-worker-space/worker-ks60ft1r

0,1
Comm: tls://10.4.0.19:42621,Total threads: 4
Dashboard: http://10.4.0.19:46297/status,Memory: 15.49 GiB
Nanny: tls://10.4.0.19:44905,
Local directory: /scratch/dask-worker-space/worker-nniojyca,Local directory: /scratch/dask-worker-space/worker-nniojyca

0,1
Comm: tls://10.4.15.64:40187,Total threads: 4
Dashboard: http://10.4.15.64:46127/status,Memory: 15.49 GiB
Nanny: tls://10.4.15.64:34019,
Local directory: /scratch/dask-worker-space/worker-t0kr88nc,Local directory: /scratch/dask-worker-space/worker-t0kr88nc

0,1
Comm: tls://10.4.15.133:38051,Total threads: 4
Dashboard: http://10.4.15.133:37237/status,Memory: 15.49 GiB
Nanny: tls://10.4.15.133:43039,
Local directory: /scratch/dask-worker-space/worker-oucdgkk8,Local directory: /scratch/dask-worker-space/worker-oucdgkk8

0,1
Comm: tls://10.4.0.61:41293,Total threads: 4
Dashboard: http://10.4.0.61:37021/status,Memory: 15.49 GiB
Nanny: tls://10.4.0.61:36453,
Local directory: /scratch/dask-worker-space/worker-_ovoy0g4,Local directory: /scratch/dask-worker-space/worker-_ovoy0g4

0,1
Comm: tls://10.4.14.252:43233,Total threads: 4
Dashboard: http://10.4.14.252:38911/status,Memory: 15.49 GiB
Nanny: tls://10.4.14.252:44437,
Local directory: /scratch/dask-worker-space/worker-pzg120g4,Local directory: /scratch/dask-worker-space/worker-pzg120g4


### Inspecting Entire Dataset

Let's load the entire dataset into our Dask dataframe **data**.

As you can see below, it consists of 3750 partitions.

In [4]:
import dask.dataframe as dd

In [5]:
# download data from S3
data = dd.read_parquet(
    "s3://coiled-datasets/dea-opioid/arcos_washpost_comp.parquet", 
    compression="lz4",
    storage_options={"anon": True},
    columns=columns+categorical,
)

data

Unnamed: 0_level_0,QUANTITY,CALC_BASE_WT_IN_GM,DOSAGE_UNIT,REPORTER_BUS_ACT,REPORTER_CITY,REPORTER_STATE,REPORTER_ZIP,BUYER_BUS_ACT,BUYER_CITY,BUYER_STATE,BUYER_ZIP,DRUG_NAME
npartitions=3750,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
,float64,float64,float64,object,object,object,int64,object,object,object,int64,object
,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...


In [None]:
data.shape[0].compute()

In [6]:
data.head()

ImportError: /opt/conda/envs/coiled/lib/python3.9/site-packages/snappy/../../.././libstdc++.so.6: version `GLIBCXX_3.4.30' not found (required by /opt/conda/envs/coiled/lib/python3.9/site-packages/pyarrow/../../../libarrow.so.800)

2022-07-05 15:29:53,806 - distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client
Traceback (most recent call last):
  File "/Users/rpelgrim/mambaforge/envs/xgboost-coiled/lib/python3.9/site-packages/distributed/comm/tcp.py", line 439, in connect
    stream = await self.client.connect(
  File "/Users/rpelgrim/mambaforge/envs/xgboost-coiled/lib/python3.9/site-packages/tornado/tcpclient.py", line 275, in connect
    af, addr, stream = await connector.start(connect_timeout=timeout)
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/rpelgrim/mambaforge/envs/xgboost-coiled/lib/python3.9/asyncio/tasks.py", line 492, in wait_for
    fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/rpelgrim/mambaforge/envs/xgboost-coiled/lib/python3.9

In [None]:
data.BUYER_STATE.value_counts().compute()

### Preprocessing

Below we apply the same preprocessing steps as the ones we performed on the smaller, local subset.

In [None]:
# make sure no NaNs in the dataset
data.isna().sum().compute()

In [None]:
# fill NaN values
data.BUYER_CITY = data.BUYER_CITY.fillna(value="Unknown")
data.DOSAGE_UNIT = data.DOSAGE_UNIT.fillna(value=0)

# instantiate categorizer
ce = Categorizer(columns=categorical)

# fit categorizer and transform data
data = ce.fit_transform(data)

# replace values in categorical columns with their numerical codes
for col in categorical:
    data[col] = data[col].cat.codes

# rearrange columns
cols = data.columns.to_list()
cols_new = [cols[0]] + cols[2:] + [cols[1]]
data = data[cols_new]

# Create the train-test split
X, y = data.iloc[:, :-1], data["CALC_BASE_WT_IN_GM"]
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.3, shuffle=True, random_state=13
)

# persist the train/test splits to cluster memory to speed up training
import dask
dask.persist(X_train, X_test, y_train, y_test)

### XGBoost Training
Alright, the moment we've all been waiting for!

You're now all set to train your distributed XGBoost model on the entire 20GB dataset.

The cells below will create the DaskDMatrix, set the model parameters (using the XGBoost defaults for now) and train your XGBoost model.

In [None]:
# Create the XGBoost DMatrices
dtrain = xgb.dask.DaskDMatrix(client, X_train, y_train)
dtest = xgb.dask.DaskDMatrix(client, X_test, y_test)

In [None]:
# Set model parameters (XGBoost defaults)
params = {
    "max_depth": 6,
    "gamma": 0,
    "eta": 0.3,
    "min_child_weight": 30,
    "objective": "reg:squarederror",
    "grow_policy": "depthwise"
}

In [None]:
%%time 
# train the model 
output = xgb.dask.train(
    client, params, dtrain, num_boost_round=4,
    evals=[(dtrain, 'train')]
)

In [None]:
# make predictions
y_pred = xgb.dask.predict(client, output, dtest)
dask.persist(y_pred)

In [None]:
# evaluate model performance
from sklearn.metrics import mean_absolute_error

mae = mean_absolute_error(y_test, y_pred)
print(f"Mean Absolute Error: {mae}")

2022-07-05 12:08:22,088 - distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client
Traceback (most recent call last):
  File "/Users/rpelgrim/mambaforge/envs/xgboost-coiled/lib/python3.9/site-packages/distributed/comm/tcp.py", line 439, in connect
    stream = await self.client.connect(
  File "/Users/rpelgrim/mambaforge/envs/xgboost-coiled/lib/python3.9/site-packages/tornado/tcpclient.py", line 275, in connect
    af, addr, stream = await connector.start(connect_timeout=timeout)
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/rpelgrim/mambaforge/envs/xgboost-coiled/lib/python3.9/asyncio/tasks.py", line 492, in wait_for
    fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/rpelgrim/mambaforge/envs/xgboost-coiled/lib/python3.9

Great work! You just trained an XGBoost model on 20GB of data in less than 20 seconds.

### Shutting down the cluster
After our training is done, we can close down the cluster, releasing the resources. Should you forget to do so for whatever reason, Coiled automatically shuts down clusters after 20 minutes of inactivity, to help avoid unnecessary costs.


In [29]:
# Shut down the cluster
client.close()

## 3. Pro Tips to Speed Up Training
Below we’ve collected some pro tips straight from the Dask core team to help you speed up your XGBoost training:

- Re-cast numerical columns to less memory-intensive dtypes. For example, convert float64 into int16 whenever possible. This will reduce the memory load of your dataframe and thereby speed up training.
- The Dask Dashboard is a great way to spot bottle-necks and identify opportunities for increased performance in your code. Watch the initial author of Dask, Matt Rocklin, explain how to get the most out of the Dask Dashboard [here](https://www.youtube.com/watch?v=N_GqzcuGLCY).
- Read Matthew Power’s blog on setting up the Dask Dashboard in your Jupyter Lab environment [here](https://coiled.io/blog/dask-jupyterlab-workflow/). 
- Read Dask core contributor Guido Imperiale’s blog on how to tackle the specific issue of unmanaged memory in Dask workers [here](https://coiled.io/blog/tackling-unmanaged-memory-with-dask/). 



## 4. Recap

Let’s recap what we’ve discussed in this notebook:
- When training XGBoost with large datasets, running out of local memory can be a challenge. 
- Connecting XGboost to a local Dask cluster allows you to make the most out of the multiple cores in your machine.
- If that’s still not enough, you can connect Dask to Coiled and burst to the cloud as and when needed.
- You can tweak your distributed XGBoost performance by inspecting the Dask Dashboard.

We’d love to see you apply distributed XGBoost to a dataset that’s meaningful to you. If you’d like to try, swap your dataset into this notebook and see how well it does! 

Let us know how you get on in our [Coiled Community Slack channel](https://join.slack.com/t/coiled-users/shared_invite/zt-hx1fnr7k-In~Q8ui3XkQfvQon0yN5WQ) or by [tweeting](https://twitter.com/coiledhq) at us.