Skip to content

Commit

Permalink
Consolidate Ray content (#14171)
Browse files Browse the repository at this point in the history
Co-authored-by: Jeff Hale <discdiver@users.noreply.github.com>
  • Loading branch information
billpalombi and discdiver committed Jun 20, 2024
1 parent 27bdbb0 commit a0bb447
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 161 deletions.
121 changes: 0 additions & 121 deletions docs/3.0rc/develop/write-tasks/dask-and-ray.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -242,25 +242,21 @@ Note that, because you're using `DaskTaskRunner` in a script, you must use `if _

Run `dask_flow.py`. If you get a warning about accepting incoming network connections, that's okay - everything is local in this example.

<div class="terminal">
```bash
python dask_flow.py
```
</div>

`DaskTaskRunner` automatically creates a local Dask cluster, then starts executing all of the task runs in parallel.
The results do not return in the same order as the sequential code above.

Abbreviated output:

<div class="terminal">
```bash
goodbye marvin
hello arthur
goodbye ford
hello trillian
```
</div>

Notice what happens if you do not use the `submit` method when calling tasks:

Expand Down Expand Up @@ -315,123 +311,6 @@ goodbye marvin

The task runs are not submitted to the `DaskTaskRunner`; instead, they run sequentially.

## Run tasks on Ray

The `RayTaskRunner` (installed separately as a
[Prefect Collection](/integrations/prefect-ray/index)) is a parallel task runner that submits tasks to [Ray](https://www.ray.io/).
By default, a temporary Ray instance is created for the duration of the flow run. If you already have a Ray instance running, you can
provide the connection URL with an `address` argument.

<Note>
**Remote storage and Ray tasks**

We recommend configuring [remote storage](/3.0rc/concepts/storage/) for task execution with the `RayTaskRunner`.
This ensures tasks executing in Ray have access to task result storage, particularly when accessing a Ray instance outside of your
execution environment.
</Note>

Configure your flow to use the `RayTaskRunner`:

1. Make sure the `prefect-ray` collection is installed: `pip install prefect-ray`.
2. In your flow code, import `RayTaskRunner` from `prefect_ray.task_runners`.
3. Assign it as the task runner when defining the flow using the `task_runner=RayTaskRunner` argument.

For example, this flow uses the `RayTaskRunner` configured to access an existing Ray instance at `ray://192.0.2.255:8786`. When submitting work to an existing Ray cluster, specify `runtime_env` to ensure that the Ray workers have the [dependencies](https://docs.ray.io/en/latest/ray-core/handling-dependencies.html#runtime-environments) they need.

```python hl_lines="4"
from prefect import flow
from prefect_ray.task_runners import RayTaskRunner

@flow(
task_runner=RayTaskRunner(
address="ray://192.0.2.255:8786",
init_kwargs={"runtime_env": {"pip": ["prefect-ray"]}},
)
)
def my_flow():
...
```

`RayTaskRunner` accepts the following optional parameters:

| Parameter | Description |
| --- | --- |
| address | Address of a currently running Ray instance, starting with the [ray://](https://docs.ray.io/en/master/cluster/ray-client.html) URI. |
| init_kwargs | Additional kwargs to use when calling `ray.init`. |

Note that Ray Client uses the [ray://](https://docs.ray.io/en/master/cluster/ray-client.html) URI to indicate the address of a Ray instance.
If you don't provide the `address` of a Ray instance, Prefect creates a temporary instance automatically.

<Warning>
**Ray limitations**

Ray requires Prefect < 3.0 at the moment.

While we're excited about adding support for parallel task execution via Ray to Prefect, there are some limitations with Ray:

Ray's support for Python 3.11 is [experimental](https://docs.ray.io/en/latest/ray-overview/installation.html#install-nightlies).

Ray support for non-x86/64 architectures such as ARM/M1 processors with installation from `pip` alone and will be skipped during
installation of Prefect. It is possible to manually install the blocking component with `conda`. See the
[Ray documentation](https://docs.ray.io/en/latest/ray-overview/installation.html#m1-mac-apple-silicon-support) for instructions.

See the [Ray installation documentation](https://docs.ray.io/en/latest/ray-overview/installation.html) for further compatibility information.

</Warning>

### Run parallel tasks with Ray

To flexibly apply the appropriate task runner for your workflow, use the same flow as above, with a few minor changes to use the
[`RayTaskRunner`](https://prefecthq.github.io/prefect-ray/) where you previously configured `DaskTaskRunner`.


Configure your flow to use the `RayTaskRunner`:

1. Install `prefect-ray` into your environment with `pip install -U prefect-ray`.
2. In your flow code, import `RayTaskRunner` from `prefect_ray.task_runners`.
3. Specify the task runner when the flow is defined using the `task_runner=RayTaskRunner` argument.

<Warning>
**Ray environment limitations**

Ray requires Prefect < 3.0 at the moment.

While we're excited about parallel task execution through Ray to Prefect, there are some inherent limitations with Ray:

- Support for Python 3.11 is [experimental](https://docs.ray.io/en/latest/ray-overview/installation.html#install-nightlies).
- Ray's Windows support is currently in beta.

See the [Ray installation documentation](https://docs.ray.io/en/latest/ray-overview/installation.html) for further compatibility information.
</Warning>

Save this code in `ray_flow.py`.

```python hl_lines="2 12"
from prefect import flow, task
from prefect_ray.task_runners import RayTaskRunner

@task
def say_hello(name):
print(f"hello {name}")

@task
def say_goodbye(name):
print(f"goodbye {name}")

@flow(task_runner=RayTaskRunner())
def greetings(names):
for name in names:
say_hello.submit(name)
say_goodbye.submit(name)

if __name__ == "__main__":
greetings(["arthur", "trillian", "ford", "marvin"])
```

Now run `ray_flow.py` `RayTaskRunner`, which automatically creates a local Ray instance, then immediately starts executing all of the
tasks in parallel. If you have an existing Ray instance, you can provide the address as an argument to run tasks in the instance.
See [Running tasks on Ray](/3.0rc/develop/write-tasks/use-task-runners/#running_tasks_on_ray) for details.

## Use multiple task runners

Many workflows include a variety of tasks, and not all of them benefit from parallel execution.
Expand Down
62 changes: 22 additions & 40 deletions docs/integrations/prefect-ray/index.mdx
Original file line number Diff line number Diff line change
@@ -1,23 +1,16 @@
---
title: prefect-ray
description: Accelerate your workflows by running tasks in parallel with Ray
---

[Ray](https://docs.ray.io/en/latest/index.html) can run the tasks in your flow in parallel by distributing them over multiple machines.
The `prefect-ray` integration makes it easy speed up your flow runs by integrating Ray into your code.
[Ray](https://docs.ray.io/en/latest/index.html) can run your tasks in parallel by distributing them over multiple machines.
The `prefect-ray` integration makes it easy to accelerate your flow runs with Ray.

## Getting Started
## Install `prefect-ray`

### Prerequisites

- [Prefect installed](https://docs.prefect.io/latest/get-started/install/) in a virtual environment.

### Install prefect-ray

<div class="terminal">
```bash
pip install prefect-ray
```
</div>

<Warning>
**Ray limitations**
Expand All @@ -31,13 +24,11 @@ There are a few limitations with Ray:
See the [Ray installation documentation](https://docs.ray.io/en/latest/ray-overview/installation.html) for further compatibility information.
</Warning>

## Running tasks on Ray
## Run tasks on Ray

The `RayTaskRunner` is a [Prefect task runner](https://docs.prefect.io/develop/write-tasks/use-task-runners/) that submits tasks to [Ray](https://www.ray.io/) for parallel execution.

By default, a temporary Ray instance is created for the duration of the flow run.

For example, this flow counts to three in parallel.
For example, this flow counts to three in parallel:

```python
import time
Expand Down Expand Up @@ -96,9 +87,14 @@ This flow uses the `RayTaskRunner` configured to access an existing Ray instance
from prefect import flow
from prefect_ray.task_runners import RayTaskRunner

@flow(task_runner=RayTaskRunner(address="ray://192.0.2.255:8786"))
@flow(
task_runner=RayTaskRunner(
address="ray://192.0.2.255:8786",
init_kwargs={"runtime_env": {"pip": ["prefect-ray"]}},
)
)
def my_flow():
...
...
```

`RayTaskRunner` accepts the following optional parameters:
Expand All @@ -108,19 +104,13 @@ def my_flow():
| address | Address of a currently running Ray instance, starting with the [ray://](https://docs.ray.io/en/master/cluster/ray-client.html) URI. |
| init_kwargs | Additional kwargs to use when calling `ray.init`. |

Note that Ray Client uses the [ray://](https://docs.ray.io/en/master/cluster/ray-client.html) URI to indicate the address of a Ray instance. If you don't provide the `address` of a Ray instance, Prefect creates a temporary instance automatically.

<Warning>
**Ray environment limitations**
The Ray client uses the [ray://](https://docs.ray.io/en/master/cluster/ray-client.html) URI to indicate the address of a Ray instance.
If you don't provide the `address` of a Ray instance, Prefect creates a temporary instance automatically.

Ray support for non-x86/64 architectures such as ARM/M1 processors with installation from `pip` alone and will be skipped during installation of Prefect. It is possible to manually install the blocking component with `conda`. See the [Ray documentation](https://docs.ray.io/en/latest/ray-overview/installation.html#m1-mac-apple-silicon-support) for instructions.
## Run tasks on a remote Ray cluster

See the [Ray installation documentation](https://docs.ray.io/en/latest/ray-overview/installation.html) for further compatibility information.
</Warning>

## Running tasks on a Ray remote cluster

When using the `RayTaskRunner` with a remote Ray cluster, you may run into issues that are not seen when using a local Ray instance. To resolve these issues, we recommend taking the following steps when working with a remote Ray cluster:
When using the `RayTaskRunner` with a remote Ray cluster, you may run into issues that are not seen when using a local Ray instance.
To resolve these issues, we recommend taking the following steps when working with a remote Ray cluster:

1. By default, Prefect will not persist any data to the filesystem of the remote ray worker. However, if you want to take advantage of Prefect's caching ability, you will need to configure a remote result storage to persist results across task runs.

Expand Down Expand Up @@ -165,33 +155,25 @@ if __name__ == "__main__":

2. If you get an error stating that the module 'prefect' cannot be found, ensure `prefect` is installed on the remote cluster, with:

<div class="terminal">
```bash
pip install prefect
```
</div>

3. If you get an error with a message similar to "File system created with scheme 's3' could not be created", ensure the required Python modules are installed on **both local and remote machines**. For example, if using S3 for storage:

<div class="terminal">
```bash
pip install s3fs
```
</div>

4. If you are seeing timeout or other connection errors, double check the address provided to the `RayTaskRunner`. The address should look similar to: `address='ray://<head_node_ip_address>:10001'`:

<div class="terminal">
```bash
RayTaskRunner(address="ray://1.23.199.255:10001")
```
</div>

## Specifying remote options

The `remote_options` context can be used to control the task’s remote options.
## Specify remote options

For example, we can set the number of CPUs and GPUs to use for the `process` task:
The `remote_options` context can be used to control the task’s remote options. For example, we can set the number of CPUs and GPUs to use for the `process` task:

```python
from prefect import flow, task
Expand All @@ -212,6 +194,6 @@ def my_flow():

## Resources

Refer to the prefect-ray API documentation linked in the sidebar to explore all the capabilities of the prefect-ray library.
Refer to the `prefect-ray` API documentation to explore all the capabilities of the prefect-ray library.

For assistance using Dask, consult the [Ray documentation](https://docs.ray.io/en/latest/index.html).
For further assistance using Ray, consult the [Ray documentation](https://docs.ray.io/en/latest/index.html).

0 comments on commit a0bb447

Please sign in to comment.