diff --git a/benches/conftest.py b/benches/conftest.py index 386e4ce98723..a8f8313c0128 100644 --- a/benches/conftest.py +++ b/benches/conftest.py @@ -20,14 +20,3 @@ def handle_saving(*args, **kwargs): pytest_benchmark.session.BenchmarkSession.handle_saving = handle_saving - - -@pytest.fixture(autouse=True) -def reset_object_registry(): - """ - Ensures each test has a clean object registry. - """ - from prefect.context import PrefectObjectRegistry - - with PrefectObjectRegistry(): - yield diff --git a/src/integrations/prefect-aws/tests/conftest.py b/src/integrations/prefect-aws/tests/conftest.py index f6a9132ff519..c53607460b61 100644 --- a/src/integrations/prefect-aws/tests/conftest.py +++ b/src/integrations/prefect-aws/tests/conftest.py @@ -44,14 +44,3 @@ def aws_client_parameters_empty(): @pytest.fixture def aws_client_parameters_public_bucket(): return AwsClientParameters(config=Config(signature_version=UNSIGNED)) - - -@pytest.fixture(autouse=True) -def reset_object_registry(): - """ - Ensures each test has a clean object registry. - """ - from prefect.context import PrefectObjectRegistry - - with PrefectObjectRegistry(): - yield diff --git a/src/integrations/prefect-azure/tests/conftest.py b/src/integrations/prefect-azure/tests/conftest.py index 4cb3d9f0a38d..21f8fbf8eea9 100644 --- a/src/integrations/prefect-azure/tests/conftest.py +++ b/src/integrations/prefect-azure/tests/conftest.py @@ -17,17 +17,6 @@ def prefect_db(): yield -@pytest.fixture(autouse=True) -def reset_object_registry(): - """ - Ensures each test has a clean object registry. - """ - from prefect.context import PrefectObjectRegistry - - with PrefectObjectRegistry(): - yield - - class AsyncIter: def __init__(self, items): self.items = items diff --git a/src/integrations/prefect-bitbucket/tests/conftest.py b/src/integrations/prefect-bitbucket/tests/conftest.py index 8f7797306f5d..a06c90e85232 100644 --- a/src/integrations/prefect-bitbucket/tests/conftest.py +++ b/src/integrations/prefect-bitbucket/tests/conftest.py @@ -10,14 +10,3 @@ def prefect_db(): """ with prefect_test_harness(): yield - - -@pytest.fixture(autouse=True) -def reset_object_registry(): - """ - Ensures each test has a clean object registry. - """ - from prefect.context import PrefectObjectRegistry - - with PrefectObjectRegistry(): - yield diff --git a/src/integrations/prefect-dask/tests/conftest.py b/src/integrations/prefect-dask/tests/conftest.py index c95d93952428..7292781b3329 100644 --- a/src/integrations/prefect-dask/tests/conftest.py +++ b/src/integrations/prefect-dask/tests/conftest.py @@ -17,17 +17,6 @@ def prefect_db(): yield -@pytest.fixture(autouse=True) -def reset_object_registry(): - """ - Ensures each test has a clean object registry. - """ - from prefect.context import PrefectObjectRegistry - - with PrefectObjectRegistry(): - yield - - @pytest.fixture(scope="session") def event_loop(request): """ diff --git a/src/integrations/prefect-databricks/tests/conftest.py b/src/integrations/prefect-databricks/tests/conftest.py index 8f7797306f5d..a06c90e85232 100644 --- a/src/integrations/prefect-databricks/tests/conftest.py +++ b/src/integrations/prefect-databricks/tests/conftest.py @@ -10,14 +10,3 @@ def prefect_db(): """ with prefect_test_harness(): yield - - -@pytest.fixture(autouse=True) -def reset_object_registry(): - """ - Ensures each test has a clean object registry. - """ - from prefect.context import PrefectObjectRegistry - - with PrefectObjectRegistry(): - yield diff --git a/src/integrations/prefect-dbt/tests/conftest.py b/src/integrations/prefect-dbt/tests/conftest.py index af1df7b14299..64263c1b7b43 100644 --- a/src/integrations/prefect-dbt/tests/conftest.py +++ b/src/integrations/prefect-dbt/tests/conftest.py @@ -36,17 +36,6 @@ def prefect_db(): yield -@pytest.fixture(autouse=True) -def reset_object_registry(): - """ - Ensures each test has a clean object registry. - """ - from prefect.context import PrefectObjectRegistry - - with PrefectObjectRegistry(): - yield - - @pytest.fixture(autouse=True) def google_auth_mock(monkeypatch): """ diff --git a/src/integrations/prefect-docker/tests/conftest.py b/src/integrations/prefect-docker/tests/conftest.py index b1b41748ef6b..de600cde9c4a 100644 --- a/src/integrations/prefect-docker/tests/conftest.py +++ b/src/integrations/prefect-docker/tests/conftest.py @@ -32,17 +32,6 @@ def prefect_db(): yield -@pytest.fixture(autouse=True) -def reset_object_registry(): - """ - Ensures each test has a clean object registry. - """ - from prefect.context import PrefectObjectRegistry - - with PrefectObjectRegistry(): - yield - - @pytest.fixture(scope="session") def event_loop(request): """ diff --git a/src/integrations/prefect-gcp/tests/conftest.py b/src/integrations/prefect-gcp/tests/conftest.py index d20a8754c7e6..1b46093ecfd7 100644 --- a/src/integrations/prefect-gcp/tests/conftest.py +++ b/src/integrations/prefect-gcp/tests/conftest.py @@ -24,17 +24,6 @@ def disable_logging(): yield -@pytest.fixture(autouse=True) -def reset_object_registry(): - """ - Ensures each test has a clean object registry. - """ - from prefect.context import PrefectObjectRegistry - - with PrefectObjectRegistry(): - yield - - @pytest.fixture def google_auth(monkeypatch): google_auth_mock = MagicMock() diff --git a/src/integrations/prefect-github/tests/conftest.py b/src/integrations/prefect-github/tests/conftest.py index 8f7797306f5d..a06c90e85232 100644 --- a/src/integrations/prefect-github/tests/conftest.py +++ b/src/integrations/prefect-github/tests/conftest.py @@ -10,14 +10,3 @@ def prefect_db(): """ with prefect_test_harness(): yield - - -@pytest.fixture(autouse=True) -def reset_object_registry(): - """ - Ensures each test has a clean object registry. - """ - from prefect.context import PrefectObjectRegistry - - with PrefectObjectRegistry(): - yield diff --git a/src/integrations/prefect-gitlab/tests/conftest.py b/src/integrations/prefect-gitlab/tests/conftest.py index 8f7797306f5d..a06c90e85232 100644 --- a/src/integrations/prefect-gitlab/tests/conftest.py +++ b/src/integrations/prefect-gitlab/tests/conftest.py @@ -10,14 +10,3 @@ def prefect_db(): """ with prefect_test_harness(): yield - - -@pytest.fixture(autouse=True) -def reset_object_registry(): - """ - Ensures each test has a clean object registry. - """ - from prefect.context import PrefectObjectRegistry - - with PrefectObjectRegistry(): - yield diff --git a/src/integrations/prefect-kubernetes/tests/conftest.py b/src/integrations/prefect-kubernetes/tests/conftest.py index 1cec19320f58..477440141f88 100644 --- a/src/integrations/prefect-kubernetes/tests/conftest.py +++ b/src/integrations/prefect-kubernetes/tests/conftest.py @@ -34,17 +34,6 @@ def disable_api_logging(): yield -@pytest.fixture(autouse=True) -def reset_object_registry(): - """ - Ensures each test has a clean object registry. - """ - from prefect.context import PrefectObjectRegistry - - with PrefectObjectRegistry(): - yield - - @pytest.fixture def kube_config_dict(): return yaml.safe_load(GOOD_CONFIG_FILE_PATH.read_text()) diff --git a/src/integrations/prefect-snowflake/tests/conftest.py b/src/integrations/prefect-snowflake/tests/conftest.py index bcb517a19554..1cf1f6d78aa3 100644 --- a/src/integrations/prefect-snowflake/tests/conftest.py +++ b/src/integrations/prefect-snowflake/tests/conftest.py @@ -35,17 +35,6 @@ def prefect_db(): raise e -@pytest.fixture(autouse=True) -def reset_object_registry(): - """ - Ensures each test has a clean object registry. - """ - from prefect.context import PrefectObjectRegistry - - with PrefectObjectRegistry(): - yield - - @pytest.fixture() def credentials_params(): return { diff --git a/src/integrations/prefect-sqlalchemy/tests/conftest.py b/src/integrations/prefect-sqlalchemy/tests/conftest.py index 69455a2fde59..70ecf5f1c256 100644 --- a/src/integrations/prefect-sqlalchemy/tests/conftest.py +++ b/src/integrations/prefect-sqlalchemy/tests/conftest.py @@ -13,17 +13,6 @@ def prefect_db(): yield -@pytest.fixture(autouse=True) -def reset_object_registry(): - """ - Ensures each test has a clean object registry. - """ - from prefect.context import PrefectObjectRegistry - - with PrefectObjectRegistry(): - yield - - @pytest.fixture(autouse=True) def fetch_state_result(): with temporary_settings(updates={PREFECT_ASYNC_FETCH_STATE_RESULT: True}): diff --git a/src/prefect/flows.py b/src/prefect/flows.py index d48ac84caf6c..eb8670ec512d 100644 --- a/src/prefect/flows.py +++ b/src/prefect/flows.py @@ -56,7 +56,7 @@ from prefect.client.schemas.objects import FlowRun from prefect.client.schemas.schedules import SCHEDULE_TYPES from prefect.client.utilities import client_injector -from prefect.context import PrefectObjectRegistry, registry_from_script +from prefect.context import registry_from_script from prefect.deployments.runner import deploy from prefect.deployments.steps.core import run_steps from prefect.docker.docker_image import DockerImage @@ -125,7 +125,6 @@ from prefect.flows import FlowRun -@PrefectObjectRegistry.register_instances class Flow(Generic[P, R]): """ A Prefect workflow definition. @@ -1699,29 +1698,26 @@ def load_flow_from_entrypoint( FlowScriptError: If an exception is encountered while running the script MissingFlowError: If the flow function specified in the entrypoint does not exist """ - with PrefectObjectRegistry( # type: ignore - block_code_execution=True, - capture_failures=True, - ): - if ":" in entrypoint: - # split by the last colon once to handle Windows paths with drive letters i.e C:\path\to\file.py:do_stuff - path, func_name = entrypoint.rsplit(":", maxsplit=1) - else: - path, func_name = entrypoint.rsplit(".", maxsplit=1) - try: - flow = import_object(entrypoint) - except AttributeError as exc: - raise MissingFlowError( - f"Flow function with name {func_name!r} not found in {path!r}. " - ) from exc - if not isinstance(flow, Flow): - raise MissingFlowError( - f"Function with name {func_name!r} is not a flow. Make sure that it is " - "decorated with '@flow'." - ) + if ":" in entrypoint: + # split by the last colon once to handle Windows paths with drive letters i.e C:\path\to\file.py:do_stuff + path, func_name = entrypoint.rsplit(":", maxsplit=1) + else: + path, func_name = entrypoint.rsplit(".", maxsplit=1) + try: + flow = import_object(entrypoint) + except AttributeError as exc: + raise MissingFlowError( + f"Flow function with name {func_name!r} not found in {path!r}. " + ) from exc - return flow + if not isinstance(flow, Flow): + raise MissingFlowError( + f"Function with name {func_name!r} is not a flow. Make sure that it is " + "decorated with '@flow'." + ) + + return flow def load_flow_from_text(script_contents: AnyStr, flow_name: str) -> Flow: diff --git a/src/prefect/runner/server.py b/src/prefect/runner/server.py index 70376c473a0a..018558067a6c 100644 --- a/src/prefect/runner/server.py +++ b/src/prefect/runner/server.py @@ -10,7 +10,7 @@ from prefect._internal.schemas.validators import validate_values_conform_to_schema from prefect.client.orchestration import get_client from prefect.exceptions import MissingFlowError, ScriptError -from prefect.flows import Flow, load_flow_from_entrypoint, load_flows_from_script +from prefect.flows import Flow, load_flow_from_entrypoint from prefect.logging import get_logger from prefect.runner.utils import ( inject_schemas_into_openapi, @@ -24,6 +24,7 @@ PREFECT_RUNNER_SERVER_PORT, ) from prefect.utilities.asyncutils import sync_compatible +from prefect.utilities.importtools import load_script_as_module if TYPE_CHECKING: from prefect.client.schemas.responses import DeploymentResponse @@ -155,7 +156,10 @@ async def get_subflow_schemas(runner: "Runner") -> Dict[str, Dict]: continue script = deployment.entrypoint.split(":")[0] - subflows = load_flows_from_script(script) + module = load_script_as_module(script) + subflows = [ + obj for obj in module.__dict__.values() if isinstance(obj, Flow) + ] for flow in subflows: schemas[flow.name] = flow.parameters.model_dump() diff --git a/src/prefect/tasks.py b/src/prefect/tasks.py index d2387db025a6..a5f5fd65e3a1 100644 --- a/src/prefect/tasks.py +++ b/src/prefect/tasks.py @@ -37,7 +37,6 @@ from prefect.client.schemas.objects import TaskRunInput, TaskRunResult from prefect.context import ( FlowRunContext, - PrefectObjectRegistry, TagsContext, TaskRunContext, serialize_context, @@ -174,7 +173,6 @@ def _infer_parent_task_runs( return parents -@PrefectObjectRegistry.register_instances class Task(Generic[P, R]): """ A Prefect task definition. diff --git a/tests/conftest.py b/tests/conftest.py index adc3ceefcf79..f188692c2322 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -482,17 +482,6 @@ def test_database_connection_url(generate_test_database_connection_url): yield url -@pytest.fixture(autouse=True) -def reset_object_registry(): - """ - Ensures each test has a clean object registry. - """ - from prefect.context import PrefectObjectRegistry - - with PrefectObjectRegistry(): - yield - - @pytest.fixture(autouse=True) def reset_registered_blocks(): """ diff --git a/tests/test_flows.py b/tests/test_flows.py index 82f4503d18b6..3895445f878e 100644 --- a/tests/test_flows.py +++ b/tests/test_flows.py @@ -31,7 +31,6 @@ IntervalSchedule, RRuleSchedule, ) -from prefect.context import PrefectObjectRegistry from prefect.deployments.runner import RunnerDeployment from prefect.docker.docker_image import DockerImage from prefect.events import DeploymentEventTrigger, Posture @@ -743,16 +742,6 @@ def foo(): with pytest.raises(ValueError, match="Test 2"): await second.result() - @pytest.mark.skip(reason="Fails with new engine, passed on old engine") - async def test_call_execution_blocked_does_not_run_flow(self): - @flow(version="test") - def foo(x, y=3, z=3): - return x + y + z - - with PrefectObjectRegistry(block_code_execution=True): - state = foo(1, 2) - assert state is None - def test_flow_can_end_in_paused_state(self): @flow def my_flow():