From 8990e9ae7c386af747b686de4577031ae558f0b3 Mon Sep 17 00:00:00 2001 From: Bill Palombi Date: Thu, 20 Jun 2024 12:32:43 -0400 Subject: [PATCH] Consolidate task runner content (#14178) --- docs/3.0rc/develop/write-flows/index.mdx | 4 +- .../develop/write-tasks/dask-and-ray.mdx | 58 ------------------- docs/3.0rc/develop/write-tasks/index.mdx | 10 ++-- ...{use-task-runners.mdx => task-runners.mdx} | 45 +++++--------- docs/3.0rc/get-started/index.mdx | 2 +- docs/3.0rc/resources/big-data.mdx | 4 +- docs/integrations/prefect-dask/index.mdx | 2 +- docs/integrations/prefect-ray/index.mdx | 2 +- docs/mint.json | 3 +- 9 files changed, 28 insertions(+), 102 deletions(-) delete mode 100644 docs/3.0rc/develop/write-tasks/dask-and-ray.mdx rename docs/3.0rc/develop/write-tasks/{use-task-runners.mdx => task-runners.mdx} (81%) diff --git a/docs/3.0rc/develop/write-flows/index.mdx b/docs/3.0rc/develop/write-flows/index.mdx index 6f8f55c371e5..e95dca2c7d94 100644 --- a/docs/3.0rc/develop/write-flows/index.mdx +++ b/docs/3.0rc/develop/write-flows/index.mdx @@ -409,7 +409,7 @@ Flows allow a great deal of configuration by passing arguments to the decorator. | `retries` | An optional number of times to retry on flow run failure. | | `retry_delay_seconds` | An optional number of seconds to wait before retrying the flow after failure. This is only applicable if `retries` is nonzero. | | `flow_run_name` | An optional name to distinguish runs of this flow; this name can be provided as a string template with the flow's parameters as variables; you can also provide this name as a function that returns a string. | -| `task_runner` | An optional [task runner](/3.0rc/develop/write-tasks/use-task-runners/) to use for task execution within the flow when you `.submit()` tasks. If not provided and you `.submit()` tasks, the `ThreadPoolTaskRunner` is used. | +| `task_runner` | An optional [task runner](/3.0rc/develop/write-tasks/task-runners/) to use for task execution within the flow when you `.submit()` tasks. If not provided and you `.submit()` tasks, the `ThreadPoolTaskRunner` is used. | | `timeout_seconds` | An optional number of seconds indicating a maximum runtime for the flow. If the flow exceeds this runtime, it is marked as failed. Flow execution may continue until the next task is called. | | `validate_parameters` | Boolean indicating whether parameters passed to flows are validated by Pydantic. Default is `True`. | | `version` | An optional version string for the flow. If not provided, we will attempt to create a version string as a hash of the file containing the wrapped function. If the file cannot be located, the version will be null. | @@ -515,7 +515,7 @@ The primary flow is the "parent" flow. The flow created within the parent is the Subflow runs behave like normal flow runs. There is a full representation of the flow run in the backend as if it had been called separately. -When a subflow starts, it creates a new [task runner](/3.0rc/develop/write-tasks/use-task-runners/) for tasks within the subflow. +When a subflow starts, it creates a new [task runner](/3.0rc/develop/write-tasks/task-runners/) for tasks within the subflow. When the subflow completes, the task runner shuts down. Subflows block execution of the parent flow until completion. diff --git a/docs/3.0rc/develop/write-tasks/dask-and-ray.mdx b/docs/3.0rc/develop/write-tasks/dask-and-ray.mdx deleted file mode 100644 index e20c3a64bb1d..000000000000 --- a/docs/3.0rc/develop/write-tasks/dask-and-ray.mdx +++ /dev/null @@ -1,58 +0,0 @@ ---- -title: Run tasks in parallel on Dask or Ray -description: Learn how to use the Prefect Dask and Ray task runners for parallel or distributed task execution. ---- - -## Parallel task execution - -Many real-world data workflows benefit from truly parallel, distributed task execution. -Use Dask or Ray in your flows to choose the execution environment that fits your particular needs. - -- `DaskTaskRunner` runs tasks requiring parallel execution using -[`dask.distributed`](http://distributed.dask.org/). -- `RayTaskRunner` runs tasks requiring parallel execution using [Ray](https://www.ray.io/). - -These task runners can spin up a local Dask cluster or Ray instance on the fly, or let you connect with a Dask or Ray environment you've -set up separately. This enables you to take advantage of massively parallel computing environments. - -## Use multiple task runners - -Many workflows include a variety of tasks, and not all of them benefit from parallel execution. - -Because task runners are specified on flows, you can assign different task runners to tasks by using -[subflows](/3.0rc/develop/write-flows/#composing-flows) to organize those tasks. - -This example uses the default `ThreadPoolTaskRunner`. Then you call a `ray_greetings()` subflow that -uses the `RayTaskRunner` to execute the same tasks in a Ray instance. - -```python -from prefect import flow, task -from prefect_ray.task_runners import RayTaskRunner - -@task -def say_hello(name): - print(f"hello {name}") - -@task -def say_goodbye(name): - print(f"goodbye {name}") - -@flow(task_runner=RayTaskRunner()) -def ray_greetings(names): - for name in names: - say_hello.submit(name) - say_goodbye.submit(name) - -@flow() -def greetings(names): - for name in names: - say_hello.submit(name) - say_goodbye.submit(name) - ray_greetings(names) - -if __name__ == "__main__": - greetings(["arthur", "trillian", "ford", "marvin"]) -``` - -If you save this as `ray_subflow.py` and run it, you'll see that the flow `greetings` runs as you'd expect for a concurrent flow. Then flow -`ray-greetings` spins up a Ray instance to run the tasks again. diff --git a/docs/3.0rc/develop/write-tasks/index.mdx b/docs/3.0rc/develop/write-tasks/index.mdx index 66a1bdd20270..37ee78d4fbe1 100644 --- a/docs/3.0rc/develop/write-tasks/index.mdx +++ b/docs/3.0rc/develop/write-tasks/index.mdx @@ -72,7 +72,7 @@ This task run is tracked in the UI as well. Almost any standard Python function can be turned into a Prefect task by adding the `@task` decorator. -Tasks are always executed in the main thread by default, unless a specific [task runner](/3.0rc/develop/write-tasks/use-task-runners) is used to execute them on different threads, processes, or infrastructure. This facilitates native Python debugging and profiling. +Tasks are always executed in the main thread by default, unless a specific [task runner](/3.0rc/develop/write-tasks/task-runners) is used to execute them on different threads, processes, or infrastructure. This facilitates native Python debugging and profiling. ### Synchronous functions @@ -682,23 +682,23 @@ def show_timeouts(): ## Task results Depending on how you call tasks, they can return different types of results and optionally engage the use of -a [task runner](/3.0rc/develop/write-tasks/use-task-runners/). +a [task runner](/3.0rc/develop/write-tasks/task-runners/). Any task can return: - Data , such as `int`, `str`, `dict`, `list`. This is the default behavior any time you call `your_task()`. - [`PrefectFuture`](/3.0rc/api-ref/prefect/futures/#prefect.futures.PrefectFuture). This is achieved -by calling [`your_task.submit()`](/3.0rc/develop/write-tasks/use-task-runners/#using-a-task-runner). +by calling [`your_task.submit()`](/3.0rc/develop/write-tasks/task-runners/#using-a-task-runner). A `PrefectFuture` contains both _data_ and _State_. - Prefect [`State`](/3.0rc/api-ref/server/schemas/states/). Anytime you call your task or flow with the argument `return_state=True`, it directly returns a state to build custom behavior based on a state change you care about, such as task or flow failing or retrying. -To run your task with a [task runner](/3.0rc/develop/write-tasks/use-task-runners/), you must call the task +To run your task with a [task runner](/3.0rc/develop/write-tasks/task-runners/), you must call the task with `.submit()`. -See [state returned values](/3.0rc/develop/write-tasks/use-task-runners/#using-results-from-submitted-tasks) +See [state returned values](/3.0rc/develop/write-tasks/task-runners/#using-results-from-submitted-tasks) for examples. diff --git a/docs/3.0rc/develop/write-tasks/use-task-runners.mdx b/docs/3.0rc/develop/write-tasks/task-runners.mdx similarity index 81% rename from docs/3.0rc/develop/write-tasks/use-task-runners.mdx rename to docs/3.0rc/develop/write-tasks/task-runners.mdx index 2bbd4445e1ba..b2b6a7ac4330 100644 --- a/docs/3.0rc/develop/write-tasks/use-task-runners.mdx +++ b/docs/3.0rc/develop/write-tasks/task-runners.mdx @@ -1,25 +1,16 @@ --- -title: Task runners -description: Learn how to use Prefect task runners for parallel or distributed task execution. +title: Run tasks concurrently or in parallel +description: Learn how to use task runners for concurrent, parallel or distributed execution of tasks. --- -Task runners enable you to engage specific executors for Prefect tasks to enable concurrent, parallel, or -distributed execution of tasks. +Task runners are not required for task execution. +Calling a task function directly without a task runner executes the function in the main thread by default, +which blocks execution of its flow until the task completes. -Task runners are not required for task execution. Calling a task function directly without a task runner executes the -function in the main thread by default. Execution of a task function blocks execution of its flow until the task completes. - -You can use the `.submit()` method on a task to submit that task to a _task runner_. Using a task runner -enables you to run tasks concurrently, or in parallel using a distributed execution library such as Dask or Ray. - -### Built-in task runners and integrations - -The default task runner in Prefect is the -[`ThreadPoolTaskRunner`](/3.0rc/api-ref/prefect/task-runners/#prefect.task_runners.ThreadPoolTaskRunner), +To enable concurrent, parallel, or distributed execution of tasks, use the `.submit()` method to submit a task to a _task runner_. +The default task runner in Prefect is the [`ThreadPoolTaskRunner`](/3.0rc/api-ref/prefect/task-runners/#prefect.task_runners.ThreadPoolTaskRunner), which runs tasks concurrently within a thread pool. - -The following Prefect-developed task runners for parallel or distributed task -execution are available as [integrations](/integrations/catalog/): +For parallel or distributed task execution, you must additionally install one of the following task runners, available as integrations: - [`DaskTaskRunner`](https://github.com/PrefectHQ/prefect/tree/main/src/integrations/prefect-dask) can run tasks using [`dask.distributed`](http://distributed.dask.org/). - [`RayTaskRunner`](https://github.com/PrefectHQ/prefect/tree/main/src/integrations/prefect-ray) can run tasks using [Ray](https://www.ray.io/). @@ -38,13 +29,10 @@ such as for operations mapped across a dataset. ## Configure a task runner -To configure your flow to use a specific task runner, provide the runner to the `task_runner` keyword of -the flow decorator. +To configure your flow to use a specific task runner, provide the runner to the `task_runner` keyword of the flow decorator. To submit work to the runner, use the task's `.submit()` method. -This method returns a -[`PrefectFuture`](/3.0rc/api-ref/prefect/futures/#prefect.futures.PrefectFuture), which is a -Prefect object that contains: +This method returns a [`PrefectFuture`](/3.0rc/api-ref/prefect/futures/#prefect.futures.PrefectFuture), which is a Prefect object that contains: - a reference to the payload returned by the task; - and a [`State`](/3.0rc/api-ref/server/schemas/states/), which is a Prefect object indicating the state of the task run. @@ -84,14 +72,11 @@ Prefect uses the default `ThreadPoolTaskRunner`. ## Use multiple task runners -Each flow can only have one task runner, but sometimes you may want a subset of your tasks to run -using a different task runner than the one configured on the flow. -In this case, you can create [nested flows](/3.0rc/develop/write-flows/#composing-flows) -for tasks that need to use a different task runner. +Each flow can only have one task runner, but sometimes you may want a subset of your tasks to run using a different task runner than the one configured on the flow. +In this case, you can create [nested flows](/3.0rc/develop/write-flows/#composing-flows) for tasks that need to use a different task runner. -For example, you can have a flow (in the example below called `multiple_runner_flow`) that runs its tasks locally -using the `ThreadPoolTaskRunner`. If you have some tasks that can run more efficiently in parallel on a Dask cluster, -you can create a subflow (such as `dask_subflow`) to run those tasks using the `DaskTaskRunner`. +For example, you can have a flow (in the example below called `multiple_runner_flow`) that runs its tasks locally using the `ThreadPoolTaskRunner`. +If you have some tasks that can run more efficiently in parallel on a Dask cluster, you can create a subflow (such as `dask_subflow`) to run those tasks using the `DaskTaskRunner`. ```python from prefect import flow, task @@ -317,4 +302,4 @@ def sum_it(numbers, static_iterable): return futures sum_it([4, 5, 6], unmapped([1, 2, 3])) -``` +``` \ No newline at end of file diff --git a/docs/3.0rc/get-started/index.mdx b/docs/3.0rc/get-started/index.mdx index 7b0bd7663c8e..4eb93232e8da 100644 --- a/docs/3.0rc/get-started/index.mdx +++ b/docs/3.0rc/get-started/index.mdx @@ -13,7 +13,7 @@ With Prefect you gain: * [Scheduling](/3.0rc/deploy/serve-flows/add-schedules) * [Retries](/3.0rc/develop/write-tasks/index#task-arguments) * [Logging](/3.0rc/develop/observe-workflows/logging/) -* [Convenient async functionality](/3.0rc/develop/write-tasks/use-task-runners/#task-runners) +* [Convenient async functionality](/3.0rc/develop/write-tasks/task-runners/#task-runners) * [Caching](/3.0rc/develop/write-tasks/index#caching) * [Notifications](/3.0rc/react/events/automations-triggers/) * [Observability](/3.0rc/develop/observe-workflows/index/) diff --git a/docs/3.0rc/resources/big-data.mdx b/docs/3.0rc/resources/big-data.mdx index fa0b648cada9..e2c653b74e33 100644 --- a/docs/3.0rc/resources/big-data.mdx +++ b/docs/3.0rc/resources/big-data.mdx @@ -13,7 +13,7 @@ storage, including: 1. Saving data to disk within a flow rather than using results. 1. Caching task results to save time and compute. 1. Compressing results written to disk to save space. -1. Using a [task runner](/3.0rc/develop/write-tasks/use-task-runners/) for parallelizable +1. Using a [task runner](/3.0rc/develop/write-tasks/task-runners/) for parallelizable operations to save time. ### Remove task introspection @@ -125,5 +125,5 @@ Note that compression takes time to compress and decompress the data. Prefect's task runners allow you to use the Dask and Ray Python libraries to run tasks in parallel, distributed across multiple machines. This can save you time and compute when operating on large data structures. -See the [guide to working with Dask and Ray Task Runners](/3.0rc/develop/write-tasks/use-task-runners/) +See the [guide to working with Dask and Ray Task Runners](/3.0rc/develop/write-tasks/task-runners/) for details. diff --git a/docs/integrations/prefect-dask/index.mdx b/docs/integrations/prefect-dask/index.mdx index 3e12a164cab4..b8a39ea62671 100644 --- a/docs/integrations/prefect-dask/index.mdx +++ b/docs/integrations/prefect-dask/index.mdx @@ -123,7 +123,7 @@ Using the `DaskTaskRunner` reduced the runtime to **5.7** seconds! ## Run tasks on Dask -The `DaskTaskRunner` is a [task runner](/3.0rc/develop/write-tasks/use-task-runners) that submits tasks to the [`dask.distributed`](http://distributed.dask.org/) scheduler. +The `DaskTaskRunner` is a [task runner](/3.0rc/develop/write-tasks/task-runners) that submits tasks to the [`dask.distributed`](http://distributed.dask.org/) scheduler. By default, when the `DaskTaskRunner` is specified for a flow run, a temporary Dask cluster is created and used for the duration of the flow run. If you already have a Dask cluster running, either cloud-hosted or local, you can provide the connection URL with the `address` kwarg. diff --git a/docs/integrations/prefect-ray/index.mdx b/docs/integrations/prefect-ray/index.mdx index a4cb5a2ed654..fe2b24170c35 100644 --- a/docs/integrations/prefect-ray/index.mdx +++ b/docs/integrations/prefect-ray/index.mdx @@ -26,7 +26,7 @@ See the [Ray installation documentation](https://docs.ray.io/en/latest/ray-overv ## Run tasks on Ray -The `RayTaskRunner` is a [Prefect task runner](https://docs.prefect.io/develop/write-tasks/use-task-runners/) that submits tasks to [Ray](https://www.ray.io/) for parallel execution. +The `RayTaskRunner` is a [Prefect task runner](https://docs.prefect.io/develop/write-tasks/task-runners/) that submits tasks to [Ray](https://www.ray.io/) for parallel execution. By default, a temporary Ray instance is created for the duration of the flow run. For example, this flow counts to three in parallel: diff --git a/docs/mint.json b/docs/mint.json index 3fc5aa7d00d4..fa03ea974c07 100644 --- a/docs/mint.json +++ b/docs/mint.json @@ -83,8 +83,7 @@ "version": "3.0rc", "pages": [ "3.0rc/develop/write-tasks/index", - "3.0rc/develop/write-tasks/use-task-runners", - "3.0rc/develop/write-tasks/dask-and-ray", + "3.0rc/develop/write-tasks/task-runners", "3.0rc/develop/write-tasks/deferred-tasks" ] },