From 44a69c233932be9badb4c833af73517e4e66fc38 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com> Date: Mon, 1 Jul 2019 11:00:45 -0400 Subject: [PATCH 1/5] Update Client with new log endpoint --- src/prefect/client/client.py | 51 +++++++++++++++++++ src/prefect/config.toml | 1 - src/prefect/engine/signals.py | 4 +- .../execution/cloud/environment.py | 20 ++++---- .../environments/execution/cloud/job.yaml | 2 - .../execution/cloud/worker_pod.yaml | 4 +- src/prefect/utilities/logging.py | 43 ++++++++-------- tests/client/test_client.py | 31 +++++++++++ .../execution/test_cloud_environment.py | 6 +-- tests/utilities/test_logging.py | 5 +- 10 files changed, 121 insertions(+), 46 deletions(-) diff --git a/src/prefect/client/client.py b/src/prefect/client/client.py index e72c6f03060c..b67eea464f80 100644 --- a/src/prefect/client/client.py +++ b/src/prefect/client/client.py @@ -730,3 +730,54 @@ def set_secret(self, name: str, value: Any) -> None: if not result.data.setSecret.success: raise ValueError("Setting secret failed.") + + def write_run_log( + self, + flow_run_id: str, + task_run_id: str = None, + timestamp: datetime.datetime = None, + name: str = None, + message: str = None, + level: str = None, + info: Any = None, + ) -> None: + """ + Writes a log to Cloud + + Args: + - flow_run_id (str): the flow run id + - task_run_id (str, optional): the task run id + - timestamp (datetime, optional): the timestamp; defaults to now + - name (str, optional): the name of the logger + - message (str, optional): the log message + - level (str, optional): the log level as a string. Defaults to INFO, should be one of + DEBUG, INFO, WARNING, ERROR, or CRITICAL. + - info (Any, optional): a JSON payload of additional information + + Raises: + - ValueError: if writing the log fails + """ + mutation = { + "mutation($input: writeRunLogInput!)": { + "writeRunLog(input: $input)": {"success"} + } + } + + if timestamp is None: + timestamp = pendulum.now("UTC") + timestamp_str = pendulum.instance(timestamp).isoformat() + result = self.graphql( + mutation, + input=dict( + flowRunId=flow_run_id, + taskRunId=task_run_id, + timestamp=timestamp_str, + name=name, + message=message, + level=level, + info=info, + ), + ) # type: Any + + if not result.data.writeRunLog.success: + raise ValueError("Writing log failed.") diff --git a/src/prefect/config.toml b/src/prefect/config.toml index e2fda4b76a22..2e71576e75f2 100644 --- a/src/prefect/config.toml +++ b/src/prefect/config.toml @@ -6,7 +6,6 @@ debug = false # the Prefect Server address api = "https://api.prefect.io" graphql = "${cloud.api}/graphql/alpha" -log = "${cloud.api}/log" result_handler = "${cloud.api}/result-handler" use_local_secrets = true heartbeat_interval = 30.0 diff --git a/src/prefect/engine/signals.py b/src/prefect/engine/signals.py index d23fe24601e4..48dd58d82d08 100644 --- a/src/prefect/engine/signals.py +++ b/src/prefect/engine/signals.py @@ -24,9 +24,7 @@ class PrefectStateSignal(PrefectError): def __init__(self, message: str = None, *args, **kwargs): # type: ignore super().__init__(message) # type: ignore kwargs.setdefault("result", self) - self.state = self._state_cls( # type: ignore - message=message, *args, **kwargs - ) + self.state = self._state_cls(message=message, *args, **kwargs) # type: ignore class FAIL(PrefectStateSignal): diff --git a/src/prefect/environments/execution/cloud/environment.py b/src/prefect/environments/execution/cloud/environment.py index 45dcf3f91b5f..de5ba40eaf17 100644 --- a/src/prefect/environments/execution/cloud/environment.py +++ b/src/prefect/environments/execution/cloud/environment.py @@ -258,13 +258,12 @@ def _populate_job_yaml( pod_spec["imagePullSecrets"].append({"name": namespace + "-docker"}) env[0]["value"] = prefect.config.cloud.graphql - env[1]["value"] = prefect.config.cloud.log - env[2]["value"] = prefect.config.cloud.result_handler - env[3]["value"] = prefect.config.cloud.auth_token - env[4]["value"] = flow_run_id - env[5]["value"] = prefect.context.get("namespace", "") - env[6]["value"] = docker_name - env[7]["value"] = flow_file_path + env[1]["value"] = prefect.config.cloud.result_handler + env[2]["value"] = prefect.config.cloud.auth_token + env[3]["value"] = flow_run_id + env[4]["value"] = prefect.context.get("namespace", "") + env[5]["value"] = docker_name + env[6]["value"] = flow_file_path # set image yaml_obj["spec"]["template"]["spec"]["containers"][0]["image"] = docker_name @@ -291,10 +290,9 @@ def _populate_worker_pod_yaml(self, yaml_obj: dict) -> dict: env = yaml_obj["spec"]["containers"][0]["env"] env[0]["value"] = prefect.config.cloud.graphql - env[1]["value"] = prefect.config.cloud.log - env[2]["value"] = prefect.config.cloud.result_handler - env[3]["value"] = prefect.config.cloud.auth_token - env[4]["value"] = prefect.context.get("flow_run_id", "") + env[1]["value"] = prefect.config.cloud.result_handler + env[2]["value"] = prefect.config.cloud.auth_token + env[3]["value"] = prefect.context.get("flow_run_id", "") if self.private_registry: namespace = prefect.context.get("namespace", "") diff --git a/src/prefect/environments/execution/cloud/job.yaml b/src/prefect/environments/execution/cloud/job.yaml index de923a3e4fc3..8ae39d0ca0e2 100644 --- a/src/prefect/environments/execution/cloud/job.yaml +++ b/src/prefect/environments/execution/cloud/job.yaml @@ -19,8 +19,6 @@ spec: env: - name: PREFECT__CLOUD__GRAPHQL value: $PREFECT__CLOUD__GRAPHQL - - name: PREFECT__CLOUD__LOG - value: PREFECT__CLOUD__LOG - name: PREFECT__CLOUD__RESULT_HANDLER value: PREFECT__CLOUD__RESULT_HANDLER - name: PREFECT__CLOUD__AUTH_TOKEN diff --git a/src/prefect/environments/execution/cloud/worker_pod.yaml b/src/prefect/environments/execution/cloud/worker_pod.yaml index 73c8cdd725e8..513c6791fb6c 100644 --- a/src/prefect/environments/execution/cloud/worker_pod.yaml +++ b/src/prefect/environments/execution/cloud/worker_pod.yaml @@ -12,8 +12,6 @@ spec: env: - name: PREFECT__CLOUD__GRAPHQL value: $PREFECT__CLOUD__GRAPHQL - - name: PREFECT__CLOUD__LOG - value: PREFECT__CLOUD__LOG - name: PREFECT__CLOUD__RESULT_HANDLER value: PREFECT__CLOUD__RESULT_HANDLER - name: PREFECT__CLOUD__AUTH_TOKEN @@ -40,4 +38,4 @@ spec: requests: cpu: "500m" limits: - cpu: "500m" \ No newline at end of file + cpu: "500m" diff --git a/src/prefect/utilities/logging.py b/src/prefect/utilities/logging.py index 24d39862440c..cf6d45c042ae 100644 --- a/src/prefect/utilities/logging.py +++ b/src/prefect/utilities/logging.py @@ -9,20 +9,18 @@ When running locally, log levels and message formatting are set via your Prefect configuration file. """ import logging -import os -import queue import time -from logging.handlers import QueueHandler, QueueListener from typing import Any +import pendulum + import prefect from prefect.configuration import config -class RemoteHandler(logging.StreamHandler): +class CloudHandler(logging.StreamHandler): def __init__(self) -> None: super().__init__() - self.logger_server = config.cloud.log self.client = None self.errored_out = False @@ -35,22 +33,29 @@ def emit(self, record) -> None: # type: ignore if self.client is None: self.client = Client() # type: ignore - assert isinstance(self.client, Client) # mypy assert - r = self.client.post(path="", server=self.logger_server, **record.__dict__) + assert isinstance(self.client, Client) # mypy asser + + record_dict = record.__dict__.copy() + flow_run_id = prefect.context.get("flow_run_id", None) + task_run_id = prefect.context.get("task_run_id", None) + timestamp = record_dict.pop("timestamp", pendulum.now("UTC")) + name = record_dict.pop("name", None) + message = record_dict.pop("message", None) + level = record_dict.pop("level", None) + + self.client.write_run_log( + flow_run_id=flow_run_id, + task_run_id=task_run_id, + timestamp=timestamp, + name=name, + message=message, + level=level, + info=record_dict, + ) except: self.errored_out = True -old_factory = logging.getLogRecordFactory() - - -def cloud_record_factory(*args: Any, **kwargs: Any) -> Any: - record = old_factory(*args, **kwargs) - record.flow_run_id = prefect.context.get("flow_run_id", "") # type: ignore - record.task_run_id = prefect.context.get("task_run_id", "") # type: ignore - return record - - def configure_logging(testing: bool = False) -> logging.Logger: """ Creates a "prefect" root logger with a `StreamHandler` that has level and formatting @@ -75,9 +80,7 @@ def configure_logging(testing: bool = False) -> logging.Logger: # send logs to server if config.logging.log_to_cloud: - logging.setLogRecordFactory(cloud_record_factory) - remote_handler = RemoteHandler() - logger.addHandler(remote_handler) + logger.addHandler(CloudHandler()) return logger diff --git a/tests/client/test_client.py b/tests/client/test_client.py index 43942886de26..c90b2a57b5f3 100644 --- a/tests/client/test_client.py +++ b/tests/client/test_client.py @@ -647,3 +647,34 @@ def test_set_task_run_state_with_error(monkeypatch): with pytest.raises(ClientError) as exc: client.set_task_run_state(task_run_id="76-salt", version=0, state=Pending()) assert "something went wrong" in str(exc.value) + + +def test_write_log_successfully(monkeypatch): + response = {"data": {"writeRunLog": {"success": True}}} + post = MagicMock(return_value=MagicMock(json=MagicMock(return_value=response))) + + monkeypatch.setattr("requests.post", post) + with set_temporary_config( + {"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"} + ): + client = Client() + + assert client.write_run_log(flow_run_id="1") is None + + +def test_write_log_with_error(monkeypatch): + response = { + "data": {"writeRunLog": None}, + "errors": [{"message": "something went wrong"}], + } + post = MagicMock(return_value=MagicMock(json=MagicMock(return_value=response))) + + monkeypatch.setattr("requests.post", post) + with set_temporary_config( + {"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"} + ): + client = Client() + + with pytest.raises(ClientError) as exc: + client.write_run_log(flow_run_id="1") + assert "something went wrong" in str(exc.value) diff --git a/tests/environments/execution/test_cloud_environment.py b/tests/environments/execution/test_cloud_environment.py index 7c403e7b3346..462476099e57 100644 --- a/tests/environments/execution/test_cloud_environment.py +++ b/tests/environments/execution/test_cloud_environment.py @@ -181,7 +181,7 @@ def test_populate_job_yaml(): with set_temporary_config( { "cloud.graphql": "gql_test", - "cloud.log": "log_test", + "cloud.result_handler": "rh_test", "cloud.auth_token": "auth_test", } @@ -231,7 +231,7 @@ def test_populate_worker_pod_yaml(): with set_temporary_config( { "cloud.graphql": "gql_test", - "cloud.log": "log_test", + "cloud.result_handler": "rh_test", "cloud.auth_token": "auth_test", } @@ -266,7 +266,7 @@ def test_populate_worker_pod_yaml_with_private_registry(): with set_temporary_config( { "cloud.graphql": "gql_test", - "cloud.log": "log_test", + "cloud.result_handler": "rh_test", "cloud.auth_token": "auth_test", } diff --git a/tests/utilities/test_logging.py b/tests/utilities/test_logging.py index da06be28f634..3a7b96d0c34a 100644 --- a/tests/utilities/test_logging.py +++ b/tests/utilities/test_logging.py @@ -25,11 +25,10 @@ def test_root_logger_level_responds_to_config(): def test_remote_handler_is_configured_for_cloud(): try: with utilities.configuration.set_temporary_config( - {"logging.log_to_cloud": True, "cloud.log": "http://foo.bar:1800/log"} + {"logging.log_to_cloud": True} ): logger = utilities.logging.configure_logging(testing=True) assert hasattr(logger.handlers[-1], "client") - assert logger.handlers[-1].logger_server == "http://foo.bar:1800/log" finally: # reset root_logger logger = utilities.logging.configure_logging(testing=True) @@ -39,7 +38,7 @@ def test_remote_handler_is_configured_for_cloud(): def test_remote_handler_captures_errors_then_passes(): try: with utilities.configuration.set_temporary_config( - {"logging.log_to_cloud": True, "cloud.log": "http://foo.bar:1800/log"} + {"logging.log_to_cloud": True} ): logger = utilities.logging.configure_logging(testing=True) assert hasattr(logger.handlers[-1], "client") From d8788fa6c52051e0d6f98ac6a11e3b595030f2fd Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com> Date: Mon, 1 Jul 2019 11:10:13 -0400 Subject: [PATCH 2/5] :black_circle: --- tests/environments/execution/test_cloud_environment.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/environments/execution/test_cloud_environment.py b/tests/environments/execution/test_cloud_environment.py index 462476099e57..28d3d6d979b3 100644 --- a/tests/environments/execution/test_cloud_environment.py +++ b/tests/environments/execution/test_cloud_environment.py @@ -181,7 +181,6 @@ def test_populate_job_yaml(): with set_temporary_config( { "cloud.graphql": "gql_test", - "cloud.result_handler": "rh_test", "cloud.auth_token": "auth_test", } @@ -231,7 +230,6 @@ def test_populate_worker_pod_yaml(): with set_temporary_config( { "cloud.graphql": "gql_test", - "cloud.result_handler": "rh_test", "cloud.auth_token": "auth_test", } @@ -266,7 +264,6 @@ def test_populate_worker_pod_yaml_with_private_registry(): with set_temporary_config( { "cloud.graphql": "gql_test", - "cloud.result_handler": "rh_test", "cloud.auth_token": "auth_test", } From ba23316c3531a5ecb9884dc73bf565fb565e605e Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com> Date: Tue, 2 Jul 2019 14:50:09 -0400 Subject: [PATCH 3/5] Update record extraction to be nondestructive --- src/prefect/utilities/logging.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/prefect/utilities/logging.py b/src/prefect/utilities/logging.py index cf6d45c042ae..7a2b088793d9 100644 --- a/src/prefect/utilities/logging.py +++ b/src/prefect/utilities/logging.py @@ -38,10 +38,10 @@ def emit(self, record) -> None: # type: ignore record_dict = record.__dict__.copy() flow_run_id = prefect.context.get("flow_run_id", None) task_run_id = prefect.context.get("task_run_id", None) - timestamp = record_dict.pop("timestamp", pendulum.now("UTC")) - name = record_dict.pop("name", None) - message = record_dict.pop("message", None) - level = record_dict.pop("level", None) + timestamp = pendulum.from_timestamp(record_dict.get("created", time.time())) + name = record_dict.get("name", None) + message = record_dict.get("message", None) + level = record_dict.get("level", None) self.client.write_run_log( flow_run_id=flow_run_id, From 58c8b596b2c6522e87b724b39ca269115e468ec1 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com> Date: Wed, 10 Jul 2019 14:09:32 -0400 Subject: [PATCH 4/5] Fix tests for requests session --- tests/client/test_client.py | 8 ++++++-- .../execution/test_cloud_environment.py | 15 +++------------ 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/tests/client/test_client.py b/tests/client/test_client.py index 1f0779bb9d73..f9d503b15054 100644 --- a/tests/client/test_client.py +++ b/tests/client/test_client.py @@ -696,8 +696,10 @@ def test_set_task_run_state_with_error(monkeypatch): def test_write_log_successfully(monkeypatch): response = {"data": {"writeRunLog": {"success": True}}} post = MagicMock(return_value=MagicMock(json=MagicMock(return_value=response))) + session = MagicMock() + session.return_value.post = post + monkeypatch.setattr("requests.Session", session) - monkeypatch.setattr("requests.post", post) with set_temporary_config( {"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"} ): @@ -712,8 +714,10 @@ def test_write_log_with_error(monkeypatch): "errors": [{"message": "something went wrong"}], } post = MagicMock(return_value=MagicMock(json=MagicMock(return_value=response))) + session = MagicMock() + session.return_value.post = post + monkeypatch.setattr("requests.Session", session) - monkeypatch.setattr("requests.post", post) with set_temporary_config( {"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"} ): diff --git a/tests/environments/execution/test_cloud_environment.py b/tests/environments/execution/test_cloud_environment.py index 41b8f260d19a..fbbecd637205 100644 --- a/tests/environments/execution/test_cloud_environment.py +++ b/tests/environments/execution/test_cloud_environment.py @@ -179,10 +179,7 @@ def test_populate_job_yaml(): job = yaml.safe_load(job_file) with set_temporary_config( - { - "cloud.graphql": "gql_test", - "cloud.auth_token": "auth_test", - } + {"cloud.graphql": "gql_test", "cloud.auth_token": "auth_test"} ): with prefect.context(flow_run_id="id_test", namespace="namespace_test"): yaml_obj = environment._populate_job_yaml( @@ -225,10 +222,7 @@ def test_populate_worker_pod_yaml(): pod = yaml.safe_load(pod_file) with set_temporary_config( - { - "cloud.graphql": "gql_test", - "cloud.auth_token": "auth_test", - } + {"cloud.graphql": "gql_test", "cloud.auth_token": "auth_test"} ): with prefect.context(flow_run_id="id_test", image="my_image"): yaml_obj = environment._populate_worker_pod_yaml(yaml_obj=pod) @@ -256,10 +250,7 @@ def test_populate_worker_pod_yaml_with_private_registry(): pod = yaml.safe_load(pod_file) with set_temporary_config( - { - "cloud.graphql": "gql_test", - "cloud.auth_token": "auth_test", - } + {"cloud.graphql": "gql_test", "cloud.auth_token": "auth_test"} ): with prefect.context( flow_run_id="id_test", image="my_image", namespace="foo-man" From 48e93cd34558769bcb10ed824cfcd2f35210d864 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com> Date: Wed, 10 Jul 2019 14:16:01 -0400 Subject: [PATCH 5/5] Update CHANGELOG.md --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 590084fa1528..ff4ab0c49e74 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,13 +32,14 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/ - Fix issue with passing results to Prefect signals - [#1163](https://github.com/PrefectHQ/prefect/issues/1163) - Fix issue with `flow.update` not preserving mapped edges - [#1164](https://github.com/PrefectHQ/prefect/issues/1164) - Fix issue with Parameters and Context not being raw dictionaries - [#1186](https://github.com/PrefectHQ/prefect/issues/1186) -- Fix issue with asynchronous, long-running mapped retries in Prefect Cloud - [#1208](https://github.com/PrefectHQ/prefect/pull/1208) +- Fix issue with asynchronous, long-running mapped retries in Prefect Cloud - [#1208](https://github.com/PrefectHQ/prefect/pull/1208) - Fix issue with automatically applied collections to task call arguments when using the imperative API - [#1211](https://github.com/PrefectHQ/prefect/issues/1211) ### Breaking Changes - The CLI command `prefect execute-flow` and `prefect execute-cloud-flow` no longer exist - [#1059](https://github.com/PrefectHQ/prefect/pull/1059) - The `slack_notifier` state handler now uses a `webhook_secret` kwarg to pull the URL from a Secret - [#1075](https://github.com/PrefectHQ/prefect/issues/1075) +- Use GraphQL for Cloud logging - [#1193](https://github.com/PrefectHQ/prefect/pull/1193) - Remove the `CloudResultHandler` default result handler - [#1198](https://github.com/PrefectHQ/prefect/pull/1198) ### Contributors