Skip to content

Commit

Permalink
Remove remaining registry references (#14173)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaazzam committed Jun 20, 2024
1 parent 339c794 commit 0c6e754
Show file tree
Hide file tree
Showing 3 changed files with 0 additions and 184 deletions.
2 changes: 0 additions & 2 deletions src/prefect/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@
# Initialize the process-wide profile and registry at import time
import prefect.context

prefect.context.initialize_object_registry()

# Perform any forward-ref updates needed for Pydantic models
import prefect.client.schemas

Expand Down
114 changes: 0 additions & 114 deletions src/prefect/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,16 @@
import os
import sys
import warnings
from collections import defaultdict
from contextlib import ExitStack, contextmanager
from contextvars import ContextVar, Token
from functools import update_wrapper
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
ContextManager,
Dict,
Generator,
List,
Optional,
Set,
Tuple,
Type,
TypeVar,
Union,
Expand All @@ -46,7 +41,6 @@
from prefect.states import State
from prefect.task_runners import TaskRunner
from prefect.utilities.asyncutils import run_coro_as_sync
from prefect.utilities.importtools import load_script_as_module

T = TypeVar("T")

Expand Down Expand Up @@ -180,86 +174,6 @@ def serialize(self) -> Dict[str, Any]:
return self.model_dump(exclude_unset=True)


class PrefectObjectRegistry(ContextModel):
"""
A context that acts as a registry for all Prefect objects that are
registered during load and execution.
Attributes:
start_time: The time the object registry was created.
block_code_execution: If set, flow calls will be ignored.
capture_failures: If set, failures during __init__ will be silenced and tracked.
"""

start_time: DateTime = Field(default_factory=lambda: pendulum.now("UTC"))

_instance_registry: Dict[Type[T], List[T]] = PrivateAttr(
default_factory=lambda: defaultdict(list)
)

# Failures will be a tuple of (exception, instance, args, kwargs)
_instance_init_failures: Dict[
Type[T], List[Tuple[Exception, T, Tuple, Dict]]
] = PrivateAttr(default_factory=lambda: defaultdict(list))

block_code_execution: bool = False
capture_failures: bool = False

__var__ = ContextVar("object_registry")

def get_instances(self, type_: Type[T]) -> List[T]:
instances = []
for registered_type, type_instances in self._instance_registry.items():
if type_ in registered_type.mro():
instances.extend(type_instances)
return instances

def get_instance_failures(
self, type_: Type[T]
) -> List[Tuple[Exception, T, Tuple, Dict]]:
failures = []
for type__ in type_.mro():
failures.extend(self._instance_init_failures[type__])
return failures

def register_instance(self, object):
# TODO: Consider using a 'Set' to avoid duplicate entries
self._instance_registry[type(object)].append(object)

def register_init_failure(
self, exc: Exception, object: Any, init_args: Tuple, init_kwargs: Dict
):
self._instance_init_failures[type(object)].append(
(exc, object, init_args, init_kwargs)
)

@classmethod
def register_instances(cls, type_: Type[T]) -> Type[T]:
"""
Decorator for a class that adds registration to the `PrefectObjectRegistry`
on initialization of instances.
"""
original_init = type_.__init__

def __register_init__(__self__: T, *args: Any, **kwargs: Any) -> None:
registry = cls.get()
try:
original_init(__self__, *args, **kwargs)
except Exception as exc:
if not registry or not registry.capture_failures:
raise
else:
registry.register_init_failure(exc, __self__, args, kwargs)
else:
if registry:
registry.register_instance(__self__)

update_wrapper(__register_init__, original_init)

type_.__init__ = __register_init__
return type_


class ClientContext(ContextModel):
"""
A context for managing the Prefect client instances.
Expand Down Expand Up @@ -594,23 +508,6 @@ def tags(*new_tags: str) -> Generator[Set[str], None, None]:
yield new_tags


def registry_from_script(
path: str,
block_code_execution: bool = True,
capture_failures: bool = True,
) -> PrefectObjectRegistry:
"""
Return a fresh registry with instances populated from execution of a script.
"""
with PrefectObjectRegistry(
block_code_execution=block_code_execution,
capture_failures=capture_failures,
) as registry:
load_script_as_module(path)

return registry


@contextmanager
def use_profile(
profile: Union[Profile, str],
Expand Down Expand Up @@ -711,14 +608,3 @@ def root_settings_context():


GLOBAL_SETTINGS_CONTEXT: SettingsContext = root_settings_context()
GLOBAL_OBJECT_REGISTRY: Optional[ContextManager[PrefectObjectRegistry]] = None


def initialize_object_registry():
global GLOBAL_OBJECT_REGISTRY

if GLOBAL_OBJECT_REGISTRY:
return

GLOBAL_OBJECT_REGISTRY = PrefectObjectRegistry()
GLOBAL_OBJECT_REGISTRY.__enter__()
68 changes: 0 additions & 68 deletions src/prefect/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@
from copy import copy
from functools import partial, update_wrapper
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import (
TYPE_CHECKING,
Any,
AnyStr,
Awaitable,
Callable,
Coroutine,
Expand Down Expand Up @@ -56,7 +54,6 @@
from prefect.client.schemas.objects import FlowRun
from prefect.client.schemas.schedules import SCHEDULE_TYPES
from prefect.client.utilities import client_injector
from prefect.context import registry_from_script
from prefect.deployments.runner import deploy
from prefect.deployments.steps.core import run_steps
from prefect.docker.docker_image import DockerImage
Expand Down Expand Up @@ -1640,47 +1637,6 @@ def select_flow(
return list(flows_dict.values())[0]


def load_flows_from_script(path: str) -> List[Flow]:
"""
Load all flow objects from the given python script. All of the code in the file
will be executed.
Returns:
A list of flows
Raises:
FlowScriptError: If an exception is encountered while running the script
"""
return registry_from_script(path).get_instances(Flow)


def load_flow_from_script(path: str, flow_name: Optional[str] = None) -> Flow:
"""
Extract a flow object from a script by running all of the code in the file.
If the script has multiple flows in it, a flow name must be provided to specify
the flow to return.
Args:
path: A path to a Python script containing flows
flow_name: An optional flow name to look for in the script
Returns:
The flow object from the script
Raises:
FlowScriptError: If an exception is encountered while running the script
MissingFlowError: If no flows exist in the iterable
MissingFlowError: If a flow name is provided and that flow does not exist
UnspecifiedFlowError: If multiple flows exist but no flow name was provided
"""
return select_flow(
load_flows_from_script(path),
flow_name=flow_name,
from_message=f"in script '{path}'",
)


def load_flow_from_entrypoint(
entrypoint: str,
) -> Flow:
Expand Down Expand Up @@ -1720,30 +1676,6 @@ def load_flow_from_entrypoint(
return flow


def load_flow_from_text(script_contents: AnyStr, flow_name: str) -> Flow:
"""
Load a flow from a text script.
The script will be written to a temporary local file path so errors can refer
to line numbers and contextual tracebacks can be provided.
"""
with NamedTemporaryFile(
mode="wt" if isinstance(script_contents, str) else "wb",
prefix=f"flow-script-{flow_name}",
suffix=".py",
delete=False,
) as tmpfile:
tmpfile.write(script_contents)
tmpfile.flush()
try:
flow = load_flow_from_script(tmpfile.name, flow_name=flow_name)
finally:
# windows compat
tmpfile.close()
os.remove(tmpfile.name)
return flow


@sync_compatible
async def serve(
*args: "RunnerDeployment",
Expand Down

0 comments on commit 0c6e754

Please sign in to comment.