From ea2dfeafcd7b2d17ad396eac7a4a1cd421f3ec45 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com> Date: Sat, 15 Jun 2024 10:35:00 -0400 Subject: [PATCH 1/4] Add support for generator flows --- src/prefect/flow_engine.py | 84 ++++++++++- src/prefect/flows.py | 14 +- tests/test_flow_engine.py | 296 +++++++++++++++++++++++++++++++++++++ tests/test_task_engine.py | 28 ++++ 4 files changed, 418 insertions(+), 4 deletions(-) diff --git a/src/prefect/flow_engine.py b/src/prefect/flow_engine.py index 086dcfbbf657..60e7a1cc2541 100644 --- a/src/prefect/flow_engine.py +++ b/src/prefect/flow_engine.py @@ -6,6 +6,7 @@ from dataclasses import dataclass, field from typing import ( Any, + AsyncGenerator, Callable, Coroutine, Dict, @@ -50,12 +51,13 @@ return_value_to_state, ) from prefect.utilities.asyncutils import run_coro_as_sync -from prefect.utilities.callables import call_with_parameters +from prefect.utilities.callables import call_with_parameters, parameters_to_args_kwargs from prefect.utilities.collections import visit_collection from prefect.utilities.engine import ( _get_hook_name, _resolve_custom_flow_run_name, capture_sigterm, + link_state_to_result, propose_state_sync, resolve_to_final_result, ) @@ -632,6 +634,80 @@ async def run_flow_async( return engine.state if return_type == "state" else engine.result() +def run_generator_flow_sync( + flow: Flow[P, R], + flow_run: Optional[FlowRun] = None, + parameters: Optional[Dict[str, Any]] = None, + wait_for: Optional[Iterable[PrefectFuture]] = None, + return_type: Literal["state", "result"] = "result", +) -> Generator[R, None, None]: + if return_type != "result": + raise ValueError("The return_type for a generator flow must be 'result'") + + engine = FlowRunEngine[P, R]( + flow=flow, parameters=parameters, flow_run=flow_run, wait_for=wait_for + ) + + with engine.start(): + while engine.is_running(): + with engine.run_context(): + call_args, call_kwargs = parameters_to_args_kwargs( + flow.fn, engine.parameters or {} + ) + gen = flow.fn(*call_args, **call_kwargs) + try: + while True: + gen_result = next(gen) + # link the current state to the result for dependency tracking + link_state_to_result(engine.state, gen_result) + yield gen_result + except StopIteration as exc: + engine.handle_success(exc.value) + except GeneratorExit as exc: + engine.handle_success(None) + gen.throw(exc) + + return engine.result() + + +async def run_generator_flow_async( + flow: Flow[P, R], + flow_run: Optional[FlowRun] = None, + parameters: Optional[Dict[str, Any]] = None, + wait_for: Optional[Iterable[PrefectFuture]] = None, + return_type: Literal["state", "result"] = "result", +) -> AsyncGenerator[R, None]: + if return_type != "result": + raise ValueError("The return_type for a generator flow must be 'result'") + + engine = FlowRunEngine[P, R]( + flow=flow, parameters=parameters, flow_run=flow_run, wait_for=wait_for + ) + + with engine.start(): + while engine.is_running(): + with engine.run_context(): + call_args, call_kwargs = parameters_to_args_kwargs( + flow.fn, engine.parameters or {} + ) + gen = flow.fn(*call_args, **call_kwargs) + try: + while True: + # can't use anext in Python < 3.10 + gen_result = await gen.__anext__() + # link the current state to the result for dependency tracking + link_state_to_result(engine.state, gen_result) + yield gen_result + except (StopAsyncIteration, GeneratorExit) as exc: + engine.handle_success(None) + if isinstance(exc, GeneratorExit): + gen.throw(exc) + + # async generators can't return, but we can raise failures here + if engine.state.is_failed(): + engine.result() + + def run_flow( flow: Flow[P, R], flow_run: Optional[FlowRun] = None, @@ -646,7 +722,11 @@ def run_flow( wait_for=wait_for, return_type=return_type, ) - if flow.isasync: + if flow.isasync and flow.isgenerator: + return run_generator_flow_async(**kwargs) + elif flow.isgenerator: + return run_generator_flow_sync(**kwargs) + elif flow.isasync: return run_flow_async(**kwargs) else: return run_flow_sync(**kwargs) diff --git a/src/prefect/flows.py b/src/prefect/flows.py index 4b8491fed6e9..e678950bd1ec 100644 --- a/src/prefect/flows.py +++ b/src/prefect/flows.py @@ -89,7 +89,6 @@ from prefect.types import BANNED_CHARACTERS, WITHOUT_BANNED_CHARACTERS from prefect.utilities.annotations import NotSet from prefect.utilities.asyncutils import ( - is_async_fn, run_sync_in_worker_thread, sync_compatible, ) @@ -289,7 +288,18 @@ def __init__( self.description = description or inspect.getdoc(fn) update_wrapper(self, fn) self.fn = fn - self.isasync = is_async_fn(self.fn) + + # the flow is considered async if its function is async or an async + # generator + self.isasync = inspect.iscoroutinefunction( + self.fn + ) or inspect.isasyncgenfunction(self.fn) + + # the flow is considered a generator if its function is a generator or + # an async generator + self.isgenerator = inspect.isgeneratorfunction( + self.fn + ) or inspect.isasyncgenfunction(self.fn) raise_for_reserved_arguments(self.fn, ["return_state", "wait_for"]) diff --git a/tests/test_flow_engine.py b/tests/test_flow_engine.py index b5df9845ff26..73be3a549c6e 100644 --- a/tests/test_flow_engine.py +++ b/tests/test_flow_engine.py @@ -1370,3 +1370,299 @@ async def suspending_flow(): ) assert age == 42 + + +class TestGenerators: + async def test_generator_flow(self): + """ + Test for generator behavior including StopIteration + """ + + @flow + def g(): + yield 1 + yield 2 + + gen = g() + assert next(gen) == 1 + assert next(gen) == 2 + with pytest.raises(StopIteration): + next(gen) + + async def test_generator_flow_requires_return_type_result(self): + @flow + def g(): + yield 1 + + with pytest.raises( + ValueError, match="The return_type for a generator flow must be 'result'" + ): + for i in g(return_state=True): + pass + + async def test_generator_flow_states(self, prefect_client: PrefectClient): + """ + Test for generator behavior including StopIteration + """ + + @flow + def g(): + yield FlowRunContext.get().flow_run.id + yield 2 + + gen = g() + tr_id = next(gen) + tr = await prefect_client.read_flow_run(tr_id) + assert tr.state.is_running() + + # exhaust the generator + for _ in gen: + pass + + tr = await prefect_client.read_flow_run(tr_id) + assert tr.state.is_completed() + + async def test_generator_flow_with_return(self): + """ + If a generator returns, the return value is trapped + in its StopIteration error + """ + + @flow + def g(): + yield 1 + return 2 + + gen = g() + assert next(gen) == 1 + with pytest.raises(StopIteration) as exc_info: + next(gen) + assert exc_info.value.value == 2 + + async def test_generator_flow_with_exception(self): + @flow + def g(): + yield 1 + raise ValueError("xyz") + + gen = g() + assert next(gen) == 1 + with pytest.raises(ValueError, match="xyz"): + next(gen) + + async def test_generator_flow_with_exception_is_failed( + self, prefect_client: PrefectClient + ): + @task + def g(): + yield TaskRunContext.get().task_run.id + raise ValueError("xyz") + + gen = g() + tr_id = next(gen) + with pytest.raises(ValueError, match="xyz"): + next(gen) + tr = await prefect_client.read_task_run(tr_id) + assert tr.state.is_failed() + + async def test_generator_retries(self): + """ + Test that a generator can retry and will re-emit its events + """ + + @flow(retries=2) + def g(): + yield 1 + yield 2 + raise ValueError() + + values = [] + try: + for v in g(): + values.append(v) + except ValueError: + pass + assert values == [1, 2, 1, 2, 1, 2] + + async def test_generator_timeout(self): + """ + Test that a generator can timeout + """ + + @flow(timeout_seconds=0.1) + def g(): + yield 1 + time.sleep(2) + yield 2 + + values = [] + with pytest.raises(TimeoutError): + for v in g(): + values.append(v) + assert values == [1] + + async def test_generator_doesnt_retry_on_generator_exception(self): + """ + Test that a generator doesn't retry for normal generator exceptions like StopIteration + """ + + @flow(retries=2) + def g(): + yield 1 + yield 2 + + values = [] + try: + for v in g(): + values.append(v) + except ValueError: + pass + assert values == [1, 2] + + +class TestAsyncGenerators: + async def test_generator_flow(self): + """ + Test for generator behavior including StopIteration + """ + + @flow + async def g(): + yield 1 + yield 2 + + counter = 0 + async for val in g(): + if counter == 0: + assert val == 1 + if counter == 1: + assert val == 2 + assert counter <= 1 + counter += 1 + + async def test_generator_flow_requires_return_type_result(self): + @flow + async def g(): + yield 1 + + with pytest.raises( + ValueError, match="The return_type for a generator flow must be 'result'" + ): + async for i in g(return_state=True): + pass + + async def test_generator_flow_states(self, prefect_client: PrefectClient): + """ + Test for generator behavior including StopIteration + """ + + @flow + async def g(): + yield FlowRunContext.get().flow_run.id + + async for val in g(): + tr_id = val + tr = await prefect_client.read_flow_run(tr_id) + assert tr.state.is_running() + + tr = await prefect_client.read_flow_run(tr_id) + assert tr.state.is_completed() + + async def test_generator_flow_with_exception(self): + @flow + async def g(): + yield 1 + raise ValueError("xyz") + + with pytest.raises(ValueError, match="xyz"): + async for val in g(): + assert val == 1 + + async def test_generator_flow_with_exception_is_failed( + self, prefect_client: PrefectClient + ): + @flow + async def g(): + yield FlowRunContext.get().flow_run.id + raise ValueError("xyz") + + with pytest.raises(ValueError, match="xyz"): + async for val in g(): + tr_id = val + + tr = await prefect_client.read_flow_run(tr_id) + assert tr.state.is_failed() + + async def test_generator_retries(self): + """ + Test that a generator can retry and will re-emit its events + """ + + @flow(retries=2) + async def g(): + yield 1 + yield 2 + raise ValueError() + + values = [] + try: + async for v in g(): + values.append(v) + except ValueError: + pass + assert values == [1, 2, 1, 2, 1, 2] + + @pytest.mark.xfail( + reason="Synchronous sleep in an async flow is not interruptible by async timeout" + ) + async def test_generator_timeout_with_sync_sleep(self): + """ + Test that a generator can timeout + """ + + @flow(timeout_seconds=0.1) + async def g(): + yield 1 + time.sleep(2) + yield 2 + + values = [] + with pytest.raises(TimeoutError): + async for v in g(): + values.append(v) + assert values == [1] + + async def test_generator_timeout_with_async_sleep(self): + """ + Test that a generator can timeout + """ + + @flow(timeout_seconds=0.1) + async def g(): + yield 1 + await asyncio.sleep(2) + yield 2 + + values = [] + with pytest.raises(TimeoutError): + async for v in g(): + values.append(v) + assert values == [1] + + async def test_generator_doesnt_retry_on_generator_exception(self): + """ + Test that a generator doesn't retry for normal generator exceptions like StopIteration + """ + + @flow(retries=2) + async def g(): + yield 1 + yield 2 + + values = [] + try: + async for v in g(): + values.append(v) + except ValueError: + pass + assert values == [1, 2] diff --git a/tests/test_task_engine.py b/tests/test_task_engine.py index 2fd90174b33e..fe33dfd3a333 100644 --- a/tests/test_task_engine.py +++ b/tests/test_task_engine.py @@ -1413,6 +1413,34 @@ def g(): pass assert values == [1, 2] + def test_generators_can_be_yielded_without_being_consumed(self): + CONSUMED = [] + + @task + def g(): + CONSUMED.append("g") + yield 1 + yield 2 + + @task + def f_return(): + return g() + + @task + def f_yield(): + yield g() + + # returning a generator automatically consumes it + # because it can't be serialized + f_return() + assert CONSUMED == ["g"] + CONSUMED.clear() + + gen = next(f_yield()) + assert CONSUMED == [] + list(gen) + assert CONSUMED == ["g"] + class TestAsyncGenerators: async def test_generator_task(self): From dea0ccc56fc8d68f9ea0771d92b55e77e59f6453 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com> Date: Sat, 15 Jun 2024 13:08:33 -0400 Subject: [PATCH 2/4] Update test_events_emit_event.py --- tests/events/client/test_events_emit_event.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/events/client/test_events_emit_event.py b/tests/events/client/test_events_emit_event.py index 4fc8dac1e223..4bfb1f8594cd 100644 --- a/tests/events/client/test_events_emit_event.py +++ b/tests/events/client/test_events_emit_event.py @@ -1,4 +1,4 @@ -from datetime import timedelta +from datetime import timedelta, timezone from unittest import mock from uuid import UUID @@ -35,7 +35,7 @@ def test_emits_complex_event( emit_event( event="vogon.poetry.read", resource={"prefect.resource.id": "vogon.poem.oh-freddled-gruntbuggly"}, - occurred=DateTime(2023, 3, 1, 12, 39, 28), + occurred=DateTime(2023, 3, 1, 12, 39, 28, tzinfo=timezone.utc), related=[ { "prefect.resource.id": "vogon.ship.the-business-end", @@ -53,7 +53,7 @@ def test_emits_complex_event( event = asserting_events_worker._client.events[0] assert event.event == "vogon.poetry.read" assert event.resource.id == "vogon.poem.oh-freddled-gruntbuggly" - assert event.occurred == DateTime(2023, 3, 1, 12, 39, 28) + assert event.occurred == DateTime(2023, 3, 1, 12, 39, 28, tzinfo=timezone.utc) assert len(event.related) == 1 assert event.related[0].id == "vogon.ship.the-business-end" assert event.related[0].role == "locale" From 3452deebb2a23af2288dfd0add74c7c0837baa45 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com> Date: Sat, 15 Jun 2024 16:18:54 -0400 Subject: [PATCH 3/4] Add generator docs --- docs/3.0rc/develop/write-workflows/index.mdx | 64 ++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/docs/3.0rc/develop/write-workflows/index.mdx b/docs/3.0rc/develop/write-workflows/index.mdx index 019f5c16febd..4270e8d1a244 100644 --- a/docs/3.0rc/develop/write-workflows/index.mdx +++ b/docs/3.0rc/develop/write-workflows/index.mdx @@ -149,6 +149,70 @@ MyClass.my_class_method() MyClass.my_static_method() ``` +### Generators + +Prefect supports synchronous and async generators as flows. The flow is considered to be `Running` as long as the generator is yielding values. When the generator is exhausted, the flow is considered `Completed`. Any values yielded by the generator can be consumed by other flows or tasks. + +```python +from prefect import flow + +@flow +def generator(): + for i in range(10): + yield i + +@flow +def consumer(x): + print(x) + +for val in generator(): + consumer(val) +``` + + +**Generator functions are consumed when returned from flows** + +The result of a completed flow must be serializable, but generators cannot be serialized. +Therefore, if you return a generator from a flow, the generator will be fully consumed and its yielded values will be returned as a list. +This can lead to unexpected behavior or blocking if the generator is infinite or very large. + +Here is an example of proactive generator consumption: + +```python +from prefect import flow + +def gen(): + yield from [1, 2, 3] + print('Generator consumed!') + +@flow +def f(): + return gen() + +f() # prints 'Generator consumed!' +``` + +If you need to return a generator without consuming it, you can `yield` it instead of using `return`. +Values yielded from generator flows are not considered final results and do not face the same serialization constraints: + +```python +from prefect import flow + +def gen(): + yield from [1, 2, 3] + print('Generator consumed!') + +@flow +def f(): + yield gen + +generator = next(f()) +list(generator) # prints 'Generator consumed!' + +``` + + + ## Parameters As with any Python function, you can pass arguments to a flow. From 3ef2423c6ead0c0fd06bee763cb134d42af3e304 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com> Date: Mon, 17 Jun 2024 10:43:50 -0400 Subject: [PATCH 4/4] Update docs/3.0rc/develop/write-workflows/index.mdx Co-authored-by: Jeff Hale --- docs/3.0rc/develop/write-workflows/index.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/3.0rc/develop/write-workflows/index.mdx b/docs/3.0rc/develop/write-workflows/index.mdx index 4270e8d1a244..4afe2adb09cb 100644 --- a/docs/3.0rc/develop/write-workflows/index.mdx +++ b/docs/3.0rc/develop/write-workflows/index.mdx @@ -151,7 +151,7 @@ MyClass.my_static_method() ### Generators -Prefect supports synchronous and async generators as flows. The flow is considered to be `Running` as long as the generator is yielding values. When the generator is exhausted, the flow is considered `Completed`. Any values yielded by the generator can be consumed by other flows or tasks. +Prefect supports synchronous and asynchronous generators as flows. The flow is considered to be `Running` as long as the generator is yielding values. When the generator is exhausted, the flow is considered `Completed`. Any values yielded by the generator can be consumed by other flows or tasks. ```python from prefect import flow