-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Introducing a subscription API for autonomous task scheduling (#11779)
In earlier work, we've introduced autonomous task scheduling, where tasks outside a flow run are created as scheduled and picked up by one or more processes running `Task.serve`. In our initial implementation, we used a polling approach where each `TaskSever` would make requests from the API to look for any tasks that were currently `Scheduled`, and then move them to `Running` as they entered the task engine. This work introduces a new mechanism for `TaskServer`s to get work from their Prefect Server: a long-lived websocket connection subscribed to a queue of `TaskRun`s to be worked. Because the Prefect Server is a singleton, it can govern a queue in-memory that will be distributed out among each of the `TaskServer`s to make a simple task brokering system. The websocket implementation is modeled on the `events/in` and `events/out` websockets in Prefect Cloud, and it's expected that we'd negotiate authentication in a common way across all websockets. Note: this does not address issues of resiliency, like what happens if the Prefect Server is restarted (in-flight tasks would be lost), or if there are no `TaskServer`s draining the Queue (the Prefect Server would eventually run out of memory), or if a `TaskServer` died before transitioning a task to `Running` (the task would remain `Scheduled` and never get picked up). These are some of the items I'd like to address in future work if we like this direction. Co-authored-by: Nathan Nowack <thrast36@gmail.com> Co-authored-by: Andrew Brookins <andrew.b@prefect.io>
- Loading branch information
1 parent
0f442b3
commit e92af4b
Showing
15 changed files
with
718 additions
and
37 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
import asyncio | ||
from typing import Generic, Type, TypeVar | ||
|
||
import orjson | ||
import websockets | ||
import websockets.exceptions | ||
from starlette.status import WS_1008_POLICY_VIOLATION | ||
from typing_extensions import Self | ||
|
||
from prefect._internal.schemas.bases import IDBaseModel | ||
from prefect.settings import PREFECT_API_KEY, PREFECT_API_URL | ||
|
||
S = TypeVar("S", bound=IDBaseModel) | ||
|
||
|
||
class Subscription(Generic[S]): | ||
def __init__(self, model: Type[S], path: str): | ||
self.model = model | ||
|
||
base_url = PREFECT_API_URL.value().replace("http", "ws", 1) | ||
self.subscription_url = f"{base_url}{path}" | ||
|
||
self._connect = websockets.connect( | ||
self.subscription_url, | ||
subprotocols=["prefect"], | ||
) | ||
self._websocket = None | ||
|
||
def __aiter__(self) -> Self: | ||
return self | ||
|
||
async def __anext__(self) -> S: | ||
while True: | ||
try: | ||
await self._ensure_connected() | ||
message = await self._websocket.recv() | ||
|
||
message_data = orjson.loads(message) | ||
|
||
if message_data.get("type") == "ping": | ||
await self._websocket.send(orjson.dumps({"type": "pong"}).decode()) | ||
continue | ||
|
||
return self.model.parse_raw(message) | ||
except ( | ||
ConnectionRefusedError, | ||
websockets.exceptions.ConnectionClosedError, | ||
): | ||
self._websocket = None | ||
if hasattr(self._connect, "protocol"): | ||
await self._connect.__aexit__(None, None, None) | ||
await asyncio.sleep(0.5) | ||
|
||
async def _ensure_connected(self): | ||
if self._websocket: | ||
return | ||
|
||
websocket = await self._connect.__aenter__() | ||
|
||
await websocket.send( | ||
orjson.dumps({"type": "auth", "token": PREFECT_API_KEY.value()}).decode() | ||
) | ||
|
||
try: | ||
auth = orjson.loads(await websocket.recv()) | ||
assert auth["type"] == "auth_success" | ||
except ( | ||
AssertionError, | ||
websockets.exceptions.ConnectionClosedError, | ||
) as e: | ||
if isinstance(e, AssertionError) or e.code == WS_1008_POLICY_VIOLATION: | ||
raise Exception( | ||
"Unable to authenticate to the subscription. Please " | ||
"ensure the provided `PREFECT_API_KEY` you are using is " | ||
"valid for this environment." | ||
) from e | ||
raise | ||
else: | ||
self._websocket = websocket | ||
|
||
def __repr__(self) -> str: | ||
return f"{type(self).__name__}[{self.model.__name__}]" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.