Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Addresses two race conditions in background task runs #14115

Merged
merged 3 commits into from
Jun 18, 2024

Conversation

chrisguidry
Copy link
Collaborator

During reliability testing of background tasks, I kept experiencing hangs that
couldn't be explained by task deadlocking due to the limit on the worker (see
#14092). Here I'm addressing two causes of them:

  1. Returning the first singleton instance of TaskRunWaiter before it is
    confirmed to be listening to the websocket. Using an asyncio.Event to
    signal that the socket is actually connected before proceeding.
  2. When PrefectDistributedFuture was waiting for task runs to complete, it
    would ask the API if the task run was complete first, then go on to start
    waiting. If the task changed state in-between those calls, it would likely
    be missed entirely.

With these changes in place, I've run a large number of my test tasks (see
PrefectHQ/nebula#7962, which has a task with 5 layers of dependencies) at
TASKS=100 and have experienced no hangs. I did get a hang with TASKS=1000,
but this is a marked improvement from where we were before.

Part of #14098

During reliability testing of background tasks, I kept experiencing hangs that
couldn't be explained by task deadlocking due to the `limit` on the worker (see
#14092).  Here I'm addressing two causes of them:

1. Returning the first singleton instance of `TaskRunWaiter` before it is
   confirmed to be listening to the websocket.  Using an `asyncio.Event` to
   signal that the socket is actually connected before proceeding.
2. When `PrefectDistributedFuture` was waiting for task runs to complete, it
   would ask the API if the task run was complete first, then go on to start
   waiting.  If the task changed state in-between those calls, it would likely
   be missed entirely.

With these changes in place, I've run a large number of my test tasks (see
PrefectHQ/nebula#7962, which has a task with 5 layers of dependencies) at
`TASKS=100` and have experienced no hangs.  I did get a hang with `TASKS=1000`,
but this is a marked improvement from where we were before.

Part of #14098
@chrisguidry chrisguidry requested a review from a team as a code owner June 18, 2024 17:10
@desertaxle desertaxle added the fix A fix for a bug in an existing feature label Jun 18, 2024
@@ -76,8 +77,9 @@ def __init__(
limit: Optional[int] = 10,
):
self.tasks: List[Task] = list(tasks)
self.task_keys = set(t.task_key for t in tasks if isinstance(t, Task))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The refactoring in this file is just to make the /status endpoint more useful and isn't part of the fix here

Copy link
Member

@desertaxle desertaxle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@chrisguidry chrisguidry merged commit c0f803e into main Jun 18, 2024
26 checks passed
@chrisguidry chrisguidry deleted the wait-for-task-event-subscription branch June 18, 2024 19:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
fix A fix for a bug in an existing feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants