Skip to content

Commit

Permalink
Fix deployment & work queue statuses (#14097)
Browse files Browse the repository at this point in the history
  • Loading branch information
collincchoy committed Jun 20, 2024
1 parent df37fa0 commit 62f0db6
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 13 deletions.
15 changes: 7 additions & 8 deletions src/prefect/server/api/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,20 +376,19 @@ async def get_scheduled_flow_runs(
limit=limit,
)

polled_work_queue_ids = [wq.id for wq in work_queues]
ready_work_queue_ids = [
wq.id for wq in work_queues if wq.status != WorkQueueStatus.READY
]

background_tasks.add_task(
mark_work_queues_ready,
polled_work_queue_ids=polled_work_queue_ids,
ready_work_queue_ids=ready_work_queue_ids,
polled_work_queue_ids=[
wq.id for wq in work_queues if wq.status != WorkQueueStatus.NOT_READY
],
ready_work_queue_ids=[
wq.id for wq in work_queues if wq.status == WorkQueueStatus.NOT_READY
],
)

background_tasks.add_task(
mark_deployments_ready,
work_queue_ids=ready_work_queue_ids,
work_queue_ids=[wq.id for wq in work_queues],
)

return queue_response
Expand Down
77 changes: 72 additions & 5 deletions tests/server/api/test_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pendulum

from prefect._internal.pydantic import HAS_PYDANTIC_V2
from prefect.server.schemas.statuses import WorkQueueStatus
from prefect.server.schemas.statuses import DeploymentStatus, WorkQueueStatus

if HAS_PYDANTIC_V2:
import pydantic.v1 as pydantic
Expand Down Expand Up @@ -1835,16 +1835,22 @@ async def test_updates_last_polled_on_a_multiple_work_queues(
assert work_queue.last_polled is None

async def test_updates_last_polled_on_a_full_work_pool(
self, client, work_queues, work_pools
self, client, session, work_queues, work_pools
):
work_pool = work_pools["wp_a"]
work_queues["wq_aa"].status = WorkQueueStatus.NOT_READY
work_queues["wq_ab"].status = WorkQueueStatus.PAUSED
work_queues["wq_ac"].status = WorkQueueStatus.READY
await session.commit()

now = pendulum.now("UTC")
poll_response = await client.post(
f"/work_pools/{work_pools['wp_a'].name}/get_scheduled_flow_runs",
f"/work_pools/{work_pool.name}/get_scheduled_flow_runs",
)
assert poll_response.status_code == status.HTTP_200_OK

work_queues_response = await client.post(
f"/work_pools/{work_pools['wp_a'].name}/queues/filter"
f"/work_pools/{work_pool.name}/queues/filter"
)
assert work_queues_response.status_code == status.HTTP_200_OK

Expand All @@ -1853,9 +1859,70 @@ async def test_updates_last_polled_on_a_full_work_pool(
)

for work_queue in work_queues:
assert work_queue.last_polled is not None
assert (
work_queue.last_polled is not None
), "Work queue should have updated last_polled"
assert work_queue.last_polled > now

async def test_updates_statuses_on_a_full_work_pool(
self, client, session, work_queues, work_pools, flow
):
async def create_deployment_for_work_queue(work_queue_id):
return await models.deployments.create_deployment(
session=session,
deployment=schemas.core.Deployment(
name="My Deployment",
tags=["test"],
flow_id=flow.id,
work_queue_id=work_queue_id,
),
)

work_pool = work_pools["wp_a"]

wq_not_ready = work_queues["wq_aa"]
wq_not_ready.status = WorkQueueStatus.NOT_READY

wq_paused = work_queues["wq_ab"]
wq_paused.status = WorkQueueStatus.PAUSED

wq_ready = work_queues["wq_ac"]
wq_ready.status = WorkQueueStatus.READY

deployments = [
await create_deployment_for_work_queue(wq.id)
for wq in (wq_not_ready, wq_paused, wq_ready)
]

await session.commit()

poll_response = await client.post(
f"/work_pools/{work_pool.name}/get_scheduled_flow_runs",
)
assert poll_response.status_code == status.HTTP_200_OK

work_queues_response = await client.post(
f"/work_pools/{work_pool.name}/queues/filter"
)
assert work_queues_response.status_code == status.HTTP_200_OK

work_queues = pydantic.parse_obj_as(
List[WorkQueue], work_queues_response.json()
)

for work_queue in work_queues:
if work_queue.id == wq_not_ready.id:
assert work_queue.status == WorkQueueStatus.READY
elif work_queue.id == wq_paused.id:
# paused work queues should stay paused
assert work_queue.status == WorkQueueStatus.PAUSED
elif work_queue.id == wq_ready.id:
assert work_queue.status == WorkQueueStatus.READY

for deployment in deployments:
await session.refresh(deployment)
assert deployment.status == DeploymentStatus.READY

async def test_ensure_deployments_associated_with_work_pool_have_deployment_status_of_ready(
self, client, work_pools, deployment
):
Expand Down

0 comments on commit 62f0db6

Please sign in to comment.