-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introducing a subscription API for autonomous task scheduling #11779
Conversation
Co-authored-by: Nathan Nowack <thrast36@gmail.com>
…11737) Co-authored-by: Chris Guidry <chris.g@prefect.io>
… into init-task-engine
…refectHQ/prefect into init-task-engine
In earlier work, we've introduced autonomous task scheduling, where tasks outside a flow run are created as scheduled and picked up by one or more processes running `Task.serve`. In our initial implementation, we used a polling approach where each `TaskSever` would make requests from the API to look for any tasks that were currently `Scheduled`, and then move them to `Running` as they entered the task engine. This work introduces a new mechanism for `TaskServer`s to get work from their Prefect Server: a long-lived websocket connection subscribed to a queue of `TaskRun`s to be worked. Because the Prefect Server is a singleton, it can govern a queue in-memory that will be distributed out among each of the `TaskServer`s to make a simple task brokering system. The websocket implementation is modeled on the `events/in` and `events/out` websockets in Prefect Cloud, and it's expected that we'd negotiate authentication in a common way across all websockets. Note: this does not address issues of resiliency, like what happens if the Prefect Server is restarted (in-flight tasks would be lost), or if there are no `TaskServer`s draining the Queue (the Prefect Server would eventually run out of memory), or if a `TaskServer` died before transitioning a task to `Running` (the task would remain `Scheduled` and never get picked up). These are some of the items I'd like to address in future work if we like this direction. Note: there are no tests for this new subsystem yet.
src/prefect/task_server.py
Outdated
@@ -77,14 +77,7 @@ async def start(self) -> None: | |||
|
|||
async with self as task_server: | |||
async with self._loops_task_group as tg: | |||
tg.start_soon( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I'm removing the old polling loop...
src/prefect/task_server.py
Outdated
jitter_range=0.3, | ||
) | ||
) | ||
tg.start_soon(task_server._subscribe_to_task_scheduling) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...and replacing it with the subscription
logger.info(f"Received task run: {task_run.id} - {task_run.name}") | ||
await self._submit_pending_task_run(task_run) | ||
|
||
async def _submit_pending_task_run(self, task_run: TaskRun): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is just the inner guts of the _submit_pending_task_runs
loop, just intended to operate on one task run at a time; no other changes were made except that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking awesome. I'm inspired to go and switch out all the RunInput and pause/resume polling, as soon as I understand this pattern better.
|
||
new_task_run: schemas.core.TaskRun = schemas.core.TaskRun.from_orm(model) | ||
|
||
# Place autonomously scheduled task runs onto a notification queue for the websocket |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels like the heart of the matter, in some ways, and whatever we discover while exploring these waters should be useful for e.g. replacing workers polling work queues with websocket subscriptions. But other than saying I'm intrigued I should probably pass over this in silence for the moment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes very much, and I think we can grow this functionality out to support objects in other states for other subscriptions. In Prefect Server, we should be able to handle a fairly significant number of in-flight objects in memory. For Prefect Cloud we'll need more of an external message broker, but the same concept applies.
✅ Deploy Preview for prefect-docs-preview ready!
To edit notification comments on pull requests, go to your Netlify site configuration. |
In earlier work, we've introduced autonomous task scheduling, where tasks
outside a flow run are created as scheduled and picked up by one or more
processes running
Task.serve
. In our initial implementation, we used apolling approach where each
TaskSever
would make requests from the API tolook for any tasks that were currently
Scheduled
, and then move them toRunning
as they entered the task engine.This work introduces a new mechanism for
TaskServer
s to get work from theirPrefect Server: a long-lived websocket connection subscribed to a queue of
TaskRun
s to be worked. Because the Prefect Server is a singleton, it cangovern a queue in-memory that will be distributed out among each of the
TaskServer
s to make a simple task brokering system.The websocket implementation is modeled on the
events/in
andevents/out
websockets in Prefect Cloud, and it's expected that we'd negotiate
authentication in a common way across all websockets.
Note: this does not address issues of resiliency, like what happens if the
Prefect Server is restarted (in-flight tasks would be lost), or if there are
no
TaskServer
s draining the Queue (the Prefect Server would eventually run outof memory), or if a
TaskServer
died before transitioning a task toRunning
(the task would remain
Scheduled
and never get picked up). These are someof the items I'd like to address in future work if we like this direction.
Note: there are no tests for this new subsystem yet.