-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for generator tasks #13820
Conversation
values.append(v) | ||
except ValueError: | ||
pass | ||
assert values == [1, 2, 1, 2, 1, 2] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👏
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔋
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the tests for me
This PR leverages #13793 to add native support for generator tasks to the new engine, both sync and async. In Prefect 2.x you can decorate generators as tasks, but they are not preserved as generators. Instead, when the task is called the generator is fully consumed and the resulting list is returned as the task's result. This PR preserves true generator semantics -- a task generator must be iterated over to produce values incrementally. The same goes for async generators.
Generator tasks inherit all properties of regular tasks, including retries and timeouts.
This PR also enhances dependency tracking to automatically mark generators as "parent" tasks of any tasks that consume the values they yield.
This PR does not add support for generator flows.
Here is a simple demo that shows generator yields interspaced with dependent task run executions: