Skip to content

Commit

Permalink
Add CLI command to read runs in WQ (#11989)
Browse files Browse the repository at this point in the history
  • Loading branch information
urimandujano committed Feb 16, 2024
1 parent 8b6a5aa commit 83c94c7
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 0 deletions.
36 changes: 36 additions & 0 deletions src/prefect/cli/work_queue.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Command line interface for working with work queues.
"""

from textwrap import dedent
from typing import List, Optional, Union
from uuid import UUID
Expand Down Expand Up @@ -626,3 +627,38 @@ async def delete(
else:
success_message = f"Successfully deleted work queue {name!r}"
exit_with_success(success_message)


@work_app.command("read-runs")
@experimental_parameter("pool", group="work_pools", when=lambda y: y is not None)
async def read_wq_runs(
name: str = typer.Argument(..., help="The name or ID of the work queue to poll"),
pool: Optional[str] = typer.Option(
None,
"-p",
"--pool",
help="The name of the work pool containing the work queue to poll.",
),
):
"""
Get runs in a work queue. Note that this will trigger an artificial poll of
the work queue.
"""

queue_id = await _get_work_queue_id_from_name_or_id(
name_or_id=name,
work_pool_name=pool,
)
async with get_client() as client:
try:
runs = await client.get_runs_in_work_queue(id=queue_id)
except ObjectNotFound:
if pool:
error_message = f"No work queue found: {name!r} in work pool {pool!r}"
else:
error_message = f"No work queue found: {name!r}"
exit_with_error(error_message)
success_message = (
f"Read {len(runs)} runs for work queue {name!r} in work pool {pool}: {runs}"
)
exit_with_success(success_message)
49 changes: 49 additions & 0 deletions tests/cli/test_work_queues.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import uuid

import pytest

import prefect.exceptions
from prefect import flow
from prefect.testing.cli import invoke_and_assert
from prefect.utilities.asyncutils import run_sync_in_worker_thread, sync_compatible

Expand Down Expand Up @@ -586,3 +589,49 @@ def test_ls_with_bad_pool(
expected_code=1,
)
assert f"No work pool found: '{work_queue_1.work_pool.name}-bad'" in res.output


class TestReadRuns:
@sync_compatible
async def create_runs_in_queue(self, prefect_client, queue, count: int):
foo = flow(lambda: None, name="foo")
flow_id = await prefect_client.create_flow(foo)
deployment_id = await prefect_client.create_deployment(
flow_id=flow_id,
name="test-deployment",
manifest_path="file.json",
work_queue_name=queue.name,
)
for _ in range(count):
await prefect_client.create_flow_run_from_deployment(deployment_id)

def test_read_wq(self, prefect_client, work_queue):
n_runs = 3
self.create_runs_in_queue(prefect_client, work_queue, n_runs)
cmd = f"work-queue read-runs {work_queue.name}"
result = invoke_and_assert(command=cmd, expected_code=0)
assert f"Read {n_runs} runs for work queue" in result.output

def test_read_wq_with_pool(self, prefect_client, work_queue):
n_runs = 3
self.create_runs_in_queue(prefect_client, work_queue, n_runs)
cmd = (
f"work-queue read-runs --pool {work_queue.work_pool.name} {work_queue.name}"
)
result = invoke_and_assert(command=cmd, expected_code=0)
assert f"Read {n_runs} runs for work queue" in result.output

def test_read_missing_wq(self, work_queue):
bad_name = str(uuid.uuid4())
cmd = f"work-queue read-runs --pool {work_queue.work_pool.name} {bad_name}"
result = invoke_and_assert(command=cmd, expected_code=1)
assert f"No work queue found: '{bad_name}'" in result.output

def test_read_wq_with_missing_pool(self, work_queue):
bad_name = str(uuid.uuid4())
cmd = f"work-queue read-runs --pool {bad_name} {work_queue.name}"
result = invoke_and_assert(command=cmd, expected_code=1)
assert (
f"No work queue named '{work_queue.name}' found in work pool '{bad_name}'"
in result.output
)

0 comments on commit 83c94c7

Please sign in to comment.