Skip to content

Commit

Permalink
Log failed compute log to event log rather than buried in a logging o…
Browse files Browse the repository at this point in the history
…utput (#7093)

Summary:
This surfaces more aggressively when there's a failure while uploading logs.

Test Plan:
BK
  • Loading branch information
gibsondan committed Mar 16, 2022
1 parent 862fe7e commit 69f5e3b
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 21 deletions.
2 changes: 1 addition & 1 deletion python_modules/dagster/dagster/core/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,7 @@ def engine_event(
pipeline_context: IPlanContext,
message: str,
event_specific_data: Optional["EngineEventData"] = None,
step_handle: Optional[StepHandle] = None,
step_handle: Optional[Union[StepHandle, ResolvedFromDynamicStepHandle]] = None,
) -> "DagsterEvent":
return DagsterEvent.from_pipeline(
DagsterEventType.ENGINE_EVENT,
Expand Down
31 changes: 17 additions & 14 deletions python_modules/dagster/dagster/core/execution/plan/execute_plan.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import logging
import sys
from contextlib import ExitStack
from typing import Iterator, List, cast
Expand All @@ -12,7 +11,7 @@
HookExecutionError,
user_code_error_boundary,
)
from dagster.core.events import DagsterEvent
from dagster.core.events import DagsterEvent, EngineEventData
from dagster.core.execution.context.system import PlanExecutionContext, StepExecutionContext
from dagster.core.execution.plan.execute_step import core_dagster_event_sequence_for_step
from dagster.core.execution.plan.objects import (
Expand Down Expand Up @@ -70,13 +69,15 @@ def inner_plan_execution_iterator(
)
)
except Exception as e:
log_capture_error = e
logging.exception(
"Exception while setting up compute log capture for step %s in run %s: %s",
step_context.step.key,
step_context.pipeline_run.run_id,
e,
yield DagsterEvent.engine_event(
pipeline_context=pipeline_context,
message="Exception while setting up compute log capture",
event_specific_data=EngineEventData(
error=serializable_error_info_from_exc_info(sys.exc_info())
),
step_handle=step_context.step.handle,
)
log_capture_error = e

if not log_capture_error:
yield DagsterEvent.capture_logs(
Expand All @@ -93,12 +94,14 @@ def inner_plan_execution_iterator(

try:
stack.close()
except Exception as e:
logging.exception(
"Exception while cleaning up compute log capture for step %s in run %s: %s",
step_context.step.key,
step_context.pipeline_run.run_id,
e,
except Exception:
yield DagsterEvent.engine_event(
pipeline_context=pipeline_context,
message="Exception while cleaning up compute log capture",
event_specific_data=EngineEventData(
error=serializable_error_info_from_exc_info(sys.exc_info())
),
step_handle=step_context.step.handle,
)

# process skips from failures or uncovered inputs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,24 @@ def broken_compute_log_manager_instance(fail_on_setup=False, fail_on_teardown=Fa
)


def _has_setup_exception(execute_result):
return any(
[
"Exception while setting up compute log capture" in str(event)
for event in execute_result.all_events
]
)


def _has_teardown_exception(execute_result):
return any(
[
"Exception while cleaning up compute log capture" in str(event)
for event in execute_result.all_events
]
)


def test_broken_compute_log_manager():
@op
def yay(context):
Expand All @@ -94,13 +112,31 @@ def boo_job():
boo()

with broken_compute_log_manager_instance(fail_on_setup=True) as instance:
assert yay_job.execute_in_process(instance=instance).success
assert not boo_job.execute_in_process(instance=instance, raise_on_error=False).success
yay_result = yay_job.execute_in_process(instance=instance)
assert yay_result.success
assert _has_setup_exception(yay_result)

boo_result = boo_job.execute_in_process(instance=instance, raise_on_error=False)
assert not boo_result.success
assert _has_setup_exception(boo_result)

with broken_compute_log_manager_instance(fail_on_teardown=True) as instance:
assert yay_job.execute_in_process(instance=instance).success
assert not boo_job.execute_in_process(instance=instance, raise_on_error=False).success
yay_result = yay_job.execute_in_process(instance=instance)
assert yay_result.success
assert _has_teardown_exception(yay_result)

boo_result = boo_job.execute_in_process(instance=instance, raise_on_error=False)

assert not boo_result.success
assert _has_teardown_exception(boo_result)

with broken_compute_log_manager_instance() as instance:
assert yay_job.execute_in_process(instance=instance).success
assert not boo_job.execute_in_process(instance=instance, raise_on_error=False).success
yay_result = yay_job.execute_in_process(instance=instance)
assert yay_result.success
assert not _has_setup_exception(yay_result)
assert not _has_teardown_exception(yay_result)

boo_result = boo_job.execute_in_process(instance=instance, raise_on_error=False)
assert not boo_result.success
assert not _has_setup_exception(boo_result)
assert not _has_teardown_exception(boo_result)

0 comments on commit 69f5e3b

Please sign in to comment.