Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed computing with Dask #2032

Closed
mrocklin opened this Issue Feb 13, 2017 · 46 comments

Comments

Projects
None yet
5 participants
@mrocklin
Copy link

commented Feb 13, 2017

Hello, I am an author of Dask, a library for parallel and distributed computing in Python. I am curious if there is interest within this community to collaborate on distributing XGBoost on Dask either for parallel training or for ETL.

There are probably two components of Dask that are relevant for this project:

  1. A generic system for parallel and distributed computing, built on arbitrary dynamic task scheduling. The relevant APIs here are probably dask.delayed and concurrent.futures
  2. A parallel and distributed subset of the Pandas API, dask.dataframe useful for feature engineering and data pre-processing. This doesn't implement the entire Pandas API, but comes decently close.

Is there interest in collaborating here?

@terrytangyuan

This comment has been minimized.

Copy link
Member

commented Feb 13, 2017

@mrocklin I thought Dask has integrations with sklearn. Did you take a look at our sklearn wrapper to see if it will work with that?

@mrocklin

This comment has been minimized.

Copy link
Author

commented Feb 13, 2017

Meaningfully integrating with a distributed system typically has to be done at the per-algorithm level rather than at the library level. There are some ways in which SKLearn and Dask can help each other, yes, but they're not particularly deep.

@terrytangyuan

This comment has been minimized.

Copy link
Member

commented Feb 13, 2017

Dask dataframe would be a good start. In our code base, we have a check for pandas dataframe. That might be where dask dataframe would fit as a start.

@mrocklin

This comment has been minimized.

Copy link
Author

commented Feb 13, 2017

So what happens if someone arrives with a multi-terabyte dask dataframe? Do you just convert it to Pandas and proceed? Or is there a way to parallelize XGBoost intelligently across a cluster, pointing to the various pandas dataframes that make up a dask dataframe?

@terrytangyuan

This comment has been minimized.

Copy link
Member

commented Feb 13, 2017

Users can specify batch size? I'd imagine users can be benefited through partial_fit.

cc @tqchen who's more familiar with the distributed part of the code.

@tqchen

This comment has been minimized.

Copy link
Member

commented Feb 13, 2017

The distributed version of xgboost can be hooked into a distributed job launcher, ideally get data partition feed into xgboost then continues.

@mrocklin I think the most relevant part is the xgboost-spark and xgboost-flink module, which embeds xgboost into mapPartition function of spark/flink. I guess there would be something similar in Dask

The requirement from xgboost side is that XGBoost handles inter process connection by rabit, and will need to start a tracker (that connects each job) from the client side.

@tqchen

This comment has been minimized.

Copy link
Member

commented Feb 13, 2017

see relevant code in https://github.com/dmlc/xgboost/blob/master/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala#L112

Rabit is designed to be embedded into other distributed system, so I think it might not be too hard to do the adjustment in python side.

@mrocklin

This comment has been minimized.

Copy link
Author

commented Feb 13, 2017

Launching other distributed systems from Dask is usually pretty doable. How do you move data from the hosting distributed system (spark/flink/dask) to xg-boost? Or is this for distributed training on small-data?

@mrocklin

This comment has been minimized.

Copy link
Author

commented Feb 13, 2017

More concretely, I expect to build a system as follows:

  • On every dask worker I start a Rabit server. Dask gives these Rabit servers enough information to find each other.
  • I create some local XGBoost state on every worker that represents the currently training model
  • I repeatedly feed this per-worker object pandas dataframes or numpy arrays
  • I listen for some signal from XGBoost that tells me to stop

Does this match your expectation? Is it easy for you to point me to relevant Python API?

@tqchen

This comment has been minimized.

Copy link
Member

commented Feb 15, 2017

Yes, see relevant information here https://github.com/dmlc/xgboost/blob/master/tests/distributed/ for python API.

What you will need to do additionally is to start a rabit tracker at driver side(likely to be the place which drives dask), this is done in the dmlc-submit script here https://github.com/dmlc/dmlc-core/tree/master/tracker/dmlc_tracker

@mrocklin

