From 1c12a7e0e3e16927b202be32629ee5bcab36da9b Mon Sep 17 00:00:00 2001
From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com>
Date: Sat, 15 Jun 2024 10:08:40 -0400
Subject: [PATCH 1/5] Add documentation about supported function types
---
docs/3.0rc/develop/write-tasks/index.mdx | 161 +++++++++++++++----
docs/3.0rc/develop/write-workflows/index.mdx | 85 +++++++++-
tests/test_flows.py | 40 +++++
tests/test_tasks.py | 36 +++++
4 files changed, 286 insertions(+), 36 deletions(-)
diff --git a/docs/3.0rc/develop/write-tasks/index.mdx b/docs/3.0rc/develop/write-tasks/index.mdx
index 6db0fca147a4..63ead081a4b0 100644
--- a/docs/3.0rc/develop/write-tasks/index.mdx
+++ b/docs/3.0rc/develop/write-tasks/index.mdx
@@ -67,6 +67,132 @@ Running that flow in the terminal results in something like this:
This task run is tracked in the UI as well.
+## Supported functions
+
+Almost any standard Python function can be turned into a Prefect task by adding the `@task` decorator.
+
+
+Tasks are always executed in the main thread by default, unless a specific [task runner](/3.0rc/develop/write-tasks/use-task-runners) is used to execute them on different threads, processes, or infrastructure. This facilitates native Python debugging and profiling.
+
+
+### Synchronous functions
+
+The simplest Prefect task is a synchronous Python function. Here's an example of a synchronous task that prints a message:
+
+```python
+from prefect import task
+
+@task
+def print_message():
+ print("Hello, I'm a task")
+
+print_message()
+```
+
+### Async functions
+
+Prefect also supports asynchronous functions. The resulting tasks are coroutines that can be awaited or run concurrently, following [the standard rules of async Python](https://docs.python.org/3/library/asyncio-task.html).
+
+```python
+from prefect import task
+import asyncio
+
+@task
+async def print_message():
+ await asyncio.sleep(1)
+ print("Hello, I'm an async task")
+
+asyncio.run(print_message())
+```
+
+### Methods
+
+Prefect supports instance methods, class methods, and static methods as tasks. For class methods and static methods, you must apply the appropriate method decorator _above_ the `@task` decorator:
+
+```python
+from prefect import task
+
+class MyClass:
+
+ @task
+ def my_instance_method(self):
+ pass
+
+ @classmethod
+ @task
+ def my_class_method(cls):
+ pass
+
+ @staticmethod
+ @task
+ def my_static_method():
+ pass
+
+MyClass().my_instance_method()
+MyClass.my_class_method()
+MyClass.my_static_method()
+```
+
+### Generators
+
+Prefect supports synchronous and asynchronous generators as tasks. The task is considered to be `Running` as long as the generator is yielding values. When the generator is exhausted, the task is considered `Completed`. Any values yielded by the generator can be consumed by other tasks, and they will automatically record the generator task as their parent.
+
+```python
+from prefect import task
+
+@task
+def generator():
+ for i in range(10):
+ yield i
+
+@task
+def consumer(x):
+ print(x)
+
+for val in generator():
+ consumer(val)
+```
+
+
+**Generator functions are consumed when returned from tasks**
+
+The result of a completed task must be serializable, but generators can not be serialized. Therefore, if you return a generator from a task, the generator will be fully consumed and its yielded values will be returned as a list. This can lead to unexpected behavior or blocking if the generator is infinite or very large.
+
+Here is an example of proactive generator consumption:
+
+```python
+from prefect import task
+
+def gen():
+ yield from [1, 2, 3]
+ print('Generator consumed!')
+
+@task
+def f():
+ return gen()
+
+f() # prints 'Generator consumed!'
+```
+
+If you need to return a generator without consuming it, you can `yield` it instead of using `return`. Values yielded from generator tasks are not considered final results and do not face the same serialization constraints:
+
+```python
+from prefect import task
+
+def gen():
+ yield from [1, 2, 3]
+ print('Generator consumed!')
+
+@task
+def f():
+ yield gen()
+
+generator = next(f())
+list(generator) # prints 'Generator consumed!'
+
+```
+
+
## Concurrency
Tasks enable concurrency, allowing you to execute multiple tasks asynchronously.
@@ -190,7 +316,7 @@ def my_flow():
**Call a task from another task**
-As of `prefect 2.18.x`, you can call a task from within another task:
+Task can be called from within another task:
```python
from prefect import task
@@ -662,36 +788,3 @@ def sum_it(numbers, static_iterable):
sum_it([4, 5, 6], unmapped([1, 2, 3]))
```
-
-## Async tasks
-
-Prefect supports asynchronous task and flow definitions by default. All of
-[the standard rules of async](https://docs.python.org/3/library/asyncio-task.html) apply:
-
-```python
-import asyncio
-
-from prefect import task, flow
-
-@task
-async def print_values(values):
- for value in values:
- await asyncio.sleep(1) # yield
- print(value, end=" ")
-
-@flow
-async def async_flow():
- await print_values([1, 2]) # runs immediately
- coros = [print_values("abcd"), print_values("6789")]
-
- # asynchronously gather the tasks
- await asyncio.gather(*coros)
-
-asyncio.run(async_flow())
-```
-
-If you are not using `asyncio.gather`,
-calling [`.submit()`](/3.0rc/develop/write-tasks/use-task-runners/#using-a-task-runner)
-is required for asynchronous execution on the `ConcurrentTaskRunner`.
-
-
diff --git a/docs/3.0rc/develop/write-workflows/index.mdx b/docs/3.0rc/develop/write-workflows/index.mdx
index 1c25ff05c9a0..27b30f9ca34a 100644
--- a/docs/3.0rc/develop/write-workflows/index.mdx
+++ b/docs/3.0rc/develop/write-workflows/index.mdx
@@ -5,7 +5,7 @@ description: Learn the basics of defining and running flows.
Flows are the most central Prefect object. A flow is a container for workflow logic as code and allows users to configure
how their workflows behave.
-Flows are defined as Python functions, and any Python function is eligible to be a flow.
+Flows are defined as Python functions, and almost any Python function is eligible to be a flow.
They can take inputs, perform work, and return an output.
You can turn any function into a Prefect flow by adding the `@flow` decorator.
@@ -70,6 +70,87 @@ Forks 🍴 : 1245
As shown above, flow definitions can contain arbitrary Python logic.
+## Supported functions
+
+Almost any standard Python function can be turned into a Prefect flow by adding the `@flow` decorator.
+
+
+Flows are always executed in the main thread by default to facilitate native Python debugging and profiling.
+
+
+### Synchronous functions
+
+The simplest Prefect flow is a synchronous Python function. Here's an example of a synchronous flow that prints a message:
+
+```python
+from prefect import flow
+
+@flow
+def print_message():
+ print("Hello, I'm a flow")
+
+print_message()
+```
+
+### Async functions
+
+Prefect also supports asynchronous functions. The resulting flows are coroutines that can be awaited or run concurrently, following [the standard rules of async Python](https://docs.python.org/3/library/asyncio-task.html).
+
+
+
+```python
+import asyncio
+
+from prefect import task, flow
+
+@task
+async def print_values(values):
+ for value in values:
+ await asyncio.sleep(1)
+ print(value, end=" ")
+
+@flow
+async def async_flow():
+ print("Hello, I'm an async flow")
+
+ # runs immediately
+ await print_values([1, 2])
+
+ # runs concurrently
+ coros = [print_values("abcd"), print_values("6789")]
+ await asyncio.gather(*coros)
+
+asyncio.run(async_flow())
+```
+
+### Methods
+
+Prefect supports instance methods, class methods, and static methods as flows. For class methods and static methods, you must apply the appropriate method decorator _above_ the `@flow` decorator:
+
+```python
+from prefect import flow
+
+class MyClass:
+
+ @flow
+ def my_instance_method(self):
+ pass
+
+ @classmethod
+ @flow
+ def my_class_method(cls):
+ pass
+
+ @staticmethod
+ @flow
+ def my_static_method():
+ pass
+
+MyClass().my_instance_method()
+MyClass.my_class_method()
+MyClass.my_static_method()
+```
+
## Parameters
As with any Python function, you can pass arguments to a flow.
@@ -150,7 +231,7 @@ When you run a flow that contains tasks or additional flows, Prefect tracks the
## Writing flows
-The [`@flow`] decorator is used to designate a flow:
+The `@flow` decorator is used to designate a flow:
```python hl_lines="3"
from prefect import flow
diff --git a/tests/test_flows.py b/tests/test_flows.py
index c315ee0fbc4b..c264c62b2c64 100644
--- a/tests/test_flows.py
+++ b/tests/test_flows.py
@@ -864,6 +864,46 @@ def static_method():
assert Foo.static_method() == "static"
assert isinstance(Foo.static_method, Flow)
+ @pytest.mark.parametrize("T", [BaseFoo, BaseFooModel])
+ async def test_flow_supports_async_instance_methods(self, T):
+ class Foo(T):
+ @flow
+ async def instance_method(self):
+ return self.x
+
+ f = Foo(x=1)
+ assert await Foo(x=5).instance_method() == 5
+ assert await f.instance_method() == 1
+ assert isinstance(Foo(x=10).instance_method, Flow)
+
+ @pytest.mark.parametrize("T", [BaseFoo, BaseFooModel])
+ async def test_flow_supports_async_class_methods(self, T):
+ class Foo(T):
+ def __init__(self, x):
+ self.x = x
+
+ @classmethod
+ @flow
+ async def class_method(cls):
+ return cls.__name__
+
+ assert await Foo.class_method() == "Foo"
+ assert isinstance(Foo.class_method, Flow)
+
+ @pytest.mark.parametrize("T", [BaseFoo, BaseFooModel])
+ async def test_flow_supports_async_static_methods(self, T):
+ class Foo(T):
+ def __init__(self, x):
+ self.x = x
+
+ @staticmethod
+ @flow
+ async def static_method():
+ return "static"
+
+ assert await Foo.static_method() == "static"
+ assert isinstance(Foo.static_method, Flow)
+
def test_flow_supports_instance_methods_with_basemodel(self):
class Foo(pydantic.BaseModel):
model_config = pydantic.ConfigDict(ignored_types=(Flow,))
diff --git a/tests/test_tasks.py b/tests/test_tasks.py
index 1ac3fbb92fb1..91d9525a7fe1 100644
--- a/tests/test_tasks.py
+++ b/tests/test_tasks.py
@@ -369,6 +369,42 @@ def static_method():
assert Foo.static_method() == "static"
assert isinstance(Foo.static_method, Task)
+ @pytest.mark.parametrize("T", [BaseFoo, BaseFooModel])
+ async def test_task_supports_async_instance_methods(self, T):
+ class Foo(T):
+ @task
+ async def instance_method(self):
+ return self.x
+
+ f = Foo(x=1)
+ assert await Foo(x=5).instance_method() == 5
+ # ensure the instance binding is not global
+ assert await f.instance_method() == 1
+
+ assert isinstance(Foo(x=10).instance_method, Task)
+
+ @pytest.mark.parametrize("T", [BaseFoo, BaseFooModel])
+ async def test_task_supports_async_class_methods(self, T):
+ class Foo(T):
+ @classmethod
+ @task
+ async def class_method(cls):
+ return cls.__name__
+
+ assert await Foo.class_method() == "Foo"
+ assert isinstance(Foo.class_method, Task)
+
+ @pytest.mark.parametrize("T", [BaseFoo, BaseFooModel])
+ async def test_task_supports_async_static_methods(self, T):
+ class Foo(T):
+ @staticmethod
+ @task
+ async def static_method():
+ return "static"
+
+ assert await Foo.static_method() == "static"
+ assert isinstance(Foo.static_method, Task)
+
def test_error_message_if_decorate_classmethod(self):
with pytest.raises(
TypeError, match="@classmethod should be applied on top of @task"
From 8a2f8096d422737c61cb720228fa444c9bd1459c Mon Sep 17 00:00:00 2001
From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com>
Date: Sat, 15 Jun 2024 10:21:15 -0400
Subject: [PATCH 2/5] Note async support
---
docs/3.0rc/develop/write-tasks/index.mdx | 6 +++---
docs/3.0rc/develop/write-workflows/index.mdx | 6 ++----
2 files changed, 5 insertions(+), 7 deletions(-)
diff --git a/docs/3.0rc/develop/write-tasks/index.mdx b/docs/3.0rc/develop/write-tasks/index.mdx
index 63ead081a4b0..35bae1c806be 100644
--- a/docs/3.0rc/develop/write-tasks/index.mdx
+++ b/docs/3.0rc/develop/write-tasks/index.mdx
@@ -91,7 +91,7 @@ print_message()
### Async functions
-Prefect also supports asynchronous functions. The resulting tasks are coroutines that can be awaited or run concurrently, following [the standard rules of async Python](https://docs.python.org/3/library/asyncio-task.html).
+Prefect also supports async functions. The resulting tasks are coroutines that can be awaited or run concurrently, following [the standard rules of async Python](https://docs.python.org/3/library/asyncio-task.html).
```python
from prefect import task
@@ -107,7 +107,7 @@ asyncio.run(print_message())
### Methods
-Prefect supports instance methods, class methods, and static methods as tasks. For class methods and static methods, you must apply the appropriate method decorator _above_ the `@task` decorator:
+Prefect supports snchronous and async methods as tasks, including instance methods, class methods, and static methods. For class methods and static methods, you must apply the appropriate method decorator _above_ the `@task` decorator:
```python
from prefect import task
@@ -135,7 +135,7 @@ MyClass.my_static_method()
### Generators
-Prefect supports synchronous and asynchronous generators as tasks. The task is considered to be `Running` as long as the generator is yielding values. When the generator is exhausted, the task is considered `Completed`. Any values yielded by the generator can be consumed by other tasks, and they will automatically record the generator task as their parent.
+Prefect supports synchronous and async generators as tasks. The task is considered to be `Running` as long as the generator is yielding values. When the generator is exhausted, the task is considered `Completed`. Any values yielded by the generator can be consumed by other tasks, and they will automatically record the generator task as their parent.
```python
from prefect import task
diff --git a/docs/3.0rc/develop/write-workflows/index.mdx b/docs/3.0rc/develop/write-workflows/index.mdx
index 27b30f9ca34a..0b19e0cea034 100644
--- a/docs/3.0rc/develop/write-workflows/index.mdx
+++ b/docs/3.0rc/develop/write-workflows/index.mdx
@@ -94,9 +94,7 @@ print_message()
### Async functions
-Prefect also supports asynchronous functions. The resulting flows are coroutines that can be awaited or run concurrently, following [the standard rules of async Python](https://docs.python.org/3/library/asyncio-task.html).
-
-
+Prefect also supports async functions. The resulting flows are coroutines that can be awaited or run concurrently, following [the standard rules of async Python](https://docs.python.org/3/library/asyncio-task.html).
```python
import asyncio
@@ -125,7 +123,7 @@ asyncio.run(async_flow())
### Methods
-Prefect supports instance methods, class methods, and static methods as flows. For class methods and static methods, you must apply the appropriate method decorator _above_ the `@flow` decorator:
+Prefect supports synchronous and async methods as flows, including instance methods, class methods, and static methods. For class methods and static methods, you must apply the appropriate method decorator _above_ the `@flow` decorator:
```python
from prefect import flow
From 39008a48c4e68484dac86beccf84177171b01d08 Mon Sep 17 00:00:00 2001
From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com>
Date: Sat, 15 Jun 2024 13:07:49 -0400
Subject: [PATCH 3/5] Update test_events_emit_event.py
---
tests/events/client/test_events_emit_event.py | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/tests/events/client/test_events_emit_event.py b/tests/events/client/test_events_emit_event.py
index 4fc8dac1e223..4bfb1f8594cd 100644
--- a/tests/events/client/test_events_emit_event.py
+++ b/tests/events/client/test_events_emit_event.py
@@ -1,4 +1,4 @@
-from datetime import timedelta
+from datetime import timedelta, timezone
from unittest import mock
from uuid import UUID
@@ -35,7 +35,7 @@ def test_emits_complex_event(
emit_event(
event="vogon.poetry.read",
resource={"prefect.resource.id": "vogon.poem.oh-freddled-gruntbuggly"},
- occurred=DateTime(2023, 3, 1, 12, 39, 28),
+ occurred=DateTime(2023, 3, 1, 12, 39, 28, tzinfo=timezone.utc),
related=[
{
"prefect.resource.id": "vogon.ship.the-business-end",
@@ -53,7 +53,7 @@ def test_emits_complex_event(
event = asserting_events_worker._client.events[0]
assert event.event == "vogon.poetry.read"
assert event.resource.id == "vogon.poem.oh-freddled-gruntbuggly"
- assert event.occurred == DateTime(2023, 3, 1, 12, 39, 28)
+ assert event.occurred == DateTime(2023, 3, 1, 12, 39, 28, tzinfo=timezone.utc)
assert len(event.related) == 1
assert event.related[0].id == "vogon.ship.the-business-end"
assert event.related[0].role == "locale"
From d9d61236401c90c7323d31787abc2384c48e342a Mon Sep 17 00:00:00 2001
From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com>
Date: Sat, 15 Jun 2024 13:08:54 -0400
Subject: [PATCH 4/5] Update docs/3.0rc/develop/write-tasks/index.mdx
Co-authored-by: Bill Palombi
---
docs/3.0rc/develop/write-tasks/index.mdx | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/docs/3.0rc/develop/write-tasks/index.mdx b/docs/3.0rc/develop/write-tasks/index.mdx
index 35bae1c806be..cea67f0ec447 100644
--- a/docs/3.0rc/develop/write-tasks/index.mdx
+++ b/docs/3.0rc/develop/write-tasks/index.mdx
@@ -91,7 +91,8 @@ print_message()
### Async functions
-Prefect also supports async functions. The resulting tasks are coroutines that can be awaited or run concurrently, following [the standard rules of async Python](https://docs.python.org/3/library/asyncio-task.html).
+Prefect also supports async Python functions.
+The resulting tasks are coroutines that can be awaited or run concurrently, following [standard async Python behavior](https://docs.python.org/3/library/asyncio-task.html).
```python
from prefect import task
From ac47061813bf27986946285c8ede63d2bbf06d5e Mon Sep 17 00:00:00 2001
From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com>
Date: Sat, 15 Jun 2024 13:09:29 -0400
Subject: [PATCH 5/5] Apply suggestions from code review
Co-authored-by: Bill Palombi
---
docs/3.0rc/develop/write-tasks/index.mdx | 9 ++++++---
docs/3.0rc/develop/write-workflows/index.mdx | 2 +-
2 files changed, 7 insertions(+), 4 deletions(-)
diff --git a/docs/3.0rc/develop/write-tasks/index.mdx b/docs/3.0rc/develop/write-tasks/index.mdx
index cea67f0ec447..17dcedf5f1de 100644
--- a/docs/3.0rc/develop/write-tasks/index.mdx
+++ b/docs/3.0rc/develop/write-tasks/index.mdx
@@ -157,7 +157,9 @@ for val in generator():
**Generator functions are consumed when returned from tasks**
-The result of a completed task must be serializable, but generators can not be serialized. Therefore, if you return a generator from a task, the generator will be fully consumed and its yielded values will be returned as a list. This can lead to unexpected behavior or blocking if the generator is infinite or very large.
+The result of a completed task must be serializable, but generators cannot be serialized.
+Therefore, if you return a generator from a task, the generator will be fully consumed and its yielded values will be returned as a list.
+This can lead to unexpected behavior or blocking if the generator is infinite or very large.
Here is an example of proactive generator consumption:
@@ -175,7 +177,8 @@ def f():
f() # prints 'Generator consumed!'
```
-If you need to return a generator without consuming it, you can `yield` it instead of using `return`. Values yielded from generator tasks are not considered final results and do not face the same serialization constraints:
+If you need to return a generator without consuming it, you can `yield` it instead of using `return`.
+Values yielded from generator tasks are not considered final results and do not face the same serialization constraints:
```python
from prefect import task
@@ -317,7 +320,7 @@ def my_flow():
**Call a task from another task**
-Task can be called from within another task:
+A task can be called from within another task:
```python
from prefect import task
diff --git a/docs/3.0rc/develop/write-workflows/index.mdx b/docs/3.0rc/develop/write-workflows/index.mdx
index d6a5486a6ab8..019f5c16febd 100644
--- a/docs/3.0rc/develop/write-workflows/index.mdx
+++ b/docs/3.0rc/develop/write-workflows/index.mdx
@@ -5,7 +5,7 @@ description: Learn the basics of defining and running flows.
Flows are the most central Prefect object. A flow is a container for workflow logic as code and allows users to configure
how their workflows behave.
-Flows are defined as Python functions, and almost any Python function is eligible to be a flow.
+Flows are defined as Python functions. Almost any Python function is eligible to be a flow.
They can take inputs, perform work, and return an output.
You can turn any function into a Prefect flow by adding the `@flow` decorator.