Skip to content

Commit

Permalink
Consolidate task runner content (#14178)
Browse files Browse the repository at this point in the history
  • Loading branch information
billpalombi committed Jun 20, 2024
1 parent f898ada commit 8990e9a
Show file tree
Hide file tree
Showing 9 changed files with 28 additions and 102 deletions.
4 changes: 2 additions & 2 deletions docs/3.0rc/develop/write-flows/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ Flows allow a great deal of configuration by passing arguments to the decorator.
| `retries` | An optional number of times to retry on flow run failure. |
| <span class="no-wrap">`retry_delay_seconds`</span> | An optional number of seconds to wait before retrying the flow after failure. This is only applicable if `retries` is nonzero. |
| `flow_run_name` | An optional name to distinguish runs of this flow; this name can be provided as a string template with the flow's parameters as variables; you can also provide this name as a function that returns a string. |
| `task_runner` | An optional [task runner](/3.0rc/develop/write-tasks/use-task-runners/) to use for task execution within the flow when you `.submit()` tasks. If not provided and you `.submit()` tasks, the `ThreadPoolTaskRunner` is used. |
| `task_runner` | An optional [task runner](/3.0rc/develop/write-tasks/task-runners/) to use for task execution within the flow when you `.submit()` tasks. If not provided and you `.submit()` tasks, the `ThreadPoolTaskRunner` is used. |
| `timeout_seconds` | An optional number of seconds indicating a maximum runtime for the flow. If the flow exceeds this runtime, it is marked as failed. Flow execution may continue until the next task is called. |
| `validate_parameters` | Boolean indicating whether parameters passed to flows are validated by Pydantic. Default is `True`. |
| `version` | An optional version string for the flow. If not provided, we will attempt to create a version string as a hash of the file containing the wrapped function. If the file cannot be located, the version will be null. |
Expand Down Expand Up @@ -515,7 +515,7 @@ The primary flow is the "parent" flow. The flow created within the parent is the

Subflow runs behave like normal flow runs.
There is a full representation of the flow run in the backend as if it had been called separately.
When a subflow starts, it creates a new [task runner](/3.0rc/develop/write-tasks/use-task-runners/) for tasks within the subflow.
When a subflow starts, it creates a new [task runner](/3.0rc/develop/write-tasks/task-runners/) for tasks within the subflow.
When the subflow completes, the task runner shuts down.

Subflows block execution of the parent flow until completion.
Expand Down
58 changes: 0 additions & 58 deletions docs/3.0rc/develop/write-tasks/dask-and-ray.mdx

This file was deleted.

10 changes: 5 additions & 5 deletions docs/3.0rc/develop/write-tasks/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ This task run is tracked in the UI as well.
Almost any standard Python function can be turned into a Prefect task by adding the `@task` decorator.

<Tip>
Tasks are always executed in the main thread by default, unless a specific [task runner](/3.0rc/develop/write-tasks/use-task-runners) is used to execute them on different threads, processes, or infrastructure. This facilitates native Python debugging and profiling.
Tasks are always executed in the main thread by default, unless a specific [task runner](/3.0rc/develop/write-tasks/task-runners) is used to execute them on different threads, processes, or infrastructure. This facilitates native Python debugging and profiling.
</Tip>

### Synchronous functions
Expand Down Expand Up @@ -682,23 +682,23 @@ def show_timeouts():
## Task results

Depending on how you call tasks, they can return different types of results and optionally engage the use of
a [task runner](/3.0rc/develop/write-tasks/use-task-runners/).
a [task runner](/3.0rc/develop/write-tasks/task-runners/).

Any task can return:

- Data , such as `int`, `str`, `dict`, `list`. This is the default behavior any time you
call `your_task()`.
- [`PrefectFuture`](/3.0rc/api-ref/prefect/futures/#prefect.futures.PrefectFuture). This is achieved
by calling [`your_task.submit()`](/3.0rc/develop/write-tasks/use-task-runners/#using-a-task-runner).
by calling [`your_task.submit()`](/3.0rc/develop/write-tasks/task-runners/#using-a-task-runner).
A `PrefectFuture` contains both _data_ and _State_.
- Prefect [`State`](/3.0rc/api-ref/server/schemas/states/). Anytime you call your task or flow with
the argument `return_state=True`, it directly returns a state to build custom behavior based
on a state change you care about, such as task or flow failing or retrying.

To run your task with a [task runner](/3.0rc/develop/write-tasks/use-task-runners/), you must call the task
To run your task with a [task runner](/3.0rc/develop/write-tasks/task-runners/), you must call the task
with `.submit()`.

See [state returned values](/3.0rc/develop/write-tasks/use-task-runners/#using-results-from-submitted-tasks)
See [state returned values](/3.0rc/develop/write-tasks/task-runners/#using-results-from-submitted-tasks)
for examples.

<Tip>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,16 @@
---
title: Task runners
description: Learn how to use Prefect task runners for parallel or distributed task execution.
title: Run tasks concurrently or in parallel
description: Learn how to use task runners for concurrent, parallel or distributed execution of tasks.
---

Task runners enable you to engage specific executors for Prefect tasks to enable concurrent, parallel, or
distributed execution of tasks.
Task runners are not required for task execution.
Calling a task function directly without a task runner executes the function in the main thread by default,
which blocks execution of its flow until the task completes.

Task runners are not required for task execution. Calling a task function directly without a task runner executes the
function in the main thread by default. Execution of a task function blocks execution of its flow until the task completes.

You can use the `.submit()` method on a task to submit that task to a _task runner_. Using a task runner
enables you to run tasks concurrently, or in parallel using a distributed execution library such as Dask or Ray.

### Built-in task runners and integrations

The default task runner in Prefect is the
[`ThreadPoolTaskRunner`](/3.0rc/api-ref/prefect/task-runners/#prefect.task_runners.ThreadPoolTaskRunner),
To enable concurrent, parallel, or distributed execution of tasks, use the `.submit()` method to submit a task to a _task runner_.
The default task runner in Prefect is the [`ThreadPoolTaskRunner`](/3.0rc/api-ref/prefect/task-runners/#prefect.task_runners.ThreadPoolTaskRunner),
which runs tasks concurrently within a thread pool.

The following Prefect-developed task runners for parallel or distributed task
execution are available as [integrations](/integrations/catalog/):
For parallel or distributed task execution, you must additionally install one of the following task runners, available as integrations:

- [`DaskTaskRunner`](https://github.com/PrefectHQ/prefect/tree/main/src/integrations/prefect-dask) can run tasks using [`dask.distributed`](http://distributed.dask.org/).
- [`RayTaskRunner`](https://github.com/PrefectHQ/prefect/tree/main/src/integrations/prefect-ray) can run tasks using [Ray](https://www.ray.io/).
Expand All @@ -38,13 +29,10 @@ such as for operations mapped across a dataset.

## Configure a task runner

To configure your flow to use a specific task runner, provide the runner to the `task_runner` keyword of
the flow decorator.
To configure your flow to use a specific task runner, provide the runner to the `task_runner` keyword of the flow decorator.

To submit work to the runner, use the task's `.submit()` method.
This method returns a
[`PrefectFuture`](/3.0rc/api-ref/prefect/futures/#prefect.futures.PrefectFuture), which is a
Prefect object that contains:
This method returns a [`PrefectFuture`](/3.0rc/api-ref/prefect/futures/#prefect.futures.PrefectFuture), which is a Prefect object that contains:
- a reference to the payload returned by the task;
- and a [`State`](/3.0rc/api-ref/server/schemas/states/), which is a Prefect object indicating the state of the task run.

Expand Down Expand Up @@ -84,14 +72,11 @@ Prefect uses the default `ThreadPoolTaskRunner`.

## Use multiple task runners

Each flow can only have one task runner, but sometimes you may want a subset of your tasks to run
using a different task runner than the one configured on the flow.
In this case, you can create [nested flows](/3.0rc/develop/write-flows/#composing-flows)
for tasks that need to use a different task runner.
Each flow can only have one task runner, but sometimes you may want a subset of your tasks to run using a different task runner than the one configured on the flow.
In this case, you can create [nested flows](/3.0rc/develop/write-flows/#composing-flows) for tasks that need to use a different task runner.

For example, you can have a flow (in the example below called `multiple_runner_flow`) that runs its tasks locally
using the `ThreadPoolTaskRunner`. If you have some tasks that can run more efficiently in parallel on a Dask cluster,
you can create a subflow (such as `dask_subflow`) to run those tasks using the `DaskTaskRunner`.
For example, you can have a flow (in the example below called `multiple_runner_flow`) that runs its tasks locally using the `ThreadPoolTaskRunner`.
If you have some tasks that can run more efficiently in parallel on a Dask cluster, you can create a subflow (such as `dask_subflow`) to run those tasks using the `DaskTaskRunner`.

```python
from prefect import flow, task
Expand Down Expand Up @@ -317,4 +302,4 @@ def sum_it(numbers, static_iterable):
return futures

sum_it([4, 5, 6], unmapped([1, 2, 3]))
```
```
2 changes: 1 addition & 1 deletion docs/3.0rc/get-started/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ With Prefect you gain:
* [Scheduling](/3.0rc/deploy/serve-flows/add-schedules)
* [Retries](/3.0rc/develop/write-tasks/index#task-arguments)
* [Logging](/3.0rc/develop/observe-workflows/logging/)
* [Convenient async functionality](/3.0rc/develop/write-tasks/use-task-runners/#task-runners)
* [Convenient async functionality](/3.0rc/develop/write-tasks/task-runners/#task-runners)
* [Caching](/3.0rc/develop/write-tasks/index#caching)
* [Notifications](/3.0rc/react/events/automations-triggers/)
* [Observability](/3.0rc/develop/observe-workflows/index/)
Expand Down
4 changes: 2 additions & 2 deletions docs/3.0rc/resources/big-data.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ storage, including:
1. Saving data to disk within a flow rather than using results.
1. Caching task results to save time and compute.
1. Compressing results written to disk to save space.
1. Using a [task runner](/3.0rc/develop/write-tasks/use-task-runners/) for parallelizable
1. Using a [task runner](/3.0rc/develop/write-tasks/task-runners/) for parallelizable
operations to save time.

### Remove task introspection
Expand Down Expand Up @@ -125,5 +125,5 @@ Note that compression takes time to compress and decompress the data.
Prefect's task runners allow you to use the Dask and Ray Python libraries to run tasks in parallel,
distributed across multiple machines.
This can save you time and compute when operating on large data structures.
See the [guide to working with Dask and Ray Task Runners](/3.0rc/develop/write-tasks/use-task-runners/)
See the [guide to working with Dask and Ray Task Runners](/3.0rc/develop/write-tasks/task-runners/)
for details.
2 changes: 1 addition & 1 deletion docs/integrations/prefect-dask/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ Using the `DaskTaskRunner` reduced the runtime to **5.7** seconds!

## Run tasks on Dask

The `DaskTaskRunner` is a [task runner](/3.0rc/develop/write-tasks/use-task-runners) that submits tasks to the [`dask.distributed`](http://distributed.dask.org/) scheduler.
The `DaskTaskRunner` is a [task runner](/3.0rc/develop/write-tasks/task-runners) that submits tasks to the [`dask.distributed`](http://distributed.dask.org/) scheduler.
By default, when the `DaskTaskRunner` is specified for a flow run, a temporary Dask cluster is created and used for the duration of the flow run.

If you already have a Dask cluster running, either cloud-hosted or local, you can provide the connection URL with the `address` kwarg.
Expand Down
2 changes: 1 addition & 1 deletion docs/integrations/prefect-ray/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ See the [Ray installation documentation](https://docs.ray.io/en/latest/ray-overv

## Run tasks on Ray

The `RayTaskRunner` is a [Prefect task runner](https://docs.prefect.io/develop/write-tasks/use-task-runners/) that submits tasks to [Ray](https://www.ray.io/) for parallel execution.
The `RayTaskRunner` is a [Prefect task runner](https://docs.prefect.io/develop/write-tasks/task-runners/) that submits tasks to [Ray](https://www.ray.io/) for parallel execution.
By default, a temporary Ray instance is created for the duration of the flow run.
For example, this flow counts to three in parallel:

Expand Down
3 changes: 1 addition & 2 deletions docs/mint.json
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@
"version": "3.0rc",
"pages": [
"3.0rc/develop/write-tasks/index",
"3.0rc/develop/write-tasks/use-task-runners",
"3.0rc/develop/write-tasks/dask-and-ray",
"3.0rc/develop/write-tasks/task-runners",
"3.0rc/develop/write-tasks/deferred-tasks"
]
},
Expand Down

0 comments on commit 8990e9a

Please sign in to comment.