diff --git a/docs/3.0rc/develop/write-tasks/index.mdx b/docs/3.0rc/develop/write-tasks/index.mdx index 6db0fca147a4..17dcedf5f1de 100644 --- a/docs/3.0rc/develop/write-tasks/index.mdx +++ b/docs/3.0rc/develop/write-tasks/index.mdx @@ -67,6 +67,136 @@ Running that flow in the terminal results in something like this: This task run is tracked in the UI as well. +## Supported functions + +Almost any standard Python function can be turned into a Prefect task by adding the `@task` decorator. + + +Tasks are always executed in the main thread by default, unless a specific [task runner](/3.0rc/develop/write-tasks/use-task-runners) is used to execute them on different threads, processes, or infrastructure. This facilitates native Python debugging and profiling. + + +### Synchronous functions + +The simplest Prefect task is a synchronous Python function. Here's an example of a synchronous task that prints a message: + +```python +from prefect import task + +@task +def print_message(): + print("Hello, I'm a task") + +print_message() +``` + +### Async functions + +Prefect also supports async Python functions. +The resulting tasks are coroutines that can be awaited or run concurrently, following [standard async Python behavior](https://docs.python.org/3/library/asyncio-task.html). + +```python +from prefect import task +import asyncio + +@task +async def print_message(): + await asyncio.sleep(1) + print("Hello, I'm an async task") + +asyncio.run(print_message()) +``` + +### Methods + +Prefect supports snchronous and async methods as tasks, including instance methods, class methods, and static methods. For class methods and static methods, you must apply the appropriate method decorator _above_ the `@task` decorator: + +```python +from prefect import task + +class MyClass: + + @task + def my_instance_method(self): + pass + + @classmethod + @task + def my_class_method(cls): + pass + + @staticmethod + @task + def my_static_method(): + pass + +MyClass().my_instance_method() +MyClass.my_class_method() +MyClass.my_static_method() +``` + +### Generators + +Prefect supports synchronous and async generators as tasks. The task is considered to be `Running` as long as the generator is yielding values. When the generator is exhausted, the task is considered `Completed`. Any values yielded by the generator can be consumed by other tasks, and they will automatically record the generator task as their parent. + +```python +from prefect import task + +@task +def generator(): + for i in range(10): + yield i + +@task +def consumer(x): + print(x) + +for val in generator(): + consumer(val) +``` + + +**Generator functions are consumed when returned from tasks** + +The result of a completed task must be serializable, but generators cannot be serialized. +Therefore, if you return a generator from a task, the generator will be fully consumed and its yielded values will be returned as a list. +This can lead to unexpected behavior or blocking if the generator is infinite or very large. + +Here is an example of proactive generator consumption: + +```python +from prefect import task + +def gen(): + yield from [1, 2, 3] + print('Generator consumed!') + +@task +def f(): + return gen() + +f() # prints 'Generator consumed!' +``` + +If you need to return a generator without consuming it, you can `yield` it instead of using `return`. +Values yielded from generator tasks are not considered final results and do not face the same serialization constraints: + +```python +from prefect import task + +def gen(): + yield from [1, 2, 3] + print('Generator consumed!') + +@task +def f(): + yield gen() + +generator = next(f()) +list(generator) # prints 'Generator consumed!' + +``` + + ## Concurrency Tasks enable concurrency, allowing you to execute multiple tasks asynchronously. @@ -190,7 +320,7 @@ def my_flow(): **Call a task from another task** -As of `prefect 2.18.x`, you can call a task from within another task: +A task can be called from within another task: ```python from prefect import task @@ -662,36 +792,3 @@ def sum_it(numbers, static_iterable): sum_it([4, 5, 6], unmapped([1, 2, 3])) ``` - -## Async tasks - -Prefect supports asynchronous task and flow definitions by default. All of -[the standard rules of async](https://docs.python.org/3/library/asyncio-task.html) apply: - -```python -import asyncio - -from prefect import task, flow - -@task -async def print_values(values): - for value in values: - await asyncio.sleep(1) # yield - print(value, end=" ") - -@flow -async def async_flow(): - await print_values([1, 2]) # runs immediately - coros = [print_values("abcd"), print_values("6789")] - - # asynchronously gather the tasks - await asyncio.gather(*coros) - -asyncio.run(async_flow()) -``` - -If you are not using `asyncio.gather`, -calling [`.submit()`](/3.0rc/develop/write-tasks/use-task-runners/#using-a-task-runner) -is required for asynchronous execution on the `ConcurrentTaskRunner`. - - diff --git a/docs/3.0rc/develop/write-workflows/index.mdx b/docs/3.0rc/develop/write-workflows/index.mdx index 61e55a34f2a7..019f5c16febd 100644 --- a/docs/3.0rc/develop/write-workflows/index.mdx +++ b/docs/3.0rc/develop/write-workflows/index.mdx @@ -5,7 +5,7 @@ description: Learn the basics of defining and running flows. Flows are the most central Prefect object. A flow is a container for workflow logic as code and allows users to configure how their workflows behave. -Flows are defined as Python functions, and any Python function is eligible to be a flow. +Flows are defined as Python functions. Almost any Python function is eligible to be a flow. They can take inputs, perform work, and return an output. You can turn any function into a Prefect flow by adding the `@flow` decorator. @@ -70,6 +70,85 @@ Forks 🍴 : 1245 As shown above, flow definitions can contain arbitrary Python logic. +## Supported functions + +Almost any standard Python function can be turned into a Prefect flow by adding the `@flow` decorator. + + +Flows are always executed in the main thread by default to facilitate native Python debugging and profiling. + + +### Synchronous functions + +The simplest Prefect flow is a synchronous Python function. Here's an example of a synchronous flow that prints a message: + +```python +from prefect import flow + +@flow +def print_message(): + print("Hello, I'm a flow") + +print_message() +``` + +### Async functions + +Prefect also supports async functions. The resulting flows are coroutines that can be awaited or run concurrently, following [the standard rules of async Python](https://docs.python.org/3/library/asyncio-task.html). + +```python +import asyncio + +from prefect import task, flow + +@task +async def print_values(values): + for value in values: + await asyncio.sleep(1) + print(value, end=" ") + +@flow +async def async_flow(): + print("Hello, I'm an async flow") + + # runs immediately + await print_values([1, 2]) + + # runs concurrently + coros = [print_values("abcd"), print_values("6789")] + await asyncio.gather(*coros) + +asyncio.run(async_flow()) +``` + +### Methods + +Prefect supports synchronous and async methods as flows, including instance methods, class methods, and static methods. For class methods and static methods, you must apply the appropriate method decorator _above_ the `@flow` decorator: + +```python +from prefect import flow + +class MyClass: + + @flow + def my_instance_method(self): + pass + + @classmethod + @flow + def my_class_method(cls): + pass + + @staticmethod + @flow + def my_static_method(): + pass + +MyClass().my_instance_method() +MyClass.my_class_method() +MyClass.my_static_method() +``` + ## Parameters As with any Python function, you can pass arguments to a flow. @@ -150,7 +229,7 @@ When you run a flow that contains tasks or additional flows, Prefect tracks the ## Writing flows -The [`@flow`] decorator is used to designate a flow: +The `@flow` decorator is used to designate a flow: ```python hl_lines="3" from prefect import flow diff --git a/tests/events/client/test_events_emit_event.py b/tests/events/client/test_events_emit_event.py index 4fc8dac1e223..4bfb1f8594cd 100644 --- a/tests/events/client/test_events_emit_event.py +++ b/tests/events/client/test_events_emit_event.py @@ -1,4 +1,4 @@ -from datetime import timedelta +from datetime import timedelta, timezone from unittest import mock from uuid import UUID @@ -35,7 +35,7 @@ def test_emits_complex_event( emit_event( event="vogon.poetry.read", resource={"prefect.resource.id": "vogon.poem.oh-freddled-gruntbuggly"}, - occurred=DateTime(2023, 3, 1, 12, 39, 28), + occurred=DateTime(2023, 3, 1, 12, 39, 28, tzinfo=timezone.utc), related=[ { "prefect.resource.id": "vogon.ship.the-business-end", @@ -53,7 +53,7 @@ def test_emits_complex_event( event = asserting_events_worker._client.events[0] assert event.event == "vogon.poetry.read" assert event.resource.id == "vogon.poem.oh-freddled-gruntbuggly" - assert event.occurred == DateTime(2023, 3, 1, 12, 39, 28) + assert event.occurred == DateTime(2023, 3, 1, 12, 39, 28, tzinfo=timezone.utc) assert len(event.related) == 1 assert event.related[0].id == "vogon.ship.the-business-end" assert event.related[0].role == "locale" diff --git a/tests/test_flows.py b/tests/test_flows.py index c315ee0fbc4b..c264c62b2c64 100644 --- a/tests/test_flows.py +++ b/tests/test_flows.py @@ -864,6 +864,46 @@ def static_method(): assert Foo.static_method() == "static" assert isinstance(Foo.static_method, Flow) + @pytest.mark.parametrize("T", [BaseFoo, BaseFooModel]) + async def test_flow_supports_async_instance_methods(self, T): + class Foo(T): + @flow + async def instance_method(self): + return self.x + + f = Foo(x=1) + assert await Foo(x=5).instance_method() == 5 + assert await f.instance_method() == 1 + assert isinstance(Foo(x=10).instance_method, Flow) + + @pytest.mark.parametrize("T", [BaseFoo, BaseFooModel]) + async def test_flow_supports_async_class_methods(self, T): + class Foo(T): + def __init__(self, x): + self.x = x + + @classmethod + @flow + async def class_method(cls): + return cls.__name__ + + assert await Foo.class_method() == "Foo" + assert isinstance(Foo.class_method, Flow) + + @pytest.mark.parametrize("T", [BaseFoo, BaseFooModel]) + async def test_flow_supports_async_static_methods(self, T): + class Foo(T): + def __init__(self, x): + self.x = x + + @staticmethod + @flow + async def static_method(): + return "static" + + assert await Foo.static_method() == "static" + assert isinstance(Foo.static_method, Flow) + def test_flow_supports_instance_methods_with_basemodel(self): class Foo(pydantic.BaseModel): model_config = pydantic.ConfigDict(ignored_types=(Flow,)) diff --git a/tests/test_tasks.py b/tests/test_tasks.py index 1ac3fbb92fb1..91d9525a7fe1 100644 --- a/tests/test_tasks.py +++ b/tests/test_tasks.py @@ -369,6 +369,42 @@ def static_method(): assert Foo.static_method() == "static" assert isinstance(Foo.static_method, Task) + @pytest.mark.parametrize("T", [BaseFoo, BaseFooModel]) + async def test_task_supports_async_instance_methods(self, T): + class Foo(T): + @task + async def instance_method(self): + return self.x + + f = Foo(x=1) + assert await Foo(x=5).instance_method() == 5 + # ensure the instance binding is not global + assert await f.instance_method() == 1 + + assert isinstance(Foo(x=10).instance_method, Task) + + @pytest.mark.parametrize("T", [BaseFoo, BaseFooModel]) + async def test_task_supports_async_class_methods(self, T): + class Foo(T): + @classmethod + @task + async def class_method(cls): + return cls.__name__ + + assert await Foo.class_method() == "Foo" + assert isinstance(Foo.class_method, Task) + + @pytest.mark.parametrize("T", [BaseFoo, BaseFooModel]) + async def test_task_supports_async_static_methods(self, T): + class Foo(T): + @staticmethod + @task + async def static_method(): + return "static" + + assert await Foo.static_method() == "static" + assert isinstance(Foo.static_method, Task) + def test_error_message_if_decorate_classmethod(self): with pytest.raises( TypeError, match="@classmethod should be applied on top of @task"