Skip to content

Commit

Permalink
Consolidates Dask content in Dask integration page (#14177)
Browse files Browse the repository at this point in the history
  • Loading branch information
billpalombi committed Jun 20, 2024
1 parent 0c6e754 commit 30c00ba
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 323 deletions.
298 changes: 1 addition & 297 deletions docs/3.0rc/develop/write-tasks/dask-and-ray.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ title: Run tasks in parallel on Dask or Ray
description: Learn how to use the Prefect Dask and Ray task runners for parallel or distributed task execution.
---

### Parallel task execution
## Parallel task execution

Many real-world data workflows benefit from truly parallel, distributed task execution.
Use Dask or Ray in your flows to choose the execution environment that fits your particular needs.
Expand All @@ -15,302 +15,6 @@ Use Dask or Ray in your flows to choose the execution environment that fits your
These task runners can spin up a local Dask cluster or Ray instance on the fly, or let you connect with a Dask or Ray environment you've
set up separately. This enables you to take advantage of massively parallel computing environments.

## Run tasks on Dask

The [`DaskTaskRunner`](/integrations/prefect-dask/index) is a parallel task runner that submits tasks to the
[`dask.distributed`](http://distributed.dask.org/) scheduler. To use the `DaskTaskRunner`:

1. Make sure the `prefect-dask` collection is installed: `pip install prefect-dask`.
2. In your flow code, import `DaskTaskRunner` from `prefect_dask.task_runners`.
3. Assign it as the task runner when you define the flow with the `task_runner=DaskTaskRunner` argument.

By default, a temporary Dask cluster is created for the duration of the flow run.

If you already have a Dask cluster running, either cloud-hosted or local, you can provide the connection URL
with the `address` kwarg. For example, this flow uses the `DaskTaskRunner` configured to access an existing
Dask cluster at `http://my-dask-cluster`:

```python hl_lines="4"
from prefect import flow
from prefect_dask.task_runners import DaskTaskRunner

@flow(task_runner=DaskTaskRunner(address="http://my-dask-cluster"))
def my_flow():
...
```

`DaskTaskRunner` accepts the following optional parameters:

| Parameter | Description |
| --- | --- |
| address | Address of a currently running Dask scheduler. |
| cluster_class | The cluster class to use when creating a temporary Dask cluster. It can be either the full class name (for example, `"distributed.LocalCluster"`), or the class itself. |
| cluster_kwargs | Additional kwargs to pass to the `cluster_class` when creating a temporary Dask cluster. |
| adapt_kwargs | Additional kwargs to pass to `cluster.adapt` when creating a temporary Dask cluster. Note that adaptive scaling is only enabled if `adapt_kwargs` are provided. |
| client_kwargs | Additional kwargs to use when creating a [`dask.distributed.Client`](https://distributed.dask.org/en/latest/api.html#client). |

<Warning>
**Multiprocessing safety**

Because the `DaskTaskRunner` uses multiprocessing, calls to flows
in scripts must be guarded with `if __name__ == "__main__":` or you will encounter
warnings and errors.
</Warning>

If you don't provide the `address` of a Dask scheduler, Prefect creates a temporary local cluster automatically. The number of workers
used is based on the number of cores on your machine. The default provides a mix of processes and threads that work well for
most workloads. To specify this explicitly, pass values for `n_workers` or `threads_per_worker` to `cluster_kwargs`:

```python
# Use 4 worker processes, each with 2 threads
DaskTaskRunner(
cluster_kwargs={"n_workers": 4, "threads_per_worker": 2}
)
```

### Use a temporary cluster

The `DaskTaskRunner` can create a temporary cluster using any of
[Dask's cluster-manager options](https://docs.dask.org/en/latest/setup.html). This is useful when you want each flow run to have its own Dask cluster, allowing for per-flow adaptive scaling.

To configure it, provide a `cluster_class`. This can be:

- A string specifying the import path to the cluster class (for example, `"dask_cloudprovider.aws.FargateCluster"`)
- The cluster class itself
- A function for creating a custom cluster

You can also configure `cluster_kwargs`. This takes a dictionary of keyword arguments to pass to `cluster_class` when starting the flow run.

For example, to configure a flow to use a temporary `dask_cloudprovider.aws.FargateCluster` with four workers running with an image named
`my-prefect-image`:

```python
DaskTaskRunner(
cluster_class="dask_cloudprovider.aws.FargateCluster",
cluster_kwargs={"n_workers": 4, "image": "my-prefect-image"},
)
```

### Connect to an existing cluster

Multiple Prefect flow runs can all use the same existing Dask cluster. You
might manage a single long-running Dask cluster (for example, using the Dask
[Helm Chart](https://docs.dask.org/en/latest/setup/kubernetes-helm.html)) and
configure flows to connect to it during execution. This has a couple downsides
when compared to using a temporary cluster:

- All workers in the cluster must have dependencies installed for all flows you intend to run.
- Multiple flow runs may compete for resources. Dask tries to do a good job
sharing resources between tasks, but you may still run into issues.

That said, you may prefer managing a single long-running cluster.

To configure a `DaskTaskRunner` to connect to an existing cluster, pass in the address of the
scheduler to the `address` argument:

```python
# Connect to an existing cluster running at a specified address
DaskTaskRunner(address="tcp://...")
```

### Configure adaptive scaling

A key feature of using a `DaskTaskRunner` is the ability to scale adaptively
to the workload. Instead of specifying `n_workers` as a fixed number,
you can specify a minimum and maximum number of workers to use, and the Dask
cluster scales up and down as needed.

To do this, pass `adapt_kwargs` to `DaskTaskRunner`. This takes the
following fields:

- `maximum` (`int` or `None`, optional): the maximum number of workers to scale to. Set to `None` for no maximum.
- `minimum` (`int` or `None`, optional): the minimum number of workers to scale to. Set to `None` for no minimum.

For example, this configures a flow to run on a `FargateCluster` scaling up
to a maximum of 10 workers:

```python
DaskTaskRunner(
cluster_class="dask_cloudprovider.aws.FargateCluster",
adapt_kwargs={"maximum": 10}
)
```

### Use Dask annotations

Use Dask annotations to further control the behavior of tasks.

For example, set the [priority](http://distributed.dask.org/en/stable/priority.html) of tasks in the Dask scheduler:

```python
import dask
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner

@task
def show(x):
print(x)


@flow(task_runner=DaskTaskRunner())
def my_flow():
with dask.annotate(priority=-10):
future = show.submit(1) # low priority task

with dask.annotate(priority=10):
future = show.submit(2) # high priority task
```

Another common use case is [resource](http://distributed.dask.org/en/stable/resources.html) annotations:

```python
import dask
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner

@task
def show(x):
print(x)

# Create a `LocalCluster` with some resource annotations
# Annotations are abstract in dask and not inferred from your system.
# Here, we claim that our system has 1 GPU and 1 process available per worker
@flow(
task_runner=DaskTaskRunner(
cluster_kwargs={"n_workers": 1, "resources": {"GPU": 1, "process": 1}}
)
)

def my_flow():
with dask.annotate(resources={'GPU': 1}):
future = show(0) # this task requires 1 GPU resource on a worker

with dask.annotate(resources={'process': 1}):
# These tasks each require 1 process on a worker; because we've
# specified that our cluster has 1 process per worker and 1 worker,
# these tasks will run sequentially
future = show(1)
future = show(2)
future = show(3)


if __name__ == "__main__":
my_flow()
```

### Run parallel tasks with Dask

This simple flow doesn't gain much from parallel execution, but it illustrates how simple it is to take advantage of the
[`DaskTaskRunner`](https://prefecthq.github.io/prefect-dask/).

Configure your flow to use the `DaskTaskRunner`:

1. Ensure the `prefect-dask` collection is installed by running `pip install prefect-dask`.
2. In your flow code, import `DaskTaskRunner` from `prefect_dask.task_runners`.
3. Assign it as the task runner when defining the flow using the `task_runner=DaskTaskRunner` argument.
4. Use the `.submit` method when calling task-decorated functions.

Install `prefect-dask`, make these changes, then save the updated code as `dask_flow.py`.

```python hl_lines="2 12 18"
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner


@task
def say_hello(name):
print(f"hello {name}")


@task
def say_goodbye(name):
print(f"goodbye {name}")


@flow(task_runner=DaskTaskRunner())
def greetings(names):
for name in names:
say_hello.submit(name)
say_goodbye.submit(name)


if __name__ == "__main__":
greetings(["arthur", "trillian", "ford", "marvin"])
```

Note that, because you're using `DaskTaskRunner` in a script, you must use `if __name__ == "__main__":` or you'll see warnings and errors.

Run `dask_flow.py`. If you get a warning about accepting incoming network connections, that's okay - everything is local in this example.

```bash
python dask_flow.py
```

`DaskTaskRunner` automatically creates a local Dask cluster, then starts executing all of the task runs in parallel.
The results do not return in the same order as the sequential code above.

Abbreviated output:

```bash
goodbye marvin
hello arthur
goodbye ford
hello trillian
```

Notice what happens if you do not use the `submit` method when calling tasks:

```python
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner


@task
def say_hello(name):
print(f"hello {name}")


@task
def say_goodbye(name):
print(f"goodbye {name}")


@flow(task_runner=DaskTaskRunner())
def greetings(names):
for name in names:
say_hello(name)
say_goodbye(name)


if __name__ == "__main__":
greetings(["arthur", "trillian", "ford", "marvin"])
```

Run the script:

<div class="terminal">
```bash
python dask_flow.py
```
</div>

Once again, the tasks run sequentially. Here's the output with logs removed.

<div class="terminal">
```bash
hello arthur
goodbye arthur
hello trillian
goodbye trillian
hello ford
goodbye ford
hello marvin
goodbye marvin
```
</div>

The task runs are not submitted to the `DaskTaskRunner`; instead, they run sequentially.

## Use multiple task runners

Many workflows include a variety of tasks, and not all of them benefit from parallel execution.
Expand Down

0 comments on commit 30c00ba

Please sign in to comment.