Skip to content

Commit

Permalink
Remove PrefectObjectRegistry [Taylor's Version] (#14172)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaazzam committed Jun 20, 2024
1 parent c2a647c commit 90c89de
Show file tree
Hide file tree
Showing 19 changed files with 25 additions and 203 deletions.
11 changes: 0 additions & 11 deletions benches/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,3 @@ def handle_saving(*args, **kwargs):


pytest_benchmark.session.BenchmarkSession.handle_saving = handle_saving


@pytest.fixture(autouse=True)
def reset_object_registry():
"""
Ensures each test has a clean object registry.
"""
from prefect.context import PrefectObjectRegistry

with PrefectObjectRegistry():
yield
11 changes: 0 additions & 11 deletions src/integrations/prefect-aws/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,3 @@ def aws_client_parameters_empty():
@pytest.fixture
def aws_client_parameters_public_bucket():
return AwsClientParameters(config=Config(signature_version=UNSIGNED))


@pytest.fixture(autouse=True)
def reset_object_registry():
"""
Ensures each test has a clean object registry.
"""
from prefect.context import PrefectObjectRegistry

with PrefectObjectRegistry():
yield
11 changes: 0 additions & 11 deletions src/integrations/prefect-azure/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,6 @@ def prefect_db():
yield


@pytest.fixture(autouse=True)
def reset_object_registry():
"""
Ensures each test has a clean object registry.
"""
from prefect.context import PrefectObjectRegistry

with PrefectObjectRegistry():
yield


class AsyncIter:
def __init__(self, items):
self.items = items
Expand Down
11 changes: 0 additions & 11 deletions src/integrations/prefect-bitbucket/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,3 @@ def prefect_db():
"""
with prefect_test_harness():
yield


@pytest.fixture(autouse=True)
def reset_object_registry():
"""
Ensures each test has a clean object registry.
"""
from prefect.context import PrefectObjectRegistry

with PrefectObjectRegistry():
yield
11 changes: 0 additions & 11 deletions src/integrations/prefect-dask/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,6 @@ def prefect_db():
yield


@pytest.fixture(autouse=True)
def reset_object_registry():
"""
Ensures each test has a clean object registry.
"""
from prefect.context import PrefectObjectRegistry

with PrefectObjectRegistry():
yield


@pytest.fixture(scope="session")
def event_loop(request):
"""
Expand Down
11 changes: 0 additions & 11 deletions src/integrations/prefect-databricks/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,3 @@ def prefect_db():
"""
with prefect_test_harness():
yield


@pytest.fixture(autouse=True)
def reset_object_registry():
"""
Ensures each test has a clean object registry.
"""
from prefect.context import PrefectObjectRegistry

with PrefectObjectRegistry():
yield
11 changes: 0 additions & 11 deletions src/integrations/prefect-dbt/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,6 @@ def prefect_db():
yield


@pytest.fixture(autouse=True)
def reset_object_registry():
"""
Ensures each test has a clean object registry.
"""
from prefect.context import PrefectObjectRegistry

with PrefectObjectRegistry():
yield


@pytest.fixture(autouse=True)
def google_auth_mock(monkeypatch):
"""
Expand Down
11 changes: 0 additions & 11 deletions src/integrations/prefect-docker/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,6 @@ def prefect_db():
yield


@pytest.fixture(autouse=True)
def reset_object_registry():
"""
Ensures each test has a clean object registry.
"""
from prefect.context import PrefectObjectRegistry

with PrefectObjectRegistry():
yield


@pytest.fixture(scope="session")
def event_loop(request):
"""
Expand Down
11 changes: 0 additions & 11 deletions src/integrations/prefect-gcp/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,6 @@ def disable_logging():
yield


@pytest.fixture(autouse=True)
def reset_object_registry():
"""
Ensures each test has a clean object registry.
"""
from prefect.context import PrefectObjectRegistry

with PrefectObjectRegistry():
yield


@pytest.fixture
def google_auth(monkeypatch):
google_auth_mock = MagicMock()
Expand Down
11 changes: 0 additions & 11 deletions src/integrations/prefect-github/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,3 @@ def prefect_db():
"""
with prefect_test_harness():
yield


@pytest.fixture(autouse=True)
def reset_object_registry():
"""
Ensures each test has a clean object registry.
"""
from prefect.context import PrefectObjectRegistry

with PrefectObjectRegistry():
yield
11 changes: 0 additions & 11 deletions src/integrations/prefect-gitlab/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,3 @@ def prefect_db():
"""
with prefect_test_harness():
yield


@pytest.fixture(autouse=True)
def reset_object_registry():
"""
Ensures each test has a clean object registry.
"""
from prefect.context import PrefectObjectRegistry

with PrefectObjectRegistry():
yield
11 changes: 0 additions & 11 deletions src/integrations/prefect-kubernetes/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,6 @@ def disable_api_logging():
yield


@pytest.fixture(autouse=True)
def reset_object_registry():
"""
Ensures each test has a clean object registry.
"""
from prefect.context import PrefectObjectRegistry

with PrefectObjectRegistry():
yield


@pytest.fixture
def kube_config_dict():
return yaml.safe_load(GOOD_CONFIG_FILE_PATH.read_text())
Expand Down
11 changes: 0 additions & 11 deletions src/integrations/prefect-snowflake/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,6 @@ def prefect_db():
raise e


@pytest.fixture(autouse=True)
def reset_object_registry():
"""
Ensures each test has a clean object registry.
"""
from prefect.context import PrefectObjectRegistry

with PrefectObjectRegistry():
yield


@pytest.fixture()
def credentials_params():
return {
Expand Down
11 changes: 0 additions & 11 deletions src/integrations/prefect-sqlalchemy/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,6 @@ def prefect_db():
yield


@pytest.fixture(autouse=True)
def reset_object_registry():
"""
Ensures each test has a clean object registry.
"""
from prefect.context import PrefectObjectRegistry

with PrefectObjectRegistry():
yield


@pytest.fixture(autouse=True)
def fetch_state_result():
with temporary_settings(updates={PREFECT_ASYNC_FETCH_STATE_RESULT: True}):
Expand Down
42 changes: 19 additions & 23 deletions src/prefect/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
from prefect.client.schemas.objects import FlowRun
from prefect.client.schemas.schedules import SCHEDULE_TYPES
from prefect.client.utilities import client_injector
from prefect.context import PrefectObjectRegistry, registry_from_script
from prefect.context import registry_from_script
from prefect.deployments.runner import deploy
from prefect.deployments.steps.core import run_steps
from prefect.docker.docker_image import DockerImage
Expand Down Expand Up @@ -125,7 +125,6 @@
from prefect.flows import FlowRun


@PrefectObjectRegistry.register_instances
class Flow(Generic[P, R]):
"""
A Prefect workflow definition.
Expand Down Expand Up @@ -1699,29 +1698,26 @@ def load_flow_from_entrypoint(
FlowScriptError: If an exception is encountered while running the script
MissingFlowError: If the flow function specified in the entrypoint does not exist
"""
with PrefectObjectRegistry( # type: ignore
block_code_execution=True,
capture_failures=True,
):
if ":" in entrypoint:
# split by the last colon once to handle Windows paths with drive letters i.e C:\path\to\file.py:do_stuff
path, func_name = entrypoint.rsplit(":", maxsplit=1)
else:
path, func_name = entrypoint.rsplit(".", maxsplit=1)
try:
flow = import_object(entrypoint)
except AttributeError as exc:
raise MissingFlowError(
f"Flow function with name {func_name!r} not found in {path!r}. "
) from exc

if not isinstance(flow, Flow):
raise MissingFlowError(
f"Function with name {func_name!r} is not a flow. Make sure that it is "
"decorated with '@flow'."
)
if ":" in entrypoint:
# split by the last colon once to handle Windows paths with drive letters i.e C:\path\to\file.py:do_stuff
path, func_name = entrypoint.rsplit(":", maxsplit=1)
else:
path, func_name = entrypoint.rsplit(".", maxsplit=1)
try:
flow = import_object(entrypoint)
except AttributeError as exc:
raise MissingFlowError(
f"Flow function with name {func_name!r} not found in {path!r}. "
) from exc

return flow
if not isinstance(flow, Flow):
raise MissingFlowError(
f"Function with name {func_name!r} is not a flow. Make sure that it is "
"decorated with '@flow'."
)

return flow


def load_flow_from_text(script_contents: AnyStr, flow_name: str) -> Flow:
Expand Down
8 changes: 6 additions & 2 deletions src/prefect/runner/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from prefect._internal.schemas.validators import validate_values_conform_to_schema
from prefect.client.orchestration import get_client
from prefect.exceptions import MissingFlowError, ScriptError
from prefect.flows import Flow, load_flow_from_entrypoint, load_flows_from_script
from prefect.flows import Flow, load_flow_from_entrypoint
from prefect.logging import get_logger
from prefect.runner.utils import (
inject_schemas_into_openapi,
Expand All @@ -24,6 +24,7 @@
PREFECT_RUNNER_SERVER_PORT,
)
from prefect.utilities.asyncutils import sync_compatible
from prefect.utilities.importtools import load_script_as_module

if TYPE_CHECKING:
from prefect.client.schemas.responses import DeploymentResponse
Expand Down Expand Up @@ -155,7 +156,10 @@ async def get_subflow_schemas(runner: "Runner") -> Dict[str, Dict]:
continue

script = deployment.entrypoint.split(":")[0]
subflows = load_flows_from_script(script)
module = load_script_as_module(script)
subflows = [
obj for obj in module.__dict__.values() if isinstance(obj, Flow)
]
for flow in subflows:
schemas[flow.name] = flow.parameters.model_dump()

Expand Down
2 changes: 0 additions & 2 deletions src/prefect/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
from prefect.client.schemas.objects import TaskRunInput, TaskRunResult
from prefect.context import (
FlowRunContext,
PrefectObjectRegistry,
TagsContext,
TaskRunContext,
serialize_context,
Expand Down Expand Up @@ -174,7 +173,6 @@ def _infer_parent_task_runs(
return parents


@PrefectObjectRegistry.register_instances
class Task(Generic[P, R]):
"""
A Prefect task definition.
Expand Down
11 changes: 0 additions & 11 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,17 +482,6 @@ def test_database_connection_url(generate_test_database_connection_url):
yield url


@pytest.fixture(autouse=True)
def reset_object_registry():
"""
Ensures each test has a clean object registry.
"""
from prefect.context import PrefectObjectRegistry

with PrefectObjectRegistry():
yield


@pytest.fixture(autouse=True)
def reset_registered_blocks():
"""
Expand Down
11 changes: 0 additions & 11 deletions tests/test_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
IntervalSchedule,
RRuleSchedule,
)
from prefect.context import PrefectObjectRegistry
from prefect.deployments.runner import RunnerDeployment
from prefect.docker.docker_image import DockerImage
from prefect.events import DeploymentEventTrigger, Posture
Expand Down Expand Up @@ -743,16 +742,6 @@ def foo():
with pytest.raises(ValueError, match="Test 2"):
await second.result()

@pytest.mark.skip(reason="Fails with new engine, passed on old engine")
async def test_call_execution_blocked_does_not_run_flow(self):
@flow(version="test")
def foo(x, y=3, z=3):
return x + y + z

with PrefectObjectRegistry(block_code_execution=True):
state = foo(1, 2)
assert state is None

def test_flow_can_end_in_paused_state(self):
@flow
def my_flow():
Expand Down

0 comments on commit 90c89de

Please sign in to comment.