Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task subscription api retry queue #11789

Merged
merged 5 commits into from
Jan 31, 2024

Conversation

zzstoatzz
Copy link
Contributor

@zzstoatzz zzstoatzz commented Jan 31, 2024

adds a retry queue for when tasks are submitted and there is no running TaskServer
image

empirically this solves the problem I described earlier:

  • prefect server start
  • run a TaskServer
  • submit a task -> runs great :white-check-mark2:
  • stop the TaskServer
  • submit a task while no task server running
  • start the TaskServer -> subscription created
  • nothing gets picked up

Copy link

netlify bot commented Jan 31, 2024

Deploy Preview for prefect-docs-preview ready!

Name Link
🔨 Latest commit a8c3b64
🔍 Latest deploy log https://app.netlify.com/sites/prefect-docs-preview/deploys/65ba7f6095c1bf0008ad2bf0
😎 Deploy Preview https://deploy-preview-11789--prefect-docs-preview.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify site configuration.

@zzstoatzz zzstoatzz changed the base branch from main to task-subscription-api January 31, 2024 17:12
@zzstoatzz zzstoatzz marked this pull request as ready for review January 31, 2024 17:24
@zzstoatzz zzstoatzz requested review from zangell44 and a team as code owners January 31, 2024 17:24
@zzstoatzz zzstoatzz removed the request for review from a team January 31, 2024 17:24
Copy link
Collaborator

@chrisguidry chrisguidry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! This is exactly what I was thinking! We can also look into sending a ping and awaiting its pong before we accept that the task has been delivered. This looks like:

pong = await websocket.ping()
await pong

If we put that after the send_json call, we should be pretty confident that a task server got the message.

src/prefect/server/api/task_runs.py Outdated Show resolved Hide resolved
src/prefect/server/api/task_runs.py Outdated Show resolved Hide resolved
@zzstoatzz
Copy link
Contributor Author

hmm

look into sending a ping and awaiting its pong before we accept that the task has been delivered

looks like starlette's WebSocket doesn't have these out of the box, so i took a stab at an ack here - let me know if we maybe should hold off on this

Comment on lines +36 to +42
message = await self._websocket.recv()

message_data = orjson.loads(message)

if message_data.get("type") == "ping":
await self._websocket.send(orjson.dumps({"type": "pong"}).decode())
continue
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh bummer, I thought we could use the built-in websocket.ping() but that doesn't seem to be available in the server-side sockets. This works great too!

@zzstoatzz zzstoatzz merged commit 254a89f into task-subscription-api Jan 31, 2024
38 of 52 checks passed
@zzstoatzz zzstoatzz deleted the task-subscription-api-retry-queue branch January 31, 2024 18:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants