Skip to content

Commit

Permalink
Merge pull request #1653 from PrefectHQ/healthchecks
Browse files Browse the repository at this point in the history
Environment Dependency Healthcheck
  • Loading branch information
joshmeek committed Oct 23, 2019
2 parents 780c704 + a4c8707 commit 6b69821
Show file tree
Hide file tree
Showing 17 changed files with 127 additions and 3 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/
- Add the ability to delete task tag limits using the client - [#1622](https://github.com/PrefectHQ/prefect/pull/1622)
- Adds an "Ask for help" button with a link to the prefect.io support page - [#1637](https://github.com/PrefectHQ/prefect/pull/1637)
- Reduces the size of the `prefecthq/prefect` Docker image by ~400MB, which is now the base Docker image used in Flows - [#1648](https://github.com/PrefectHQ/prefect/pull/1648)
- Add a new healthcheck for environment dependencies - [#1653](https://github.com/PrefectHQ/prefect/pull/1653)

### Task Library

Expand All @@ -29,7 +30,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/

### Breaking Changes

- None
- `kubernetes` is no longer installed by default in deployed flow images - [#1653](https://github.com/PrefectHQ/prefect/pull/1653)

### Contributors

Expand Down
4 changes: 4 additions & 0 deletions src/prefect/environments/execution/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ def __init__(
def __repr__(self) -> str:
return "<Environment: {}>".format(type(self).__name__)

@property
def dependencies(self) -> list:
return []

def setup(self, storage: "Storage") -> None:
"""
Sets up any infrastructure needed for this environment
Expand Down
4 changes: 4 additions & 0 deletions src/prefect/environments/execution/dask/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ def __init__(

super().__init__(labels=labels, on_start=on_start, on_exit=on_exit)

@property
def dependencies(self) -> list:
return ["kubernetes"]

def setup(self, storage: "Docker") -> None: # type: ignore
if self.private_registry:
from kubernetes import client, config
Expand Down
4 changes: 4 additions & 0 deletions src/prefect/environments/execution/fargate/fargate_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ def _parse_kwargs(self, user_kwargs: dict) -> tuple:

return task_definition_kwargs, task_run_kwargs

@property
def dependencies(self) -> list:
return ["boto3", "botocore"]

def setup(self, storage: "Docker") -> None: # type: ignore
"""
Register the task definition if it does not already exist.
Expand Down
4 changes: 4 additions & 0 deletions src/prefect/environments/execution/k8s/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ def __init__(

super().__init__(labels=labels, on_start=on_start, on_exit=on_exit)

@property
def dependencies(self) -> list:
return ["kubernetes"]

def execute( # type: ignore
self, storage: "Docker", flow_location: str, **kwargs: Any
) -> None:
Expand Down
4 changes: 4 additions & 0 deletions src/prefect/environments/execution/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ def __init__(
) -> None:
super().__init__(labels=labels, on_start=on_start, on_exit=on_exit)

@property
def dependencies(self) -> list:
return []

def execute(self, storage: "Storage", flow_location: str, **kwargs: Any) -> None:
"""
Executes the flow for this environment from the storage parameter,
Expand Down
4 changes: 4 additions & 0 deletions src/prefect/environments/execution/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ def __init__(
self.executor_kwargs = executor_kwargs or dict()
super().__init__(labels=labels, on_start=on_start, on_exit=on_exit)

@property
def dependencies(self) -> list:
return []

def execute( # type: ignore
self, storage: "Storage", flow_location: str, **kwargs: Any
) -> None:
Expand Down
23 changes: 23 additions & 0 deletions src/prefect/environments/storage/_healthcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""

import ast
import importlib
import sys
import warnings

Expand Down Expand Up @@ -93,11 +94,33 @@ def result_handler_check(flows: list):
print("Result Handler check: OK")


def environment_dependency_check(flows: list):
# Test for imports that are required by certain environments
for flow in flows:
# Load all required dependencies for an environment
if not hasattr(flow.environment, "dependencies"):
continue

required_imports = flow.environment.dependencies
for dependency in required_imports:
try:
importlib.import_module(dependency)
except ModuleNotFoundError:
raise ModuleNotFoundError(
"Using {} requires the `{}` dependency".format(
flow.environment.__class__.__name__, dependency
)
)

print("Environment dependency check: OK")


if __name__ == "__main__":
flow_file_path, python_version = sys.argv[1:3]

print("Beginning health checks...")
system_check(python_version)
flows = cloudpickle_deserialization_check(flow_file_path)
result_handler_check(flows)
environment_dependency_check(flows)
print("All health checks passed.")
2 changes: 1 addition & 1 deletion src/prefect/environments/storage/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def __init__(
self.extra_commands.extend(
[
"apt update && apt install -y gcc git && rm -rf /var/lib/apt/lists/*",
"pip install git+https://github.com/PrefectHQ/prefect.git@{}#egg=prefect[kubernetes]".format(
"pip install git+https://github.com/PrefectHQ/prefect.git@{}".format(
self.prefect_version
),
]
Expand Down
5 changes: 5 additions & 0 deletions tests/environments/execution/test_base_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ def f():
assert environment.on_exit is f


def test_environment_dependencies():
environment = Environment()
assert environment.dependencies == []


def test_setup_environment_passes():
environment = Environment()
environment.setup(storage=Docker())
Expand Down
5 changes: 5 additions & 0 deletions tests/environments/execution/test_dask_k8s_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ def f():
assert environment.on_exit is f


def test_dask_environment_dependencies():
environment = DaskKubernetesEnvironment()
assert environment.dependencies == ["kubernetes"]


def test_create_dask_environment_identifier_label():
environment = DaskKubernetesEnvironment()
assert environment.identifier_label
Expand Down
5 changes: 5 additions & 0 deletions tests/environments/execution/test_fargate_task_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ def f():
assert environment.on_exit is f


def test_fargate_task_environment_dependencies():
environment = FargateTaskEnvironment()
assert environment.dependencies == ["boto3", "botocore"]


def test_create_fargate_task_environment_aws_creds_provided():
environment = FargateTaskEnvironment(
labels=["foo"],
Expand Down
5 changes: 5 additions & 0 deletions tests/environments/execution/test_k8s_job_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ def f():
assert environment.on_exit is f


def test_k8s_job_environment_dependencies():
environment = KubernetesJobEnvironment()
assert environment.dependencies == ["kubernetes"]


def test_create_k8s_job_environment_identifier_label():
with tempfile.TemporaryDirectory() as directory:

Expand Down
5 changes: 5 additions & 0 deletions tests/environments/execution/test_local_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ def f():
assert environment.logger.name == "prefect.LocalEnvironment"


def test_environment_dependencies():
environment = LocalEnvironment()
assert environment.dependencies == []


def test_setup_environment_passes():
environment = LocalEnvironment()
environment.setup(storage=Docker())
Expand Down
5 changes: 5 additions & 0 deletions tests/environments/execution/test_remote_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ def f():
assert environment.on_exit is f


def test_remote_environment_dependencies():
environment = RemoteEnvironment()
assert environment.dependencies == []


def test_environment_execute():
with tempfile.TemporaryDirectory() as directory:

Expand Down
46 changes: 46 additions & 0 deletions tests/environments/storage/test_docker_healthcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
import os
import sys
import tempfile
from unittest.mock import MagicMock

import cloudpickle
import pytest

from prefect import Flow, Task, task
from prefect.environments import Environment, RemoteEnvironment
from prefect.environments.storage import _healthcheck as healthchecks


Expand Down Expand Up @@ -154,3 +156,47 @@ def down():
result = down(upstream_tasks=[up])

assert healthchecks.result_handler_check([f]) is None


class TestEnvironmentDependencyCheck:
def test_no_raise_on_normal_flow(self):
flow = Flow("THIS IS A TEST")

assert healthchecks.environment_dependency_check([flow]) is None

def test_no_raise_on_remote_env(self):
flow = Flow("THIS IS A TEST", environment=RemoteEnvironment())

assert healthchecks.environment_dependency_check([flow]) is None

def test_no_raise_on_proper_imports(self):
class NewEnvironment(Environment):
@property
def dependencies(self) -> list:
return ["prefect"]

flow = Flow("THIS IS A TEST", environment=NewEnvironment())

assert healthchecks.environment_dependency_check([flow]) is None

def test_no_raise_on_missing_dependencies_property(self):
class NewEnvironment(Environment):
pass

flow = Flow("THIS IS A TEST", environment=NewEnvironment())

assert healthchecks.environment_dependency_check([flow]) is None

@pytest.mark.skipif(
sys.version_info < (3, 6), reason="3.5 does not support ModuleNotFoundError"
)
def test_raise_on_missing_imports(self, monkeypatch):
class NewEnvironment(Environment):
@property
def dependencies(self) -> list:
return ["TEST"]

flow = Flow("THIS IS A TEST", environment=NewEnvironment())

with pytest.raises(ModuleNotFoundError):
healthchecks.environment_dependency_check([flow])
2 changes: 1 addition & 1 deletion tests/environments/storage/test_docker_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ def test_create_dockerfile_from_base_image():
(
"FROM python:3.6-slim",
"apt update && apt install -y gcc git && rm -rf /var/lib/apt/lists/*",
"pip install git+https://github.com/PrefectHQ/prefect.git@424be6b5ed8d3be85064de4b95b5c3d7cb665510#egg=prefect[kubernetes]",
"pip install git+https://github.com/PrefectHQ/prefect.git@424be6b5ed8d3be85064de4b95b5c3d7cb665510",
),
),
],
Expand Down

0 comments on commit 6b69821

Please sign in to comment.