diff --git a/CHANGELOG.md b/CHANGELOG.md index c2fbc7e29e01..c5edc06226e7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,13 +37,14 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/ - Fix issue with passing results to Prefect signals - [#1163](https://github.com/PrefectHQ/prefect/issues/1163) - Fix issue with `flow.update` not preserving mapped edges - [#1164](https://github.com/PrefectHQ/prefect/issues/1164) - Fix issue with Parameters and Context not being raw dictionaries - [#1186](https://github.com/PrefectHQ/prefect/issues/1186) -- Fix issue with asynchronous, long-running mapped retries in Prefect Cloud - [#1208](https://github.com/PrefectHQ/prefect/pull/1208) +- Fix issue with asynchronous, long-running mapped retries in Prefect Cloud - [#1208](https://github.com/PrefectHQ/prefect/pull/1208) - Fix issue with automatically applied collections to task call arguments when using the imperative API - [#1211](https://github.com/PrefectHQ/prefect/issues/1211) ### Breaking Changes - The CLI command `prefect execute-flow` and `prefect execute-cloud-flow` no longer exist - [#1059](https://github.com/PrefectHQ/prefect/pull/1059) - The `slack_notifier` state handler now uses a `webhook_secret` kwarg to pull the URL from a Secret - [#1075](https://github.com/PrefectHQ/prefect/issues/1075) +- Use GraphQL for Cloud logging - [#1193](https://github.com/PrefectHQ/prefect/pull/1193) - Remove the `CloudResultHandler` default result handler - [#1198](https://github.com/PrefectHQ/prefect/pull/1198) ### Contributors diff --git a/src/prefect/client/client.py b/src/prefect/client/client.py index 4859ffa10ea9..b8702045ef72 100644 --- a/src/prefect/client/client.py +++ b/src/prefect/client/client.py @@ -745,3 +745,54 @@ def set_secret(self, name: str, value: Any) -> None: if not result.data.setSecret.success: raise ValueError("Setting secret failed.") + + def write_run_log( + self, + flow_run_id: str, + task_run_id: str = None, + timestamp: datetime.datetime = None, + name: str = None, + message: str = None, + level: str = None, + info: Any = None, + ) -> None: + """ + Writes a log to Cloud + + Args: + - flow_run_id (str): the flow run id + - task_run_id (str, optional): the task run id + - timestamp (datetime, optional): the timestamp; defaults to now + - name (str, optional): the name of the logger + - message (str, optional): the log message + - level (str, optional): the log level as a string. Defaults to INFO, should be one of + DEBUG, INFO, WARNING, ERROR, or CRITICAL. + - info (Any, optional): a JSON payload of additional information + + Raises: + - ValueError: if writing the log fails + """ + mutation = { + "mutation($input: writeRunLogInput!)": { + "writeRunLog(input: $input)": {"success"} + } + } + + if timestamp is None: + timestamp = pendulum.now("UTC") + timestamp_str = pendulum.instance(timestamp).isoformat() + result = self.graphql( + mutation, + input=dict( + flowRunId=flow_run_id, + taskRunId=task_run_id, + timestamp=timestamp_str, + name=name, + message=message, + level=level, + info=info, + ), + ) # type: Any + + if not result.data.writeRunLog.success: + raise ValueError("Writing log failed.") diff --git a/src/prefect/config.toml b/src/prefect/config.toml index eefd0a0cd392..80e68f78bc7c 100644 --- a/src/prefect/config.toml +++ b/src/prefect/config.toml @@ -6,7 +6,6 @@ debug = false # the Prefect Server address api = "https://api.prefect.io" graphql = "${cloud.api}/graphql/alpha" -log = "${cloud.api}/log" use_local_secrets = true heartbeat_interval = 30.0 diff --git a/src/prefect/environments/execution/cloud/environment.py b/src/prefect/environments/execution/cloud/environment.py index 9f2129b18b29..455e60e3c6c2 100644 --- a/src/prefect/environments/execution/cloud/environment.py +++ b/src/prefect/environments/execution/cloud/environment.py @@ -258,12 +258,11 @@ def _populate_job_yaml( pod_spec["imagePullSecrets"].append({"name": namespace + "-docker"}) env[0]["value"] = prefect.config.cloud.graphql - env[1]["value"] = prefect.config.cloud.log - env[2]["value"] = prefect.config.cloud.auth_token - env[3]["value"] = flow_run_id - env[4]["value"] = prefect.context.get("namespace", "") - env[5]["value"] = docker_name - env[6]["value"] = flow_file_path + env[1]["value"] = prefect.config.cloud.auth_token + env[2]["value"] = flow_run_id + env[3]["value"] = prefect.context.get("namespace", "") + env[4]["value"] = docker_name + env[5]["value"] = flow_file_path # set image yaml_obj["spec"]["template"]["spec"]["containers"][0]["image"] = docker_name @@ -290,9 +289,8 @@ def _populate_worker_pod_yaml(self, yaml_obj: dict) -> dict: env = yaml_obj["spec"]["containers"][0]["env"] env[0]["value"] = prefect.config.cloud.graphql - env[1]["value"] = prefect.config.cloud.log - env[2]["value"] = prefect.config.cloud.auth_token - env[3]["value"] = prefect.context.get("flow_run_id", "") + env[1]["value"] = prefect.config.cloud.auth_token + env[2]["value"] = prefect.context.get("flow_run_id", "") if self.private_registry: namespace = prefect.context.get("namespace", "") diff --git a/src/prefect/environments/execution/cloud/job.yaml b/src/prefect/environments/execution/cloud/job.yaml index c9eb8c877d35..875756e59435 100644 --- a/src/prefect/environments/execution/cloud/job.yaml +++ b/src/prefect/environments/execution/cloud/job.yaml @@ -19,8 +19,6 @@ spec: env: - name: PREFECT__CLOUD__GRAPHQL value: $PREFECT__CLOUD__GRAPHQL - - name: PREFECT__CLOUD__LOG - value: PREFECT__CLOUD__LOG - name: PREFECT__CLOUD__AUTH_TOKEN value: PREFECT__CLOUD__AUTH_TOKEN - name: PREFECT__CONTEXT__FLOW_RUN_ID diff --git a/src/prefect/environments/execution/cloud/worker_pod.yaml b/src/prefect/environments/execution/cloud/worker_pod.yaml index d4a33e115ee1..18f97e7c100b 100644 --- a/src/prefect/environments/execution/cloud/worker_pod.yaml +++ b/src/prefect/environments/execution/cloud/worker_pod.yaml @@ -12,8 +12,6 @@ spec: env: - name: PREFECT__CLOUD__GRAPHQL value: $PREFECT__CLOUD__GRAPHQL - - name: PREFECT__CLOUD__LOG - value: PREFECT__CLOUD__LOG - name: PREFECT__CLOUD__AUTH_TOKEN value: PREFECT__CLOUD__AUTH_TOKEN - name: PREFECT__CONTEXT__FLOW_RUN_ID @@ -38,4 +36,4 @@ spec: requests: cpu: "500m" limits: - cpu: "500m" \ No newline at end of file + cpu: "500m" diff --git a/src/prefect/utilities/logging.py b/src/prefect/utilities/logging.py index 24d39862440c..7a2b088793d9 100644 --- a/src/prefect/utilities/logging.py +++ b/src/prefect/utilities/logging.py @@ -9,20 +9,18 @@ When running locally, log levels and message formatting are set via your Prefect configuration file. """ import logging -import os -import queue import time -from logging.handlers import QueueHandler, QueueListener from typing import Any +import pendulum + import prefect from prefect.configuration import config -class RemoteHandler(logging.StreamHandler): +class CloudHandler(logging.StreamHandler): def __init__(self) -> None: super().__init__() - self.logger_server = config.cloud.log self.client = None self.errored_out = False @@ -35,22 +33,29 @@ def emit(self, record) -> None: # type: ignore if self.client is None: self.client = Client() # type: ignore - assert isinstance(self.client, Client) # mypy assert - r = self.client.post(path="", server=self.logger_server, **record.__dict__) + assert isinstance(self.client, Client) # mypy asser + + record_dict = record.__dict__.copy() + flow_run_id = prefect.context.get("flow_run_id", None) + task_run_id = prefect.context.get("task_run_id", None) + timestamp = pendulum.from_timestamp(record_dict.get("created", time.time())) + name = record_dict.get("name", None) + message = record_dict.get("message", None) + level = record_dict.get("level", None) + + self.client.write_run_log( + flow_run_id=flow_run_id, + task_run_id=task_run_id, + timestamp=timestamp, + name=name, + message=message, + level=level, + info=record_dict, + ) except: self.errored_out = True -old_factory = logging.getLogRecordFactory() - - -def cloud_record_factory(*args: Any, **kwargs: Any) -> Any: - record = old_factory(*args, **kwargs) - record.flow_run_id = prefect.context.get("flow_run_id", "") # type: ignore - record.task_run_id = prefect.context.get("task_run_id", "") # type: ignore - return record - - def configure_logging(testing: bool = False) -> logging.Logger: """ Creates a "prefect" root logger with a `StreamHandler` that has level and formatting @@ -75,9 +80,7 @@ def configure_logging(testing: bool = False) -> logging.Logger: # send logs to server if config.logging.log_to_cloud: - logging.setLogRecordFactory(cloud_record_factory) - remote_handler = RemoteHandler() - logger.addHandler(remote_handler) + logger.addHandler(CloudHandler()) return logger diff --git a/tests/client/test_client.py b/tests/client/test_client.py index d0cbbe757bfb..f9d503b15054 100644 --- a/tests/client/test_client.py +++ b/tests/client/test_client.py @@ -691,3 +691,38 @@ def test_set_task_run_state_with_error(monkeypatch): with pytest.raises(ClientError) as exc: client.set_task_run_state(task_run_id="76-salt", version=0, state=Pending()) assert "something went wrong" in str(exc.value) + + +def test_write_log_successfully(monkeypatch): + response = {"data": {"writeRunLog": {"success": True}}} + post = MagicMock(return_value=MagicMock(json=MagicMock(return_value=response))) + session = MagicMock() + session.return_value.post = post + monkeypatch.setattr("requests.Session", session) + + with set_temporary_config( + {"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"} + ): + client = Client() + + assert client.write_run_log(flow_run_id="1") is None + + +def test_write_log_with_error(monkeypatch): + response = { + "data": {"writeRunLog": None}, + "errors": [{"message": "something went wrong"}], + } + post = MagicMock(return_value=MagicMock(json=MagicMock(return_value=response))) + session = MagicMock() + session.return_value.post = post + monkeypatch.setattr("requests.Session", session) + + with set_temporary_config( + {"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"} + ): + client = Client() + + with pytest.raises(ClientError) as exc: + client.write_run_log(flow_run_id="1") + assert "something went wrong" in str(exc.value) diff --git a/tests/environments/execution/test_cloud_environment.py b/tests/environments/execution/test_cloud_environment.py index 246210c54a52..fbbecd637205 100644 --- a/tests/environments/execution/test_cloud_environment.py +++ b/tests/environments/execution/test_cloud_environment.py @@ -179,11 +179,7 @@ def test_populate_job_yaml(): job = yaml.safe_load(job_file) with set_temporary_config( - { - "cloud.graphql": "gql_test", - "cloud.log": "log_test", - "cloud.auth_token": "auth_test", - } + {"cloud.graphql": "gql_test", "cloud.auth_token": "auth_test"} ): with prefect.context(flow_run_id="id_test", namespace="namespace_test"): yaml_obj = environment._populate_job_yaml( @@ -203,12 +199,11 @@ def test_populate_job_yaml(): env = yaml_obj["spec"]["template"]["spec"]["containers"][0]["env"] assert env[0]["value"] == "gql_test" - assert env[1]["value"] == "log_test" - assert env[2]["value"] == "auth_test" - assert env[3]["value"] == "id_test" - assert env[4]["value"] == "namespace_test" - assert env[5]["value"] == "test1/test2:test3" - assert env[6]["value"] == "test4" + assert env[1]["value"] == "auth_test" + assert env[2]["value"] == "id_test" + assert env[3]["value"] == "namespace_test" + assert env[4]["value"] == "test1/test2:test3" + assert env[5]["value"] == "test4" assert ( yaml_obj["spec"]["template"]["spec"]["containers"][0]["image"] @@ -227,11 +222,7 @@ def test_populate_worker_pod_yaml(): pod = yaml.safe_load(pod_file) with set_temporary_config( - { - "cloud.graphql": "gql_test", - "cloud.log": "log_test", - "cloud.auth_token": "auth_test", - } + {"cloud.graphql": "gql_test", "cloud.auth_token": "auth_test"} ): with prefect.context(flow_run_id="id_test", image="my_image"): yaml_obj = environment._populate_worker_pod_yaml(yaml_obj=pod) @@ -242,9 +233,8 @@ def test_populate_worker_pod_yaml(): env = yaml_obj["spec"]["containers"][0]["env"] assert env[0]["value"] == "gql_test" - assert env[1]["value"] == "log_test" - assert env[2]["value"] == "auth_test" - assert env[3]["value"] == "id_test" + assert env[1]["value"] == "auth_test" + assert env[2]["value"] == "id_test" assert yaml_obj["spec"]["containers"][0]["image"] == "my_image" @@ -260,11 +250,7 @@ def test_populate_worker_pod_yaml_with_private_registry(): pod = yaml.safe_load(pod_file) with set_temporary_config( - { - "cloud.graphql": "gql_test", - "cloud.log": "log_test", - "cloud.auth_token": "auth_test", - } + {"cloud.graphql": "gql_test", "cloud.auth_token": "auth_test"} ): with prefect.context( flow_run_id="id_test", image="my_image", namespace="foo-man" diff --git a/tests/utilities/test_logging.py b/tests/utilities/test_logging.py index da06be28f634..3a7b96d0c34a 100644 --- a/tests/utilities/test_logging.py +++ b/tests/utilities/test_logging.py @@ -25,11 +25,10 @@ def test_root_logger_level_responds_to_config(): def test_remote_handler_is_configured_for_cloud(): try: with utilities.configuration.set_temporary_config( - {"logging.log_to_cloud": True, "cloud.log": "http://foo.bar:1800/log"} + {"logging.log_to_cloud": True} ): logger = utilities.logging.configure_logging(testing=True) assert hasattr(logger.handlers[-1], "client") - assert logger.handlers[-1].logger_server == "http://foo.bar:1800/log" finally: # reset root_logger logger = utilities.logging.configure_logging(testing=True) @@ -39,7 +38,7 @@ def test_remote_handler_is_configured_for_cloud(): def test_remote_handler_captures_errors_then_passes(): try: with utilities.configuration.set_temporary_config( - {"logging.log_to_cloud": True, "cloud.log": "http://foo.bar:1800/log"} + {"logging.log_to_cloud": True} ): logger = utilities.logging.configure_logging(testing=True) assert hasattr(logger.handlers[-1], "client")