-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cloud Caching #885
Cloud Caching #885
Changes from 10 commits
dbb8301
0ee6a45
6f693b8
173f64f
dc12803
4b6b0eb
828df15
5600741
b05117b
9744682
dc2a476
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -567,6 +567,35 @@ def set_flow_run_state( | |
|
||
self.graphql(mutation, state=serialized_state) # type: Any | ||
|
||
def get_latest_cached_states( | ||
self, task_id: str, created_after: datetime.datetime | ||
) -> List["prefect.engine.state.State"]: | ||
""" | ||
Pulls all Cached states for the given task which were created after the provided date. | ||
|
||
Args: | ||
- task_id (str): the task id for this task run | ||
- created_after (datetime.datetime): the earliest date the state should have been created at | ||
|
||
Returns: | ||
- List[State]: a list of Cached states created after the given date | ||
""" | ||
where_clause = { | ||
"where": { | ||
"state": {"_eq": "Cached"}, | ||
"task_id": {"_eq": task_id}, | ||
"created": {"_gte": created_after.isoformat()}, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry I didn't catch this last night -- this field "created" is when the As an example, a task run might be created today in anticipation of a scheduled flow run, but that run might not start for a month. In a month's time, the task might enter a cached state, and that time (in a month) is the quantity of interest. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That makes too much sense -- good catch and sorry for the oversight! |
||
}, | ||
"order_by": {"created": EnumValue("desc")}, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment regarding There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated |
||
} | ||
query = {"query": {with_args("task_run", where_clause): "serialized_state"}} | ||
cicdw marked this conversation as resolved.
Show resolved
Hide resolved
|
||
result = self.graphql(query) # type: Any | ||
deserializer = prefect.engine.state.State.deserialize | ||
valid_states = [ | ||
deserializer(res.serialized_state) for res in result.data.task_run | ||
] | ||
return valid_states | ||
|
||
def get_task_run_info( | ||
self, flow_run_id: str, task_id: str, map_index: Optional[int] = None | ||
) -> TaskRunInfoResult: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated this because otherwise it would break if new classes / methods were added in the master branch vs. the latest release