### Note

This notebook will work through a dask-xgboost example using airlines dataset taken from the documentation with some modifications - https://dask-ml.readthedocs.io/en/stable/examples/xgboost.html.
This notebook uses **GPU** variant of xgboost `py-xgboost-gpu`.

The dataset used for this notebook can be downloaded from http://stat-computing.org/dataexpo/2009/the-data.html. Data of year 2000 has been used in this notebook for demonstration.


### Versions of packages to be used in this notebook

In [1]:
import dask; print('Dask Version:', dask.__version__)
import distributed; print('Distributed Version:', distributed.__version__)
import xgboost; print('XGBoost Version:', xgboost.__version__)
import dask_xgboost; print('Dask-XGBoost Version:', dask_xgboost.__version__)
import dask_cuda; print('Dask CUDA Version:', dask_cuda.__version__)

Dask Version: 2.9.2
Distributed Version: 2.9.3
XGBoost Version: 0.90
Dask-XGBoost Version: 0.1.9
Dask CUDA Version: v0.11.0


### Import dask packages

In [2]:
from dask.distributed import Client, LocalCluster, progress
from dask import compute, persist
from dask_cuda import LocalCUDACluster
import dask_cuda

### Create a single node dask CUDA cluster
Note - This notebook would also work with multi-node dask-cluster, but for that you'll have to start the dask-scheduler and dask-cuda-worker on your machines; and use the scheduler connection information to the dask `Client` object.

In [None]:
#cluster = LocalCUDACluster()
#cluster
#client = Client(cluster)

### Create dask client

In [3]:
#copy and paste dask_scheduler_address
client = Client('tcp://10.3.64.217:18786')
client

0,1
Client  Scheduler: tcp://10.3.64.217:18786  Dashboard: http://10.3.64.217:18787/status,Cluster  Workers: 1  Cores: 1  Memory: 100.00 GB


### Create dask dataframe from the CSV files

In [4]:
import dask.dataframe as dd

# Subset of the columns to use
cols = ['Year', 'Month', 'DayOfWeek', 'Distance',
        'DepDelay', 'CRSDepTime', 'UniqueCarrier', 'Origin', 'Dest']

# Create the dataframe
##df = dd.read_csv('/mnt/pai/data/airlines_2000-2008/2000.csv', usecols=cols)
df = dd.read_csv('/dlidata/distributed_dask/5fn2lfccfbhfoin51fr094dz6bdgipl3.csv', usecols=cols)


In [5]:
df

Unnamed: 0_level_0,Year,Month,DayOfWeek,CRSDepTime,UniqueCarrier,DepDelay,Origin,Dest,Distance
npartitions=9,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
,int64,int64,int64,int64,object,float64,object,object,int64
,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...


In [6]:
df.head()

Unnamed: 0,Year,Month,DayOfWeek,CRSDepTime,UniqueCarrier,DepDelay,Origin,Dest,Distance
0,2000,1,5,1647,HP,0.0,ATL,PHX,1587
1,2000,1,6,1647,HP,1.0,ATL,PHX,1587
2,2000,1,7,1647,HP,,ATL,PHX,1587
3,2000,1,1,1647,HP,-2.0,ATL,PHX,1587
4,2000,1,6,846,HP,-4.0,ATL,PHX,1587


In [7]:
df.npartitions

9

### Create the target classification label "is_delayed"


In [8]:
is_delayed = (df.DepDelay.fillna(16) > 15)

df['CRSDepTime'] = df['CRSDepTime'].clip(upper=2399)
del df['DepDelay']

df, is_delayed = persist(df, is_delayed)
progress(df, is_delayed)

VBox()

![code](https://github.com/IBM/wmla-assets/raw/master/WMLA-learning-journey/shared-images/elastic_distributed_dask/Elastic_Distributed_Dask1.gif)

In [9]:
is_delayed.head()

0    False
1    False
2     True
3    False
4    False
Name: DepDelay, dtype: bool

## One hot encode

In [10]:
df = dd.get_dummies(df.categorize()).persist()

In [11]:
len(df.columns)

428

In [12]:
df.head()

Unnamed: 0,Year,Month,DayOfWeek,CRSDepTime,Distance,UniqueCarrier_HP,UniqueCarrier_WN,UniqueCarrier_CO,UniqueCarrier_US,UniqueCarrier_DL,...,Dest_DAY,Dest_DCA,Dest_DEN,Dest_BRO,Dest_LFT,Dest_LWB,Dest_GST,Dest_MIB,Dest_ITO,Dest_BQN
0,2000,1,5,1647,1587,1,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,2000,1,6,1647,1587,1,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2,2000,1,7,1647,1587,1,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
3,2000,1,1,1647,1587,1,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
4,2000,1,6,846,1587,1,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


### Find the spread of dask dataframe partitions across worker nodes

In [13]:
client.who_has(df)

{"('get_dummies-e0afd93abc0c684ac402743fec2df6d6', 8)": ('tcp://10.3.64.217:44697',),
 "('get_dummies-e0afd93abc0c684ac402743fec2df6d6', 5)": ('tcp://10.3.64.217:44697',),
 "('get_dummies-e0afd93abc0c684ac402743fec2df6d6', 4)": ('tcp://10.3.64.217:44697',),
 "('get_dummies-e0afd93abc0c684ac402743fec2df6d6', 7)": ('tcp://10.3.64.217:44697',),
 "('get_dummies-e0afd93abc0c684ac402743fec2df6d6', 6)": ('tcp://10.3.64.217:44697',),
 "('get_dummies-e0afd93abc0c684ac402743fec2df6d6', 1)": ('tcp://10.3.64.217:44697',),
 "('get_dummies-e0afd93abc0c684ac402743fec2df6d6', 0)": ('tcp://10.3.64.217:44697',),
 "('get_dummies-e0afd93abc0c684ac402743fec2df6d6', 3)": ('tcp://10.3.64.217:44697',),
 "('get_dummies-e0afd93abc0c684ac402743fec2df6d6', 2)": ('tcp://10.3.64.217:44697',)}

In [14]:
persist(df)
progress(df)

VBox()

## Split and Train in GPU

In [15]:
data_train, data_test = df.random_split([0.9, 0.1], 
                                         random_state=1234)
labels_train, labels_test = is_delayed.random_split([0.9, 0.1], 
                                                    random_state=1234)

In [16]:
data_train.compute().shape

(5115379, 428)

In [17]:
%%time
import dask_xgboost as dxgb

params = {'objective': 'binary:logistic', 'nround': 10, 
          'max_depth': 16, 'eta': 0.01, 'subsample': 0.5, 
          'min_child_weight': 1,
          'tree_method': 'gpu_hist'}

CPU times: user 6 µs, sys: 0 ns, total: 6 µs
Wall time: 8.82 µs


In [18]:
%%time
bst = dxgb.train(client, params, data_train, labels_train)
bst

CPU times: user 60.2 ms, sys: 12.4 ms, total: 72.6 ms
Wall time: 52.8 s


## Predict and Evaluate

In [19]:
import xgboost as xgb
dtest = xgb.DMatrix(data_test.head())
bst.predict(dtest)

array([0.49385586, 0.49536628, 0.47054937, 0.48235655, 0.49634877],
      dtype=float32)

In [20]:
# Or use dask-xgboost to predict in parallel across the cluster
predictions = dxgb.predict(client, bst, data_test).persist()
predictions

Unnamed: 0,Array,Chunk
Bytes,unknown,unknown
Shape,"(nan,)","(nan,)"
Count,9 Tasks,9 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes unknown unknown Shape (nan,) (nan,) Count 9 Tasks 9 Chunks Type float32 numpy.ndarray",,

Unnamed: 0,Array,Chunk
Bytes,unknown,unknown
Shape,"(nan,)","(nan,)"
Count,9 Tasks,9 Chunks
Type,float32,numpy.ndarray


In [21]:
prediction_sc = predictions.compute()
prediction_sc

array([0.49385586, 0.49536628, 0.47054937, ..., 0.48311645, 0.4729035 ,
       0.4720207 ], dtype=float32)

In [22]:
label_sc = labels_test.compute()
label_sc

2          True
8         False
17        False
25        False
32        False
33        False
46         True
62        False
65        False
72        False
74        False
75        False
91        False
100        True
104       False
105       False
114       False
115       False
120       False
124        True
127        True
136       False
157       False
158       False
176       False
179       False
180       False
191        True
213       False
239       False
          ...  
575571    False
575590    False
575614     True
575630     True
575649     True
575655    False
575677     True
575680    False
575685    False
575686    False
575688     True
575716    False
575726    False
575734     True
575738    False
575789    False
575792    False
575801    False
575804     True
575816    False
575823    False
575839     True
575845     True
575869    False
575872    False
575873     True
575877    False
575888    False
575903    False
575905    False
Name: DepDelay, Length: 

In [23]:
from sklearn.metrics import roc_auc_score, roc_curve
print(roc_auc_score(label_sc, 
                    prediction_sc))

0.7124789020664825


In [24]:
client.close()