Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove PrefectObjectRegistry [Taylor's Version] #14172

Merged
merged 3 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions benches/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,3 @@ def handle_saving(*args, **kwargs):


pytest_benchmark.session.BenchmarkSession.handle_saving = handle_saving


@pytest.fixture(autouse=True)
def reset_object_registry():
"""
Ensures each test has a clean object registry.
"""
from prefect.context import PrefectObjectRegistry

with PrefectObjectRegistry():
yield
11 changes: 0 additions & 11 deletions src/integrations/prefect-aws/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,3 @@ def aws_client_parameters_empty():
@pytest.fixture
def aws_client_parameters_public_bucket():
return AwsClientParameters(config=Config(signature_version=UNSIGNED))


@pytest.fixture(autouse=True)
def reset_object_registry():
"""
Ensures each test has a clean object registry.
"""
from prefect.context import PrefectObjectRegistry

with PrefectObjectRegistry():
yield
11 changes: 0 additions & 11 deletions src/integrations/prefect-azure/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,6 @@ def prefect_db():
yield


@pytest.fixture(autouse=True)
def reset_object_registry():
"""
Ensures each test has a clean object registry.
"""
from prefect.context import PrefectObjectRegistry

with PrefectObjectRegistry():
yield


class AsyncIter:
def __init__(self, items):
self.items = items
Expand Down
11 changes: 0 additions & 11 deletions src/integrations/prefect-bitbucket/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,3 @@ def prefect_db():
"""
with prefect_test_harness():
yield


@pytest.fixture(autouse=True)
def reset_object_registry():
"""
Ensures each test has a clean object registry.
"""
from prefect.context import PrefectObjectRegistry

with PrefectObjectRegistry():
yield
11 changes: 0 additions & 11 deletions src/integrations/prefect-dask/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,6 @@ def prefect_db():
yield


@pytest.fixture(autouse=True)
def reset_object_registry():
"""
Ensures each test has a clean object registry.
"""
from prefect.context import PrefectObjectRegistry

with PrefectObjectRegistry():
yield


@pytest.fixture(scope="session")
def event_loop(request):
"""
Expand Down
11 changes: 0 additions & 11 deletions src/integrations/prefect-databricks/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,3 @@ def prefect_db():
"""
with prefect_test_harness():
yield


@pytest.fixture(autouse=True)
def reset_object_registry():
"""
Ensures each test has a clean object registry.
"""
from prefect.context import PrefectObjectRegistry

with PrefectObjectRegistry():
yield
11 changes: 0 additions & 11 deletions src/integrations/prefect-dbt/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,6 @@ def prefect_db():
yield


@pytest.fixture(autouse=True)
def reset_object_registry():
"""
Ensures each test has a clean object registry.
"""
from prefect.context import PrefectObjectRegistry

with PrefectObjectRegistry():
yield


@pytest.fixture(autouse=True)
def google_auth_mock(monkeypatch):
"""
Expand Down
11 changes: 0 additions & 11 deletions src/integrations/prefect-docker/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,6 @@ def prefect_db():
yield


@pytest.fixture(autouse=True)
def reset_object_registry():
"""
Ensures each test has a clean object registry.
"""
from prefect.context import PrefectObjectRegistry

with PrefectObjectRegistry():
yield


@pytest.fixture(scope="session")
def event_loop(request):
"""
Expand Down
11 changes: 0 additions & 11 deletions src/integrations/prefect-gcp/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,6 @@ def disable_logging():
yield


@pytest.fixture(autouse=True)
def reset_object_registry():
"""
Ensures each test has a clean object registry.
"""
from prefect.context import PrefectObjectRegistry

with PrefectObjectRegistry():
yield


@pytest.fixture
def google_auth(monkeypatch):
google_auth_mock = MagicMock()
Expand Down
11 changes: 0 additions & 11 deletions src/integrations/prefect-github/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,3 @@ def prefect_db():
"""
with prefect_test_harness():
yield


@pytest.fixture(autouse=True)
def reset_object_registry():
"""
Ensures each test has a clean object registry.
"""
from prefect.context import PrefectObjectRegistry

with PrefectObjectRegistry():
yield
11 changes: 0 additions & 11 deletions src/integrations/prefect-gitlab/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,3 @@ def prefect_db():
"""
with prefect_test_harness():
yield


@pytest.fixture(autouse=True)
def reset_object_registry():
"""
Ensures each test has a clean object registry.
"""
from prefect.context import PrefectObjectRegistry

with PrefectObjectRegistry():
yield
11 changes: 0 additions & 11 deletions src/integrations/prefect-kubernetes/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,6 @@ def disable_api_logging():
yield


@pytest.fixture(autouse=True)
def reset_object_registry():
"""
Ensures each test has a clean object registry.
"""
from prefect.context import PrefectObjectRegistry

with PrefectObjectRegistry():
yield


@pytest.fixture
def kube_config_dict():
return yaml.safe_load(GOOD_CONFIG_FILE_PATH.read_text())
Expand Down
11 changes: 0 additions & 11 deletions src/integrations/prefect-snowflake/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,6 @@ def prefect_db():
raise e


@pytest.fixture(autouse=True)
def reset_object_registry():
"""
Ensures each test has a clean object registry.
"""
from prefect.context import PrefectObjectRegistry

with PrefectObjectRegistry():
yield


@pytest.fixture()
def credentials_params():
return {
Expand Down
11 changes: 0 additions & 11 deletions src/integrations/prefect-sqlalchemy/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,6 @@ def prefect_db():
yield


@pytest.fixture(autouse=True)
def reset_object_registry():
"""
Ensures each test has a clean object registry.
"""
from prefect.context import PrefectObjectRegistry

with PrefectObjectRegistry():
yield


@pytest.fixture(autouse=True)
def fetch_state_result():
with temporary_settings(updates={PREFECT_ASYNC_FETCH_STATE_RESULT: True}):
Expand Down
42 changes: 19 additions & 23 deletions src/prefect/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
from prefect.client.schemas.objects import FlowRun
from prefect.client.schemas.schedules import SCHEDULE_TYPES
from prefect.client.utilities import client_injector
from prefect.context import PrefectObjectRegistry, registry_from_script
from prefect.context import registry_from_script
from prefect.deployments.runner import deploy
from prefect.deployments.steps.core import run_steps
from prefect.docker.docker_image import DockerImage
Expand Down Expand Up @@ -125,7 +125,6 @@
from prefect.flows import FlowRun


@PrefectObjectRegistry.register_instances
class Flow(Generic[P, R]):
"""
A Prefect workflow definition.
Expand Down Expand Up @@ -1699,29 +1698,26 @@ def load_flow_from_entrypoint(
FlowScriptError: If an exception is encountered while running the script
MissingFlowError: If the flow function specified in the entrypoint does not exist
"""
with PrefectObjectRegistry( # type: ignore
block_code_execution=True,
capture_failures=True,
):
if ":" in entrypoint:
# split by the last colon once to handle Windows paths with drive letters i.e C:\path\to\file.py:do_stuff
path, func_name = entrypoint.rsplit(":", maxsplit=1)
else:
path, func_name = entrypoint.rsplit(".", maxsplit=1)
try:
flow = import_object(entrypoint)
except AttributeError as exc:
raise MissingFlowError(
f"Flow function with name {func_name!r} not found in {path!r}. "
) from exc

if not isinstance(flow, Flow):
raise MissingFlowError(
f"Function with name {func_name!r} is not a flow. Make sure that it is "
"decorated with '@flow'."
)
if ":" in entrypoint:
# split by the last colon once to handle Windows paths with drive letters i.e C:\path\to\file.py:do_stuff
path, func_name = entrypoint.rsplit(":", maxsplit=1)
else:
path, func_name = entrypoint.rsplit(".", maxsplit=1)
try:
flow = import_object(entrypoint)
except AttributeError as exc:
raise MissingFlowError(
f"Flow function with name {func_name!r} not found in {path!r}. "
) from exc

return flow
if not isinstance(flow, Flow):
raise MissingFlowError(
f"Function with name {func_name!r} is not a flow. Make sure that it is "
"decorated with '@flow'."
)

return flow


def load_flow_from_text(script_contents: AnyStr, flow_name: str) -> Flow:
Expand Down
8 changes: 6 additions & 2 deletions src/prefect/runner/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from prefect._internal.schemas.validators import validate_values_conform_to_schema
from prefect.client.orchestration import get_client
from prefect.exceptions import MissingFlowError, ScriptError
from prefect.flows import Flow, load_flow_from_entrypoint, load_flows_from_script
from prefect.flows import Flow, load_flow_from_entrypoint
from prefect.logging import get_logger
from prefect.runner.utils import (
inject_schemas_into_openapi,
Expand All @@ -24,6 +24,7 @@
PREFECT_RUNNER_SERVER_PORT,
)
from prefect.utilities.asyncutils import sync_compatible
from prefect.utilities.importtools import load_script_as_module

if TYPE_CHECKING:
from prefect.client.schemas.responses import DeploymentResponse
Expand Down Expand Up @@ -155,7 +156,10 @@ async def get_subflow_schemas(runner: "Runner") -> Dict[str, Dict]:
continue

script = deployment.entrypoint.split(":")[0]
subflows = load_flows_from_script(script)
module = load_script_as_module(script)
subflows = [
obj for obj in module.__dict__.values() if isinstance(obj, Flow)
]
for flow in subflows:
schemas[flow.name] = flow.parameters.model_dump()

Expand Down
2 changes: 0 additions & 2 deletions src/prefect/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
from prefect.client.schemas.objects import TaskRunInput, TaskRunResult
from prefect.context import (
FlowRunContext,
PrefectObjectRegistry,
TagsContext,
TaskRunContext,
serialize_context,
Expand Down Expand Up @@ -174,7 +173,6 @@ def _infer_parent_task_runs(
return parents


@PrefectObjectRegistry.register_instances
class Task(Generic[P, R]):
"""
A Prefect task definition.
Expand Down
11 changes: 0 additions & 11 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,17 +482,6 @@ def test_database_connection_url(generate_test_database_connection_url):
yield url


@pytest.fixture(autouse=True)
def reset_object_registry():
"""
Ensures each test has a clean object registry.
"""
from prefect.context import PrefectObjectRegistry

with PrefectObjectRegistry():
yield


@pytest.fixture(autouse=True)
def reset_registered_blocks():
"""
Expand Down
11 changes: 0 additions & 11 deletions tests/test_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
IntervalSchedule,
RRuleSchedule,
)
from prefect.context import PrefectObjectRegistry
from prefect.deployments.runner import RunnerDeployment
from prefect.docker.docker_image import DockerImage
from prefect.events import DeploymentEventTrigger, Posture
Expand Down Expand Up @@ -743,16 +742,6 @@ def foo():
with pytest.raises(ValueError, match="Test 2"):
await second.result()

@pytest.mark.skip(reason="Fails with new engine, passed on old engine")
async def test_call_execution_blocked_does_not_run_flow(self):
@flow(version="test")
def foo(x, y=3, z=3):
return x + y + z

with PrefectObjectRegistry(block_code_execution=True):
state = foo(1, 2)
assert state is None

def test_flow_can_end_in_paused_state(self):
@flow
def my_flow():
Expand Down