Skip to content

Commit

Permalink
Fix celery docker jobs that involve writing directly to command-line …
Browse files Browse the repository at this point in the history
…output (#7665)

Summary:
This brings in the celery docker executor to match the logic in the celery k8s executor for turning process logs into structured events. Before, logging in an op or resources would lead to a cryptic JSON decode error.

Test Plan: BK
  • Loading branch information
gibsondan committed May 2, 2022
1 parent 87c0f27 commit 90b4739
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -251,20 +251,39 @@ def demo_job_celery():
return demo_job_celery


@solid(required_resource_keys={"buggy_resource"})
def hello(context):
context.log.info("Hello, world from IMAGE 1")


def define_docker_celery_pipeline():
from dagster_celery_docker import celery_docker_executor

@resource
def resource_with_output():
print("writing to stdout") # pylint: disable=print-call
return 42

@solid(required_resource_keys={"resource_with_output"})
def use_resource_with_output_solid():
pass

@pipeline(
mode_defs=[
ModeDefinition(
resource_defs={"s3": s3_resource, "io_manager": s3_pickle_io_manager},
resource_defs={
"s3": s3_resource,
"io_manager": s3_pickle_io_manager,
"resource_with_output": resource_with_output,
},
executor_defs=default_executors + [celery_docker_executor],
)
]
)
def docker_celery_pipeline():
count_letters(multiply_the_word())
get_environment_solid()
use_resource_with_output_solid()

return docker_celery_pipeline

Expand Down
47 changes: 47 additions & 0 deletions python_modules/dagster/dagster/core/events/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from json import JSONDecodeError

from dagster import check
from dagster.serdes import deserialize_json_to_dagster_namedtuple


def filter_dagster_events_from_cli_logs(log_lines):
"""
Filters the raw log lines from a dagster-cli invocation to return only the lines containing json.
- Log lines don't necessarily come back in order
- Something else might log JSON
- Docker appears to silently split very long log lines -- this is undocumented behavior
TODO: replace with reading event logs from the DB
"""
check.list_param(log_lines, "log_lines", str)

coalesced_lines = []
buffer = []
in_split_line = False
for line in log_lines:
line = line.strip()
if not in_split_line and line.startswith("{"):
if line.endswith("}"):
coalesced_lines.append(line)
else:
buffer.append(line)
in_split_line = True
elif in_split_line:
buffer.append(line)
if line.endswith("}"): # Note: hack, this may not have been the end of the full object
coalesced_lines.append("".join(buffer))
buffer = []
in_split_line = False

events = []
for line in coalesced_lines:
try:
events.append(deserialize_json_to_dagster_namedtuple(line))
except JSONDecodeError:
pass
except check.CheckError:
pass

return events
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from dagster_k8s import utils

from dagster import check
from dagster.core.events import DagsterEvent
from dagster.core.events.utils import filter_dagster_events_from_cli_logs
from dagster.core.execution.plan.objects import StepSuccessData


def test_filter_dagster_events_from_pod_logs():
def test_filter_dagster_events_from_cli_logs():

sameple_output = """
2020-07-17 11:31:58 - dagster - DEBUG - foo - new_run - STEP_START - Started execution of step "do_something".
Expand Down Expand Up @@ -58,7 +57,7 @@ def test_filter_dagster_events_from_pod_logs():
""".split(
"\n"
)
res = utils.filter_dagster_events_from_pod_logs(sameple_output)
res = filter_dagster_events_from_cli_logs(sameple_output)

assert len(res) == 7

Expand All @@ -67,7 +66,7 @@ def test_filter_dagster_events_from_pod_logs():
check.inst(last_event.event_specific_data, StepSuccessData)


def test_filter_dagster_events_from_pod_logs_coalesce():
def test_filter_dagster_events_from_cli_logs_coalesce():
logs = """
{"__class__": "DagsterEvent", "event_specific
_data": {"__class__": "StepSuccessData", "duration_ms": 13.923579000000075}, "event_typ
Expand All @@ -76,7 +75,7 @@ def test_filter_dagster_events_from_pod_logs_coalesce():
""".split(
"\n"
)
res = utils.filter_dagster_events_from_pod_logs(logs)
res = filter_dagster_events_from_cli_logs(logs)
assert len(res) == 1

event = res[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
)
from dagster.cli.api import ExecuteStepArgs
from dagster.core.events import EngineEventData
from dagster.core.events.utils import filter_dagster_events_from_cli_logs
from dagster.core.execution.retries import RetryMode
from dagster.core.storage.pipeline_run import PipelineRun
from dagster.serdes import pack_value, serialize_dagster_namedtuple, unpack_value
Expand Down Expand Up @@ -328,7 +329,8 @@ def _execute_step_docker(
if res is None:
raise Exception("No response from execute_step in CeleryDockerExecutor")

serialized_events += [event for event in res.split("\n") if event]
events = filter_dagster_events_from_cli_logs(res.split("\n"))
serialized_events += [serialize_dagster_namedtuple(event) for event in events]

return serialized_events

Expand Down
2 changes: 1 addition & 1 deletion python_modules/libraries/dagster-celery-docker/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ envlist = py{39,38,37,36}-{unix,windows},pylint
usedevelop = true
setenv =
VIRTUALENV_PIP=21.3.1
passenv = CI_* COVERALLS_REPO_TOKEN BUILDKITE AWS_SECRET_ACCESS_KEY AWS_ACCESS_KEY_ID DAGSTER_DOCKER_* POSTGRES_TEST_DB_HOST
passenv = CI_* COVERALLS_REPO_TOKEN GOOGLE_APPLICATION_CREDENTIALS BUILDKITE AWS_SECRET_ACCESS_KEY AWS_ACCESS_KEY_ID DAGSTER_DOCKER_* POSTGRES_TEST_DB_HOST
deps =
-e ../../dagster[test]
-e ../../dagster-graphql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
)
from dagster_k8s.utils import (
delete_job,
filter_dagster_events_from_pod_logs,
get_pod_names_in_job,
retrieve_pod_logs,
wait_for_job_success,
Expand All @@ -42,6 +41,7 @@
from dagster.core.errors import DagsterUnmetExecutorRequirementsError
from dagster.core.events import EngineEventData
from dagster.core.events.log import EventLogEntry
from dagster.core.events.utils import filter_dagster_events_from_cli_logs
from dagster.core.execution.plan.objects import StepFailureData, UserFailureData
from dagster.core.execution.retries import RetryMode
from dagster.core.storage.pipeline_run import PipelineRun, PipelineRunStatus
Expand Down Expand Up @@ -546,7 +546,7 @@ def _execute_step_k8s_job(
step_key=step_key,
)

events += filter_dagster_events_from_pod_logs(logs)
events += filter_dagster_events_from_cli_logs(logs)
serialized_events = [serialize_dagster_namedtuple(event) for event in events]
return serialized_events

Expand Down
46 changes: 0 additions & 46 deletions python_modules/libraries/dagster-k8s/dagster_k8s/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import re

from dagster import check, seven
from dagster.serdes import deserialize_json_to_dagster_namedtuple

from .client import (
DEFAULT_JOB_POD_COUNT,
DEFAULT_WAIT_BETWEEN_ATTEMPTS,
Expand Down Expand Up @@ -79,46 +76,3 @@ def wait_for_pod(
return DagsterKubernetesClient.production_client().wait_for_pod(
pod_name, namespace, wait_for_state, wait_timeout, wait_time_between_attempts
)


def filter_dagster_events_from_pod_logs(log_lines):
"""
Filters the raw log lines from a dagster-cli invocation to return only the lines containing json.
- Log lines don't necessarily come back in order
- Something else might log JSON
- Docker appears to silently split very long log lines -- this is undocumented behavior
TODO: replace with reading event logs from the DB
"""
check.list_param(log_lines, "log_lines", str)

coalesced_lines = []
buffer = []
in_split_line = False
for line in log_lines:
line = line.strip()
if not in_split_line and line.startswith("{"):
if line.endswith("}"):
coalesced_lines.append(line)
else:
buffer.append(line)
in_split_line = True
elif in_split_line:
buffer.append(line)
if line.endswith("}"): # Note: hack, this may not have been the end of the full object
coalesced_lines.append("".join(buffer))
buffer = []
in_split_line = False

events = []
for line in coalesced_lines:
try:
events.append(deserialize_json_to_dagster_namedtuple(line))
except seven.JSONDecodeError:
pass
except check.CheckError:
pass

return events

0 comments on commit 90b4739

Please sign in to comment.