This comment has been minimized.

Copy link
Author

commented Feb 15, 2017

OK, filling out my outline from before:

Before running any XGBoost code we set up a Rabit network

On the driver/scheduler node we start a rabit tracker

envs = {'DMLC_NUM_WORKER' : nworker,
        'DMLC_NUM_SERVER' : nserver}

rabit = RabitTracker(hostIP=ip_address, nslave=num_workers)
envs.update(rabit.slave_envs())
rabit.start(args.num_workers)  # manages connections in background thread

I may also go through a similar process to start a PSTracker. Should this be on the same centralized machine or should it be elsewhere within the network? Should there be a few of these? Should this be user configurable?

Eventually I have my tracker (and pstrackers?) join the rabit network and block.

rabit.join()  # join network

On worker nodes I need to dump these environment variables (which I'll move through normal dask channels) into the local environment. Then just calling xgboost.rabit.init() should suffice

import os
os.environ.update(envs)
xgboost.rabit.init()

Looking at the Rabit code it looks like environment variables are the only way to provide this information. Can you verify this? Is there a way to supply tracker host/port information as direct inputs?

Training

Then I convert my numpy arrays / pandas dataframes / scipy sparse arrays to DMatrix objects, this seems relatively straightforward. However I'm likely to have several batches of data per worker. Is there a clean way to call train several times with more data as it becomes available? I'm concerned about the comments on these lines:

# Run training, all the features in training API is available.
# Currently, this script only support calling train once for fault recovery purpose.
bst = xgb.train(param, dtrain, num_round, watchlist, early_stopping_rounds=2)

Do we need to wait for all data to arrive before starting training?

Example dataset / problem

Assuming that I have everything above correct then is there a standard distributed training example that people use for demonstration?

@tqchen

This comment has been minimized.

Copy link
Member

commented Feb 15, 2017

There is no need to start pstracker.

  • Tracker only need to be started in one place, likely on scheduler(driver), it has no data heavy job, and only serves to connect the works.
  • The env args can be passed as kwargs in rabit.init
  • Since tree boosting is a batched algorithm, we do need to wait for all data to be ingested before start training.
    • Note however each worker only need to take a shard(subset of rows) of data.
    • Ideally, we should use the data iter interface to pass the data into DMatrix as mini-batch fashion, so the entire dataset does not have to sit in memory
    • This is done via https://github.com/dmlc/xgboost/blob/master/include/xgboost/c_api.h#L117, which do not yet have a python wrapper.
    • For the first solution, I would recommend to directly pass by array
@mrocklin

This comment has been minimized.

Copy link
Author

commented Feb 18, 2017

I had some time to play with this this morning. Results here: https://github.com/mrocklin/dask-xgboost

So far it only handles the distributed learning of a single in-memory dataset. Some questions arose:

  1. What is the best way to serialize and pass around DMatrix objects?
  2. What is the best way to serialize and return a Booster result?
  3. How do the environment variables listed above map to arguments in rabit.init? What precisely is the expected form of inputs to rabit.init? Passing the result of slave_envs() to rabit.init obviously won't work because it expects a list. Should we convert each keyname to --key, perhaps dropping the DMLC prefix and converting to lowercase?
  4. Is there a good way to test correctness? How do we compare two Booster objects? Should we expect distributed training to produce precisely the same result and sequential training?
@tqchen

This comment has been minimized.

Copy link
Member

commented Feb 18, 2017

  • You don't normally serialize DMatrix, it is more like a training time data holder, I assume data is passed around and shared by dask (array/dataframe), then passed to xgboost
    • We can explore better ways to pass data other than directly through in-memory array, possibly by exposing an data iterator to xgboost
  • You can pickle Booster, as long as xgboost is installed in both side.
  • Sorry about not elaborating how things are passed, it should be
rabit.init(['DMLC_KEY1=VALUE1', 'DMLC_KEY2=VALUE2']
  • Normally the booster trained from distributed and single machine are not the same, but here are a few things to check
    • The booster returned from all worker should be identical
    • Looking for the predictive validation error, it should be nroughly as low as single machine case
@mrocklin

This comment has been minimized.

Copy link
Author

commented Feb 18, 2017

Two more questions generally about how this gets used (I have no experience with XGBoost and only a little experience with machine learning, please forgive my ignorance).

  1. Is it reasonable to use multiple workers on the same input data? (XGBoost is computationally bound?)
  2. If we operate on larger datasets do I have to do anything special to tell each XGBoost worker that its data differs from its peers?

Which use case is more common?

@tqchen

This comment has been minimized.

Copy link
Member

commented Feb 18, 2017

Each work should work on a different partition of data(by rows), they should NOT look at same input data.

  • If data is not big enough, a multi-threaded verison should do
  • Each work will collect statistics separately on the their partition and sync with each other

This normally corresponds to mapPartition operation in frameworks like spark/flink

Say my data set have 8 rows, 4 columns, if we start two workers

  • worker 0 reads from row 0-3
  • worker 1 reads from row 4 -7
@mrocklin

This comment has been minimized.

Copy link
Author

commented Feb 18, 2017

OK, what's up there now is a bit cleaner. It would be nice if we had some ability to consume results as they were generated on each worker, but we've worked around it for now. Here is the current solution:

  1. Persist the dask array or dataframe on the cluster, wait for it to finish
  2. Find where each chunk/partition ended up
  3. Tell each worker to concatenate exactly those chunks/partitions and train on them

This solution appears to be manageable, but is not ideal. It would be convenient if xgboost-python could accept results as they arrived. However I think the next thing to do is to try it in practice.

I'm going to look around on the internet for examples. If anyone happens to have an artificial problem that I can easily generate with the numpy or pandas API that would be welcome. Until then, here is a trivial example on my laptop with random data:

In [1]: import dask.dataframe as dd

In [2]: df = dd.demo.make_timeseries('2000', '2001', {'x': float, 'y': float, 'z': int}, freq='1s', partition_freq=
   ...: '1D')  # some random time series data

In [3]: df.head()
Out[3]: 
                            x         y     z
2000-01-01 00:00:00  0.778864  0.824796   977
2000-01-01 00:00:01 -0.019888 -0.173454  1023
2000-01-01 00:00:02  0.552826  0.051995  1083
2000-01-01 00:00:03 -0.761811  0.780124   959
2000-01-01 00:00:04 -0.643525  0.679375   980

In [4]: labels = df.z > 1000

In [5]: del df['z']

In [6]: df.head()
Out[6]: 
                            x         y
2000-01-01 00:00:00  0.778864  0.824796
2000-01-01 00:00:01 -0.019888 -0.173454
2000-01-01 00:00:02  0.552826  0.051995
2000-01-01 00:00:03 -0.761811  0.780124
2000-01-01 00:00:04 -0.643525  0.679375

In [7]: labels.head()
Out[7]: 
2000-01-01 00:00:00    False
2000-01-01 00:00:01     True
2000-01-01 00:00:02     True
2000-01-01 00:00:03    False
2000-01-01 00:00:04    False
Name: z, dtype: bool

In [8]: from dask.distributed import Client

In [9]: c = Client()  # creates a local "cluster" on my laptop

In [10]: from dask_xgboost import train
/home/mrocklin/Software/anaconda/lib/python3.5/site-packages/sklearn/cross_validation.py:44: DeprecationWarning: This module was deprecated in version 0.18 in favor of the model_selection module into which all the refactored classes and functions are moved. Also note that the interface of the new CV iterators are different from that of this module. This module will be removed in 0.20.
  "This module will be removed in 0.20.", DeprecationWarning)

In [11]: param = {'max_depth': 2, 'eta': 1, 'silent': 1, 'objective': 'binary:logistic'}  # taken from example

In [12]: bst = train(c, param, df, labels)
/home/mrocklin/Software/anaconda/lib/python3.5/site-packages/sklearn/cross_validation.py:44: DeprecationWarning: This module was deprecated in version 0.18 in favor of the model_selection module into which all the refactored classes and functions are moved. Also note that the interface of the new CV iterators are different from that of this module. This module will be removed in 0.20.
  "This module will be removed in 0.20.", DeprecationWarning)
