Skip to content

Commit

Permalink
RFC: Keep firing hooks even if a framework exception is raised mid-ex…
Browse files Browse the repository at this point in the history
…ecution (#7652)

Summary:
We should still fire a failure hook even if there's a framework error.

Test Plan:
Did a manual test by raising an exception in core_dagster_event_sequence_for_step - will write a real test next.
  • Loading branch information
gibsondan committed May 4, 2022
1 parent c81a825 commit 504aa08
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 23 deletions.
32 changes: 11 additions & 21 deletions python_modules/dagster/dagster/core/execution/plan/execute_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,17 +191,15 @@ def dagster_event_sequence_for_step(
The run was interrupted in the middle of execution (typically by a
termination request).
(5) User error:
(5) Dagster framework error:
The framework raised a DagsterError that indicates a usage error
or some other error not communicated by a user-thrown exception. For example,
if the user yields an object out of a compute function that is not a
proper event (not an Output, ExpectationResult, etc).
(6) Framework failure:
An unexpected error occurred. This is a framework error. Either there
has been an internal error in the framework OR we have forgotten to put a
user code error boundary around invoked user-space code. These terminate
the computation immediately (by re-raising).
(6) All other errors:
An unexpected error occurred. Either there has been an internal error in the framework
OR we have forgotten to put a user code error boundary around invoked user-space code.
The "raised_dagster_errors" context manager can be used to force these errors to be
Expand Down Expand Up @@ -313,24 +311,16 @@ def dagster_event_sequence_for_step(
)
raise interrupt_error

# case (5) in top comment
except DagsterError as dagster_error:
step_context.capture_step_exception(dagster_error)
# cases (5) and (6) in top comment
except BaseException as error:
step_context.capture_step_exception(error)
yield step_failure_event_from_exc_info(
step_context,
sys.exc_info(),
error_source=ErrorSource.FRAMEWORK_ERROR,
error_source=ErrorSource.FRAMEWORK_ERROR
if isinstance(error, DagsterError)
else ErrorSource.UNEXPECTED_ERROR,
)

if step_context.raise_on_error:
raise dagster_error

# case (6) in top comment
except BaseException as unexpected_exception:
step_context.capture_step_exception(unexpected_exception)
yield step_failure_event_from_exc_info(
step_context,
sys.exc_info(),
error_source=ErrorSource.UNEXPECTED_ERROR,
)
raise unexpected_exception
raise error
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from collections import defaultdict
from unittest import mock

import pytest

Expand All @@ -22,7 +23,7 @@
from dagster.core.definitions.decorators.hook_decorator import event_list_hook
from dagster.core.definitions.events import HookExecutionResult
from dagster.core.definitions.policy import RetryPolicy
from dagster.core.errors import DagsterInvalidDefinitionError
from dagster.core.errors import DagsterExecutionInterruptedError, DagsterInvalidDefinitionError
from dagster.core.test_utils import instance_for_test


Expand Down Expand Up @@ -271,6 +272,48 @@ def failed_solid(_):
assert "succeeded_solid_with_hook" not in called_hook_to_solids["a_named_failure_hook"]


def test_failure_hook_framework_exception():

called_hook_to_solids = defaultdict(list)

@failure_hook
def a_failure_hook(context):
called_hook_to_solids[context.hook_def.name].append(context.solid.name)

@op
def my_op(_):
# this solid shouldn't trigger failure hooks
pass

@job(hooks={a_failure_hook})
def my_job():
my_op()

with mock.patch(
"dagster.core.execution.plan.execute_plan.core_dagster_event_sequence_for_step"
) as mocked_event_sequence:
mocked_event_sequence.side_effect = Exception("Framework exception during execution")

result = my_job.execute_in_process(raise_on_error=False)
assert not result.success

# Hook runs when a framework error
assert "my_op" in called_hook_to_solids["a_failure_hook"]

called_hook_to_solids = defaultdict(list)

# Does not run if the execution is interrupted
mocked_event_sequence.side_effect = DagsterExecutionInterruptedError(
"Execution interrupted during execution"
)

result = my_job.execute_in_process(raise_on_error=False)
assert not result.success

# test if hooks are run for the given solids
assert "my_op" not in called_hook_to_solids["a_failure_hook"]


def test_success_hook_event():
@success_hook
def a_hook(_):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ def test_exity_run(run_config): # pylint: disable=redefined-outer-name
assert _message_exists(event_records, 'Execution of step "exity_solid" failed.')
assert _message_exists(
event_records,
'Execution of run for "exity_pipeline" failed. An exception was thrown during execution.',
"Execution of run for \"exity_pipeline\" failed. Steps failed: ['exity_solid']",
)


Expand Down

0 comments on commit 504aa08

Please sign in to comment.