Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidates Dask content in Dask integration page #14177

Merged
merged 4 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
298 changes: 1 addition & 297 deletions docs/3.0rc/develop/write-tasks/dask-and-ray.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ title: Run tasks in parallel on Dask or Ray
description: Learn how to use the Prefect Dask and Ray task runners for parallel or distributed task execution.
---

### Parallel task execution
## Parallel task execution

Many real-world data workflows benefit from truly parallel, distributed task execution.
Use Dask or Ray in your flows to choose the execution environment that fits your particular needs.
Expand All @@ -15,302 +15,6 @@ Use Dask or Ray in your flows to choose the execution environment that fits your
These task runners can spin up a local Dask cluster or Ray instance on the fly, or let you connect with a Dask or Ray environment you've
set up separately. This enables you to take advantage of massively parallel computing environments.

## Run tasks on Dask

The [`DaskTaskRunner`](/integrations/prefect-dask/index) is a parallel task runner that submits tasks to the
[`dask.distributed`](http://distributed.dask.org/) scheduler. To use the `DaskTaskRunner`:

1. Make sure the `prefect-dask` collection is installed: `pip install prefect-dask`.
2. In your flow code, import `DaskTaskRunner` from `prefect_dask.task_runners`.
3. Assign it as the task runner when you define the flow with the `task_runner=DaskTaskRunner` argument.

By default, a temporary Dask cluster is created for the duration of the flow run.

If you already have a Dask cluster running, either cloud-hosted or local, you can provide the connection URL
with the `address` kwarg. For example, this flow uses the `DaskTaskRunner` configured to access an existing
Dask cluster at `http://my-dask-cluster`:

```python hl_lines="4"
from prefect import flow
from prefect_dask.task_runners import DaskTaskRunner

@flow(task_runner=DaskTaskRunner(address="http://my-dask-cluster"))
def my_flow():
...
```

`DaskTaskRunner` accepts the following optional parameters:

| Parameter | Description |
| --- | --- |
| address | Address of a currently running Dask scheduler. |
| cluster_class | The cluster class to use when creating a temporary Dask cluster. It can be either the full class name (for example, `"distributed.LocalCluster"`), or the class itself. |
| cluster_kwargs | Additional kwargs to pass to the `cluster_class` when creating a temporary Dask cluster. |
| adapt_kwargs | Additional kwargs to pass to `cluster.adapt` when creating a temporary Dask cluster. Note that adaptive scaling is only enabled if `adapt_kwargs` are provided. |
| client_kwargs | Additional kwargs to use when creating a [`dask.distributed.Client`](https://distributed.dask.org/en/latest/api.html#client). |

<Warning>
**Multiprocessing safety**

Because the `DaskTaskRunner` uses multiprocessing, calls to flows
in scripts must be guarded with `if __name__ == "__main__":` or you will encounter
warnings and errors.
</Warning>

If you don't provide the `address` of a Dask scheduler, Prefect creates a temporary local cluster automatically. The number of workers
used is based on the number of cores on your machine. The default provides a mix of processes and threads that work well for
most workloads. To specify this explicitly, pass values for `n_workers` or `threads_per_worker` to `cluster_kwargs`:

```python
# Use 4 worker processes, each with 2 threads
DaskTaskRunner(
cluster_kwargs={"n_workers": 4, "threads_per_worker": 2}
)
```

### Use a temporary cluster

The `DaskTaskRunner` can create a temporary cluster using any of
[Dask's cluster-manager options](https://docs.dask.org/en/latest/setup.html). This is useful when you want each flow run to have its own Dask cluster, allowing for per-flow adaptive scaling.

To configure it, provide a `cluster_class`. This can be:

- A string specifying the import path to the cluster class (for example, `"dask_cloudprovider.aws.FargateCluster"`)
- The cluster class itself
- A function for creating a custom cluster

You can also configure `cluster_kwargs`. This takes a dictionary of keyword arguments to pass to `cluster_class` when starting the flow run.

For example, to configure a flow to use a temporary `dask_cloudprovider.aws.FargateCluster` with four workers running with an image named
`my-prefect-image`:

```python
DaskTaskRunner(
cluster_class="dask_cloudprovider.aws.FargateCluster",
cluster_kwargs={"n_workers": 4, "image": "my-prefect-image"},
)
```

### Connect to an existing cluster

Multiple Prefect flow runs can all use the same existing Dask cluster. You
might manage a single long-running Dask cluster (for example, using the Dask
[Helm Chart](https://docs.dask.org/en/latest/setup/kubernetes-helm.html)) and
configure flows to connect to it during execution. This has a couple downsides
when compared to using a temporary cluster:

- All workers in the cluster must have dependencies installed for all flows you intend to run.
- Multiple flow runs may compete for resources. Dask tries to do a good job
sharing resources between tasks, but you may still run into issues.

That said, you may prefer managing a single long-running cluster.

To configure a `DaskTaskRunner` to connect to an existing cluster, pass in the address of the
scheduler to the `address` argument:

```python
# Connect to an existing cluster running at a specified address
DaskTaskRunner(address="tcp://...")
```

### Configure adaptive scaling

A key feature of using a `DaskTaskRunner` is the ability to scale adaptively
to the workload. Instead of specifying `n_workers` as a fixed number,
you can specify a minimum and maximum number of workers to use, and the Dask
cluster scales up and down as needed.

To do this, pass `adapt_kwargs` to `DaskTaskRunner`. This takes the
following fields:

- `maximum` (`int` or `None`, optional): the maximum number of workers to scale to. Set to `None` for no maximum.
- `minimum` (`int` or `None`, optional): the minimum number of workers to scale to. Set to `None` for no minimum.

For example, this configures a flow to run on a `FargateCluster` scaling up
to a maximum of 10 workers:

```python
DaskTaskRunner(
cluster_class="dask_cloudprovider.aws.FargateCluster",
adapt_kwargs={"maximum": 10}
)
```

### Use Dask annotations

Use Dask annotations to further control the behavior of tasks.

For example, set the [priority](http://distributed.dask.org/en/stable/priority.html) of tasks in the Dask scheduler:

```python
import dask
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner

@task
def show(x):
print(x)


@flow(task_runner=DaskTaskRunner())
def my_flow():
with dask.annotate(priority=-10):
future = show.submit(1) # low priority task

with dask.annotate(priority=10):
future = show.submit(2) # high priority task
```

Another common use case is [resource](http://distributed.dask.org/en/stable/resources.html) annotations:

```python
import dask
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner

@task
def show(x):
print(x)

# Create a `LocalCluster` with some resource annotations
# Annotations are abstract in dask and not inferred from your system.
# Here, we claim that our system has 1 GPU and 1 process available per worker
@flow(
task_runner=DaskTaskRunner(
cluster_kwargs={"n_workers": 1, "resources": {"GPU": 1, "process": 1}}
)
)

def my_flow():
with dask.annotate(resources={'GPU': 1}):
future = show(0) # this task requires 1 GPU resource on a worker

with dask.annotate(resources={'process': 1}):
# These tasks each require 1 process on a worker; because we've
# specified that our cluster has 1 process per worker and 1 worker,
# these tasks will run sequentially
future = show(1)
future = show(2)
future = show(3)


if __name__ == "__main__":
my_flow()
```

### Run parallel tasks with Dask

This simple flow doesn't gain much from parallel execution, but it illustrates how simple it is to take advantage of the
[`DaskTaskRunner`](https://prefecthq.github.io/prefect-dask/).

Configure your flow to use the `DaskTaskRunner`:

1. Ensure the `prefect-dask` collection is installed by running `pip install prefect-dask`.
2. In your flow code, import `DaskTaskRunner` from `prefect_dask.task_runners`.
3. Assign it as the task runner when defining the flow using the `task_runner=DaskTaskRunner` argument.
4. Use the `.submit` method when calling task-decorated functions.

Install `prefect-dask`, make these changes, then save the updated code as `dask_flow.py`.

```python hl_lines="2 12 18"
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner


@task
def say_hello(name):
print(f"hello {name}")


@task
def say_goodbye(name):
print(f"goodbye {name}")


@flow(task_runner=DaskTaskRunner())
def greetings(names):
for name in names:
say_hello.submit(name)
say_goodbye.submit(name)


if __name__ == "__main__":
greetings(["arthur", "trillian", "ford", "marvin"])
```

Note that, because you're using `DaskTaskRunner` in a script, you must use `if __name__ == "__main__":` or you'll see warnings and errors.

Run `dask_flow.py`. If you get a warning about accepting incoming network connections, that's okay - everything is local in this example.

```bash
python dask_flow.py
```

`DaskTaskRunner` automatically creates a local Dask cluster, then starts executing all of the task runs in parallel.
The results do not return in the same order as the sequential code above.

Abbreviated output:

```bash
goodbye marvin
hello arthur
goodbye ford
hello trillian
```

Notice what happens if you do not use the `submit` method when calling tasks:

```python
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner


@task
def say_hello(name):
print(f"hello {name}")


@task
def say_goodbye(name):
print(f"goodbye {name}")


@flow(task_runner=DaskTaskRunner())
def greetings(names):
for name in names:
say_hello(name)
say_goodbye(name)


if __name__ == "__main__":
greetings(["arthur", "trillian", "ford", "marvin"])
```

Run the script:

<div class="terminal">
```bash
python dask_flow.py
```
</div>

Once again, the tasks run sequentially. Here's the output with logs removed.

<div class="terminal">
```bash
hello arthur
goodbye arthur
hello trillian
goodbye trillian
hello ford
goodbye ford
hello marvin
goodbye marvin
```
</div>

The task runs are not submitted to the `DaskTaskRunner`; instead, they run sequentially.

## Use multiple task runners

Many workflows include a variety of tasks, and not all of them benefit from parallel execution.
Expand Down
Loading