diff --git a/CHANGELOG.md b/CHANGELOG.md index 221fcaeab6a6..b805fca36570 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/ - Add the ability to delete task tag limits using the client - [#1622](https://github.com/PrefectHQ/prefect/pull/1622) - Adds an "Ask for help" button with a link to the prefect.io support page - [#1637](https://github.com/PrefectHQ/prefect/pull/1637) - Reduces the size of the `prefecthq/prefect` Docker image by ~400MB, which is now the base Docker image used in Flows - [#1648](https://github.com/PrefectHQ/prefect/pull/1648) +- Add a new healthcheck for environment dependencies - [#1653](https://github.com/PrefectHQ/prefect/pull/1653) ### Task Library @@ -29,7 +30,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/ ### Breaking Changes -- None +- `kubernetes` is no longer installed by default in deployed flow images - [#1653](https://github.com/PrefectHQ/prefect/pull/1653) ### Contributors diff --git a/src/prefect/environments/execution/base.py b/src/prefect/environments/execution/base.py index 6245fb72fa9a..b3b3796d87f0 100644 --- a/src/prefect/environments/execution/base.py +++ b/src/prefect/environments/execution/base.py @@ -52,6 +52,10 @@ def __init__( def __repr__(self) -> str: return "".format(type(self).__name__) + @property + def dependencies(self) -> list: + return [] + def setup(self, storage: "Storage") -> None: """ Sets up any infrastructure needed for this environment diff --git a/src/prefect/environments/execution/dask/k8s.py b/src/prefect/environments/execution/dask/k8s.py index ec6ae9e51410..173819e9c440 100644 --- a/src/prefect/environments/execution/dask/k8s.py +++ b/src/prefect/environments/execution/dask/k8s.py @@ -88,6 +88,10 @@ def __init__( super().__init__(labels=labels, on_start=on_start, on_exit=on_exit) + @property + def dependencies(self) -> list: + return ["kubernetes"] + def setup(self, storage: "Docker") -> None: # type: ignore if self.private_registry: from kubernetes import client, config diff --git a/src/prefect/environments/execution/fargate/fargate_task.py b/src/prefect/environments/execution/fargate/fargate_task.py index 5ff4b8ee5db0..e6cb2578e677 100644 --- a/src/prefect/environments/execution/fargate/fargate_task.py +++ b/src/prefect/environments/execution/fargate/fargate_task.py @@ -142,6 +142,10 @@ def _parse_kwargs(self, user_kwargs: dict) -> tuple: return task_definition_kwargs, task_run_kwargs + @property + def dependencies(self) -> list: + return ["boto3", "botocore"] + def setup(self, storage: "Docker") -> None: # type: ignore """ Register the task definition if it does not already exist. diff --git a/src/prefect/environments/execution/k8s/job.py b/src/prefect/environments/execution/k8s/job.py index 79b17f178e2f..070597e54841 100644 --- a/src/prefect/environments/execution/k8s/job.py +++ b/src/prefect/environments/execution/k8s/job.py @@ -59,6 +59,10 @@ def __init__( super().__init__(labels=labels, on_start=on_start, on_exit=on_exit) + @property + def dependencies(self) -> list: + return ["kubernetes"] + def execute( # type: ignore self, storage: "Docker", flow_location: str, **kwargs: Any ) -> None: diff --git a/src/prefect/environments/execution/local.py b/src/prefect/environments/execution/local.py index fcec0d8d7c85..2bffab0d50a4 100644 --- a/src/prefect/environments/execution/local.py +++ b/src/prefect/environments/execution/local.py @@ -27,6 +27,10 @@ def __init__( ) -> None: super().__init__(labels=labels, on_start=on_start, on_exit=on_exit) + @property + def dependencies(self) -> list: + return [] + def execute(self, storage: "Storage", flow_location: str, **kwargs: Any) -> None: """ Executes the flow for this environment from the storage parameter, diff --git a/src/prefect/environments/execution/remote.py b/src/prefect/environments/execution/remote.py index a81e434b5ffa..9489296bcbfc 100644 --- a/src/prefect/environments/execution/remote.py +++ b/src/prefect/environments/execution/remote.py @@ -46,6 +46,10 @@ def __init__( self.executor_kwargs = executor_kwargs or dict() super().__init__(labels=labels, on_start=on_start, on_exit=on_exit) + @property + def dependencies(self) -> list: + return [] + def execute( # type: ignore self, storage: "Storage", flow_location: str, **kwargs: Any ) -> None: diff --git a/src/prefect/environments/storage/_healthcheck.py b/src/prefect/environments/storage/_healthcheck.py index 58fc1569889e..b737bb13f900 100644 --- a/src/prefect/environments/storage/_healthcheck.py +++ b/src/prefect/environments/storage/_healthcheck.py @@ -5,6 +5,7 @@ """ import ast +import importlib import sys import warnings @@ -93,6 +94,27 @@ def result_handler_check(flows: list): print("Result Handler check: OK") +def environment_dependency_check(flows: list): + # Test for imports that are required by certain environments + for flow in flows: + # Load all required dependencies for an environment + if not hasattr(flow.environment, "dependencies"): + continue + + required_imports = flow.environment.dependencies + for dependency in required_imports: + try: + importlib.import_module(dependency) + except ModuleNotFoundError: + raise ModuleNotFoundError( + "Using {} requires the `{}` dependency".format( + flow.environment.__class__.__name__, dependency + ) + ) + + print("Environment dependency check: OK") + + if __name__ == "__main__": flow_file_path, python_version = sys.argv[1:3] @@ -100,4 +122,5 @@ def result_handler_check(flows: list): system_check(python_version) flows = cloudpickle_deserialization_check(flow_file_path) result_handler_check(flows) + environment_dependency_check(flows) print("All health checks passed.") diff --git a/src/prefect/environments/storage/docker.py b/src/prefect/environments/storage/docker.py index a27ca2ea951e..3f3fb18b0d8f 100644 --- a/src/prefect/environments/storage/docker.py +++ b/src/prefect/environments/storage/docker.py @@ -105,7 +105,7 @@ def __init__( self.extra_commands.extend( [ "apt update && apt install -y gcc git && rm -rf /var/lib/apt/lists/*", - "pip install git+https://github.com/PrefectHQ/prefect.git@{}#egg=prefect[kubernetes]".format( + "pip install git+https://github.com/PrefectHQ/prefect.git@{}".format( self.prefect_version ), ] diff --git a/tests/environments/execution/test_base_environment.py b/tests/environments/execution/test_base_environment.py index cdf3b7f649bc..162671b87933 100644 --- a/tests/environments/execution/test_base_environment.py +++ b/tests/environments/execution/test_base_environment.py @@ -29,6 +29,11 @@ def f(): assert environment.on_exit is f +def test_environment_dependencies(): + environment = Environment() + assert environment.dependencies == [] + + def test_setup_environment_passes(): environment = Environment() environment.setup(storage=Docker()) diff --git a/tests/environments/execution/test_dask_k8s_environment.py b/tests/environments/execution/test_dask_k8s_environment.py index bb2820e9e95d..49af5e46fbcf 100644 --- a/tests/environments/execution/test_dask_k8s_environment.py +++ b/tests/environments/execution/test_dask_k8s_environment.py @@ -40,6 +40,11 @@ def f(): assert environment.on_exit is f +def test_dask_environment_dependencies(): + environment = DaskKubernetesEnvironment() + assert environment.dependencies == ["kubernetes"] + + def test_create_dask_environment_identifier_label(): environment = DaskKubernetesEnvironment() assert environment.identifier_label diff --git a/tests/environments/execution/test_fargate_task_environment.py b/tests/environments/execution/test_fargate_task_environment.py index 6733dc191f11..5ab6a20a4313 100644 --- a/tests/environments/execution/test_fargate_task_environment.py +++ b/tests/environments/execution/test_fargate_task_environment.py @@ -38,6 +38,11 @@ def f(): assert environment.on_exit is f +def test_fargate_task_environment_dependencies(): + environment = FargateTaskEnvironment() + assert environment.dependencies == ["boto3", "botocore"] + + def test_create_fargate_task_environment_aws_creds_provided(): environment = FargateTaskEnvironment( labels=["foo"], diff --git a/tests/environments/execution/test_k8s_job_environment.py b/tests/environments/execution/test_k8s_job_environment.py index 83803e433bf7..f5aaf8b2e946 100644 --- a/tests/environments/execution/test_k8s_job_environment.py +++ b/tests/environments/execution/test_k8s_job_environment.py @@ -63,6 +63,11 @@ def f(): assert environment.on_exit is f +def test_k8s_job_environment_dependencies(): + environment = KubernetesJobEnvironment() + assert environment.dependencies == ["kubernetes"] + + def test_create_k8s_job_environment_identifier_label(): with tempfile.TemporaryDirectory() as directory: diff --git a/tests/environments/execution/test_local_environment.py b/tests/environments/execution/test_local_environment.py index b6e44a303625..f0deb62cfa52 100644 --- a/tests/environments/execution/test_local_environment.py +++ b/tests/environments/execution/test_local_environment.py @@ -28,6 +28,11 @@ def f(): assert environment.logger.name == "prefect.LocalEnvironment" +def test_environment_dependencies(): + environment = LocalEnvironment() + assert environment.dependencies == [] + + def test_setup_environment_passes(): environment = LocalEnvironment() environment.setup(storage=Docker()) diff --git a/tests/environments/execution/test_remote_environment.py b/tests/environments/execution/test_remote_environment.py index c396e8c1ea01..c6d6c5ff21ca 100644 --- a/tests/environments/execution/test_remote_environment.py +++ b/tests/environments/execution/test_remote_environment.py @@ -40,6 +40,11 @@ def f(): assert environment.on_exit is f +def test_remote_environment_dependencies(): + environment = RemoteEnvironment() + assert environment.dependencies == [] + + def test_environment_execute(): with tempfile.TemporaryDirectory() as directory: diff --git a/tests/environments/storage/test_docker_healthcheck.py b/tests/environments/storage/test_docker_healthcheck.py index 989bfcef5e3f..b38d9752ba56 100644 --- a/tests/environments/storage/test_docker_healthcheck.py +++ b/tests/environments/storage/test_docker_healthcheck.py @@ -2,11 +2,13 @@ import os import sys import tempfile +from unittest.mock import MagicMock import cloudpickle import pytest from prefect import Flow, Task, task +from prefect.environments import Environment, RemoteEnvironment from prefect.environments.storage import _healthcheck as healthchecks @@ -154,3 +156,47 @@ def down(): result = down(upstream_tasks=[up]) assert healthchecks.result_handler_check([f]) is None + + +class TestEnvironmentDependencyCheck: + def test_no_raise_on_normal_flow(self): + flow = Flow("THIS IS A TEST") + + assert healthchecks.environment_dependency_check([flow]) is None + + def test_no_raise_on_remote_env(self): + flow = Flow("THIS IS A TEST", environment=RemoteEnvironment()) + + assert healthchecks.environment_dependency_check([flow]) is None + + def test_no_raise_on_proper_imports(self): + class NewEnvironment(Environment): + @property + def dependencies(self) -> list: + return ["prefect"] + + flow = Flow("THIS IS A TEST", environment=NewEnvironment()) + + assert healthchecks.environment_dependency_check([flow]) is None + + def test_no_raise_on_missing_dependencies_property(self): + class NewEnvironment(Environment): + pass + + flow = Flow("THIS IS A TEST", environment=NewEnvironment()) + + assert healthchecks.environment_dependency_check([flow]) is None + + @pytest.mark.skipif( + sys.version_info < (3, 6), reason="3.5 does not support ModuleNotFoundError" + ) + def test_raise_on_missing_imports(self, monkeypatch): + class NewEnvironment(Environment): + @property + def dependencies(self) -> list: + return ["TEST"] + + flow = Flow("THIS IS A TEST", environment=NewEnvironment()) + + with pytest.raises(ModuleNotFoundError): + healthchecks.environment_dependency_check([flow]) diff --git a/tests/environments/storage/test_docker_storage.py b/tests/environments/storage/test_docker_storage.py index d43498a95c8c..6c3b2fb5497e 100644 --- a/tests/environments/storage/test_docker_storage.py +++ b/tests/environments/storage/test_docker_storage.py @@ -324,7 +324,7 @@ def test_create_dockerfile_from_base_image(): ( "FROM python:3.6-slim", "apt update && apt install -y gcc git && rm -rf /var/lib/apt/lists/*", - "pip install git+https://github.com/PrefectHQ/prefect.git@424be6b5ed8d3be85064de4b95b5c3d7cb665510#egg=prefect[kubernetes]", + "pip install git+https://github.com/PrefectHQ/prefect.git@424be6b5ed8d3be85064de4b95b5c3d7cb665510", ), ), ],