Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[doc][dask] Update notes about k8s. #10271

Merged
merged 3 commits into from
May 13, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 55 additions & 17 deletions doc/tutorials/dask.rst
Original file line number Diff line number Diff line change
Expand Up @@ -237,53 +237,91 @@ For most of the use cases with GPUs, the `Dask-CUDA <https://docs.rapids.ai/api/
Working with other clusters
***************************

Using Dask's ``LocalCluster`` is convenient for getting started quickly on a single-machine. Once you're ready to scale your work, though, there are a number of ways to deploy Dask on a distributed cluster. You can use `Dask-CUDA <https://docs.rapids.ai/api/dask-cuda/stable/quickstart.html>`_, for example, for GPUs and you can use Dask Cloud Provider to `deploy Dask clusters in the cloud <https://docs.dask.org/en/stable/deploying.html#cloud>`_. See the `Dask documentation for a more comprehensive list <https://docs.dask.org/en/stable/deploying.html#distributed-computing>`_.
Using Dask's ``LocalCluster`` is convenient for getting started quickly on a local machine. Once you're ready to scale your work, though, there are a number of ways to deploy Dask on a distributed cluster. You can use `Dask-CUDA <https://docs.rapids.ai/api/dask-cuda/stable/quickstart.html>`_, for example, for GPUs and you can use Dask Cloud Provider to `deploy Dask clusters in the cloud <https://docs.dask.org/en/stable/deploying.html#cloud>`_. See the `Dask documentation for a more comprehensive list <https://docs.dask.org/en/stable/deploying.html#distributed-computing>`_.

In the example below, a ``KubeCluster`` is used for `deploying Dask on Kubernetes <https://docs.dask.org/en/stable/deploying-kubernetes.html>`_:

.. code-block:: python

from dask_kubernetes import KubeCluster # Need to install the ``dask-kubernetes`` package
from dask_kubernetes.operator import KubeCluster # Need to install the ``dask-kubernetes`` package
from dask_kubernetes.operator.kubecluster.kubecluster import CreateMode

from dask.distributed import Client
from xgboost import dask as dxgb
import dask
import dask.array as da

dask.config.set({"kubernetes.scheduler-service-type": "LoadBalancer",
"kubernetes.scheduler-service-wait-timeout": 360,
"distributed.comm.timeouts.connect": 360})


def main():
'''Connect to a remote kube cluster with GPU nodes and run training on it.'''
'''Connect to a remote kube cluster with GPU nodes and run training on it.'''
m = 1000
n = 10
kWorkers = 2 # assuming you have 2 GPU nodes on that cluster.
# You need to work out the worker-spec yourself. See document in dask_kubernetes for
# its usage. Here we just want to show that XGBoost works on various clusters.
cluster = KubeCluster.from_yaml('worker-spec.yaml', deploy_mode='remote')
cluster.scale(kWorkers) # scale to use all GPUs

with Client(cluster) as client:
X = da.random.random(size=(m, n), chunks=100)
y = da.random.random(size=(m, ), chunks=100)
# See notes below for why we use pre-allocated cluster.
with KubeCluster(
name="xgboost-test",
image="my-image-name:latest",
n_workers=kWorkers,
create_mode=CreateMode.CONNECT_ONLY,
shutdown_on_close=False,
) as cluster:
with Client(cluster) as client:
X = da.random.random(size=(m, n), chunks=100)
y = X.sum(axis=1)

regressor = dxgb.DaskXGBRegressor(n_estimators=10, missing=0.0)
regressor.client = client
regressor.set_params(tree_method='hist', device="cuda")
regressor.fit(X, y, eval_set=[(X, y)])
regressor = dxgb.DaskXGBRegressor(n_estimators=10, missing=0.0)
regressor.client = client
regressor.set_params(tree_method='hist', device="cuda")
regressor.fit(X, y, eval_set=[(X, y)])


if __name__ == '__main__':
# Launch the kube cluster on somewhere like GKE, then run this as client process.
# main function will connect to that cluster and start training xgboost model.
main()


Different cluster classes might have subtle differences like network configuration, or
specific cluster implementation might contains bugs that we are not aware of. Open an
issue if such case is found and there's no documentation on how to resolve it in that
cluster implementation.

An interesting aspect of the Kubernetes cluster is that the pods may become available
after the Dask workflow has begun, which can cause issues with distributed XGBoost since
XGBoost expects the nodes used by input data to remain unchanged during training. To use
Kubernetes clusters, it is necessary to wait for all the pods to be online before
submitting XGBoost tasks. One can either create a wait function in Python or simply
pre-allocate a cluster with k8s tools (like ``kubectl``) before running dask workflows. To
pre-allocate a cluster, we can first generate the cluster spec using dask kubernetes:

.. code-block:: python

import json

from dask_kubernetes.operator import make_cluster_spec

spec = make_cluster_spec(name="xgboost-test", image="my-image-name:latest", n_workers=16)
with open("cluster-spec.json", "w") as fd:
json.dump(spec, fd, indent=2)

.. code-block:: sh

kubectl apply -f ./cluster-spec.json


Check whether the pods are available:

.. code-block:: sh

kubectl get pods

Once all pods have been initialized, the Dask XGBoost workflow can be run, as in the
previous example. It is important to ensure that the cluster sets the parameter
``create_mode=CreateMode.CONNECT_ONLY`` and optionally ``shutdown_on_close=False`` if you
do not want to shut down the cluster after a single job.

*******
Threads
*******
Expand Down
Loading