/home/mrocklin/Software/anaconda/lib/python3.5/site-packages/sklearn/cross_validation.py:44: DeprecationWarning: This module was deprecated in version 0.18 in favor of the model_selection module into which all the refactored classes and functions are moved. Also note that the interface of the new CV iterators are different from that of this module. This module will be removed in 0.20.
  "This module will be removed in 0.20.", DeprecationWarning)
/home/mrocklin/Software/anaconda/lib/python3.5/site-packages/sklearn/cross_validation.py:44: DeprecationWarning: This module was deprecated in version 0.18 in favor of the model_selection module into which all the refactored classes and functions are moved. Also note that the interface of the new CV iterators are different from that of this module. This module will be removed in 0.20.
  "This module will be removed in 0.20.", DeprecationWarning)
/home/mrocklin/Software/anaconda/lib/python3.5/site-packages/sklearn/cross_validation.py:44: DeprecationWarning: This module was deprecated in version 0.18 in favor of the model_selection module into which all the refactored classes and functions are moved. Also note that the interface of the new CV iterators are different from that of this module. This module will be removed in 0.20.
  "This module will be removed in 0.20.", DeprecationWarning)
[14:46:20] Tree method is automatically selected to be 'approx' for faster speed. to use old behavior(exact greedy algorithm on single machine), set tree_method to 'exact'
[14:46:20] Tree method is automatically selected to be 'approx' for faster speed. to use old behavior(exact greedy algorithm on single machine), set tree_method to 'exact'
[14:46:20] Tree method is automatically selected to be 'approx' for faster speed. to use old behavior(exact greedy algorithm on single machine), set tree_method to 'exact'
[14:46:20] Tree method is automatically selected to be 'approx' for faster speed. to use old behavior(exact greedy algorithm on single machine), set tree_method to 'exact'

