Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add documentation about supported function types #14060

Merged
merged 7 commits into from
Jun 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 131 additions & 34 deletions docs/3.0rc/develop/write-tasks/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,136 @@ Running that flow in the terminal results in something like this:

This task run is tracked in the UI as well.

## Supported functions

Almost any standard Python function can be turned into a Prefect task by adding the `@task` decorator.

<Tip>
Tasks are always executed in the main thread by default, unless a specific [task runner](/3.0rc/develop/write-tasks/use-task-runners) is used to execute them on different threads, processes, or infrastructure. This facilitates native Python debugging and profiling.
</Tip>

### Synchronous functions

The simplest Prefect task is a synchronous Python function. Here's an example of a synchronous task that prints a message:

```python
from prefect import task

@task
def print_message():
print("Hello, I'm a task")

print_message()
```

### Async functions

Prefect also supports async Python functions.
The resulting tasks are coroutines that can be awaited or run concurrently, following [standard async Python behavior](https://docs.python.org/3/library/asyncio-task.html).

```python
from prefect import task
import asyncio

@task
async def print_message():
await asyncio.sleep(1)
print("Hello, I'm an async task")

asyncio.run(print_message())
```

### Methods

Prefect supports snchronous and async methods as tasks, including instance methods, class methods, and static methods. For class methods and static methods, you must apply the appropriate method decorator _above_ the `@task` decorator:

```python
from prefect import task

class MyClass:

@task
def my_instance_method(self):
pass

@classmethod
@task
def my_class_method(cls):
pass

@staticmethod
@task
def my_static_method():
pass

MyClass().my_instance_method()
MyClass.my_class_method()
MyClass.my_static_method()
```

### Generators

Prefect supports synchronous and async generators as tasks. The task is considered to be `Running` as long as the generator is yielding values. When the generator is exhausted, the task is considered `Completed`. Any values yielded by the generator can be consumed by other tasks, and they will automatically record the generator task as their parent.

```python
from prefect import task

@task
def generator():
for i in range(10):
yield i

@task
def consumer(x):
print(x)

for val in generator():
consumer(val)
```

<Warning>
**Generator functions are consumed when returned from tasks**

The result of a completed task must be serializable, but generators cannot be serialized.
Therefore, if you return a generator from a task, the generator will be fully consumed and its yielded values will be returned as a list.
This can lead to unexpected behavior or blocking if the generator is infinite or very large.

Here is an example of proactive generator consumption:

```python
from prefect import task

def gen():
yield from [1, 2, 3]
print('Generator consumed!')

@task
def f():
return gen()

f() # prints 'Generator consumed!'
```

If you need to return a generator without consuming it, you can `yield` it instead of using `return`.
Values yielded from generator tasks are not considered final results and do not face the same serialization constraints:

```python
from prefect import task

def gen():
yield from [1, 2, 3]
print('Generator consumed!')

@task
def f():
yield gen()

generator = next(f())
list(generator) # prints 'Generator consumed!'

```
</Warning>

## Concurrency

Tasks enable concurrency, allowing you to execute multiple tasks asynchronously.
Expand Down Expand Up @@ -190,7 +320,7 @@ def my_flow():

**Call a task from another task**

As of `prefect 2.18.x`, you can call a task from within another task:
A task can be called from within another task:

```python
from prefect import task
Expand Down Expand Up @@ -662,36 +792,3 @@ def sum_it(numbers, static_iterable):

sum_it([4, 5, 6], unmapped([1, 2, 3]))
```

## Async tasks

Prefect supports asynchronous task and flow definitions by default. All of
[the standard rules of async](https://docs.python.org/3/library/asyncio-task.html) apply:

```python
import asyncio

from prefect import task, flow

@task
async def print_values(values):
for value in values:
await asyncio.sleep(1) # yield
print(value, end=" ")

@flow
async def async_flow():
await print_values([1, 2]) # runs immediately
coros = [print_values("abcd"), print_values("6789")]

# asynchronously gather the tasks
await asyncio.gather(*coros)

asyncio.run(async_flow())
```

If you are not using `asyncio.gather`,
calling [`.submit()`](/3.0rc/develop/write-tasks/use-task-runners/#using-a-task-runner)
is required for asynchronous execution on the `ConcurrentTaskRunner`.


83 changes: 81 additions & 2 deletions docs/3.0rc/develop/write-workflows/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ description: Learn the basics of defining and running flows.

Flows are the most central Prefect object. A flow is a container for workflow logic as code and allows users to configure
how their workflows behave.
Flows are defined as Python functions, and any Python function is eligible to be a flow.
Flows are defined as Python functions. Almost any Python function is eligible to be a flow.

They can take inputs, perform work, and return an output.
You can turn any function into a Prefect flow by adding the `@flow` decorator.
Expand Down Expand Up @@ -70,6 +70,85 @@ Forks 🍴 : 1245
As shown above, flow definitions can contain arbitrary Python logic.
</Tip>

## Supported functions

Almost any standard Python function can be turned into a Prefect flow by adding the `@flow` decorator.

<Tip>
Flows are always executed in the main thread by default to facilitate native Python debugging and profiling.
</Tip>

### Synchronous functions

The simplest Prefect flow is a synchronous Python function. Here's an example of a synchronous flow that prints a message:

```python
from prefect import flow

@flow
def print_message():
print("Hello, I'm a flow")

print_message()
```

### Async functions

Prefect also supports async functions. The resulting flows are coroutines that can be awaited or run concurrently, following [the standard rules of async Python](https://docs.python.org/3/library/asyncio-task.html).

```python
import asyncio

from prefect import task, flow

@task
async def print_values(values):
for value in values:
await asyncio.sleep(1)
print(value, end=" ")

@flow
async def async_flow():
print("Hello, I'm an async flow")

# runs immediately
await print_values([1, 2])

# runs concurrently
coros = [print_values("abcd"), print_values("6789")]
await asyncio.gather(*coros)

asyncio.run(async_flow())
```

### Methods

Prefect supports synchronous and async methods as flows, including instance methods, class methods, and static methods. For class methods and static methods, you must apply the appropriate method decorator _above_ the `@flow` decorator:

```python
from prefect import flow

class MyClass:

@flow
def my_instance_method(self):
pass

@classmethod
@flow
def my_class_method(cls):
pass

@staticmethod
@flow
def my_static_method():
pass

MyClass().my_instance_method()
MyClass.my_class_method()
MyClass.my_static_method()
```

## Parameters

As with any Python function, you can pass arguments to a flow.
Expand Down Expand Up @@ -150,7 +229,7 @@ When you run a flow that contains tasks or additional flows, Prefect tracks the

## Writing flows

The [`@flow`] decorator is used to designate a flow:
The `@flow` decorator is used to designate a flow:

```python hl_lines="3"
from prefect import flow
Expand Down
6 changes: 3 additions & 3 deletions tests/events/client/test_events_emit_event.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import timedelta
from datetime import timedelta, timezone
from unittest import mock
from uuid import UUID

Expand Down Expand Up @@ -35,7 +35,7 @@ def test_emits_complex_event(
emit_event(
event="vogon.poetry.read",
resource={"prefect.resource.id": "vogon.poem.oh-freddled-gruntbuggly"},
occurred=DateTime(2023, 3, 1, 12, 39, 28),
occurred=DateTime(2023, 3, 1, 12, 39, 28, tzinfo=timezone.utc),
related=[
{
"prefect.resource.id": "vogon.ship.the-business-end",
Expand All @@ -53,7 +53,7 @@ def test_emits_complex_event(
event = asserting_events_worker._client.events[0]
assert event.event == "vogon.poetry.read"
assert event.resource.id == "vogon.poem.oh-freddled-gruntbuggly"
assert event.occurred == DateTime(2023, 3, 1, 12, 39, 28)
assert event.occurred == DateTime(2023, 3, 1, 12, 39, 28, tzinfo=timezone.utc)
assert len(event.related) == 1
assert event.related[0].id == "vogon.ship.the-business-end"
assert event.related[0].role == "locale"
Expand Down
40 changes: 40 additions & 0 deletions tests/test_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,46 @@ def static_method():
assert Foo.static_method() == "static"
assert isinstance(Foo.static_method, Flow)

@pytest.mark.parametrize("T", [BaseFoo, BaseFooModel])
async def test_flow_supports_async_instance_methods(self, T):
class Foo(T):
@flow
async def instance_method(self):
return self.x

f = Foo(x=1)
assert await Foo(x=5).instance_method() == 5
assert await f.instance_method() == 1
assert isinstance(Foo(x=10).instance_method, Flow)

@pytest.mark.parametrize("T", [BaseFoo, BaseFooModel])
async def test_flow_supports_async_class_methods(self, T):
class Foo(T):
def __init__(self, x):
self.x = x

@classmethod
@flow
async def class_method(cls):
return cls.__name__

assert await Foo.class_method() == "Foo"
assert isinstance(Foo.class_method, Flow)

@pytest.mark.parametrize("T", [BaseFoo, BaseFooModel])
async def test_flow_supports_async_static_methods(self, T):
class Foo(T):
def __init__(self, x):
self.x = x

@staticmethod
@flow
async def static_method():
return "static"

assert await Foo.static_method() == "static"
assert isinstance(Foo.static_method, Flow)

def test_flow_supports_instance_methods_with_basemodel(self):
class Foo(pydantic.BaseModel):
model_config = pydantic.ConfigDict(ignored_types=(Flow,))
Expand Down
Loading