Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix FlowRunView task run queries when all task runs are cached #6572

Merged
merged 4 commits into from
Aug 26, 2022

Conversation

zanieb
Copy link
Contributor

@zanieb zanieb commented Aug 25, 2022

Closes bug reported in https://prefecthq.slack.com/archives/C028BPL85C3/p1661452093250219 where if all of the task runs are cached an error is thrown when retrieving cache runs

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/redacted.py", line 122, in redacted
    tasks = view.get_all_task_runs()
  File "/usr/local/lib/python3.9/site-packages/prefect/backend/flow_run.py", line 739, in get_all_task_runs
    task_run_data = TaskRunView._query_for_task_runs(
  File "/usr/local/lib/python3.9/site-packages/prefect/backend/task_run.py", line 367, in _query_for_task_runs
    raise ValueError(
ValueError: No task runs found while querying for task runs where ....

This is easily fixed by toggling the error_on_empty bool.
Includes a change that excludes of cached task runs from the load_static_tasks query.
Includes a fix that prevents duplicate task runs from being returned when the object is shared by multiple threads.

Example

from prefect.backend import FlowRunView

view = FlowRunView.from_flow_run_id("030ee8ba-3e2c-439f-96c4-44c0ef2ad618")

# Calling repeatedly does not fail
view.get_all_task_runs()
view.get_all_task_runs()  # Previously would fail here
view.get_all_task_runs()

# Getting the latest is fine
view.get_latest()

# Getting the latest and making a query for static tasks is fine
view.get_latest(load_static_tasks=True)

# Instantiating a new instance with static loading works still
view = FlowRunView.from_flow_run_id("030ee8ba-3e2c-439f-96c4-44c0ef2ad618", load_static_tasks=True)

Checklist

  • [ ] This pull request references any related issue by including "closes <link to issue>"
    • If no issue exists and your change is not a small fix, please create an issue first.
  • This pull request includes tests or only affects documentation.
  • This pull request includes a bug, feature, enhancement, docs, or maintenance label categorizing the change.

@github-actions github-actions bot added the v1 Related to Prefect 1.x label Aug 25, 2022
@zanieb zanieb added the bug Something isn't working label Aug 25, 2022
Copy link
Contributor

@bunchesofdonald bunchesofdonald left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming my assumptions are right this looks good to me.


if load_static_tasks:
task_run_data = TaskRunView._query_for_task_runs(
where={
"map_index": {"_eq": -1},
"flow_run_id": {"_eq": flow_run_id},
"id": {"_nin": [view.task_run_id for view in _cached_task_runs]},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgive my v1 ignorance, but I assume that _nin means not in? So this is effectively saying "Get all of the task runs associated with this flow run, except for the ones that are cached."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep! We only cache finished task runs so excluding them from the query results will reduce the amount of traffic with no change in behavior.

Comment on lines +741 to +745
# Freeze the cached task runs we will return to prevent race conditions if
# this object is retrieving task runs in multiple threads
cached_task_runs = list(self._cached_task_runs.values())
cached_task_run_ids = {task_run.task_run_id for task_run in cached_task_runs}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bunchesofdonald A new change here that prevents a race condition if self._cached_task_runs is updated by another thread while the query is running. We wouldn't include those task runs in the nin call so they'd be returned by the API then on L759 we'd append the cached runs resulting in duplicates.

@zanieb zanieb merged commit 25c0e50 into 1.x Aug 26, 2022
@zanieb zanieb deleted the flow-run-view-fix branch August 26, 2022 20:02
@zanieb zanieb added fix A fix for a bug in an existing feature and removed bug Something isn't working labels Aug 30, 2022
@zanieb zanieb mentioned this pull request Sep 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
fix A fix for a bug in an existing feature v1 Related to Prefect 1.x
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants