Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Environment Dependency Healthcheck #1653

Merged
merged 14 commits into from
Oct 23, 2019
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/
- Add the ability to delete task tag limits using the client - [#1622](https://github.com/PrefectHQ/prefect/pull/1622)
- Adds an "Ask for help" button with a link to the prefect.io support page - [#1637](https://github.com/PrefectHQ/prefect/pull/1637)
- Reduces the size of the `prefecthq/prefect` Docker image by ~400MB, which is now the base Docker image used in Flows - [#1648](https://github.com/PrefectHQ/prefect/pull/1648)
- Add a new healthcheck for environment dependencies - [#1653](https://github.com/PrefectHQ/prefect/pull/1653)
joshmeek marked this conversation as resolved.
Show resolved Hide resolved

### Task Library

Expand Down
4 changes: 4 additions & 0 deletions src/prefect/environments/execution/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ def __init__(
def __repr__(self) -> str:
return "<Environment: {}>".format(type(self).__name__)

@property
def dependencies(self) -> list:
return []

def setup(self, storage: "Storage") -> None:
"""
Sets up any infrastructure needed for this environment
Expand Down
4 changes: 4 additions & 0 deletions src/prefect/environments/execution/dask/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ def __init__(

super().__init__(labels=labels, on_start=on_start, on_exit=on_exit)

@property
def dependencies(self) -> list:
return ["kubernetes"]

def setup(self, storage: "Docker") -> None: # type: ignore
if self.private_registry:
from kubernetes import client, config
Expand Down
4 changes: 4 additions & 0 deletions src/prefect/environments/execution/fargate/fargate_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ def _parse_kwargs(self, user_kwargs: dict) -> tuple:

return task_definition_kwargs, task_run_kwargs

@property
def dependencies(self) -> list:
return ["boto3", "botocore"]

def setup(self, storage: "Docker") -> None: # type: ignore
"""
Register the task definition if it does not already exist.
Expand Down
4 changes: 4 additions & 0 deletions src/prefect/environments/execution/k8s/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ def __init__(

super().__init__(labels=labels, on_start=on_start, on_exit=on_exit)

@property
def dependencies(self) -> list:
return ["kubernetes"]

def execute( # type: ignore
self, storage: "Docker", flow_location: str, **kwargs: Any
) -> None:
Expand Down
4 changes: 4 additions & 0 deletions src/prefect/environments/execution/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ def __init__(
) -> None:
super().__init__(labels=labels, on_start=on_start, on_exit=on_exit)

@property
def dependencies(self) -> list:
return []

def execute(self, storage: "Storage", flow_location: str, **kwargs: Any) -> None:
"""
Executes the flow for this environment from the storage parameter,
Expand Down
4 changes: 4 additions & 0 deletions src/prefect/environments/execution/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ def __init__(
self.executor_kwargs = executor_kwargs or dict()
super().__init__(labels=labels, on_start=on_start, on_exit=on_exit)

@property
def dependencies(self) -> list:
return []

def execute( # type: ignore
self, storage: "Storage", flow_location: str, **kwargs: Any
) -> None:
Expand Down
23 changes: 23 additions & 0 deletions src/prefect/environments/storage/_healthcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""

import ast
import importlib
import sys
import warnings

Expand Down Expand Up @@ -93,11 +94,33 @@ def result_handler_check(flows: list):
print("Result Handler check: OK")


def environment_dependency_check(flows: list):
# Test for imports that are required by certain environments
for flow in flows:
# Load all required dependencies for an environment
if not hasattr(flow.environment, "dependencies"):
continue

required_imports = flow.environment.dependencies
for dependency in required_imports:
try:
importlib.import_module(dependency)
except ModuleNotFoundError:
raise ModuleNotFoundError(
"Using {} requires the `{}` dependency".format(
flow.environment.__class__.__name__, dependency
joshmeek marked this conversation as resolved.
Show resolved Hide resolved
)
)

print("Environment dependency check: OK")


if __name__ == "__main__":
flow_file_path, python_version = sys.argv[1:3]

print("Beginning health checks...")
system_check(python_version)
flows = cloudpickle_deserialization_check(flow_file_path)
result_handler_check(flows)
environment_dependency_check(flows)
print("All health checks passed.")
2 changes: 1 addition & 1 deletion src/prefect/environments/storage/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def __init__(
self.extra_commands.extend(
[
"apt update && apt install -y gcc git && rm -rf /var/lib/apt/lists/*",
"pip install git+https://github.com/PrefectHQ/prefect.git@{}#egg=prefect[kubernetes]".format(
"pip install git+https://github.com/PrefectHQ/prefect.git@{}".format(
joshmeek marked this conversation as resolved.
Show resolved Hide resolved
self.prefect_version
),
]
Expand Down
5 changes: 5 additions & 0 deletions tests/environments/execution/test_base_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ def f():
assert environment.on_exit is f


def test_environment_dependencies():
environment = Environment()
assert environment.dependencies == []


def test_setup_environment_passes():
environment = Environment()
environment.setup(storage=Docker())
Expand Down
5 changes: 5 additions & 0 deletions tests/environments/execution/test_dask_k8s_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ def f():
assert environment.on_exit is f


def test_dask_environment_dependencies():
environment = DaskKubernetesEnvironment()
assert environment.dependencies == ["kubernetes"]


def test_create_dask_environment_identifier_label():
environment = DaskKubernetesEnvironment()
assert environment.identifier_label
Expand Down
5 changes: 5 additions & 0 deletions tests/environments/execution/test_fargate_task_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ def f():
assert environment.on_exit is f


def test_fargate_task_environment_dependencies():
environment = FargateTaskEnvironment()
assert environment.dependencies == ["boto3", "botocore"]


def test_create_fargate_task_environment_aws_creds_provided():
environment = FargateTaskEnvironment(
labels=["foo"],
Expand Down
5 changes: 5 additions & 0 deletions tests/environments/execution/test_k8s_job_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ def f():
assert environment.on_exit is f


def test_k8s_job_environment_dependencies():
environment = KubernetesJobEnvironment()
assert environment.dependencies == ["kubernetes"]


def test_create_k8s_job_environment_identifier_label():
with tempfile.TemporaryDirectory() as directory:

Expand Down
5 changes: 5 additions & 0 deletions tests/environments/execution/test_local_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ def f():
assert environment.logger.name == "prefect.LocalEnvironment"


def test_environment_dependencies():
environment = LocalEnvironment()
assert environment.dependencies == []


def test_setup_environment_passes():
environment = LocalEnvironment()
environment.setup(storage=Docker())
Expand Down
5 changes: 5 additions & 0 deletions tests/environments/execution/test_remote_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ def f():
assert environment.on_exit is f


def test_remote_environment_dependencies():
environment = RemoteEnvironment()
assert environment.dependencies == []


def test_environment_execute():
with tempfile.TemporaryDirectory() as directory:

Expand Down
46 changes: 46 additions & 0 deletions tests/environments/storage/test_docker_healthcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
import os
import sys
import tempfile
from unittest.mock import MagicMock

import cloudpickle
import pytest

from prefect import Flow, Task, task
from prefect.environments import Environment, RemoteEnvironment
from prefect.environments.storage import _healthcheck as healthchecks


Expand Down Expand Up @@ -154,3 +156,47 @@ def down():
result = down(upstream_tasks=[up])

assert healthchecks.result_handler_check([f]) is None


class TestEnvironmentDependencyCheck:
def test_no_raise_on_normal_flow(self):
flow = Flow("THIS IS A TEST")

assert healthchecks.environment_dependency_check([flow]) is None

def test_no_raise_on_remote_env(self):
flow = Flow("THIS IS A TEST", environment=RemoteEnvironment())

assert healthchecks.environment_dependency_check([flow]) is None

def test_no_raise_on_proper_imports(self):
class NewEnvironment(Environment):
@property
def dependencies(self) -> list:
return ["prefect"]

flow = Flow("THIS IS A TEST", environment=NewEnvironment())

assert healthchecks.environment_dependency_check([flow]) is None

def test_no_raise_on_missing_dependencies_property(self):
class NewEnvironment(Environment):
pass

flow = Flow("THIS IS A TEST", environment=NewEnvironment())

assert healthchecks.environment_dependency_check([flow]) is None

@pytest.mark.skipif(
sys.version_info < (3, 6), reason="3.5 does not support ModuleNotFoundError"
)
def test_raise_on_missing_imports(self, monkeypatch):
class NewEnvironment(Environment):
@property
def dependencies(self) -> list:
return ["TEST"]

flow = Flow("THIS IS A TEST", environment=NewEnvironment())

with pytest.raises(ModuleNotFoundError):
healthchecks.environment_dependency_check([flow])
2 changes: 1 addition & 1 deletion tests/environments/storage/test_docker_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ def test_create_dockerfile_from_base_image():
(
"FROM python:3.6-slim",
"apt update && apt install -y gcc git && rm -rf /var/lib/apt/lists/*",
"pip install git+https://github.com/PrefectHQ/prefect.git@424be6b5ed8d3be85064de4b95b5c3d7cb665510#egg=prefect[kubernetes]",
"pip install git+https://github.com/PrefectHQ/prefect.git@424be6b5ed8d3be85064de4b95b5c3d7cb665510",
),
),
],
Expand Down