diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c79a970a1ec..def4d14de48f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/ - Add informative logs in the event that a heartbeat thread dies - [#1721](https://github.com/PrefectHQ/prefect/pull/1721) - Loosen Job spec requirements for `KubernetesJobEnvironment` - [#1713](https://github.com/PrefectHQ/prefect/pull/1713) - Loosen `containerDefinitions` requirements for `FargateTaskEnvironment` - [#1713](https://github.com/PrefectHQ/prefect/pull/1713) +- Local Docker agent proactively fails flow runs if image cannot be pulled - [#1395](https://github.com/PrefectHQ/prefect/issues/1395) - Add graceful keyboard interrupt shutdown for all agents - [#1731](https://github.com/PrefectHQ/prefect/pull/1731) ### Task Library diff --git a/src/prefect/agent/local/agent.py b/src/prefect/agent/local/agent.py index 04ca6ad102fd..b82b3b7f8471 100644 --- a/src/prefect/agent/local/agent.py +++ b/src/prefect/agent/local/agent.py @@ -5,6 +5,7 @@ from prefect import config, context from prefect.agent import Agent +from prefect.engine.state import Failed from prefect.environments.storage import Docker from prefect.serialization.storage import StorageSchema from prefect.utilities.graphql import GraphQLResult @@ -78,9 +79,14 @@ def deploy_flows(self, flow_runs: list) -> None: storage = StorageSchema().load(flow_run.flow.storage) if not isinstance(StorageSchema().load(flow_run.flow.storage), Docker): - self.logger.error( - "Storage for flow run {} is not of type Docker.".format(flow_run.id) + msg = "Storage for flow run {} is not of type Docker.".format( + flow_run.id ) + state_msg = "Agent {} failed to run flow: ".format(self.name) + msg + self.client.set_flow_run_state( + flow_run.id, version=flow_run.version, state=Failed(state_msg) + ) + self.logger.error(msg) continue env_vars = self.populate_env_vars(flow_run=flow_run) @@ -97,7 +103,15 @@ def deploy_flows(self, flow_runs: list) -> None: "Successfully pulled image {}...".format(storage.name) ) except docker.errors.APIError as exc: - self.logger.error("Issue pulling image {}".format(storage.name)) + msg = "Issue pulling image {}".format(storage.name) + state_msg = ( + "Agent {} failed to pull image for flow: ".format(self.name) + + msg + ) + self.client.set_flow_run_state( + flow_run.id, version=flow_run.version, state=Failed(msg) + ) + self.logger.error(msg) # Create a container self.logger.debug("Creating Docker container {}".format(storage.name)) diff --git a/tests/agent/test_local_agent.py b/tests/agent/test_local_agent.py index 567dbc29fe1a..78406afa9c2f 100644 --- a/tests/agent/test_local_agent.py +++ b/tests/agent/test_local_agent.py @@ -189,6 +189,7 @@ def test_local_agent_deploy_flows(monkeypatch, runner_token): def test_local_agent_deploy_flows_storage_continues(monkeypatch, runner_token): + monkeypatch.setattr("prefect.agent.agent.Client", MagicMock()) api = MagicMock() api.ping.return_value = True api.create_container.return_value = {"Id": "container_id"} @@ -200,7 +201,11 @@ def test_local_agent_deploy_flows_storage_continues(monkeypatch, runner_token): agent.deploy_flows( flow_runs=[ GraphQLResult( - {"flow": GraphQLResult({"storage": Local().serialize()}), "id": "id"} + { + "flow": GraphQLResult({"storage": Local().serialize()}), + "id": "id", + "version": "version", + } ) ] )