Skip to content

Commit

Permalink
Update Client with new GQL log endpoint (#1193)
Browse files Browse the repository at this point in the history
Update Client with new GQL log endpoint
  • Loading branch information
jlowin committed Jul 12, 2019
2 parents 9b6229e + 84987f6 commit 5e76dee
Show file tree
Hide file tree
Showing 10 changed files with 131 additions and 63 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Expand Up @@ -37,13 +37,14 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/
- Fix issue with passing results to Prefect signals - [#1163](https://github.com/PrefectHQ/prefect/issues/1163)
- Fix issue with `flow.update` not preserving mapped edges - [#1164](https://github.com/PrefectHQ/prefect/issues/1164)
- Fix issue with Parameters and Context not being raw dictionaries - [#1186](https://github.com/PrefectHQ/prefect/issues/1186)
- Fix issue with asynchronous, long-running mapped retries in Prefect Cloud - [#1208](https://github.com/PrefectHQ/prefect/pull/1208)
- Fix issue with asynchronous, long-running mapped retries in Prefect Cloud - [#1208](https://github.com/PrefectHQ/prefect/pull/1208)
- Fix issue with automatically applied collections to task call arguments when using the imperative API - [#1211](https://github.com/PrefectHQ/prefect/issues/1211)

### Breaking Changes

- The CLI command `prefect execute-flow` and `prefect execute-cloud-flow` no longer exist - [#1059](https://github.com/PrefectHQ/prefect/pull/1059)
- The `slack_notifier` state handler now uses a `webhook_secret` kwarg to pull the URL from a Secret - [#1075](https://github.com/PrefectHQ/prefect/issues/1075)
- Use GraphQL for Cloud logging - [#1193](https://github.com/PrefectHQ/prefect/pull/1193)
- Remove the `CloudResultHandler` default result handler - [#1198](https://github.com/PrefectHQ/prefect/pull/1198)

### Contributors
Expand Down
51 changes: 51 additions & 0 deletions src/prefect/client/client.py
Expand Up @@ -745,3 +745,54 @@ def set_secret(self, name: str, value: Any) -> None:

if not result.data.setSecret.success:
raise ValueError("Setting secret failed.")

def write_run_log(
self,
flow_run_id: str,
task_run_id: str = None,
timestamp: datetime.datetime = None,
name: str = None,
message: str = None,
level: str = None,
info: Any = None,
) -> None:
"""
Writes a log to Cloud
Args:
- flow_run_id (str): the flow run id
- task_run_id (str, optional): the task run id
- timestamp (datetime, optional): the timestamp; defaults to now
- name (str, optional): the name of the logger
- message (str, optional): the log message
- level (str, optional): the log level as a string. Defaults to INFO, should be one of
DEBUG, INFO, WARNING, ERROR, or CRITICAL.
- info (Any, optional): a JSON payload of additional information
Raises:
- ValueError: if writing the log fails
"""
mutation = {
"mutation($input: writeRunLogInput!)": {
"writeRunLog(input: $input)": {"success"}
}
}

if timestamp is None:
timestamp = pendulum.now("UTC")
timestamp_str = pendulum.instance(timestamp).isoformat()
result = self.graphql(
mutation,
input=dict(
flowRunId=flow_run_id,
taskRunId=task_run_id,
timestamp=timestamp_str,
name=name,
message=message,
level=level,
info=info,
),
) # type: Any

if not result.data.writeRunLog.success:
raise ValueError("Writing log failed.")
1 change: 0 additions & 1 deletion src/prefect/config.toml
Expand Up @@ -6,7 +6,6 @@ debug = false
# the Prefect Server address
api = "https://api.prefect.io"
graphql = "${cloud.api}/graphql/alpha"
log = "${cloud.api}/log"
use_local_secrets = true
heartbeat_interval = 30.0

Expand Down
16 changes: 7 additions & 9 deletions src/prefect/environments/execution/cloud/environment.py
Expand Up @@ -258,12 +258,11 @@ def _populate_job_yaml(
pod_spec["imagePullSecrets"].append({"name": namespace + "-docker"})

env[0]["value"] = prefect.config.cloud.graphql
env[1]["value"] = prefect.config.cloud.log
env[2]["value"] = prefect.config.cloud.auth_token
env[3]["value"] = flow_run_id
env[4]["value"] = prefect.context.get("namespace", "")
env[5]["value"] = docker_name
env[6]["value"] = flow_file_path
env[1]["value"] = prefect.config.cloud.auth_token
env[2]["value"] = flow_run_id
env[3]["value"] = prefect.context.get("namespace", "")
env[4]["value"] = docker_name
env[5]["value"] = flow_file_path

# set image
yaml_obj["spec"]["template"]["spec"]["containers"][0]["image"] = docker_name
Expand All @@ -290,9 +289,8 @@ def _populate_worker_pod_yaml(self, yaml_obj: dict) -> dict:
env = yaml_obj["spec"]["containers"][0]["env"]

env[0]["value"] = prefect.config.cloud.graphql
env[1]["value"] = prefect.config.cloud.log
env[2]["value"] = prefect.config.cloud.auth_token
env[3]["value"] = prefect.context.get("flow_run_id", "")
env[1]["value"] = prefect.config.cloud.auth_token
env[2]["value"] = prefect.context.get("flow_run_id", "")

if self.private_registry:
namespace = prefect.context.get("namespace", "")
Expand Down
2 changes: 0 additions & 2 deletions src/prefect/environments/execution/cloud/job.yaml
Expand Up @@ -19,8 +19,6 @@ spec:
env:
- name: PREFECT__CLOUD__GRAPHQL
value: $PREFECT__CLOUD__GRAPHQL
- name: PREFECT__CLOUD__LOG
value: PREFECT__CLOUD__LOG
- name: PREFECT__CLOUD__AUTH_TOKEN
value: PREFECT__CLOUD__AUTH_TOKEN
- name: PREFECT__CONTEXT__FLOW_RUN_ID
Expand Down
4 changes: 1 addition & 3 deletions src/prefect/environments/execution/cloud/worker_pod.yaml
Expand Up @@ -12,8 +12,6 @@ spec:
env:
- name: PREFECT__CLOUD__GRAPHQL
value: $PREFECT__CLOUD__GRAPHQL
- name: PREFECT__CLOUD__LOG
value: PREFECT__CLOUD__LOG
- name: PREFECT__CLOUD__AUTH_TOKEN
value: PREFECT__CLOUD__AUTH_TOKEN
- name: PREFECT__CONTEXT__FLOW_RUN_ID
Expand All @@ -38,4 +36,4 @@ spec:
requests:
cpu: "500m"
limits:
cpu: "500m"
cpu: "500m"
43 changes: 23 additions & 20 deletions src/prefect/utilities/logging.py
Expand Up @@ -9,20 +9,18 @@
When running locally, log levels and message formatting are set via your Prefect configuration file.
"""
import logging
import os
import queue
import time
from logging.handlers import QueueHandler, QueueListener
from typing import Any

import pendulum

import prefect
from prefect.configuration import config


class RemoteHandler(logging.StreamHandler):
class CloudHandler(logging.StreamHandler):
def __init__(self) -> None:
super().__init__()
self.logger_server = config.cloud.log
self.client = None
self.errored_out = False

Expand All @@ -35,22 +33,29 @@ def emit(self, record) -> None: # type: ignore
if self.client is None:
self.client = Client() # type: ignore

assert isinstance(self.client, Client) # mypy assert
r = self.client.post(path="", server=self.logger_server, **record.__dict__)
assert isinstance(self.client, Client) # mypy asser

record_dict = record.__dict__.copy()
flow_run_id = prefect.context.get("flow_run_id", None)
task_run_id = prefect.context.get("task_run_id", None)
timestamp = pendulum.from_timestamp(record_dict.get("created", time.time()))
name = record_dict.get("name", None)
message = record_dict.get("message", None)
level = record_dict.get("level", None)

self.client.write_run_log(
flow_run_id=flow_run_id,
task_run_id=task_run_id,
timestamp=timestamp,
name=name,
message=message,
level=level,
info=record_dict,
)
except:
self.errored_out = True


old_factory = logging.getLogRecordFactory()


def cloud_record_factory(*args: Any, **kwargs: Any) -> Any:
record = old_factory(*args, **kwargs)
record.flow_run_id = prefect.context.get("flow_run_id", "") # type: ignore
record.task_run_id = prefect.context.get("task_run_id", "") # type: ignore
return record


def configure_logging(testing: bool = False) -> logging.Logger:
"""
Creates a "prefect" root logger with a `StreamHandler` that has level and formatting
Expand All @@ -75,9 +80,7 @@ def configure_logging(testing: bool = False) -> logging.Logger:

# send logs to server
if config.logging.log_to_cloud:
logging.setLogRecordFactory(cloud_record_factory)
remote_handler = RemoteHandler()
logger.addHandler(remote_handler)
logger.addHandler(CloudHandler())
return logger


Expand Down
35 changes: 35 additions & 0 deletions tests/client/test_client.py
Expand Up @@ -691,3 +691,38 @@ def test_set_task_run_state_with_error(monkeypatch):
with pytest.raises(ClientError) as exc:
client.set_task_run_state(task_run_id="76-salt", version=0, state=Pending())
assert "something went wrong" in str(exc.value)


def test_write_log_successfully(monkeypatch):
response = {"data": {"writeRunLog": {"success": True}}}
post = MagicMock(return_value=MagicMock(json=MagicMock(return_value=response)))
session = MagicMock()
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
):
client = Client()

assert client.write_run_log(flow_run_id="1") is None


def test_write_log_with_error(monkeypatch):
response = {
"data": {"writeRunLog": None},
"errors": [{"message": "something went wrong"}],
}
post = MagicMock(return_value=MagicMock(json=MagicMock(return_value=response)))
session = MagicMock()
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
):
client = Client()

with pytest.raises(ClientError) as exc:
client.write_run_log(flow_run_id="1")
assert "something went wrong" in str(exc.value)
34 changes: 10 additions & 24 deletions tests/environments/execution/test_cloud_environment.py
Expand Up @@ -179,11 +179,7 @@ def test_populate_job_yaml():
job = yaml.safe_load(job_file)

with set_temporary_config(
{
"cloud.graphql": "gql_test",
"cloud.log": "log_test",
"cloud.auth_token": "auth_test",
}
{"cloud.graphql": "gql_test", "cloud.auth_token": "auth_test"}
):
with prefect.context(flow_run_id="id_test", namespace="namespace_test"):
yaml_obj = environment._populate_job_yaml(
Expand All @@ -203,12 +199,11 @@ def test_populate_job_yaml():
env = yaml_obj["spec"]["template"]["spec"]["containers"][0]["env"]

assert env[0]["value"] == "gql_test"
assert env[1]["value"] == "log_test"
assert env[2]["value"] == "auth_test"
assert env[3]["value"] == "id_test"
assert env[4]["value"] == "namespace_test"
assert env[5]["value"] == "test1/test2:test3"
assert env[6]["value"] == "test4"
assert env[1]["value"] == "auth_test"
assert env[2]["value"] == "id_test"
assert env[3]["value"] == "namespace_test"
assert env[4]["value"] == "test1/test2:test3"
assert env[5]["value"] == "test4"

assert (
yaml_obj["spec"]["template"]["spec"]["containers"][0]["image"]
Expand All @@ -227,11 +222,7 @@ def test_populate_worker_pod_yaml():
pod = yaml.safe_load(pod_file)

with set_temporary_config(
{
"cloud.graphql": "gql_test",
"cloud.log": "log_test",
"cloud.auth_token": "auth_test",
}
{"cloud.graphql": "gql_test", "cloud.auth_token": "auth_test"}
):
with prefect.context(flow_run_id="id_test", image="my_image"):
yaml_obj = environment._populate_worker_pod_yaml(yaml_obj=pod)
Expand All @@ -242,9 +233,8 @@ def test_populate_worker_pod_yaml():
env = yaml_obj["spec"]["containers"][0]["env"]

assert env[0]["value"] == "gql_test"
assert env[1]["value"] == "log_test"
assert env[2]["value"] == "auth_test"
assert env[3]["value"] == "id_test"
assert env[1]["value"] == "auth_test"
assert env[2]["value"] == "id_test"

assert yaml_obj["spec"]["containers"][0]["image"] == "my_image"

Expand All @@ -260,11 +250,7 @@ def test_populate_worker_pod_yaml_with_private_registry():
pod = yaml.safe_load(pod_file)

with set_temporary_config(
{
"cloud.graphql": "gql_test",
"cloud.log": "log_test",
"cloud.auth_token": "auth_test",
}
{"cloud.graphql": "gql_test", "cloud.auth_token": "auth_test"}
):
with prefect.context(
flow_run_id="id_test", image="my_image", namespace="foo-man"
Expand Down
5 changes: 2 additions & 3 deletions tests/utilities/test_logging.py
Expand Up @@ -25,11 +25,10 @@ def test_root_logger_level_responds_to_config():
def test_remote_handler_is_configured_for_cloud():
try:
with utilities.configuration.set_temporary_config(
{"logging.log_to_cloud": True, "cloud.log": "http://foo.bar:1800/log"}
{"logging.log_to_cloud": True}
):
logger = utilities.logging.configure_logging(testing=True)
assert hasattr(logger.handlers[-1], "client")
assert logger.handlers[-1].logger_server == "http://foo.bar:1800/log"
finally:
# reset root_logger
logger = utilities.logging.configure_logging(testing=True)
Expand All @@ -39,7 +38,7 @@ def test_remote_handler_is_configured_for_cloud():
def test_remote_handler_captures_errors_then_passes():
try:
with utilities.configuration.set_temporary_config(
{"logging.log_to_cloud": True, "cloud.log": "http://foo.bar:1800/log"}
{"logging.log_to_cloud": True}
):
logger = utilities.logging.configure_logging(testing=True)
assert hasattr(logger.handlers[-1], "client")
Expand Down

0 comments on commit 5e76dee

Please sign in to comment.