# Distributed XGBoost with Dask

https://xgboost.readthedocs.io/en/stable/tutorials/dask.html

In [1]:
!pip3 install xgboost

Collecting xgboost
  Downloading xgboost-1.5.2-py3-none-manylinux2014_x86_64.whl (173.6 MB)
[K     |████████████████████████████████| 173.6 MB 8.1 kB/s  eta 0:00:01/s eta 0:00:10[K     |▋                               | 3.1 MB 18.6 MB/s eta 0:00:10                      | 7.7 MB 18.6 MB/s eta 0:00:09 0:00:0912.4 MB 18.6 MB/s eta 0:00:09      | 13.9 MB 18.6 MB/s eta 0:00:09:00:09   |███▍                            | 18.4 MB 18.6 MB/s eta 0:00:09                          | 20.7 MB 18.6 MB/s eta 0:00:09�                           | 23.2 MB 18.6 MB/s eta 0:00:09███▊                           | 25.5 MB 18.6 MB/s eta 0:00:08[K     |█████                           | 27.7 MB 18.6 MB/s eta 0:00:08    | 30.2 MB 18.6 MB/s eta 0:00:08         | 32.3 MB 18.6 MB/s eta 0:00:08��██▍                         | 34.5 MB 18.6 MB/s eta 0:00:088    |███████▏                        | 39.0 MB 44.1 MB/s eta 0:00:0400:0303 44.1 MB/s eta 0:00:03               | 50.0 MB 44.1 MB/s eta 0:00:03             | 51.6 MB 

In [1]:
import os
import time

import cdsw
import xgboost as xgb
import dask.array as da
from dask import dataframe as dd

import dask.distributed

from dask.distributed import Client

## Start up Dask Cluster

In [74]:
dask_scheduler = cdsw.launch_workers(
    n=1,
    cpu=1,
    memory=2,
    code=f"!dask-scheduler --host 0.0.0.0 --dashboard-address 127.0.0.1:8090 --scheduler-file /home/cdsw/_scheduler_/dask.log",
)

# Wait for the scheduler to start.
time.sleep(10)

In [75]:
def get_scheduler_url(dask_scheduler):
    scheduler_workers = cdsw.list_workers()
    scheduler_id = dask_scheduler[0]["id"]
    scheduler_ip = [
        worker["ip_address"] for worker in scheduler_workers if worker["id"] == scheduler_id
    ][0]

    return f"tcp://{scheduler_ip}:8786"

scheduler_url = get_scheduler_url(dask_scheduler)

In [76]:
N_WORKERS = 3

dask_workers = cdsw.launch_workers(
    n=N_WORKERS,
    cpu=1,
    memory=4,
    code=f"!dask-worker {scheduler_url} --local-directory /home/cdsw/_worker_",
)

# Wait for the workers to start.
time.sleep(10)

In [77]:
client = Client(scheduler_url)

## Get some data

In [13]:
# HIGGS data: https://archive.ics.uci.edu/ml/datasets/HIGGS

!curl https://archive.ics.uci.edu/ml/machine-learning-databases/00280/HIGGS.csv.gz --output HIGGS.csv.gz

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 2685M  100 2685M    0     0  85.5M      0  0:00:31  0:00:31 --:--:-- 97.1M


In [16]:
!gzip -d HIGGS.csv.gz

In [78]:
colnames = ['label'] + ['feature-%02d' % i for i in range(1, 29)]
dask_df = dd.read_csv("HIGGS.csv", header=None, names=colnames)

In [79]:
dask_df

Unnamed: 0_level_0,label,feature-01,feature-02,feature-03,feature-04,feature-05,feature-06,feature-07,feature-08,feature-09,feature-10,feature-11,feature-12,feature-13,feature-14,feature-15,feature-16,feature-17,feature-18,feature-19,feature-20,feature-21,feature-22,feature-23,feature-24,feature-25,feature-26,feature-27,feature-28
npartitions=125,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1
,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


This dataset contains 11 million examples and takes up nearly 8GiB on disk. As a Dask DataFrame, these 11 million entries are chunked into 125 partitions, with each partition containing about 88,000 examples each. None of these examples has been loaded into memory, though, because Dask DataFrames operate under lazy compute (which is why we don't see any data in the DataFrame above). 

Instead, when we run `head()` a `.compute()` operation is called which returns the head of the first partition. Now we see the data itself!

In [28]:
dask_df.head()

Unnamed: 0,label,feature-01,feature-02,feature-03,feature-04,feature-05,feature-06,feature-07,feature-08,feature-09,...,feature-19,feature-20,feature-21,feature-22,feature-23,feature-24,feature-25,feature-26,feature-27,feature-28
0,1.0,0.869293,-0.635082,0.22569,0.32747,-0.689993,0.754202,-0.248573,-1.092064,0.0,...,-0.010455,-0.045767,3.101961,1.35376,0.979563,0.978076,0.920005,0.721657,0.988751,0.876678
1,1.0,0.907542,0.329147,0.359412,1.49797,-0.31301,1.095531,-0.557525,-1.58823,2.173076,...,-1.13893,-0.000819,0.0,0.30222,0.833048,0.9857,0.978098,0.779732,0.992356,0.798343
2,1.0,0.798835,1.470639,-1.635975,0.453773,0.425629,1.104875,1.282322,1.381664,0.0,...,1.128848,0.900461,0.0,0.909753,1.10833,0.985692,0.951331,0.803252,0.865924,0.780118
3,0.0,1.344385,-0.876626,0.935913,1.99205,0.882454,1.786066,-1.646778,-0.942383,0.0,...,-0.678379,-1.360356,0.0,0.946652,1.028704,0.998656,0.728281,0.8692,1.026736,0.957904
4,1.0,1.105009,0.321356,1.522401,0.882808,-1.205349,0.681466,-1.070464,-0.921871,0.0,...,-0.373566,0.113041,0.0,0.755856,1.361057,0.98661,0.838085,1.133295,0.872245,0.808487


Next, we need to perform a train/test split. The authors of the dataset intended for the last 500,000 entries to act as the test set. We can approximate that by "breaking off" the last 6 partitions, which should contain approximately 530,000 examples. The remainder of the entries will be our train set. 

In [80]:
# The last 6 partitions contain ~ 530000 examples 
# dask_df_train, dask_df_test = dask_df.partitions[:-6], , dask_df.partitions[-6:]

dask_df_train, dask_df_test = dask_df.partitions[0], dask_df.partitions[-1]

In [81]:
dask_df_train.head()

Unnamed: 0,label,feature-01,feature-02,feature-03,feature-04,feature-05,feature-06,feature-07,feature-08,feature-09,...,feature-19,feature-20,feature-21,feature-22,feature-23,feature-24,feature-25,feature-26,feature-27,feature-28
0,1.0,0.869293,-0.635082,0.22569,0.32747,-0.689993,0.754202,-0.248573,-1.092064,0.0,...,-0.010455,-0.045767,3.101961,1.35376,0.979563,0.978076,0.920005,0.721657,0.988751,0.876678
1,1.0,0.907542,0.329147,0.359412,1.49797,-0.31301,1.095531,-0.557525,-1.58823,2.173076,...,-1.13893,-0.000819,0.0,0.30222,0.833048,0.9857,0.978098,0.779732,0.992356,0.798343
2,1.0,0.798835,1.470639,-1.635975,0.453773,0.425629,1.104875,1.282322,1.381664,0.0,...,1.128848,0.900461,0.0,0.909753,1.10833,0.985692,0.951331,0.803252,0.865924,0.780118
3,0.0,1.344385,-0.876626,0.935913,1.99205,0.882454,1.786066,-1.646778,-0.942383,0.0,...,-0.678379,-1.360356,0.0,0.946652,1.028704,0.998656,0.728281,0.8692,1.026736,0.957904
4,1.0,1.105009,0.321356,1.522401,0.882808,-1.205349,0.681466,-1.070464,-0.921871,0.0,...,-0.373566,0.113041,0.0,0.755856,1.361057,0.98661,0.838085,1.133295,0.872245,0.808487


In [82]:
dask_df_test.head()

Unnamed: 0,label,feature-01,feature-02,feature-03,feature-04,feature-05,feature-06,feature-07,feature-08,feature-09,...,feature-19,feature-20,feature-21,feature-22,feature-23,feature-24,feature-25,feature-26,feature-27,feature-28
0,1.0,0.711173,0.596989,0.425995,1.107701,-1.726694,2.323159,-1.236822,0.580592,0.0,...,1.094702,1.083582,0.0,3.467659,2.860081,1.14731,1.194971,0.952634,2.715921,2.041492
1,1.0,0.582701,1.450185,-1.468407,0.349005,-0.145752,1.000076,0.510933,1.109466,0.0,...,0.6075,-1.385882,3.101961,0.774237,0.89436,0.990923,0.786021,0.947854,0.932345,0.781177
2,1.0,1.85315,0.715813,-0.036307,0.276173,0.514204,1.326015,0.102958,-0.137981,2.173076,...,-0.702531,1.151281,0.0,0.795276,1.361374,0.990129,0.601279,0.635518,0.79517,0.720959
3,1.0,1.930746,-0.971101,0.083646,0.661669,0.268873,0.486618,-2.886546,1.293518,0.0,...,-1.821012,1.539719,0.0,0.609424,1.23362,0.988642,0.854114,0.967377,1.195336,0.98335
4,0.0,1.288384,0.163573,1.702176,0.290307,1.394569,0.731942,-0.215896,-0.219474,2.173076,...,-0.713357,-0.555179,0.0,0.877087,0.94202,0.979568,1.094009,0.948912,0.965075,0.836173


Good -- these two subsets of the dataset look like they contain different pieces of information and we still haven't needed to load the entire dataset into memory! 

Next we need to separate the `label` from the `features` so that we can properly pass the data into XGBoost. 

In [83]:
y_train = dask_df_train['label']
X_train = dask_df_train[dask_df_train.columns.difference(['label'])]

Now it's time to take the data partitions and load them into these weird DMatrix thingies required by XGBoost. 

<img src="https://miro.medium.com/max/1400/0*AX-9WEYvaCI2h86I">

This takes a while! 

In [84]:
# X and y must be Dask dataframes or arrays
#num_obs = 1e5
#num_features = 20
#X = da.random.random(size=(num_obs, num_features), chunks=(1000, num_features))
#y = da.random.random(size=(num_obs, 1), chunks=(1000, 1))

dtrain = xgb.dask.DaskDMatrix(client, X_train, y_train)

In [86]:
output = xgb.dask.train(
    client,
    {"verbosity": 2, "tree_method": "hist", "objective": "reg:squarederror"},
    dtrain,
    num_boost_round=4,
    evals=[(dtrain, "train")],
)

In [87]:
output

{'booster': <xgboost.core.Booster at 0x7fcc1c2554d0>,
 'history': {'train': OrderedDict([('rmse',
                [0.475212, 0.461451, 0.451909, 0.445539])])}}

### Train score

We can score our model by passing our DaskDMatrix object to the xgb.dask.predict method. The result is another Dask Array so we must use `.compute()` to retrieve a non-distributed data object (e.g., a Numpy array)

In [88]:
prediction = xgb.dask.predict(client, output, dtrain)

In [89]:
thingy = prediction.compute()

In [99]:
labels = [round(t) for t in thingy]
len(labels)

88000

In [92]:
y_thingy = y_train.compute()

In [101]:
sum(labels == y_thingy)/len(labels)

0.7045340909090909

### Inference on test set


In [102]:
y_test = dask_df_test['label']
X_test = dask_df_test[dask_df_test.columns.difference(['label'])]

In [103]:
test_preds = xgb.dask.predict(client, output['booster'], X_test)


In [104]:
test_labels = [round(t) for t in test_preds.compute()]

In [105]:
sum(test_labels == y_test.compute()) / len(test_labels)

0.697659090909091

## Shut down workers

In [106]:
cdsw.stop_workers(*[worker["id"] for worker in dask_workers + dask_scheduler])

[<Response [204]>, <Response [204]>, <Response [204]>, <Response [204]>]

distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client