In [13]: bst
Out[13]: <xgboost.core.Booster at 0x7fbaacfd17b8>
@mrocklin

This comment has been minimized.

Copy link
Author

commented Feb 18, 2017

Relevant code is here if anyone wants to take a look: https://github.com/mrocklin/dask-xgboost/blob/master/dask_xgboost/core.py

As I said, I'm new to XGBoost, so I'm probably missing things.

@tqchen

This comment has been minimized.

Copy link
Member

commented Feb 18, 2017

a typical toy example to try is in https://github.com/dmlc/xgboost/tree/master/demo/data
It is in libsvm format though, and need a bit parsing to get it into numpy

@mrocklin

This comment has been minimized.

Copy link
Author

commented Feb 18, 2017

Anything larger (for which you would actually need a cluster)? Or is there a standard way to generate a dataset of arbitrary size?

@mrocklin

This comment has been minimized.

Copy link
Author

commented Feb 18, 2017

Or, perhaps a better question is: "What would you (or anyone else reading this issue) like to see here?"

@mrocklin

This comment has been minimized.

Copy link
Author

commented Feb 18, 2017

Building predict now. If I move the model back to a worker (going through pickle/unpickle process) and then call bst.predict on some data I get the following error:

Doing rabit call after Finalize

My assumption was that, at this point, the model is self-contained and no longer needs to use rabit. It seems to work fine on the client machine. Any thoughts why I might receive this error when calling predict?

@tqchen

This comment has been minimized.

Copy link
Member

commented Feb 18, 2017

Some part of predict still uses rabit, mainly because of the predictor still uses learner with some initialization routines that is shared with training. Eventually this should be fixed, but this is the case for now.

@tqchen

This comment has been minimized.

Copy link
Member

commented Feb 18, 2017

I think as long as it works fine for the common dataset, it is an interesting starting point.

There are reasons to use a cluster for medium data anyway (ease of scheduling in cluster env), some of the pyspark users might be interested to try it out if we advertise it a bit

Testing out on the dataset that really matters was hard, e.g.(try 1 dataset with 1 billion rows). Kaggle might some big dataset that could be relevant that's around 10 million.

@mrocklin

