Skip to content

Commit

Permalink
chore(docs): add RayTaskRunner doc example using runtime_env (#14138)
Browse files Browse the repository at this point in the history
  • Loading branch information
parkedwards committed Jun 19, 2024
1 parent 7d8ce75 commit f4fc69b
Showing 1 changed file with 45 additions and 40 deletions.
85 changes: 45 additions & 40 deletions docs/3.0rc/develop/write-tasks/dask-and-ray.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,29 @@ description: Learn how to use the Prefect Dask and Ray task runners for parallel

### Parallel task execution

Many real-world data workflows benefit from truly parallel, distributed task execution.
Many real-world data workflows benefit from truly parallel, distributed task execution.
Use Dask or Ray in your flows to choose the execution environment that fits your particular needs.

- `DaskTaskRunner` runs tasks requiring parallel execution using
- `DaskTaskRunner` runs tasks requiring parallel execution using
[`dask.distributed`](http://distributed.dask.org/).
- `RayTaskRunner` runs tasks requiring parallel execution using [Ray](https://www.ray.io/).

These task runners can spin up a local Dask cluster or Ray instance on the fly, or let you connect with a Dask or Ray environment you've
These task runners can spin up a local Dask cluster or Ray instance on the fly, or let you connect with a Dask or Ray environment you've
set up separately. This enables you to take advantage of massively parallel computing environments.

## Run tasks on Dask

The [`DaskTaskRunner`](/integrations/prefect-dask/index) is a parallel task runner that submits tasks to the
The [`DaskTaskRunner`](/integrations/prefect-dask/index) is a parallel task runner that submits tasks to the
[`dask.distributed`](http://distributed.dask.org/) scheduler. To use the `DaskTaskRunner`:

1. Make sure the `prefect-dask` collection is installed: `pip install prefect-dask`.
2. In your flow code, import `DaskTaskRunner` from `prefect_dask.task_runners`.
3. Assign it as the task runner when you define the flow with the `task_runner=DaskTaskRunner` argument.

By default, a temporary Dask cluster is created for the duration of the flow run.
By default, a temporary Dask cluster is created for the duration of the flow run.

If you already have a Dask cluster running, either cloud-hosted or local, you can provide the connection URL
with the `address` kwarg. For example, this flow uses the `DaskTaskRunner` configured to access an existing
If you already have a Dask cluster running, either cloud-hosted or local, you can provide the connection URL
with the `address` kwarg. For example, this flow uses the `DaskTaskRunner` configured to access an existing
Dask cluster at `http://my-dask-cluster`:

```python hl_lines="4"
Expand All @@ -53,11 +53,11 @@ def my_flow():
**Multiprocessing safety**

Because the `DaskTaskRunner` uses multiprocessing, calls to flows
in scripts must be guarded with `if __name__ == "__main__":` or you will encounter
in scripts must be guarded with `if __name__ == "__main__":` or you will encounter
warnings and errors.
</Warning>

If you don't provide the `address` of a Dask scheduler, Prefect creates a temporary local cluster automatically. The number of workers
If you don't provide the `address` of a Dask scheduler, Prefect creates a temporary local cluster automatically. The number of workers
used is based on the number of cores on your machine. The default provides a mix of processes and threads that work well for
most workloads. To specify this explicitly, pass values for `n_workers` or `threads_per_worker` to `cluster_kwargs`:

Expand All @@ -70,7 +70,7 @@ DaskTaskRunner(

### Use a temporary cluster

The `DaskTaskRunner` can create a temporary cluster using any of
The `DaskTaskRunner` can create a temporary cluster using any of
[Dask's cluster-manager options](https://docs.dask.org/en/latest/setup.html). This is useful when you want each flow run to have its own Dask cluster, allowing for per-flow adaptive scaling.

To configure it, provide a `cluster_class`. This can be:
Expand All @@ -81,7 +81,7 @@ To configure it, provide a `cluster_class`. This can be:

You can also configure `cluster_kwargs`. This takes a dictionary of keyword arguments to pass to `cluster_class` when starting the flow run.

For example, to configure a flow to use a temporary `dask_cloudprovider.aws.FargateCluster` with four workers running with an image named
For example, to configure a flow to use a temporary `dask_cloudprovider.aws.FargateCluster` with four workers running with an image named
`my-prefect-image`:

```python
Expand All @@ -94,16 +94,16 @@ DaskTaskRunner(
### Connect to an existing cluster

Multiple Prefect flow runs can all use the same existing Dask cluster. You
might manage a single long-running Dask cluster (for example, using the Dask
might manage a single long-running Dask cluster (for example, using the Dask
[Helm Chart](https://docs.dask.org/en/latest/setup/kubernetes-helm.html)) and
configure flows to connect to it during execution. This has a couple downsides
when compared to using a temporary cluster:

- All workers in the cluster must have dependencies installed for all flows you intend to run.
- Multiple flow runs may compete for resources. Dask tries to do a good job
- Multiple flow runs may compete for resources. Dask tries to do a good job
sharing resources between tasks, but you may still run into issues.

That said, you may prefer managing a single long-running cluster.
That said, you may prefer managing a single long-running cluster.

To configure a `DaskTaskRunner` to connect to an existing cluster, pass in the address of the
scheduler to the `address` argument:
Expand All @@ -116,7 +116,7 @@ DaskTaskRunner(address="tcp://...")
### Configure adaptive scaling

A key feature of using a `DaskTaskRunner` is the ability to scale adaptively
to the workload. Instead of specifying `n_workers` as a fixed number,
to the workload. Instead of specifying `n_workers` as a fixed number,
you can specify a minimum and maximum number of workers to use, and the Dask
cluster scales up and down as needed.

Expand Down Expand Up @@ -186,7 +186,7 @@ def my_flow():
future = show(0) # this task requires 1 GPU resource on a worker

with dask.annotate(resources={'process': 1}):
# These tasks each require 1 process on a worker; because we've
# These tasks each require 1 process on a worker; because we've
# specified that our cluster has 1 process per worker and 1 worker,
# these tasks will run sequentially
future = show(1)
Expand All @@ -200,7 +200,7 @@ if __name__ == "__main__":

### Run parallel tasks with Dask

This simple flow doesn't gain much from parallel execution, but it illustrates how simple it is to take advantage of the
This simple flow doesn't gain much from parallel execution, but it illustrates how simple it is to take advantage of the
[`DaskTaskRunner`](https://prefecthq.github.io/prefect-dask/).

Configure your flow to use the `DaskTaskRunner`:
Expand All @@ -209,7 +209,7 @@ Configure your flow to use the `DaskTaskRunner`:
2. In your flow code, import `DaskTaskRunner` from `prefect_dask.task_runners`.
3. Assign it as the task runner when defining the flow using the `task_runner=DaskTaskRunner` argument.
4. Use the `.submit` method when calling task-decorated functions.

Install `prefect-dask`, make these changes, then save the updated code as `dask_flow.py`.

```python hl_lines="2 12 18"
Expand Down Expand Up @@ -248,7 +248,7 @@ python dask_flow.py
```
</div>

`DaskTaskRunner` automatically creates a local Dask cluster, then starts executing all of the task runs in parallel.
`DaskTaskRunner` automatically creates a local Dask cluster, then starts executing all of the task runs in parallel.
The results do not return in the same order as the sequential code above.

Abbreviated output:
Expand Down Expand Up @@ -317,16 +317,16 @@ The task runs are not submitted to the `DaskTaskRunner`; instead, they run seque

## Run tasks on Ray

The `RayTaskRunner` (installed separately as a
[Prefect Collection](/integrations/prefect-ray/index)) is a parallel task runner that submits tasks to [Ray](https://www.ray.io/).
By default, a temporary Ray instance is created for the duration of the flow run. If you already have a Ray instance running, you can
The `RayTaskRunner` (installed separately as a
[Prefect Collection](/integrations/prefect-ray/index)) is a parallel task runner that submits tasks to [Ray](https://www.ray.io/).
By default, a temporary Ray instance is created for the duration of the flow run. If you already have a Ray instance running, you can
provide the connection URL with an `address` argument.

<Note>
**Remote storage and Ray tasks**

We recommend configuring [remote storage](/3.0rc/concepts/storage/) for task execution with the `RayTaskRunner`.
This ensures tasks executing in Ray have access to task result storage, particularly when accessing a Ray instance outside of your
We recommend configuring [remote storage](/3.0rc/concepts/storage/) for task execution with the `RayTaskRunner`.
This ensures tasks executing in Ray have access to task result storage, particularly when accessing a Ray instance outside of your
execution environment.
</Note>

Expand All @@ -336,15 +336,20 @@ Configure your flow to use the `RayTaskRunner`:
2. In your flow code, import `RayTaskRunner` from `prefect_ray.task_runners`.
3. Assign it as the task runner when defining the flow using the `task_runner=RayTaskRunner` argument.

For example, this flow uses the `RayTaskRunner` configured to access an existing Ray instance at `ray://192.0.2.255:8786`.
For example, this flow uses the `RayTaskRunner` configured to access an existing Ray instance at `ray://192.0.2.255:8786`. When submitting work to an existing Ray cluster, we should specify `runtime_env` to ensure that the Ray workers have the [dependencies](https://docs.ray.io/en/latest/ray-core/handling-dependencies.html#runtime-environments) they need.

```python hl_lines="4"
from prefect import flow
from prefect_ray.task_runners import RayTaskRunner

@flow(task_runner=RayTaskRunner(address="ray://192.0.2.255:8786"))
@flow(
task_runner=RayTaskRunner(
address="ray://192.0.2.255:8786",
init_kwargs={"runtime_env": {"pip": ["prefect-ray"]}},
)
)
def my_flow():
...
...
```

`RayTaskRunner` accepts the following optional parameters:
Expand All @@ -354,20 +359,20 @@ def my_flow():
| address | Address of a currently running Ray instance, starting with the [ray://](https://docs.ray.io/en/master/cluster/ray-client.html) URI. |
| init_kwargs | Additional kwargs to use when calling `ray.init`. |

Note that Ray Client uses the [ray://](https://docs.ray.io/en/master/cluster/ray-client.html) URI to indicate the address of a Ray instance.
Note that Ray Client uses the [ray://](https://docs.ray.io/en/master/cluster/ray-client.html) URI to indicate the address of a Ray instance.
If you don't provide the `address` of a Ray instance, Prefect creates a temporary instance automatically.

<Warning>
<Warning>
**Ray limitations**

Ray requires Prefect < 3.0 at the moment.

While we're excited about adding support for parallel task execution via Ray to Prefect, there are some limitations with Ray:

Ray's support for Python 3.11 is [experimental](https://docs.ray.io/en/latest/ray-overview/installation.html#install-nightlies).

Ray support for non-x86/64 architectures such as ARM/M1 processors with installation from `pip` alone and will be skipped during
installation of Prefect. It is possible to manually install the blocking component with `conda`. See the
Ray support for non-x86/64 architectures such as ARM/M1 processors with installation from `pip` alone and will be skipped during
installation of Prefect. It is possible to manually install the blocking component with `conda`. See the
[Ray documentation](https://docs.ray.io/en/latest/ray-overview/installation.html#m1-mac-apple-silicon-support) for instructions.

See the [Ray installation documentation](https://docs.ray.io/en/latest/ray-overview/installation.html) for further compatibility information.
Expand All @@ -376,8 +381,8 @@ See the [Ray installation documentation](https://docs.ray.io/en/latest/ray-overv

### Run parallel tasks with Ray

To flexibly apply the appropriate task runner for your workflow, use the same flow as above, with a few minor changes to use the
[`RayTaskRunner`](https://prefecthq.github.io/prefect-ray/) where you previously configured `DaskTaskRunner`.
To flexibly apply the appropriate task runner for your workflow, use the same flow as above, with a few minor changes to use the
[`RayTaskRunner`](https://prefecthq.github.io/prefect-ray/) where you previously configured `DaskTaskRunner`.


Configure your flow to use the `RayTaskRunner`:
Expand All @@ -386,7 +391,7 @@ Configure your flow to use the `RayTaskRunner`:
2. In your flow code, import `RayTaskRunner` from `prefect_ray.task_runners`.
3. Specify the task runner when the flow is defined using the `task_runner=RayTaskRunner` argument.

<Warning>
<Warning>
**Ray environment limitations**

Ray requires Prefect < 3.0 at the moment.
Expand Down Expand Up @@ -423,18 +428,18 @@ if __name__ == "__main__":
greetings(["arthur", "trillian", "ford", "marvin"])
```

Now run `ray_flow.py` `RayTaskRunner`, which automatically creates a local Ray instance, then immediately starts executing all of the
tasks in parallel. If you have an existing Ray instance, you can provide the address as an argument to run tasks in the instance.
Now run `ray_flow.py` `RayTaskRunner`, which automatically creates a local Ray instance, then immediately starts executing all of the
tasks in parallel. If you have an existing Ray instance, you can provide the address as an argument to run tasks in the instance.
See [Running tasks on Ray](/3.0rc/develop/write-tasks/use-task-runners/#running_tasks_on_ray) for details.

## Use multiple task runners

Many workflows include a variety of tasks, and not all of them benefit from parallel execution.

Because task runners are specified on flows, you can assign different task runners to tasks by using
Because task runners are specified on flows, you can assign different task runners to tasks by using
[subflows](/3.0rc/develop/write-flows/#composing-flows) to organize those tasks.

This example uses the default `ConcurrentTaskRunner`. Then you call a `ray_greetings()` subflow that
This example uses the default `ConcurrentTaskRunner`. Then you call a `ray_greetings()` subflow that
uses the `RayTaskRunner` to execute the same tasks in a Ray instance.

```python
Expand Down Expand Up @@ -466,5 +471,5 @@ if __name__ == "__main__":
greetings(["arthur", "trillian", "ford", "marvin"])
```

If you save this as `ray_subflow.py` and run it, you'll see that the flow `greetings` runs as you'd expect for a concurrent flow. Then flow
If you save this as `ray_subflow.py` and run it, you'll see that the flow `greetings` runs as you'd expect for a concurrent flow. Then flow
`ray-greetings` spins up a Ray instance to run the tasks again.

0 comments on commit f4fc69b

Please sign in to comment.