Skip to content

Commit

Permalink
[dock][dask] Unupdate notes about k8s.
Browse files Browse the repository at this point in the history
  • Loading branch information
trivialfis committed May 13, 2024
1 parent 8237920 commit 55c7e65
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 18 deletions.
72 changes: 55 additions & 17 deletions doc/tutorials/dask.rst
Original file line number Diff line number Diff line change
Expand Up @@ -237,53 +237,91 @@ For most of the use cases with GPUs, the `Dask-CUDA <https://docs.rapids.ai/api/
Working with other clusters
***************************

Using Dask's ``LocalCluster`` is convenient for getting started quickly on a single-machine. Once you're ready to scale your work, though, there are a number of ways to deploy Dask on a distributed cluster. You can use `Dask-CUDA <https://docs.rapids.ai/api/dask-cuda/stable/quickstart.html>`_, for example, for GPUs and you can use Dask Cloud Provider to `deploy Dask clusters in the cloud <https://docs.dask.org/en/stable/deploying.html#cloud>`_. See the `Dask documentation for a more comprehensive list <https://docs.dask.org/en/stable/deploying.html#distributed-computing>`_.
Using Dask's ``LocalCluster`` is convenient for getting started quickly on a local machine. Once you're ready to scale your work, though, there are a number of ways to deploy Dask on a distributed cluster. You can use `Dask-CUDA <https://docs.rapids.ai/api/dask-cuda/stable/quickstart.html>`_, for example, for GPUs and you can use Dask Cloud Provider to `deploy Dask clusters in the cloud <https://docs.dask.org/en/stable/deploying.html#cloud>`_. See the `Dask documentation for a more comprehensive list <https://docs.dask.org/en/stable/deploying.html#distributed-computing>`_.

In the example below, a ``KubeCluster`` is used for `deploying Dask on Kubernetes <https://docs.dask.org/en/stable/deploying-kubernetes.html>`_:

.. code-block:: python
from dask_kubernetes import KubeCluster # Need to install the ``dask-kubernetes`` package
from dask_kubernetes.operator import KubeCluster # Need to install the ``dask-kubernetes`` package
from dask_kubernetes.operator.kubecluster.kubecluster import CreateMode
from dask.distributed import Client
from xgboost import dask as dxgb
import dask
import dask.array as da
dask.config.set({"kubernetes.scheduler-service-type": "LoadBalancer",
"kubernetes.scheduler-service-wait-timeout": 360,
"distributed.comm.timeouts.connect": 360})
def main():
'''Connect to a remote kube cluster with GPU nodes and run training on it.'''
'''Connect to a remote kube cluster with GPU nodes and run training on it.'''
m = 1000
n = 10
kWorkers = 2 # assuming you have 2 GPU nodes on that cluster.
# You need to work out the worker-spec yourself. See document in dask_kubernetes for
# its usage. Here we just want to show that XGBoost works on various clusters.
cluster = KubeCluster.from_yaml('worker-spec.yaml', deploy_mode='remote')
cluster.scale(kWorkers) # scale to use all GPUs
with Client(cluster) as client:
X = da.random.random(size=(m, n), chunks=100)
y = da.random.random(size=(m, ), chunks=100)
# See notes below for why we use pre-allocated cluster.
with KubeCluster(
name="xgboost-test",
image="my-image-name:latest",
n_workers=kWorkers,
create_mode=CreateMode.CONNECT_ONLY,
shutdown_on_close=False,
) as cluster:
with Client(cluster) as client:
X = da.random.random(size=(m, n), chunks=100)
y = X.sum(axis=1)
regressor = dxgb.DaskXGBRegressor(n_estimators=10, missing=0.0)
regressor.client = client
regressor.set_params(tree_method='hist', device="cuda")
regressor.fit(X, y, eval_set=[(X, y)])
regressor = dxgb.DaskXGBRegressor(n_estimators=10, missing=0.0)
regressor.client = client
regressor.set_params(tree_method='hist', device="cuda")
regressor.fit(X, y, eval_set=[(X, y)])
if __name__ == '__main__':
# Launch the kube cluster on somewhere like GKE, then run this as client process.
# main function will connect to that cluster and start training xgboost model.
main()
Different cluster classes might have subtle differences like network configuration, or
specific cluster implementation might contains bugs that we are not aware of. Open an
issue if such case is found and there's no documentation on how to resolve it in that
cluster implementation.

An interesting aspect of the Kubernetes cluster is that the pods may become available
after the Dask workflow has begun, which can cause issues with distributed XGBoost since
XGBoost expects the nodes used by input data to remain unchanged during training. To use
Kubernetes clusters, it is necessary to wait for all the pods to be online before
submitting XGBoost tasks. One can either create a wait function in Python or simply
pre-allocate a cluster with k8s tools (like ``kubectl``) before running dask workflows. To
pre-allocate a cluster, we can first generate the cluster spec using dask kubernetes:

.. code-block:: python
import json
from dask_kubernetes.operator import make_cluster_spec
spec = make_cluster_spec(name="xgboost-test", image="my-image-name:latest", n_workers=16)
with open("cluster-spec.json", "w") as fd:
json.dump(spec, fd, indent=2)
.. code-block:: sh
kubectl apply -f ./cluster-spec.json
Check whether the pods are available:

.. code-block:: sh
kubectl get pods
Once all pods have been initialized, the Dask XGBoost workflow can be run, as in the
previous example. It is important to ensure that the cluster sets the parameter
``create_mode=CreateMode.CONNECT_ONLY`` and optionally ``shutdown_on_close=False`` if you
do not want to shut down the cluster after a single job.

*******
Threads
*******
Expand Down
2 changes: 1 addition & 1 deletion python-package/xgboost/dask/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def __init__(self, **args: Any) -> None:
# not the same as task ID is string and "10" is sorted before "2") with dask
# worker ID. This outsources the rank assignment to dask and prevents
# non-deterministic issue.
self.args["DMLC_TASK_ID"] = f"[xgboost.dask-{wid}]:" + str(worker.address)
self.args["DMLC_TASK_ID"] = f"[xgboost.dask-{worker.address}]"


def dconcat(value: Sequence[_T]) -> _T:
Expand Down

0 comments on commit 55c7e65

Please sign in to comment.