Skip to content

Commit

Permalink
Clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
jlowin committed Apr 30, 2024
1 parent 3e95663 commit 46a0068
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 48 deletions.
7 changes: 5 additions & 2 deletions src/prefect/runner/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
PREFECT_RUNNER_SERVER_PORT,
)
from prefect.utilities.asyncutils import sync_compatible
from prefect.utilities.importtools import import_objects_by_type
from prefect.utilities.importtools import load_script_as_module

if TYPE_CHECKING:
from prefect.client.schemas.responses import DeploymentResponse
Expand Down Expand Up @@ -160,7 +160,10 @@ async def get_subflow_schemas(runner: "Runner") -> Dict[str, Dict]:
continue

script = deployment.entrypoint.split(":")[0]
subflows = import_objects_by_type(script, Flow)
module = load_script_as_module(script)
subflows = [
obj for obj in module.__dict__.values() if isinstance(obj, Flow)
]
for flow in subflows:
schemas[flow.name] = flow.parameters.dict()

Expand Down
10 changes: 0 additions & 10 deletions src/prefect/utilities/importtools.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,16 +214,6 @@ def import_object(import_path: str):
return getattr(module, object_name)


def import_objects_by_type(script_path: str, obj_type: type):
"""
Load all objects of a given type from a module.
"""
module = load_script_as_module(script_path)
return {
name: obj for name, obj in module.__dict__.items() if isinstance(obj, obj_type)
}


class DelayedImportErrorModule(ModuleType):
"""
A fake module returned by `lazy_import` when the module cannot be found. When any
Expand Down
36 changes: 0 additions & 36 deletions tests/test_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2279,42 +2279,6 @@ def pretend_flow():
import_object_mock.assert_called_with("my.module.pretend_flow")


async def test_handling_script_with_unprotected_call_in_flow_script(
tmp_path,
caplog,
prefect_client,
):
flow_code_with_call = """
from prefect import flow, get_run_logger
@flow
def dog():
get_run_logger().warning("meow!")
return "woof!"
dog()
"""
fpath = tmp_path / "f.py"
fpath.write_text(dedent(flow_code_with_call))
with caplog.at_level("WARNING"):
flow = load_flow_from_entrypoint(f"{fpath}:dog")

# Make sure that warning is raised
assert (
"Script loading is in progress, flow 'dog' will not be executed. "
"Consider updating the script to only call the flow" in caplog.text
)

flow_runs = await prefect_client.read_flows()
assert len(flow_runs) == 0

# Make sure that flow runs when called
res = flow()
assert res == "woof!"
flow_runs = await prefect_client.read_flows()
assert len(flow_runs) == 1


class TestFlowRunName:
async def test_invalid_runtime_run_name(self):
class InvalidFlowRunNameArg:
Expand Down

0 comments on commit 46a0068

Please sign in to comment.