This comment has been minimized.

Copy link
Author

commented Feb 18, 2017

This repository shows experiments against the airlines dataset, which I think is in the tens of millions of rows and tens of columns (thousand after one-hot-encoding?) For their benchmark it looks like they took a sample of 100k rows and artificially generated larger datasets from this sample. Presumably we could scale this up if necessary.

Here is an example using this data with pandas and xgboost on a single core. Any recommendations on data prep, parameters, or how to do this properly would be welcome.

In [1]: import pandas as pd

In [2]: df = pd.read_csv('train-0.1m.csv')

In [3]: df.head()
Out[3]: 
  Month DayofMonth DayOfWeek  DepTime UniqueCarrier Origin Dest  Distance  \
0   c-8       c-21       c-7     1934            AA    ATL  DFW       732   
1   c-4       c-20       c-3     1548            US    PIT  MCO       834   
2   c-9        c-2       c-5     1422            XE    RDU  CLE       416   
3  c-11       c-25       c-6     1015            OO    DEN  MEM       872   
4  c-10        c-7       c-6     1828            WN    MDW  OMA       423   

  dep_delayed_15min  
0                 N  
1                 N  
2                 N  
3                 N  
4                 Y  

In [4]: labels = df.dep_delayed_15min == 'Y'

In [5]: del df['dep_delayed_15min']

In [6]: df = pd.get_dummies(df)

In [7]: len(df.columns)
Out[7]: 652

In [8]: import xgboost as xgb
/home/mrocklin/Software/anaconda/lib/python3.5/site-packages/sklearn/cross_validation.py:44: DeprecationWarning: This module was deprecated in version 0.18 in favor of the model_selection module into which all the refactored classes and functions are moved. Also note that the interface of the new CV iterators are different from that of this module. This module will be removed in 0.20.
  "This module will be removed in 0.20.", DeprecationWarning)

In [9]: dtrain = xgb.DMatrix(df, label=labels)

In [10]: param = {}  # Are there better choices for parameters?  I could use help here

In [11]: bst = xgb.train(param, dtrain)  # or other parameters here?
[17:50:28] src/tree/updater_prune.cc:74: tree pruning end, 1 roots, 124 extra nodes, 0 pruned nodes, max_depth=6
[17:50:30] src/tree/updater_prune.cc:74: tree pruning end, 1 roots, 120 extra nodes, 0 pruned nodes, max_depth=6
[17:50:32] src/tree/updater_prune.cc:74: tree pruning end, 1 roots, 120 extra nodes, 0 pruned nodes, max_depth=6
[17:50:33] src/tree/updater_prune.cc:74: tree pruning end, 1 roots, 116 extra nodes, 0 pruned nodes, max_depth=6
[17:50:35] src/tree/updater_prune.cc:74: tree pruning end, 1 roots, 112 extra nodes, 0 pruned nodes, max_depth=6
[17:50:36] src/tree/updater_prune.cc:74: tree pruning end, 1 roots, 114 extra nodes, 0 pruned nodes, max_depth=6
[17:50:38] src/tree/updater_prune.cc:74: tree pruning end, 1 roots, 106 extra nodes, 0 pruned nodes, max_depth=6
[17:50:39] src/tree/updater_prune.cc:74: tree pruning end, 1 roots, 116 extra nodes, 0 pruned nodes, max_depth=6
[17:50:41] src/tree/updater_prune.cc:74: tree pruning end, 1 roots, 104 extra nodes, 0 pruned nodes, max_depth=6
[17:50:43] src/tree/updater_prune.cc:74: tree pruning end, 1 roots, 100 extra nodes, 0 pruned nodes, max_depth=6

In [12]: test = pd.read_csv('test.csv')

In [13]: test.head()
Out[13]: 
  Month DayofMonth DayOfWeek  DepTime UniqueCarrier Origin Dest  Distance  \
0   c-7       c-25       c-3      615            YV    MRY  PHX       598   
1   c-4       c-17       c-2      739            WN    LAS  HOU      1235   
2  c-12        c-2       c-7      651            MQ    GSP  ORD       577   
3   c-3       c-25       c-7     1614            WN    BWI  MHT       377   
4   c-6        c-6       c-3     1505            UA    ORD  STL       258   

  dep_delayed_15min  
