-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add more variables to context #1405
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Starting a discussion on tasks / flows in context and whether we should pursue that at this moment. Serialization costs + the fact that context
is thread.local
make me hesitant, without a strong known use case.
src/prefect/engine/flow_runner.py
Outdated
@@ -154,7 +154,7 @@ def initialize_run( # type: ignore | |||
for param, value in parameters.items(): | |||
context_params[param] = value | |||
|
|||
context.update(flow_name=self.flow.name) | |||
context.update(flow=self.flow, flow_name=self.flow.name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the only piece of the PR that makes me hesitate -> Flow objects could be quite large, and adding this to context guarantees that all Dask communication has to ship the Flow along with every task run submission. Without a strong argument for its inclusion, I vote to not include it for now until we better understand the implications.
It is definitely possible that Flows are already serialized since we submit a FlowRunner method to Dask, but this feels like something we shouldn't do off-the-cuff without thinking through it.
src/prefect/engine/task_runner.py
Outdated
@@ -166,7 +166,11 @@ def initialize_run( # type: ignore | |||
context.update(loop_context) | |||
|
|||
context.update( | |||
task_run_count=run_count, task_name=self.task.name, task_tags=self.task.tags | |||
task=self.task, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related discussion: #889
We definitely know tasks are serialized by Dask already, but including "heavy" objects in context without a known use case still makes me nervous - context is a thread.local
object too, so I honestly don't know if that introduces any complications for Tasks which use multithreading or not. It feels like something I'd rather learn during experimentation than from a user though.
@@ -155,6 +155,8 @@ def initialize_run( # type: ignore | |||
updated_context = context or {} | |||
updated_context.update(flow_run_info.context or {}) | |||
updated_context.update( | |||
flow_id=flow_run_info.id, | |||
flow_run_id=flow_run_info.id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to note: in the context of running w/ Cloud this is already added on to all deployments. (e.g. of use: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/cli/execute.py#L39)
Co-Authored-By: Chris White <chris@prefect.io>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
…-ui/esbuild-0.14.27 Bump esbuild from 0.14.25 to 0.14.27 in /orion-ui
Thanks for contributing to Prefect!
Please describe your work and make sure your PR:
CHANGELOG.md
(if appropriate)docs/outline.toml
for API reference docs (if appropriate)Note that your PR will not be reviewed unless all three boxes are checked.
What does this PR change?
This PR pulls the non-breaking changes out of #1403:
task_slug
,task
, andflow
to contextflow_id
andflow_run_id
to context when running in CloudWhy is this PR important?