Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove remaining registry references #14173

Merged
merged 2 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/prefect/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@
# Initialize the process-wide profile and registry at import time
import prefect.context

prefect.context.initialize_object_registry()

# Perform any forward-ref updates needed for Pydantic models
import prefect.client.schemas

Expand Down
114 changes: 0 additions & 114 deletions src/prefect/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,16 @@
import os
import sys
import warnings
from collections import defaultdict
from contextlib import ExitStack, contextmanager
from contextvars import ContextVar, Token
from functools import update_wrapper
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
ContextManager,
Dict,
Generator,
List,
Optional,
Set,
Tuple,
Type,
TypeVar,
Union,
Expand All @@ -46,7 +41,6 @@
from prefect.states import State
from prefect.task_runners import TaskRunner
from prefect.utilities.asyncutils import run_coro_as_sync
from prefect.utilities.importtools import load_script_as_module

T = TypeVar("T")

Expand Down Expand Up @@ -180,86 +174,6 @@ def serialize(self) -> Dict[str, Any]:
return self.model_dump(exclude_unset=True)


class PrefectObjectRegistry(ContextModel):
"""
A context that acts as a registry for all Prefect objects that are
registered during load and execution.

Attributes:
start_time: The time the object registry was created.
block_code_execution: If set, flow calls will be ignored.
capture_failures: If set, failures during __init__ will be silenced and tracked.
"""

start_time: DateTime = Field(default_factory=lambda: pendulum.now("UTC"))

_instance_registry: Dict[Type[T], List[T]] = PrivateAttr(
default_factory=lambda: defaultdict(list)
)

# Failures will be a tuple of (exception, instance, args, kwargs)
_instance_init_failures: Dict[
Type[T], List[Tuple[Exception, T, Tuple, Dict]]
] = PrivateAttr(default_factory=lambda: defaultdict(list))

block_code_execution: bool = False
capture_failures: bool = False

__var__ = ContextVar("object_registry")

def get_instances(self, type_: Type[T]) -> List[T]:
instances = []
for registered_type, type_instances in self._instance_registry.items():
if type_ in registered_type.mro():
instances.extend(type_instances)
return instances

def get_instance_failures(
self, type_: Type[T]
) -> List[Tuple[Exception, T, Tuple, Dict]]:
failures = []
for type__ in type_.mro():
failures.extend(self._instance_init_failures[type__])
return failures

def register_instance(self, object):
# TODO: Consider using a 'Set' to avoid duplicate entries
self._instance_registry[type(object)].append(object)

def register_init_failure(
self, exc: Exception, object: Any, init_args: Tuple, init_kwargs: Dict
):
self._instance_init_failures[type(object)].append(
(exc, object, init_args, init_kwargs)
)

@classmethod
def register_instances(cls, type_: Type[T]) -> Type[T]:
"""
Decorator for a class that adds registration to the `PrefectObjectRegistry`
on initialization of instances.
"""
original_init = type_.__init__

def __register_init__(__self__: T, *args: Any, **kwargs: Any) -> None:
registry = cls.get()
try:
original_init(__self__, *args, **kwargs)
except Exception as exc:
if not registry or not registry.capture_failures:
raise
else:
registry.register_init_failure(exc, __self__, args, kwargs)
else:
if registry:
registry.register_instance(__self__)

update_wrapper(__register_init__, original_init)

type_.__init__ = __register_init__
return type_


class ClientContext(ContextModel):
"""
A context for managing the Prefect client instances.
Expand Down Expand Up @@ -594,23 +508,6 @@ def tags(*new_tags: str) -> Generator[Set[str], None, None]:
yield new_tags


def registry_from_script(
path: str,
block_code_execution: bool = True,
capture_failures: bool = True,
) -> PrefectObjectRegistry:
"""
Return a fresh registry with instances populated from execution of a script.
"""
with PrefectObjectRegistry(
block_code_execution=block_code_execution,
capture_failures=capture_failures,
) as registry:
load_script_as_module(path)

return registry


@contextmanager
def use_profile(
profile: Union[Profile, str],
Expand Down Expand Up @@ -711,14 +608,3 @@ def root_settings_context():


GLOBAL_SETTINGS_CONTEXT: SettingsContext = root_settings_context()
GLOBAL_OBJECT_REGISTRY: Optional[ContextManager[PrefectObjectRegistry]] = None


def initialize_object_registry():
global GLOBAL_OBJECT_REGISTRY

if GLOBAL_OBJECT_REGISTRY:
return

GLOBAL_OBJECT_REGISTRY = PrefectObjectRegistry()
GLOBAL_OBJECT_REGISTRY.__enter__()
68 changes: 0 additions & 68 deletions src/prefect/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@
from copy import copy
from functools import partial, update_wrapper
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import (
TYPE_CHECKING,
Any,
AnyStr,
Awaitable,
Callable,
Coroutine,
Expand Down Expand Up @@ -56,7 +54,6 @@
from prefect.client.schemas.objects import FlowRun
from prefect.client.schemas.schedules import SCHEDULE_TYPES
from prefect.client.utilities import client_injector
from prefect.context import registry_from_script
from prefect.deployments.runner import deploy
from prefect.deployments.steps.core import run_steps
from prefect.docker.docker_image import DockerImage
Expand Down Expand Up @@ -1640,47 +1637,6 @@ def select_flow(
return list(flows_dict.values())[0]


def load_flows_from_script(path: str) -> List[Flow]:
"""
Load all flow objects from the given python script. All of the code in the file
will be executed.

Returns:
A list of flows

Raises:
FlowScriptError: If an exception is encountered while running the script
"""
return registry_from_script(path).get_instances(Flow)


def load_flow_from_script(path: str, flow_name: Optional[str] = None) -> Flow:
"""
Extract a flow object from a script by running all of the code in the file.

If the script has multiple flows in it, a flow name must be provided to specify
the flow to return.

Args:
path: A path to a Python script containing flows
flow_name: An optional flow name to look for in the script

Returns:
The flow object from the script

Raises:
FlowScriptError: If an exception is encountered while running the script
MissingFlowError: If no flows exist in the iterable
MissingFlowError: If a flow name is provided and that flow does not exist
UnspecifiedFlowError: If multiple flows exist but no flow name was provided
"""
return select_flow(
load_flows_from_script(path),
flow_name=flow_name,
from_message=f"in script '{path}'",
)


def load_flow_from_entrypoint(
entrypoint: str,
) -> Flow:
Expand Down Expand Up @@ -1720,30 +1676,6 @@ def load_flow_from_entrypoint(
return flow


def load_flow_from_text(script_contents: AnyStr, flow_name: str) -> Flow:
"""
Load a flow from a text script.

The script will be written to a temporary local file path so errors can refer
to line numbers and contextual tracebacks can be provided.
"""
with NamedTemporaryFile(
mode="wt" if isinstance(script_contents, str) else "wb",
prefix=f"flow-script-{flow_name}",
suffix=".py",
delete=False,
) as tmpfile:
tmpfile.write(script_contents)
tmpfile.flush()
try:
flow = load_flow_from_script(tmpfile.name, flow_name=flow_name)
finally:
# windows compat
tmpfile.close()
os.remove(tmpfile.name)
return flow


@sync_compatible
async def serve(
*args: "RunnerDeployment",
Expand Down
Loading