diff --git a/src/prefect/__init__.py b/src/prefect/__init__.py index 0316ec63f847..f71ee4fef429 100644 --- a/src/prefect/__init__.py +++ b/src/prefect/__init__.py @@ -8,6 +8,8 @@ import warnings import sys +print("HI THERE") + __version_info__ = _version.get_versions() __version__ = __version_info__["version"] diff --git a/src/prefect/cli/agent.py b/src/prefect/cli/agent.py index e5061a845d0c..ed5f4ec3ad8b 100644 --- a/src/prefect/cli/agent.py +++ b/src/prefect/cli/agent.py @@ -7,24 +7,19 @@ from typing import List, Optional from uuid import UUID -import anyio import typer -import prefect -from prefect.agent import PrefectAgent from prefect.cli._types import PrefectTyper, SettingsOption -from prefect.cli._utilities import exit_with_error from prefect.cli.root import app -from prefect.client import get_client -from prefect.client.schemas.filters import WorkQueueFilter, WorkQueueFilterName -from prefect.exceptions import ObjectNotFound from prefect.settings import ( PREFECT_AGENT_PREFETCH_SECONDS, PREFECT_AGENT_QUERY_INTERVAL, PREFECT_API_URL, ) -from prefect.utilities.processutils import setup_signal_handlers_agent -from prefect.utilities.services import critical_service_loop +from prefect.utilities.importtools import lazy_import + +anyio = lazy_import("anyio") +prefect = lazy_import("prefect") agent_app = PrefectTyper( name="agent", @@ -36,7 +31,6 @@ ) app.add_typer(agent_app) - ascii_name = r""" ___ ___ ___ ___ ___ ___ _____ _ ___ ___ _ _ _____ | _ \ _ \ __| __| __/ __|_ _| /_\ / __| __| \| |_ _| @@ -102,6 +96,13 @@ async def start( """ Start an agent process to poll one or more work queues for flow runs. """ + from prefect.agent import PrefectAgent + from prefect.cli._utilities import exit_with_error + from prefect.client import get_client + from prefect.exceptions import ObjectNotFound + from prefect.utilities.processutils import setup_signal_handlers_agent + from prefect.utilities.services import critical_service_loop + work_queues = work_queues or [] if work_queue is not None: @@ -261,6 +262,10 @@ async def _check_work_queues_paused( Returns: - bool: True if work queues are paused, False otherwise """ + from prefect.client import get_client + from prefect.client.schemas.filters import WorkQueueFilter, WorkQueueFilterName + from prefect.exceptions import ObjectNotFound + work_queues_list = work_queues or ["default"] try: work_queues_filter = WorkQueueFilter( diff --git a/src/prefect/cli/artifact.py b/src/prefect/cli/artifact.py index 0593ef81dddc..2f610fecbd9d 100644 --- a/src/prefect/cli/artifact.py +++ b/src/prefect/cli/artifact.py @@ -1,17 +1,12 @@ from typing import Optional -import pendulum import typer -from rich.pretty import Pretty -from rich.table import Table -from prefect import get_client from prefect.cli._types import PrefectTyper -from prefect.cli._utilities import exit_with_error, exit_with_success from prefect.cli.root import app -from prefect.client.schemas.filters import ArtifactFilter, ArtifactFilterKey -from prefect.client.schemas.sorting import ArtifactCollectionSort, ArtifactSort -from prefect.exceptions import ObjectNotFound +from prefect.utilities.importtools import lazy_import + +rich = lazy_import("rich") artifact_app = PrefectTyper( name="artifact", help="Commands for starting and interacting with artifacts." @@ -36,7 +31,12 @@ async def list_artifacts( """ List artifacts. """ - table = Table( + import pendulum + + from prefect import get_client + from prefect.client.schemas.sorting import ArtifactCollectionSort, ArtifactSort + + table = rich.Table( title="Artifacts", caption="List Artifacts using `prefect artifact ls`", show_header=True, @@ -123,6 +123,10 @@ async def inspect( } ] """ + from prefect import get_client + from prefect.cli._utilities import exit_with_error + from prefect.client.schemas.filters import ArtifactFilter, ArtifactFilterKey + from prefect.client.schemas.sorting import ArtifactSort async with get_client() as client: artifacts = await client.read_artifacts( @@ -135,7 +139,7 @@ async def inspect( artifacts = [a.dict(json_compatible=True) for a in artifacts] - app.console.print(Pretty(artifacts)) + app.console.print(rich.Pretty(artifacts)) @artifact_app.command("delete") @@ -156,6 +160,11 @@ async def delete( Examples: $ prefect artifact delete "my-artifact" """ + from prefect import get_client + from prefect.cli._utilities import exit_with_error, exit_with_success + from prefect.client.schemas.filters import ArtifactFilter, ArtifactFilterKey + from prefect.exceptions import ObjectNotFound + if key and artifact_id: exit_with_error("Please provide either a key or an artifact_id but not both.") diff --git a/src/prefect/cli/block.py b/src/prefect/cli/block.py index b63a83db23ac..d7eedc19a4ff 100644 --- a/src/prefect/cli/block.py +++ b/src/prefect/cli/block.py @@ -1,30 +1,29 @@ """ Command line interface for working with blocks. """ + import inspect -from importlib import import_module from pathlib import Path from types import ModuleType from typing import List, Optional, Type import typer -from rich.table import Table +from typing_extensions import TYPE_CHECKING -from prefect.blocks.core import Block, InvalidBlockRegistration from prefect.cli._types import PrefectTyper -from prefect.cli._utilities import exit_with_error, exit_with_success from prefect.cli.root import app -from prefect.client import get_client from prefect.exceptions import ( ObjectNotFound, PrefectHTTPStatusError, - ProtectedBlockError, ScriptError, exception_traceback, ) -from prefect.settings import PREFECT_UI_URL -from prefect.utilities.asyncutils import run_sync_in_worker_thread -from prefect.utilities.importtools import load_script_as_module +from prefect.utilities.importtools import lazy_import + +if TYPE_CHECKING: + from prefect.blocks.core import Block + +rich = lazy_import("rich") blocks_app = PrefectTyper(name="block", help="Commands for working with blocks.") blocktypes_app = PrefectTyper( @@ -36,7 +35,7 @@ def display_block(block_document): block_slug = f"{block_document.block_type.slug}/{block_document.name}" - block_table = Table( + block_table = rich.Table( title=block_slug, show_header=False, show_footer=False, expand=True ) block_table.add_column(style="italic cyan") @@ -51,7 +50,7 @@ def display_block(block_document): def display_block_type(block_type): - block_type_table = Table( + block_type_table = rich.Table( title=block_type.name, show_header=False, show_footer=False, expand=True ) block_type_table.add_column(style="italic cyan") @@ -72,7 +71,9 @@ def display_block_type(block_type): return block_type_table -async def _register_blocks_in_module(module: ModuleType) -> List[Type[Block]]: +async def _register_blocks_in_module(module: ModuleType) -> List[Type["Block"]]: + from prefect.blocks.core import Block, InvalidBlockRegistration + registered_blocks = [] for _, cls in inspect.getmembers(module): if Block.is_block_class(cls): @@ -85,8 +86,8 @@ async def _register_blocks_in_module(module: ModuleType) -> List[Type[Block]]: return registered_blocks -def _build_registered_blocks_table(registered_blocks: List[Type[Block]]): - table = Table("Registered Blocks") +def _build_registered_blocks_table(registered_blocks: List[Type["Block"]]): + table = rich.Table("Registered Blocks") for block in registered_blocks: table.add_row(block.get_block_type_name()) return table @@ -123,6 +124,13 @@ async def register( Register block types in a .py file: $ prefect block register -f my_blocks.py """ + from importlib import import_module + + from prefect.cli._utilities import exit_with_error + from prefect.settings import PREFECT_UI_URL + from prefect.utilities.asyncutils import run_sync_in_worker_thread + from prefect.utilities.importtools import load_script_as_module + # Handles if both options are specified or if neither are specified if not (bool(file_path) ^ bool(module_name)): exit_with_error( @@ -182,10 +190,12 @@ async def block_ls(): """ View all configured blocks. """ + from prefect.client import get_client + async with get_client() as client: blocks = await client.read_block_documents() - table = Table( + table = rich.Table( title="Blocks", caption="List Block Types using `prefect block type ls`" ) table.add_column("ID", style="cyan", no_wrap=True) @@ -214,6 +224,9 @@ async def block_delete( """ Delete a configured block. """ + from prefect.cli._utilities import exit_with_error, exit_with_success + from prefect.client import get_client + async with get_client() as client: if slug is None and block_id is not None: try: @@ -250,6 +263,10 @@ async def block_create( """ Generate a link to the Prefect UI to create a block. """ + from prefect.cli._utilities import exit_with_error + from prefect.client import get_client + from prefect.settings import PREFECT_UI_URL + async with get_client() as client: try: block_type = await client.read_block_type_by_slug(block_type_slug) @@ -284,6 +301,9 @@ async def block_inspect( """ Displays details about a configured block. """ + from prefect.cli._utilities import exit_with_error + from prefect.client import get_client + async with get_client() as client: if slug is None and block_id is not None: try: @@ -314,10 +334,12 @@ async def list_types(): """ List all block types. """ + from prefect.client import get_client + async with get_client() as client: block_types = await client.read_block_types() - table = Table( + table = rich.Table( title="Block Types", show_lines=True, ) @@ -349,6 +371,9 @@ async def blocktype_inspect( """ Display details about a block type. """ + from prefect.cli._utilities import exit_with_error + from prefect.client import get_client + async with get_client() as client: try: block_type = await client.read_block_type_by_slug(slug) @@ -365,6 +390,13 @@ async def blocktype_delete( """ Delete an unprotected Block Type. """ + from prefect.cli._utilities import exit_with_error, exit_with_success + from prefect.client import get_client + from prefect.exceptions import ( + ObjectNotFound, + ProtectedBlockError, + ) + async with get_client() as client: try: block_type = await client.read_block_type_by_slug(slug) diff --git a/src/prefect/cli/concurrency_limit.py b/src/prefect/cli/concurrency_limit.py index 4dced52942bc..b19d67237a3d 100644 --- a/src/prefect/cli/concurrency_limit.py +++ b/src/prefect/cli/concurrency_limit.py @@ -1,25 +1,12 @@ """ Command line interface for working with concurrency limits. """ -import textwrap - -import pendulum - -try: - from rich.console import Group -except ImportError: - # Name changed in https://github.com/Textualize/rich/blob/master/CHANGELOG.md#1100---2022-01-09 - from rich.console import RenderGroup as Group - -from rich.panel import Panel -from rich.pretty import Pretty -from rich.table import Table from prefect.cli._types import PrefectTyper -from prefect.cli._utilities import exit_with_error, exit_with_success from prefect.cli.root import app -from prefect.client import get_client -from prefect.exceptions import ObjectNotFound +from prefect.utilities.importtools import lazy_import + +pendulum = lazy_import("pendulum") concurrency_limit_app = PrefectTyper( name="concurrency-limit", @@ -36,6 +23,9 @@ async def create(tag: str, concurrency_limit: int): This limit controls how many task runs with that tag may simultaneously be in a Running state. """ + import textwrap + + from prefect.client import get_client async with get_client() as client: await client.create_concurrency_limit( @@ -67,6 +57,20 @@ async def inspect(tag: str): which are currently using a concurrency slot. """ + from prefect.cli._utilities import exit_with_error + from prefect.client import get_client + from prefect.exceptions import ObjectNotFound + + try: + from rich.console import Group + except ImportError: + # Name changed in https://github.com/Textualize/rich/blob/master/CHANGELOG.md#1100---2022-01-09 + from rich.console import RenderGroup as Group + + from rich.panel import Panel + from rich.pretty import Pretty + from rich.table import Table + async with get_client() as client: try: result = await client.read_concurrency_limit_by_tag(tag=tag) @@ -104,6 +108,10 @@ async def ls(limit: int = 15, offset: int = 0): """ View all concurrency limits. """ + from rich.table import Table + + from prefect.client import get_client + table = Table( title="Concurrency Limits", caption="inspect a concurrency limit to show active task run IDs", @@ -134,6 +142,9 @@ async def reset(tag: str): """ Resets the concurrency limit slots set on the specified tag. """ + from prefect.cli._utilities import exit_with_error, exit_with_success + from prefect.client import get_client + from prefect.exceptions import ObjectNotFound async with get_client() as client: try: @@ -149,6 +160,9 @@ async def delete(tag: str): """ Delete the concurrency limit set on the specified tag. """ + from prefect.cli._utilities import exit_with_error, exit_with_success + from prefect.client import get_client + from prefect.exceptions import ObjectNotFound async with get_client() as client: try: diff --git a/src/prefect/cli/deployment.py b/src/prefect/cli/deployment.py index 8d642a736d5f..1681475d6499 100644 --- a/src/prefect/cli/deployment.py +++ b/src/prefect/cli/deployment.py @@ -11,28 +11,13 @@ from typing import Any, Dict, List, Optional, Union from uuid import UUID -import pendulum import typer import yaml -from rich.pretty import Pretty -from rich.table import Table +from typing_extensions import TYPE_CHECKING -from prefect._internal.compatibility.experimental import experiment_enabled from prefect.blocks.core import Block from prefect.cli._types import PrefectTyper -from prefect.cli._utilities import exit_with_error, exit_with_success from prefect.cli.root import app -from prefect.client.orchestration import PrefectClient, ServerType, get_client -from prefect.client.schemas.filters import FlowFilter -from prefect.client.schemas.objects import DeploymentSchedule -from prefect.client.schemas.schedules import ( - CronSchedule, - IntervalSchedule, - RRuleSchedule, -) -from prefect.client.utilities import inject_client -from prefect.context import PrefectObjectRegistry, registry_from_script -from prefect.deployments import Deployment, load_deployments_from_yaml from prefect.exceptions import ( ObjectAlreadyExists, ObjectNotFound, @@ -40,14 +25,15 @@ ScriptError, exception_traceback, ) -from prefect.flow_runs import wait_for_flow_run -from prefect.flows import load_flow_from_entrypoint -from prefect.settings import PREFECT_UI_URL -from prefect.states import Scheduled -from prefect.utilities.asyncutils import run_sync_in_worker_thread from prefect.utilities.collections import listrepr from prefect.utilities.dispatch import get_registry_for_type -from prefect.utilities.filesystem import create_default_ignore_file +from prefect.utilities.importtools import lazy_import + +if TYPE_CHECKING: + from prefect.client.orchestration import PrefectClient + from prefect.context import PrefectObjectRegistry + +pendulum = lazy_import("pendulum") def str_presenter(dumper, data): @@ -75,13 +61,17 @@ def str_presenter(dumper, data): def assert_deployment_name_format(name: str) -> None: + from prefect.cli._utilities import exit_with_error + if "/" not in name: exit_with_error( "Invalid deployment name. Expected '/'" ) -async def get_deployment(client: PrefectClient, name, deployment_id): +async def get_deployment(client: "PrefectClient", name, deployment_id): + from prefect.cli._utilities import exit_with_error + if name is None and deployment_id is not None: try: deployment = await client.read_deployment(deployment_id) @@ -103,11 +93,14 @@ async def get_deployment(client: PrefectClient, name, deployment_id): async def create_work_queue_and_set_concurrency_limit( work_queue_name, work_pool_name, work_queue_concurrency ): + from prefect.cli._utilities import exit_with_error + from prefect.client.orchestration import get_client + async with get_client() as client: if work_queue_concurrency is not None and work_queue_name: try: try: - await check_work_pool_exists(work_pool_name) + await check_work_pool_exists(work_pool_name, client=client) res = await client.create_work_queue( name=work_queue_name, work_pool_name=work_pool_name ) @@ -167,10 +160,11 @@ async def create_work_queue_and_set_concurrency_limit( ) -@inject_client async def check_work_pool_exists( - work_pool_name: Optional[str], client: PrefectClient = None + work_pool_name: Optional[str], client: "PrefectClient" = None ): + from prefect.cli._utilities import exit_with_error + if work_pool_name is not None: try: await client.read_work_pool(work_pool_name=work_pool_name) @@ -189,10 +183,11 @@ async def check_work_pool_exists( exit_with_error("Work pool not found!") -@inject_client async def _print_deployment_work_pool_instructions( - work_pool_name: str, client: PrefectClient = None + work_pool_name: str, client: "PrefectClient" = None ): + from prefect._internal.compatibility.experimental import experiment_enabled + work_pool = await client.read_work_pool(work_pool_name) blurb = ( "\nTo execute flow runs from this deployment, start an agent " @@ -277,6 +272,11 @@ async def inspect(name: str): } """ + from rich.pretty import Pretty + + from prefect.cli._utilities import exit_with_error + from prefect.client.orchestration import get_client + assert_deployment_name_format(name) async with get_client() as client: @@ -355,6 +355,14 @@ async def create_schedule( """ Create a schedule for a given deployment. """ + from prefect.cli._utilities import exit_with_error, exit_with_success + from prefect.client.orchestration import get_client + from prefect.client.schemas.schedules import ( + CronSchedule, + IntervalSchedule, + RRuleSchedule, + ) + assert_deployment_name_format(name) if sum(option is not None for option in [interval, rrule_string, cron_string]) != 1: @@ -459,6 +467,9 @@ async def delete_schedule( """ Delete a deployment schedule. """ + from prefect.cli._utilities import exit_with_error, exit_with_success + from prefect.client.orchestration import get_client + assert_deployment_name_format(deployment_name) async with get_client() as client: @@ -490,6 +501,9 @@ async def pause_schedule(deployment_name: str, schedule_id: UUID): """ Pause a deployment schedule. """ + from prefect.cli._utilities import exit_with_error, exit_with_success + from prefect.client.orchestration import get_client + assert_deployment_name_format(deployment_name) async with get_client() as client: @@ -521,6 +535,9 @@ async def resume_schedule(deployment_name: str, schedule_id: UUID): """ Resume a deployment schedule. """ + from prefect.cli._utilities import exit_with_error, exit_with_success + from prefect.client.orchestration import get_client + assert_deployment_name_format(deployment_name) async with get_client() as client: @@ -550,6 +567,17 @@ async def list_schedules(deployment_name: str): """ View all schedules for a deployment. """ + from rich.table import Table + + from prefect.cli._utilities import exit_with_error + from prefect.client.orchestration import get_client + from prefect.client.schemas.objects import DeploymentSchedule + from prefect.client.schemas.schedules import ( + CronSchedule, + IntervalSchedule, + RRuleSchedule, + ) + assert_deployment_name_format(deployment_name) async with get_client() as client: try: @@ -601,6 +629,9 @@ async def clear_schedules( """ Clear all schedules for a deployment. """ + from prefect.cli._utilities import exit_with_error, exit_with_success + from prefect.client.orchestration import get_client + assert_deployment_name_format(deployment_name) async with get_client() as client: try: @@ -673,6 +704,9 @@ async def _set_schedule( This command is deprecated. Use 'prefect deployment schedule create' instead. """ + from prefect.cli._utilities import exit_with_error + from prefect.client.orchestration import get_client + assert_deployment_name_format(name) exclusive_options = sum( @@ -736,6 +770,9 @@ async def _pause_schedule( This command is deprecated. Use `prefect deployment schedule pause` instead. """ + from prefect.cli._utilities import exit_with_error + from prefect.client.orchestration import get_client + assert_deployment_name_format(name) async with get_client() as client: @@ -771,6 +808,9 @@ async def _resume_schedule( This command is deprecated. Use `prefect deployment schedule resume` instead. """ + from prefect.cli._utilities import exit_with_error + from prefect.client.orchestration import get_client + # TODO only work if there is one schedule, otherwise error assert_deployment_name_format(name) async with get_client() as client: @@ -796,6 +836,11 @@ async def ls(flow_name: Optional[List[str]] = None, by_created: bool = False): """ View all deployments or deployments for specific flows. """ + from rich.table import Table + + from prefect.client.orchestration import get_client + from prefect.client.schemas.filters import FlowFilter + async with get_client() as client: deployments = await client.read_deployments( flow_filter=FlowFilter(name={"any_": flow_name}) if flow_name else None @@ -914,6 +959,12 @@ async def run( """ import dateparser + from prefect.cli._utilities import exit_with_error, exit_with_success + from prefect.client.orchestration import get_client + from prefect.flow_runs import wait_for_flow_run + from prefect.settings import PREFECT_UI_URL + from prefect.states import Scheduled + now = pendulum.now("UTC") multi_params = {} @@ -1072,11 +1123,15 @@ async def run( ) -def _load_deployments(path: Path, quietly=False) -> PrefectObjectRegistry: +def _load_deployments(path: Path, quietly=False) -> "PrefectObjectRegistry": """ Load deployments from the path the user gave on the command line, giving helpful error messages if they cannot be loaded. """ + from prefect.cli._utilities import exit_with_error + from prefect.context import registry_from_script + from prefect.deployments import load_deployments_from_yaml + if path.suffix == ".py": from_msg = "python script" loader = registry_from_script @@ -1137,6 +1192,11 @@ async def apply( """ Create or update a deployment from a YAML file. """ + from prefect.cli._utilities import exit_with_error + from prefect.client.orchestration import ServerType, get_client + from prefect.deployments import Deployment + from prefect.settings import PREFECT_UI_URL + deployment = None async with get_client() as client: for path in paths: @@ -1252,6 +1312,9 @@ async def delete( $ prefect deployment delete test_flow/test_deployment $ prefect deployment delete --id dfd3e220-a130-4149-9af6-8d487e02fea6 """ + from prefect.cli._utilities import exit_with_error, exit_with_success + from prefect.client.orchestration import get_client + async with get_client() as client: if name is None and deployment_id is not None: try: @@ -1457,6 +1520,19 @@ async def build( """ Generate a deployment YAML from /path/to/file.py:flow_function """ + + from prefect.cli._utilities import exit_with_error + from prefect.client.orchestration import get_client + from prefect.client.schemas.schedules import ( + CronSchedule, + IntervalSchedule, + RRuleSchedule, + ) + from prefect.deployments import Deployment + from prefect.flows import load_flow_from_entrypoint + from prefect.utilities.asyncutils import run_sync_in_worker_thread + from prefect.utilities.filesystem import create_default_ignore_file + # validate inputs if not name: exit_with_error( @@ -1723,6 +1799,8 @@ def _load_json_key_values( Returns: A mapping of keys -> parsed values """ + from prefect.cli._utilities import exit_with_error + parsed = {} def cast_value(value: str) -> Any: