Skip to content

Submitting multiple dask.xgboost calls causes fewer worker utilization #5644

@rudra0713

Description

@rudra0713

I have created a random classification dataset with 100,000 rows and 30 columns and I am training the distributed xgboost on this dataset. My system has 8 workers, therefore I created 8 partitions of my dataset. While running dask.xgboost only once on this dataset, each of the 8 workers get one part of the DaskDmatrix (total it has 8 parts). But when I submit multiple dask.xgboost calls, then the 8 partitions get randomly divided among a subset of the workers. Therefore, only those workers end up utilizing CPU and runtime becomes very high.

Here is a reproducible example,

from dask.distributed import Client
import xgboost as xgb
from sklearn.datasets import make_classification
import dask.array as da
from dask.distributed import get_client
import dask

n_samples, n_features, num_test_samples = 100000, 30, 100
dask.config.set({'distributed.worker.daemon': False})

def invoke_dis_xgboost(X, y, number_of_classes, number_of_estimators):
  client_xgboost = get_client()
  dtrain = xgb.dask.DaskDMatrix(client_xgboost, X, y)
  xgb_params = {
                  'n_estimators': number_of_estimators,
                  'num_class': number_of_classes,
                  ## all other xgb parameter
              }
output = xgb.dask.train(client_xgboost, xgb_params, dtrain, num_boost_round=100)
return


def main(client):
    print(f'n_samples={n_samples}, n_features={n_features}')
    X_local, y_local = make_classification(n_samples=n_samples, n_features=n_features, random_state=12345)
    number_of_classes = len(set(y_local))
    X = da.from_array(X_local, chunks=(n_samples//8,n_features), name='train_feature')
    y = da.from_array(y_local, chunks=(n_samples//8,), name='train_label')

    futures = []
    results = []
    
    for i in range(100, 105):
        f1 = client.submit(invoke_dis_xgboost, X, y, number_of_classes, i)
        futures.append(f1)
    for i, f in enumerate(futures):
        results.append(f.result())

return


if __name__ == '__main__':
    client = Client('127.0.0.1:8786')
    main(client)

Looking at this part of the source code of xgboost.dask.train

key_to_partition = {part.key: part for part in parts}
who_has = await client.scheduler.who_has(keys=[part.key for part in parts])
worker_map: Dict[str, "distributed.Future"] = defaultdict(list)
for key, workers in who_has.items():
     worker_map[next(iter(workers))].append(key_to_partition[key])

I was hoping to see each part will be distributed to one worker only. However, I am getting the following output,

('tuple-4d2a6701-77c1-41f6-9566-b5d360cde213', ('tcp://127.0.0.1:43719',), 
('tuple-f301e12c-b53d-4520-a764-383d3a5e1785', ('tcp://127.0.0.1:46133',), 
('tuple-4f5d4be3-4e51-4b49-87ea-ef4bf2460f21', ('tcp://127.0.0.1:32889',), 
('tuple-2ad89ca9-2e8a-4486-bfd4-5e36eed92576', ('tcp://127.0.0.1:32889',), 
('tuple-5856e611-f229-4c41-bf49-a39e4d3c6f9a', ('tcp://127.0.0.1:46133',), 
('tuple-d176cd30-4049-40c2-927e-49d06088dc8b', ('tcp://127.0.0.1:46133',), 
('tuple-aba3b2a6-d245-44a9-b3ec-9231b6637c88', ('tcp://127.0.0.1:43719',), 
('tuple-f82b4ee7-47f4-4ccf-a431-efe5b930bd34', ('tcp://127.0.0.1:32889',)

Since there are only 3 unique workers getting all the parititions, number of dispatched_train is 3, therefore only 3 workers are utilizing CPU according to the dask dashboard.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions