Skip to content

Commit

Permalink
Dask plugin docs (#929)
Browse files Browse the repository at this point in the history
* Add documentation for `dask` integration

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Fix typos in `pyspark_pi.py`

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Fix typo in `dask_example.py`

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Fix typos

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Update docs after `flyteidl` change

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Add documentation on interruptible behavior

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Address PR feedback

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Small fixes

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Add requirement files for dask

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Add git to Dockerfile

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Add note about local execution

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* minor typos

Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com>

Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com>

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>
Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com>
Co-authored-by: Niels Bantilan <niels.bantilan@gmail.com>
  • Loading branch information
bstadlbauer and cosmicBboy committed Jan 23, 2023
1 parent 9a1b05a commit 2aefe89
Show file tree
Hide file tree
Showing 11 changed files with 838 additions and 2 deletions.
3 changes: 3 additions & 0 deletions cookbook/docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ class CustomSorter(FileNameSortKey):
"sql_alchemy.py",
"whylogs_example.py",
## Kubernetes
"dask.py",
"pod.py",
"pyspark_pi.py",
"dataframe_passing.py",
Expand Down Expand Up @@ -296,6 +297,7 @@ def __call__(self, filename):
"../integrations/flytekit_plugins/whylogs_examples",
"../integrations/flytekit_plugins/onnx_examples",
"../integrations/kubernetes/pod",
"../integrations/kubernetes/k8s_dask",
"../integrations/kubernetes/k8s_spark",
"../integrations/kubernetes/kftensorflow",
"../integrations/kubernetes/kfpytorch",
Expand Down Expand Up @@ -338,6 +340,7 @@ def __call__(self, filename):
"auto/integrations/flytekit_plugins/whylogs_examples",
"auto/integrations/flytekit_plugins/onnx_examples",
"auto/integrations/kubernetes/pod",
"auto/integrations/kubernetes/k8s_dask",
"auto/integrations/kubernetes/k8s_spark",
"auto/integrations/kubernetes/kftensorflow",
"auto/integrations/kubernetes/kfpytorch",
Expand Down
9 changes: 9 additions & 0 deletions cookbook/docs/integrations.rst
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,15 @@ orchestrated by Flyte itself, within its provisioned Kubernetes clusters.

---

.. link-button:: auto/integrations/kubernetes/k8s_dask/index
:type: ref
:text: K8s Cluster Dask Jobs
:classes: btn-block stretched-link
^^^^^^^^^^^^
Run Dask jobs on a K8s Cluster.

---

.. link-button:: auto/integrations/kubernetes/k8s_spark/index
:type: ref
:text: K8s Cluster Spark Jobs
Expand Down
59 changes: 59 additions & 0 deletions cookbook/integrations/kubernetes/k8s_dask/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
FROM ubuntu:focal
LABEL org.opencontainers.image.source https://github.com/flyteorg/flytesnacks

WORKDIR /root
ENV VENV /opt/venv
ENV LANG C.UTF-8
ENV LC_ALL C.UTF-8
ENV PYTHONPATH /root
ENV DEBIAN_FRONTEND=noninteractive

# Install Python3 and other basics
RUN apt-get update \
&& apt-get install -y \
build-essential \
curl \
git \
libssl-dev \
make \
python3-pip \
python3.8 \
python3.8-venv \
&& rm -rf /var/lib/apt/lists/* \
&& :

# Install AWS CLI to run on AWS (for GCS install GSutil). This will be removed
# in future versions to make it completely portable
RUN pip3 install awscli

WORKDIR /opt
RUN curl https://sdk.cloud.google.com > install.sh
RUN bash /opt/install.sh --install-dir=/opt
ENV PATH $PATH:/opt/google-cloud-sdk/bin
WORKDIR /root

ENV VENV /opt/venv
# Virtual environment
RUN python3 -m venv ${VENV}
ENV PATH="${VENV}/bin:$PATH"

RUN pip3 install wheel

# Install Python dependencies
COPY k8s_dask/requirements.txt /root
RUN pip install -r /root/requirements.txt

# Copy the makefile targets to expose on the container. This makes it easier to register.
# Delete this after we update CI
COPY in_container.mk /root/Makefile

# Delete this after we update CI to not serialize inside the container
COPY k8s_dask/sandbox.config /root

# Copy the actual code
COPY k8s_dask/ /root/k8s_dask

# This tag is supplied by the build script and will be used to determine the version
# when registering tasks, workflows, and launch plans
ARG tag
ENV FLYTE_INTERNAL_IMAGE $tag
3 changes: 3 additions & 0 deletions cookbook/integrations/kubernetes/k8s_dask/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
PREFIX=k8s_dask
include ../../../common/common.mk
include ../../../common/leaf.mk
212 changes: 212 additions & 0 deletions cookbook/integrations/kubernetes/k8s_dask/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
.. _plugins-dask-k8s:

Kubernetes Dask Jobs
=====================

.. tags:: Dask, Integration, DistributedComputing, Data, Advanced

Flyte can execute dask jobs natively on a Kubernetes Cluster, which manages a virtual ``dask`` cluster's lifecycle. To
do so, it leverages the open-sourced `Dask Kubernetes Operator <https://kubernetes.dask.org/en/latest/operator.html>`__
and can be enabled without signing up for any service. This is like running an ephemeral ``dask`` cluster, which gets
created for the specific Flyte task and gets torn down after completion.

In Flyte/K8s, the cost is amortized because pods are faster to create than a machine, but the penalty of downloading
Docker images may affect the performance. Also, remember that starting a pod is not as fast as running a process.

Flytekit makes it possible to write ``dask`` code natively as a task and the ``dask`` cluster will be automatically
configured using the decorated ``Dask()`` config. The examples in this section provide a hands-on tutorial for writing
``dask`` Flyte tasks.

The plugin has been tested against the ``2022.12.0`` version of the ``dask-kubernetes-operator``.


Why use K8s dask?
-----------------

Managing Python dependencies is hard. Flyte makes it easy to version and manage dependencies using containers. The
K8s ``dask`` plugin brings all the benefits of containerization to ``dask`` without needing to manage special ``dask``
clusters.

**Pros:**

#. Extremely easy to get started; get complete isolation between workloads
#. Every job runs in isolation and has its own virtual cluster - no more nightmarish dependency management!
#. Flyte manages everything for you!

**Cons:**

#. Short running, bursty jobs are not a great fit because of the container overhead
#. No interactive Dask capabilities are available with Flyte K8s dask, which is more suited for running adhoc and
scheduled jobs.


Step 1: Deploy the Dask Plugin in the Flyte Backend
---------------------------------------------------

Flyte dask uses the `Dask Kubernetes Operator <https://kubernetes.dask.org/en/latest/operator.html>`__ and a custom
built `Flyte Dask Plugin <https://pkg.go.dev/github.com/flyteorg/flyteplugins@v1.0.28/go/tasks/plugins/k8s/dask>`__.
This is a backend plugin which has to be enabled in your deployment; you can follow the steps mentioned in the
:ref:`flyte:deployment-plugin-setup-k8s` section.


Step 2: Environment setup
-------------------------

#. Install ``flytekitplugins-dask`` using ``pip`` in your environment.

.. code-block:: bash
pip install flytekitplugins-dask
#. Ensure you have enough resources on your K8s cluster. Based on the resources required for your ``dask`` job (across job runner, scheduler and workers), you may have to tweak resource quotas for the namespace.


Implementation details
----------------------

Local execution
^^^^^^^^^^^^^^^

When running the ``dask`` task locally, it will use a local `distributed Client
<https://distributed.dask.org/en/stable/client.html>`__. In case you would like to connect the to a remote cluster for
when developing locally, you can set the ``DASK_SCHEDULER_ADDRESS`` environment variable to the URL of the remote
scheduler and the ``Client()`` will use the cluster automatically.

Resource specification
^^^^^^^^^^^^^^^^^^^^^^

It is advised to set ``limits`` as this will set the ``--nthreads`` and ``--memory-limit`` arguments for the workers
as recommended by ``dask`` `best practices <https://kubernetes.dask.org/en/latest/kubecluster.html?highlight=--nthreads#best-practices>`__.
When specifying resources, the following precedence is followed for all components of the ``dask`` job (job-runner pod,
scheduler pod and worker pods):

#. If no resources are specified, the `platform resources <https://github.com/flyteorg/flyte/blob/1e3d515550cb338c2edb3919d79c6fa1f0da5a19/charts/flyte-core/values.yaml#L520-L531>`__ are used
#. When ``task`` resources are used, those will be applied to all components of the ``dask`` job

.. code-block:: python
from flytekit import Resources, task
from flytekitplugins.dask import Dask
@task(
task_config=Dask(),
limits=Resources(cpu="1", mem="10Gi") # Will be applied to all components
)
def my_dask_task():
...
#. When resources are specified for the single components, they take the highest precedence

.. code-block:: python
from flytekit import Resources, task
from flytekitplugins.dask import Dask, Scheduler, WorkerGroup
@task(
task_config=Dask(
scheduler=Scheduler(
limits=Resources(cpu="1", mem="2Gi"), # Will be applied to the job pod
),
workers=WorkerGroup(
limits=Resources(cpu="4", mem="10Gi"), # Will be applied to the scheduler and worker pods
),
),
)
def my_dask_task():
...
Images
^^^^^^
By default, all components of the deployed ``dask`` job (job runner pod, scheduler pod and worker pods) will all use the
the image that was used while registering (this image should have ``dask[distributed]`` installed in its Python
environment). This helps keeping the Python environments of all cluster components in sync.
However, there is the possibility to specify different images for the components. This allows for use cases such as using
different images between tasks of the same workflow. While it is possible to use different images for the different
components of the ``dask`` job, it is not advised, as this can quickly lead to Python environments getting our of sync.

.. code-block:: python
from flytekit import Resources, task
from flytekitplugins.dask import Dask, Scheduler, WorkerGroup
@task(
task_config=Dask(
scheduler=Scheduler(
image="my_image:0.1.0", # Will be used by the job pod
),
workers=WorkerGroup(
image="my_image:0.1.0", # Will be used by the scheduler and worker pods
),
),
)
def my_dask_task():
...
Environment Variables
^^^^^^^^^^^^^^^^^^^^^
Environment variables set in the ``@task`` decorator will be passed on to all ``dask`` job components (job runner pod,
scheduler pod and worker pods)

.. code-block:: python
from flytekit import Resources, task
from flytekitplugins.dask import Dask
@task(
task_config=Dask(),
env={"FOO": "BAR"} # Will be applied to all components
)
def my_dask_task():
...
Labels and Annotations
^^^^^^^^^^^^^^^^^^^^^^

Labels and annotations set in a ``LaunchPlan`` will be passed on to all ``dask`` job components (job runner pod,
scheduler pod and worker pods)

.. code-block:: python
from flytekit import Resources, task, workflow, Labels, Annotations
from flytekitplugins.dask import Dask
@task(task_config=Dask())
def my_dask_task():
...
@workflow
def my_dask_workflow():
my_dask_task()
# Labels and annotations will be passed on to all dask cluster components
my_launch_plan = my_dask_workflow.create_launch_plan(
labels=Labels({"myexecutionlabel": "bar", ...}),
annotations=Annotations({"region": "SEA", ...}),
)
Interruptible Tasks
^^^^^^^^^^^^^^^^^^^

The ``dask`` backend plugin supports running on interruptible nodes. When ``interruptible==True``, the plugin will add
the configured tolerations and node selectors to all worker pods. Please note that the job runner as well as the
scheduler will not be run on interruptible nodes.

.. code-block:: python
from flytekit import Resources, task, workflow, Labels, Annotations
from flytekitplugins.dask import Dask
@task(
task_config=Dask(),
interruptible=True,
)
def my_dask_task():
...
Code Examples
-------------
Empty file.

0 comments on commit 2aefe89

Please sign in to comment.