-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Input cacheing #78
Input cacheing #78
Conversation
My gut concern with making it an attribute of That said, I'm not sure I see a downside because we can check a state's type before trying to access its unique attributes -- by which I mean any attributes other than |
src/prefect/engine/flow_runner.py
Outdated
@@ -179,6 +182,9 @@ def get_run_state( | |||
lambda s: s.data, task_states[edge.upstream_task] | |||
) | |||
|
|||
if task in start_tasks and hasattr(task_states.get(task), "data"): | |||
upstream_inputs.update(task_states[task].data.get("cached_inputs")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment that may not matter, just thinking here: is it better to use upstream_inputs.update()
, which has the potential to allow any existing data in upstream_inputs
stick around, or wholesale replace upstream_inputs with whatever is in cached_inputs
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I considered that; I don't think I have enough experience to make an informed decision here, which is why I went with prioritizing the cache without fully clearing upstream_inputs
. Maybe there will one day be a situation where task
is a start_task
but for some reason the cached_inputs
key is empty, or where upstream_inputs
has more data in it than the cache tracks? Within our current setup it won't break anything either way.
src/prefect/engine/flow_runner.py
Outdated
@@ -179,6 +182,9 @@ def get_run_state( | |||
lambda s: s.data, task_states[edge.upstream_task] | |||
) | |||
|
|||
if task in start_tasks and hasattr(task_states.get(task), "data"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A State should always have a data
attribute, so I'm not sure the second check in this line is ever going to fail (as long as it is, in fact, a state). However, the data
attribute might not be a dict
which would cause the next line to error even if this check passed. To be clear, that would be unexpected (if you're telling the system to run a start_task
, I think it would be wrong to be anything other than a Pending
state with a dict data attribute) -- but certainly possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If task
is not actually present in task_states
, then task_states.get(task)
will be None
. Basically the check is equivalent to task in task_states
-- maybe I should change it to that.
tests/engine/test_flow_runner.py
Outdated
assert isinstance(state, Success) | ||
|
||
|
||
class TestInputCacheing: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor spelling error here and a couple other places - caching vs cacheing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting; I thought it looked weird but google told me cacheing
was the correct gerund form. I'll change though. 👍
@@ -199,6 +199,9 @@ def __init__(self, name: str, default: Any = None, required: bool = True) -> Non | |||
|
|||
super().__init__(name=name, slug=name) | |||
|
|||
def __repr__(self) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks ;)
Re: uniform attributes across states; while I definitely understand your concern, functionally it doesn't feel very different from EDIT: a benefit of having everything in |
@cicdw No consequences on the server side at all! As long as the State object is still serializable everything will work fine. |
After looking through PR I think I understand why constantly accessing |
⚫ Remove attributes from base State class oof
State class refactor
src/prefect/engine/state.py
Outdated
for attr in self.__dict__: | ||
if attr.startswith("_") or attr == "message": | ||
continue | ||
eq &= getattr(self, attr) == getattr(other, attr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will raise errors if someone (for whatever reason) added an after-the-fact attribute to their state object that doesn't exist on the other one. I suggest getattr(self, attr) == getattr(other, attr, object())
-- this will allow missing (or extra) attributes to be compared safely, and object() is never equal to anything else so it won't give false matches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea; done.
|
||
# a DONTRUN signal at any point breaks the chain and we return | ||
# the most recently computed state | ||
except signals.DONTRUN: | ||
except signals.DONTRUN as exc: | ||
if "manual_only" in str(exc): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels fragile. I totally get why it's here and what it's accomplishing but I think the fact it's here suggests that we have a new type of state that needs managing.
Maybe the manual_only
trigger should be raising a new TRIGGERDONTRUN
signal (analogous to TRIGGERFAILED
) which could be handled explicitly in handle_signals()
. This new signal could automatically build and return a Pending(cached_inputs=inputs)
state, rather than hoping to trap the error message itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to punt on this (because this approach will work at the moment), I'm fine with that too. We can open a new issue to readdress it later. The core functionality here is more important!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, that makes sense; I think I'd rather punt on this and think about it a little more. Having handle_signals
do something like inputs = kwargs.get('inputs')
doesn't feel great either.
LGTM! So glad to see this |
Addresses half of #56 w/ input cacheing.
Use cases:
Retrying
statesmanual_only
triggersAlso updates
Parameter
repr.Question for @jlowin: should we just let
cached_inputs
be an attribute of theState
class (defaulting toNone
)? Reaching into thedata
attribute sometimes feels like unnecessary work.