Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
1 contributor

Users who have contributed to this file

43 lines (34 sloc) 1.38 KB
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd
import dask.array as da
import numpy as np
import xgboost as xgb
import GPUtil
import time
# Define the function to be executed on each worker
def train(X, y, available_devices):
dtrain = xgb.dask.create_worker_dmatrix(X, y)
local_device = available_devices[xgb.rabit.get_rank()]
# Specify the GPU algorithm and device for this worker
params = {"tree_method": "gpu_hist", "gpu_id": local_device}
print("Worker {} starting training on {} rows".format(xgb.rabit.get_rank(), dtrain.num_row()))
start = time.time()
xgb.train(params, dtrain, num_boost_round=500)
end = time.time()
print("Worker {} finished training in {:0.2f}s".format(xgb.rabit.get_rank(), end - start))
def main():
max_devices = 16
# Check which devices we have locally
available_devices = GPUtil.getAvailable(limit=max_devices)
# Use one worker per device
cluster = LocalCluster(n_workers=len(available_devices), threads_per_worker=4)
client = Client(cluster)
# Set up a relatively large regression problem
n = 100
m = 10000000
partition_size = 100000
X = da.random.random((m, n), partition_size)
y = da.random.random(m, partition_size)
xgb.dask.run(client, train, X, y, available_devices)
if __name__ == '__main__':
main()
You can’t perform that action at this time.