Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CLI command to read runs in WQ #11989

Merged
merged 3 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions src/prefect/cli/work_queue.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Command line interface for working with work queues.
"""

from textwrap import dedent
from typing import List, Optional, Union
from uuid import UUID
Expand Down Expand Up @@ -626,3 +627,38 @@ async def delete(
else:
success_message = f"Successfully deleted work queue {name!r}"
exit_with_success(success_message)


@work_app.command("read-runs")
@experimental_parameter("pool", group="work_pools", when=lambda y: y is not None)
async def read_wq_runs(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be nice to have a couple tests around this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a few tests! Let me know if you think there are edge cases I should be testing for.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems to hit all the notes, thank you!

name: str = typer.Argument(..., help="The name or ID of the work queue to poll"),
pool: Optional[str] = typer.Option(
None,
"-p",
"--pool",
help="The name of the work pool containing the work queue to poll.",
),
):
"""
Get runs in a work queue. Note that this will trigger an artificial poll of
the work queue.
"""

queue_id = await _get_work_queue_id_from_name_or_id(
name_or_id=name,
work_pool_name=pool,
)
async with get_client() as client:
try:
runs = await client.get_runs_in_work_queue(id=queue_id)
except ObjectNotFound:
if pool:
error_message = f"No work queue found: {name!r} in work pool {pool!r}"
else:
error_message = f"No work queue found: {name!r}"
exit_with_error(error_message)
success_message = (
f"Read {len(runs)} runs for work queue {name!r} in work pool {pool}: {runs}"
)
exit_with_success(success_message)
49 changes: 49 additions & 0 deletions tests/cli/test_work_queues.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import uuid

import pytest

import prefect.exceptions
from prefect import flow
from prefect.testing.cli import invoke_and_assert
from prefect.utilities.asyncutils import run_sync_in_worker_thread, sync_compatible

Expand Down Expand Up @@ -586,3 +589,49 @@ def test_ls_with_bad_pool(
expected_code=1,
)
assert f"No work pool found: '{work_queue_1.work_pool.name}-bad'" in res.output


class TestReadRuns:
@sync_compatible
async def create_runs_in_queue(self, prefect_client, queue, count: int):
foo = flow(lambda: None, name="foo")
flow_id = await prefect_client.create_flow(foo)
deployment_id = await prefect_client.create_deployment(
flow_id=flow_id,
name="test-deployment",
manifest_path="file.json",
work_queue_name=queue.name,
)
for _ in range(count):
await prefect_client.create_flow_run_from_deployment(deployment_id)

def test_read_wq(self, prefect_client, work_queue):
n_runs = 3
self.create_runs_in_queue(prefect_client, work_queue, n_runs)
cmd = f"work-queue read-runs {work_queue.name}"
result = invoke_and_assert(command=cmd, expected_code=0)
assert f"Read {n_runs} runs for work queue" in result.output

def test_read_wq_with_pool(self, prefect_client, work_queue):
n_runs = 3
self.create_runs_in_queue(prefect_client, work_queue, n_runs)
cmd = (
f"work-queue read-runs --pool {work_queue.work_pool.name} {work_queue.name}"
)
result = invoke_and_assert(command=cmd, expected_code=0)
assert f"Read {n_runs} runs for work queue" in result.output

def test_read_missing_wq(self, work_queue):
bad_name = str(uuid.uuid4())
cmd = f"work-queue read-runs --pool {work_queue.work_pool.name} {bad_name}"
result = invoke_and_assert(command=cmd, expected_code=1)
assert f"No work queue found: '{bad_name}'" in result.output

def test_read_wq_with_missing_pool(self, work_queue):
bad_name = str(uuid.uuid4())
cmd = f"work-queue read-runs --pool {bad_name} {work_queue.name}"
result = invoke_and_assert(command=cmd, expected_code=1)
assert (
f"No work queue named '{work_queue.name}' found in work pool '{bad_name}'"
in result.output
)