Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WorkQueue.status to server and client side schemas #11829

Merged
merged 5 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/prefect/client/schemas/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ class DeploymentStatus(AutoEnum):
NOT_READY = AutoEnum.auto()


class WorkQueueStatus(AutoEnum):
"""Enumeration of work queue statuses."""

READY = AutoEnum.auto()
NOT_READY = AutoEnum.auto()
PAUSED = AutoEnum.auto()


class StateDetails(PrefectBaseModel):
flow_run_id: UUID = None
task_run_id: UUID = None
Expand Down Expand Up @@ -1222,6 +1230,9 @@ class WorkQueue(ObjectBaseModel):
last_polled: Optional[DateTimeTZ] = Field(
default=None, description="The last time an agent polled this queue for work."
)
status: Optional[WorkQueueStatus] = Field(
default=None, description="The queue status."
)

@validator("name", check_fields=False)
def validate_name_characters(cls, v):
Expand Down
5 changes: 4 additions & 1 deletion src/prefect/server/schemas/core.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Full schemas of Prefect REST API objects.
"""

import datetime
from typing import Any, Dict, List, Optional, Union
from uuid import UUID
Expand All @@ -27,7 +28,9 @@
raise_on_name_alphanumeric_dashes_only,
raise_on_name_with_banned_characters,
)
from prefect.settings import PREFECT_API_TASK_CACHE_KEY_MAX_LENGTH
from prefect.settings import (
PREFECT_API_TASK_CACHE_KEY_MAX_LENGTH,
)
from prefect.utilities.collections import dict_to_flatdict, flatdict_to_dict, listrepr
from prefect.utilities.names import generate_slug, obfuscate, obfuscate_string

Expand Down
19 changes: 16 additions & 3 deletions src/prefect/server/schemas/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
if TYPE_CHECKING:
import prefect.server.database.orm_models

DEPLOYMENT_LAST_POLLED_TIMEOUT_SECONDS = 60
WORK_QUEUE_LAST_POLLED_TIMEOUT_SECONDS = 60


class SetStateStatus(AutoEnum):
"""Enumerates return statuses for setting run states."""
Expand Down Expand Up @@ -232,9 +235,6 @@ def __eq__(self, other: Any) -> bool:
return super().__eq__(other)


DEPLOYMENT_LAST_POLLED_TIMEOUT_SECONDS = 60


@copy_model_fields
class DeploymentResponse(ORMBaseModel):
name: str = FieldFrom(schemas.core.Deployment)
Expand Down Expand Up @@ -303,13 +303,26 @@ class WorkQueueResponse(schemas.core.WorkQueue):
default=None,
description="The name of the work pool the work pool resides within.",
)
status: Optional[schemas.statuses.WorkQueueStatus] = Field(
default=None, description="The queue status."
)
urimandujano marked this conversation as resolved.
Show resolved Hide resolved

@classmethod
def from_orm(cls, orm_work_queue):
response = super().from_orm(orm_work_queue)
if orm_work_queue.work_pool:
response.work_pool_name = orm_work_queue.work_pool.name

if response.is_paused:
response.status = schemas.statuses.WorkQueueStatus.PAUSED
else:
unready_at = datetime.datetime.now(
tz=datetime.timezone.utc
) - datetime.timedelta(seconds=WORK_QUEUE_LAST_POLLED_TIMEOUT_SECONDS)
if response.last_polled and response.last_polled > unready_at:
response.status = schemas.statuses.WorkQueueStatus.READY
else:
response.status = schemas.statuses.WorkQueueStatus.NOT_READY
urimandujano marked this conversation as resolved.
Show resolved Hide resolved
return response


Expand Down
8 changes: 8 additions & 0 deletions src/prefect/server/schemas/statuses.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,11 @@ class DeploymentStatus(AutoEnum):

READY = AutoEnum.auto()
NOT_READY = AutoEnum.auto()


class WorkQueueStatus(AutoEnum):
"""Enumeration of work queue statuses."""

READY = AutoEnum.auto()
NOT_READY = AutoEnum.auto()
PAUSED = AutoEnum.auto()
22 changes: 22 additions & 0 deletions tests/client/test_prefect_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1521,6 +1521,15 @@ async def test_create_then_read_work_queue(self, prefect_client):
assert isinstance(lookup, WorkQueue)
assert lookup.name == "foo"

async def test_create_and_read_includes_status(self, prefect_client: PrefectClient):
queue = await prefect_client.create_work_queue(name="foo")
assert hasattr(queue, "status")
assert queue.status == "NOT_READY"

lookup = await prefect_client.read_work_queue(queue.id)
assert hasattr(lookup, "status")
assert lookup.status == "NOT_READY"

async def test_create_then_read_work_queue_by_name(self, prefect_client):
queue = await prefect_client.create_work_queue(name="foo")
assert isinstance(queue.id, UUID)
Expand Down Expand Up @@ -1597,6 +1606,19 @@ async def test_get_runs_from_queue_respects_limit(self, prefect_client, deployme
assert len(output) == 10
assert {o.id for o in output} == {r.id for r in runs}

async def test_get_runs_from_queue_updates_status(
self, prefect_client: PrefectClient
):
queue = await prefect_client.create_work_queue(name="foo")
assert queue.status == "NOT_READY"

# Trigger an operation that would update the queues last_polled status
await prefect_client.get_runs_in_work_queue(queue.id, limit=1)

# Verify that the polling results in a READY status
lookup = await prefect_client.read_work_queue(queue.id)
assert lookup.status == "READY"


async def test_delete_flow_run(prefect_client, flow_run):
# Note - the flow_run provided by the fixture is not of type `FlowRun`
Expand Down
80 changes: 80 additions & 0 deletions tests/server/api/test_work_queues.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import uuid
from typing import List
from uuid import uuid4

Expand Down Expand Up @@ -114,6 +115,12 @@ async def test_create_work_queue_with_invalid_characters_fails(self, client, nam
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
assert b"contains an invalid character" in response.content

async def test_create_work_queue_initially_is_not_ready(self, client):
response = await client.post("/work_queues/", json=dict(name=str(uuid.uuid4())))
assert response.status_code == status.HTTP_201_CREATED
assert "status" in response.json()
assert response.json()["status"] == "NOT_READY"


class TestUpdateWorkQueue:
async def test_update_work_queue(
Expand Down Expand Up @@ -268,6 +275,25 @@ async def test_read_work_queues_returns_empty_list(self, client):
assert response.status_code == status.HTTP_200_OK
assert response.json() == []

async def test_work_queue_old_last_polled_is_in_not_ready_status(
self,
client,
work_queue,
session,
):
# Update the queue with an old last_polled time
new_data = WorkQueueUpdate(
last_polled=pendulum.now("UTC").subtract(days=1)
).dict(json_compatible=True, exclude_unset=True)
response = await client.patch(f"/work_queues/{work_queue.id}", json=new_data)
assert response.status_code == status.HTTP_204_NO_CONTENT

# Verify the work queue status is changed
wq_response = await client.get(f"/work_queues/{work_queue.id}")
assert wq_response.status_code == status.HTTP_200_OK
assert wq_response.json()["status"] == "NOT_READY"
assert wq_response.json()["is_paused"] is False


class TestGetRunsInWorkQueue:
@pytest.fixture
Expand Down Expand Up @@ -495,6 +521,60 @@ async def test_read_work_queue_runs_associated_deployments_return_status_of_read
assert updated_deployment_response.status_code == status.HTTP_200_OK
assert updated_deployment_response.json()["status"] == "READY"

async def test_read_work_queue_runs_updates_work_queue_status(
self,
client,
work_queue,
session,
):
# Verify the work queue is initially not ready
wq_response = await client.get(f"/work_queues/{work_queue.id}")
assert wq_response.status_code == status.HTTP_200_OK
assert wq_response.json()["status"] == "NOT_READY"

# Trigger a polling operation
response = await client.post(
f"/work_queues/{work_queue.id}/get_runs",
json=dict(agent_id=str(uuid.uuid4())),
)
assert response.status_code == status.HTTP_200_OK

# Verify the work queue is now ready
wq_response = await client.get(f"/work_queues/{work_queue.id}")
assert wq_response.status_code == status.HTTP_200_OK
assert wq_response.json()["status"] == "READY"

async def test_read_work_queue_runs_does_not_update_a_paused_work_queues_status(
self,
client,
work_queue,
session,
):
# Move the queue into a PAUSED state
new_data = WorkQueueUpdate(is_paused=True).dict(
json_compatible=True, exclude_unset=True
)
response = await client.patch(f"/work_queues/{work_queue.id}", json=new_data)
assert response.status_code == status.HTTP_204_NO_CONTENT

# Verify the work queue is PAUSED
wq_response = await client.get(f"/work_queues/{work_queue.id}")
assert wq_response.status_code == status.HTTP_200_OK
assert wq_response.json()["status"] == "PAUSED"
assert wq_response.json()["is_paused"] is True

# Trigger a polling operation
response = await client.post(
f"/work_queues/{work_queue.id}/get_runs",
json=dict(agent_id=str(uuid.uuid4())),
)
assert response.status_code == status.HTTP_200_OK

# Verify the work queue status is still PAUSED
wq_response = await client.get(f"/work_queues/{work_queue.id}")
assert wq_response.status_code == status.HTTP_200_OK
assert wq_response.json()["status"] == "PAUSED"

urimandujano marked this conversation as resolved.
Show resolved Hide resolved

class TestDeleteWorkQueue:
async def test_delete_work_queue(self, client, work_queue):
Expand Down