0                 N  
1                 N  
2                 N  
3                 N  
4                 Y  

In [14]: test_labels = test.dep_delayed_15min == 'Y'

In [16]: del test['dep_delayed_15min']

In [17]: test = pd.get_dummies(test)

In [18]: len(test.columns)  # oops, looks like the columns don't match up
Out[18]: 670

In [19]: dtest = xgb.DMatrix(test)

In [20]: predictions = bst.predict(dtest)  # this fails because of mismatched columns

Anyway, here is an option. The airlines dataset seems well known and can be inconveniently large in practice. Again though, machine learning isn't my specialty, so I don't know if this is appropriate or not.

cc @TomAugspurger, who seems like the kind of guy who might have thoughts on this.

@mrocklin

This comment has been minimized.

Copy link
Author

commented Feb 18, 2017

Regarding Dask and predict, I can always set up rabit again. This feels a little bit unclean though because it forces evaluation rather than keeping things lazy. But this isn't a serious blocker to use.

@mrocklin

This comment has been minimized.

Copy link
Author

commented Feb 19, 2017

Running into some issues with predict. Two questions:

  1. Can I call Booster.predict multiple times within the same rabit session?
  2. Can I call rabit.init, Booster.predict and rabit.finalize on separate threads?

Currently I create a new tracker, and call rabit.init on the worker's main thread. This works fine. However when I call Booster.predict in worker threads (each dask worker maintains a thread pool for computation) I get errors like Doing rabit call after Finalize. Any recommendations?

@mrocklin

This comment has been minimized.

Copy link
Author

commented Feb 20, 2017

Some part of predict still uses rabit, mainly because of the predictor still uses learner with some initialization routines that is shared with training. Eventually this should be fixed, but this is the case for now.

I'm curious about this. After we serialize-transfer-deserialize the trained model from a worker to my client machine it seems to work fine on normal data even though there is no rabit network. It seems like a model trained with Rabit can be used to predict data without rabit. This also seems like it would be necessary in production. Can you say more about the constraints of using a rabit-trained model here?

@ogrisel

This comment has been minimized.

Copy link

commented Feb 20, 2017

Example dataset / problem
Assuming that I have everything above correct then is there a standard distributed training example that people use for demonstration?

I would be nice to reproduce the results of this experiment:

https://github.com/Microsoft/LightGBM/wiki/Experiments#parallel-experiment

with the new binning + fast hist option from XGBoost (#1950), it should be possible to get similar results.

@ogrisel

This comment has been minimized.

Copy link

commented Feb 20, 2017

a typical toy example to try is in https://github.com/dmlc/xgboost/tree/master/demo/data
It is in libsvm format though, and need a bit parsing to get it into numpy

You might be interested in this PR in sklearn: scikit-learn/scikit-learn#935

@tqchen

This comment has been minimized.

Copy link
Member

commented Feb 20, 2017

@mrocklin There is no constraint on reusing the model. So the model trained in distributed version can be used in serial version. It is just that current limitation of predictor(when compiled with rabit) have mixed function with the training function(so rabit call happened).

Now that you say it, I think we might have a solution for the problem. Simply do a rabit.init(without passing in anything, and make the predictor think it is the only worker ) before predict should solve the problem

@mrocklin

This comment has been minimized.

Copy link
Author

commented Feb 20, 2017

Yes. Indeed that resolves the problem. dask-xgboost now supports predict: mrocklin/dask-xgboost@827a03d

Thanks for the workaround @tqchen !

@mrocklin

This comment has been minimized.

Copy link
Author

commented Feb 20, 2017

Here is a workflow with dask.dataframe and xgboost on a small sample of the airlines dataset on my local laptop. Does this look OK to everyone? Are there API elements of XGBoost that I'm missing here?

In [1]: import dask.dataframe as dd

In [2]: import dask_xgboost as dxgb

In [3]: df = dd.read_csv('train-0.1m.csv')

In [4]: df.head()
Out[4]: 
  Month DayofMonth DayOfWeek  DepTime UniqueCarrier Origin Dest  Distance  \
0   c-8       c-21       c-7     1934            AA    ATL  DFW       732   
1   c-4       c-20       c-3     1548            US    PIT  MCO       834   
2   c-9        c-2       c-5     1422            XE    RDU  CLE       416   
3  c-11       c-25       c-6     1015            OO    DEN  MEM       872   
4  c-10        c-7       c-6     1828            WN    MDW  OMA       423   

  dep_delayed_15min  
0                 N  
1                 N  
2                 N  
3                 N  
4                 Y  

In [5]: labels = df.dep_delayed_15min == 'Y'

In [6]: del df['dep_delayed_15min']

In [7]: df = df.categorize()

In [8]: df = dd.get_dummies(df)

In [9]: data_train, data_test = df.random_split([0.9, 0.1], random_state=123)

In [10]: labels_train, labels_test = labels.random_split([0.9, 0.1], random_state=123)

In [11]: from dask.distributed import Client

In [12]: client = Client()  # in a large-data situation I probably should have done this before calling categorize above (which requires computation)

In [13]: param = {}  # Are there better choices for parameters?

In [14]: bst = dxgb.train(client, {}, data_train, labels_train)
[14:00:46] src/tree/updater_prune.cc:74: tree pruning end, 1 roots, 120 extra nodes, 0 pruned nodes, max_depth=6
[14:00:48] src/tree/updater_prune.cc:74: tree pruning end, 1 roots, 120 extra nodes, 0 pruned nodes, max_depth=6
[14:00:50] src/tree/updater_prune.cc:74: tree pruning end, 1 roots, 122 extra nodes, 0 pruned nodes, max_depth=6
[14:00:53] src/tree/updater_prune.cc:74: tree pruning end, 1 roots, 118 extra nodes, 0 pruned nodes, max_depth=6
[14:00:55] src/tree/updater_prune.cc:74: tree pruning end, 1 roots, 120 extra nodes, 0 pruned nodes, max_depth=6
[14:00:57] src/tree/updater_prune.cc:74: tree pruning end, 1 roots, 114 extra nodes, 0 pruned nodes, max_depth=6
[14:00:59] src/tree/updater_prune.cc:74: tree pruning end, 1 roots, 118 extra nodes, 0 pruned nodes, max_depth=6
[14:01:01] src/tree/updater_prune.cc:74: tree pruning end, 1 roots, 118 extra nodes, 0 pruned nodes, max_depth=6
[14:01:04] src/tree/updater_prune.cc:74: tree pruning end, 1 roots, 94 extra nodes, 0 pruned nodes, max_depth=6
[14:01:06] src/tree/updater_prune.cc:74: tree pruning end, 1 roots, 102 extra nodes, 0 pruned nodes, max_depth=6

In [15]: bst
Out[15]: <xgboost.core.Booster at 0x7f689803af60>

In [16]: predictions = dxgb.predict(client, bst, data_test)

In [17]: predictions
Out[17]: 
Dask Series Structure:
npartitions=1
None    float32
None        ...
Name: predictions, dtype: float32
Dask Name: _predict_part, 9 tasks
@mrocklin

This comment has been minimized.

Copy link
Author

commented Feb 20, 2017

My short term objective is to write a short blogpost about this so that hopefully someone else with more experience with XGBoost and with more time comes along to adopt this project and push it forward. (I, like everyone else here, am working on a few other projects like this at the same time.)

I am partial to the airlines dataset just because I already have it up in an S3 bucket. I agree though that the Criteo dataset would make for a better demonstration at scale.

I'm still not sure what parameters to use or how to judge the result. For parameters I can use the experiment from @szilard here. Is there a good way to judge the predictions? For example are we looking for predictions > 0.5 to match labels_test?

@szilard

This comment has been minimized.

Copy link
Contributor

commented Feb 20, 2017

Perhaps the most common way to evaluate predictive performance for binary classification (especially in research or competition settings) is to use area under the ROC curve (AUC), though in real world applications one should use metrics that are aligned with the "business" values produced by using the models.

@ogrisel

This comment has been minimized.

Copy link

commented Feb 20, 2017

For example are we looking for predictions > 0.5 to match labels_test?

Yes. If you take the mean of that on the test set, this is the test accuracy. But it's likely that the dataset is imbalanced (much more absence of click than clicks). In that case the ROC AUC score is a better metric.

from sklearn.metrics import roc_auc_score
print(roc_auc_score(labels_test, predictions))

assuming predictions is a 1D array of positive probabilities estimated by the model for each row in the test set.

@tqchen

This comment has been minimized.

Copy link
Member

commented Feb 20, 2017

@mrocklin One followup question, does dask allow multi-threaded worker jobs? I know this is not very relevant to python due to GIL. But xgboost can allow multi-threaded training per worker while still coordinate with each other distributedly. We should always set nthread arguments of xgboost to be the number of working cores of that worker

@mrocklin

This comment has been minimized.

Copy link
Author

commented Feb 20, 2017

Short answer is "yes". Most use of Dask is with projects like NumPy, Pandas, SKLearn and others that are mostly just C and Fortran code, wrapped with Python. The GIL doesn't affect these libraries. Some people do use Dask for similar applications to the PySpark RDD (see dask.bag) and will be affected. This group is in the minority though.

So yes, Dask allows multi-threaded tasks. How do we tell XGBoost to use multiple threads? In my experiments so far I see high CPU use without changing any parameters, so maybe everything works well by default?

@tqchen

This comment has been minimized.

Copy link
Member

commented Feb 20, 2017

XGBoost uses multi-threaded by default, and will use all the available cpu threads on the machine(instead of on that worker) if nthread is not set. This can create race condition when multiple workers are assigned to same machine.

So it is always good to set nthread parameter to the maximum number of cores the worker allowed to use. Usually a good practice is use around say 4 thread per worker

@mrocklin

This comment has been minimized.

Copy link
Author

commented Feb 21, 2017

@mrocklin

This comment has been minimized.

Copy link
Author

commented Feb 21, 2017

Notebook: https://gist.github.com/19c89d78e34437e061876a9872f4d2df
Short screencast (six minutes): https://youtu.be/Cc4E-PdDSro

Critical feedback is very welcome. Again, please forgive my ignorance in this field.

@ogrisel

This comment has been minimized.

Copy link

commented Feb 21, 2017

@mrocklin great demo! I think the runtime performance (and possibly the memory usage) could be greatly improved by using 'tree_method': 'hist', 'grow_policy': 'lossguide' in the param dict.

@mrocklin

This comment has been minimized.

Copy link
Author

commented Feb 21, 2017

Thanks @ogrisel. With those parameters training time goes from six minutes to one minute. Memory usage seems to stay about the same though.

@mrocklin

This comment has been minimized.

Copy link
Author

commented Feb 27, 2017

OK, coming back to this. Are there any XGBoost operations other than train and predict that we should implement?

@tqchen or @ogrisel if either of you have the time to look through the implementation at https://github.com/mrocklin/dask-xgboost/blob/master/dask_xgboost/core.py I would be grateful. I understand though that looking through a foreign codebase is not always high on priority lists.

If everything is ok then I'll add a bit more to the README, publish to PyPI, and then we can probably close this issue.

@tqchen

This comment has been minimized.

Copy link
Member

commented Feb 27, 2017

I think only train and predict need to be distributed. Other things do not have to be distributed since they do not reply on the dataset

@mrocklin

This comment has been minimized.

Copy link
Author

commented Feb 27, 2017

I've pushed dask-xgboost to PyPI and moved it to https://github.com/dask/dask-xgboost

Thank you @tqchen and @ogrisel for your help here. The collaboration made this relatively easy.

I would be happy to help people if they wanted to run benchmarks. Until then, closing.

@mrocklin mrocklin closed this Feb 27, 2017

@lock lock bot locked as resolved and limited conversation to collaborators Oct 26, 2018

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
You can’t perform that action at this time.