Skip to content

Commit

Permalink
add delete and ls commands for workers of a work pool
Browse files Browse the repository at this point in the history
  • Loading branch information
zzstoatzz committed Apr 19, 2024
1 parent 3af4ee9 commit 2f37bbf
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 0 deletions.
62 changes: 62 additions & 0 deletions src/prefect/cli/work_pool.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Command line interface for working with work queues.
"""

import json

import pendulum
Expand Down Expand Up @@ -32,6 +33,12 @@
work_pool_app = PrefectTyper(
name="work-pool", help="Commands for working with work pools."
)
work_pool_worker_app = PrefectTyper(
name="worker", help="Commands for interacting with a work pool's workers."
)

work_pool_app.add_typer(work_pool_worker_app, aliases=["workers"])

app.add_typer(work_pool_app, aliases=["work-pool"])


Expand Down Expand Up @@ -641,3 +648,58 @@ def sort_by_created_key(r):
),
style="yellow",
)


@work_pool_worker_app.command(aliases=["ls"])
async def _ls(
name: str = typer.Argument(
..., help="The name of the work pool to list workers for."
),
):
"""
List workers in a work pool.
\b
Examples:
$ prefect work-pool worker ls "my-pool"
"""
async with get_client() as client:
try:
workers = await client.read_work_pool_workers(work_pool_name=name)
except ObjectNotFound as exc:
exit_with_error(exc)

table = Table(title=f"Workers for work pool {name!r}")
table.add_column("ID", style="cyan", no_wrap=True)
table.add_column("Name", style="green", no_wrap=True)
table.add_column("Status", style="blue", no_wrap=True)

for worker in workers:
table.add_row(str(worker.id), worker.name, worker.status)

app.console.print(table)


@work_pool_worker_app.command(aliases=["delete"])
async def _delete(
name: str = typer.Argument(
..., help="The name of the work pool to delete workers from."
),
worker_name: str = typer.Argument(..., help="The ID of the worker to delete."),
):
"""
Delete a worker from a work pool.
\b
Examples:
$ prefect work-pool worker delete "my-pool" "my-worker"
"""
async with get_client() as client:
try:
await client.delete_work_pool_worker(
work_pool_name=name, worker_name=worker_name
)
except ObjectNotFound as exc:
exit_with_error(exc)

exit_with_success(f"Deleted worker {worker_name!r} from work pool {name!r}")
35 changes: 35 additions & 0 deletions src/prefect/client/orchestration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2697,6 +2697,41 @@ async def delete_work_pool(
else:
raise

async def read_work_pool_workers(
self,
work_pool_name: str,
limit: Optional[int] = None,
offset: Optional[int] = None,
) -> List[Worker]:
"""
Lists workers in a work pool.
"""
try:
response = await self._client.post(
f"/work_pools/{work_pool_name}/workers/filter",
json={"limit": limit, "offset": offset},
)
return pydantic.parse_obj_as(List[Worker], response.json())
except httpx.HTTPStatusError as e:
if e.response.status_code == status.HTTP_404_NOT_FOUND:
raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
else:
raise

async def delete_work_pool_worker(self, work_pool_name: str, worker_name: str):
"""
Deletes a worker from a work pool.
"""
try:
await self._client.delete(
f"/work_pools/{work_pool_name}/workers/{worker_name}"
)
except httpx.HTTPStatusError as e:
if e.response.status_code == status.HTTP_404_NOT_FOUND:
raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
else:
raise

async def read_work_queues(
self,
work_pool_name: Optional[str] = None,
Expand Down

0 comments on commit 2f37bbf

Please sign in to comment.