From a0bb447ebef1ebff4fbaf70473c17c3f6755092c Mon Sep 17 00:00:00 2001 From: Bill Palombi Date: Thu, 20 Jun 2024 10:01:48 -0400 Subject: [PATCH] Consolidate Ray content (#14171) Co-authored-by: Jeff Hale --- .../develop/write-tasks/dask-and-ray.mdx | 121 ------------------ docs/integrations/prefect-ray/index.mdx | 62 ++++----- 2 files changed, 22 insertions(+), 161 deletions(-) diff --git a/docs/3.0rc/develop/write-tasks/dask-and-ray.mdx b/docs/3.0rc/develop/write-tasks/dask-and-ray.mdx index c3040a1a0949..fa7d70da2933 100644 --- a/docs/3.0rc/develop/write-tasks/dask-and-ray.mdx +++ b/docs/3.0rc/develop/write-tasks/dask-and-ray.mdx @@ -242,25 +242,21 @@ Note that, because you're using `DaskTaskRunner` in a script, you must use `if _ Run `dask_flow.py`. If you get a warning about accepting incoming network connections, that's okay - everything is local in this example. -
```bash python dask_flow.py ``` -
`DaskTaskRunner` automatically creates a local Dask cluster, then starts executing all of the task runs in parallel. The results do not return in the same order as the sequential code above. Abbreviated output: -
```bash goodbye marvin hello arthur goodbye ford hello trillian ``` -
Notice what happens if you do not use the `submit` method when calling tasks: @@ -315,123 +311,6 @@ goodbye marvin The task runs are not submitted to the `DaskTaskRunner`; instead, they run sequentially. -## Run tasks on Ray - -The `RayTaskRunner` (installed separately as a -[Prefect Collection](/integrations/prefect-ray/index)) is a parallel task runner that submits tasks to [Ray](https://www.ray.io/). -By default, a temporary Ray instance is created for the duration of the flow run. If you already have a Ray instance running, you can -provide the connection URL with an `address` argument. - - -**Remote storage and Ray tasks** - -We recommend configuring [remote storage](/3.0rc/concepts/storage/) for task execution with the `RayTaskRunner`. -This ensures tasks executing in Ray have access to task result storage, particularly when accessing a Ray instance outside of your -execution environment. - - -Configure your flow to use the `RayTaskRunner`: - -1. Make sure the `prefect-ray` collection is installed: `pip install prefect-ray`. -2. In your flow code, import `RayTaskRunner` from `prefect_ray.task_runners`. -3. Assign it as the task runner when defining the flow using the `task_runner=RayTaskRunner` argument. - -For example, this flow uses the `RayTaskRunner` configured to access an existing Ray instance at `ray://192.0.2.255:8786`. When submitting work to an existing Ray cluster, specify `runtime_env` to ensure that the Ray workers have the [dependencies](https://docs.ray.io/en/latest/ray-core/handling-dependencies.html#runtime-environments) they need. - -```python hl_lines="4" -from prefect import flow -from prefect_ray.task_runners import RayTaskRunner - -@flow( - task_runner=RayTaskRunner( - address="ray://192.0.2.255:8786", - init_kwargs={"runtime_env": {"pip": ["prefect-ray"]}}, - ) -) -def my_flow(): - ... -``` - -`RayTaskRunner` accepts the following optional parameters: - -| Parameter | Description | -| --- | --- | -| address | Address of a currently running Ray instance, starting with the [ray://](https://docs.ray.io/en/master/cluster/ray-client.html) URI. | -| init_kwargs | Additional kwargs to use when calling `ray.init`. | - -Note that Ray Client uses the [ray://](https://docs.ray.io/en/master/cluster/ray-client.html) URI to indicate the address of a Ray instance. -If you don't provide the `address` of a Ray instance, Prefect creates a temporary instance automatically. - - -**Ray limitations** - -Ray requires Prefect < 3.0 at the moment. - -While we're excited about adding support for parallel task execution via Ray to Prefect, there are some limitations with Ray: - -Ray's support for Python 3.11 is [experimental](https://docs.ray.io/en/latest/ray-overview/installation.html#install-nightlies). - -Ray support for non-x86/64 architectures such as ARM/M1 processors with installation from `pip` alone and will be skipped during -installation of Prefect. It is possible to manually install the blocking component with `conda`. See the -[Ray documentation](https://docs.ray.io/en/latest/ray-overview/installation.html#m1-mac-apple-silicon-support) for instructions. - -See the [Ray installation documentation](https://docs.ray.io/en/latest/ray-overview/installation.html) for further compatibility information. - - - -### Run parallel tasks with Ray - -To flexibly apply the appropriate task runner for your workflow, use the same flow as above, with a few minor changes to use the -[`RayTaskRunner`](https://prefecthq.github.io/prefect-ray/) where you previously configured `DaskTaskRunner`. - - -Configure your flow to use the `RayTaskRunner`: - -1. Install `prefect-ray` into your environment with `pip install -U prefect-ray`. -2. In your flow code, import `RayTaskRunner` from `prefect_ray.task_runners`. -3. Specify the task runner when the flow is defined using the `task_runner=RayTaskRunner` argument. - - -**Ray environment limitations** - -Ray requires Prefect < 3.0 at the moment. - -While we're excited about parallel task execution through Ray to Prefect, there are some inherent limitations with Ray: - -- Support for Python 3.11 is [experimental](https://docs.ray.io/en/latest/ray-overview/installation.html#install-nightlies). -- Ray's Windows support is currently in beta. - -See the [Ray installation documentation](https://docs.ray.io/en/latest/ray-overview/installation.html) for further compatibility information. - - -Save this code in `ray_flow.py`. - -```python hl_lines="2 12" -from prefect import flow, task -from prefect_ray.task_runners import RayTaskRunner - -@task -def say_hello(name): - print(f"hello {name}") - -@task -def say_goodbye(name): - print(f"goodbye {name}") - -@flow(task_runner=RayTaskRunner()) -def greetings(names): - for name in names: - say_hello.submit(name) - say_goodbye.submit(name) - -if __name__ == "__main__": - greetings(["arthur", "trillian", "ford", "marvin"]) -``` - -Now run `ray_flow.py` `RayTaskRunner`, which automatically creates a local Ray instance, then immediately starts executing all of the -tasks in parallel. If you have an existing Ray instance, you can provide the address as an argument to run tasks in the instance. -See [Running tasks on Ray](/3.0rc/develop/write-tasks/use-task-runners/#running_tasks_on_ray) for details. - ## Use multiple task runners Many workflows include a variety of tasks, and not all of them benefit from parallel execution. diff --git a/docs/integrations/prefect-ray/index.mdx b/docs/integrations/prefect-ray/index.mdx index 6889c1c92549..a4cb5a2ed654 100644 --- a/docs/integrations/prefect-ray/index.mdx +++ b/docs/integrations/prefect-ray/index.mdx @@ -1,23 +1,16 @@ --- title: prefect-ray +description: Accelerate your workflows by running tasks in parallel with Ray --- -[Ray](https://docs.ray.io/en/latest/index.html) can run the tasks in your flow in parallel by distributing them over multiple machines. -The `prefect-ray` integration makes it easy speed up your flow runs by integrating Ray into your code. +[Ray](https://docs.ray.io/en/latest/index.html) can run your tasks in parallel by distributing them over multiple machines. +The `prefect-ray` integration makes it easy to accelerate your flow runs with Ray. -## Getting Started +## Install `prefect-ray` -### Prerequisites - -- [Prefect installed](https://docs.prefect.io/latest/get-started/install/) in a virtual environment. - -### Install prefect-ray - -
```bash pip install prefect-ray ``` -
**Ray limitations** @@ -31,13 +24,11 @@ There are a few limitations with Ray: See the [Ray installation documentation](https://docs.ray.io/en/latest/ray-overview/installation.html) for further compatibility information. -## Running tasks on Ray +## Run tasks on Ray The `RayTaskRunner` is a [Prefect task runner](https://docs.prefect.io/develop/write-tasks/use-task-runners/) that submits tasks to [Ray](https://www.ray.io/) for parallel execution. - By default, a temporary Ray instance is created for the duration of the flow run. - -For example, this flow counts to three in parallel. +For example, this flow counts to three in parallel: ```python import time @@ -96,9 +87,14 @@ This flow uses the `RayTaskRunner` configured to access an existing Ray instance from prefect import flow from prefect_ray.task_runners import RayTaskRunner -@flow(task_runner=RayTaskRunner(address="ray://192.0.2.255:8786")) +@flow( + task_runner=RayTaskRunner( + address="ray://192.0.2.255:8786", + init_kwargs={"runtime_env": {"pip": ["prefect-ray"]}}, + ) +) def my_flow(): - ... + ... ``` `RayTaskRunner` accepts the following optional parameters: @@ -108,19 +104,13 @@ def my_flow(): | address | Address of a currently running Ray instance, starting with the [ray://](https://docs.ray.io/en/master/cluster/ray-client.html) URI. | | init_kwargs | Additional kwargs to use when calling `ray.init`. | -Note that Ray Client uses the [ray://](https://docs.ray.io/en/master/cluster/ray-client.html) URI to indicate the address of a Ray instance. If you don't provide the `address` of a Ray instance, Prefect creates a temporary instance automatically. - - -**Ray environment limitations** +The Ray client uses the [ray://](https://docs.ray.io/en/master/cluster/ray-client.html) URI to indicate the address of a Ray instance. +If you don't provide the `address` of a Ray instance, Prefect creates a temporary instance automatically. -Ray support for non-x86/64 architectures such as ARM/M1 processors with installation from `pip` alone and will be skipped during installation of Prefect. It is possible to manually install the blocking component with `conda`. See the [Ray documentation](https://docs.ray.io/en/latest/ray-overview/installation.html#m1-mac-apple-silicon-support) for instructions. +## Run tasks on a remote Ray cluster -See the [Ray installation documentation](https://docs.ray.io/en/latest/ray-overview/installation.html) for further compatibility information. - - -## Running tasks on a Ray remote cluster - -When using the `RayTaskRunner` with a remote Ray cluster, you may run into issues that are not seen when using a local Ray instance. To resolve these issues, we recommend taking the following steps when working with a remote Ray cluster: +When using the `RayTaskRunner` with a remote Ray cluster, you may run into issues that are not seen when using a local Ray instance. +To resolve these issues, we recommend taking the following steps when working with a remote Ray cluster: 1. By default, Prefect will not persist any data to the filesystem of the remote ray worker. However, if you want to take advantage of Prefect's caching ability, you will need to configure a remote result storage to persist results across task runs. @@ -165,33 +155,25 @@ if __name__ == "__main__": 2. If you get an error stating that the module 'prefect' cannot be found, ensure `prefect` is installed on the remote cluster, with: -
```bash pip install prefect ``` -
3. If you get an error with a message similar to "File system created with scheme 's3' could not be created", ensure the required Python modules are installed on **both local and remote machines**. For example, if using S3 for storage: -
```bash pip install s3fs ``` -
4. If you are seeing timeout or other connection errors, double check the address provided to the `RayTaskRunner`. The address should look similar to: `address='ray://:10001'`: -
```bash RayTaskRunner(address="ray://1.23.199.255:10001") ``` -
- -## Specifying remote options -The `remote_options` context can be used to control the task’s remote options. +## Specify remote options -For example, we can set the number of CPUs and GPUs to use for the `process` task: +The `remote_options` context can be used to control the task’s remote options. For example, we can set the number of CPUs and GPUs to use for the `process` task: ```python from prefect import flow, task @@ -212,6 +194,6 @@ def my_flow(): ## Resources -Refer to the prefect-ray API documentation linked in the sidebar to explore all the capabilities of the prefect-ray library. +Refer to the `prefect-ray` API documentation to explore all the capabilities of the prefect-ray library. -For assistance using Dask, consult the [Ray documentation](https://docs.ray.io/en/latest/index.html). +For further assistance using Ray, consult the [Ray documentation](https://docs.ray.io/en/latest/index.html).