Skip to content

Commit

Permalink
Add a "log framework error" hook to execute_step invocations, similar…
Browse files Browse the repository at this point in the history
… to the one in execute_run (#6690)

Summary:
Today we had an issue where a call to execute_step failed in system code, and the event log was left blank for that step. This PR attempts to make it so that framework errors like that are still surfaced in the event log and associated with that step.
  • Loading branch information
gibsondan committed Feb 24, 2022
1 parent d90d1dd commit 4d89075
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 35 deletions.
102 changes: 67 additions & 35 deletions python_modules/dagster/dagster/cli/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
get_working_directory_from_kwargs,
python_origin_target_argument,
)
from dagster.core.errors import DagsterExecutionInterruptedError
from dagster.core.events import DagsterEventType, EngineEventData
from dagster.core.execution.api import create_execution_plan, execute_plan_iterator
from dagster.core.execution.run_cancellation_thread import start_run_cancellation_thread
Expand All @@ -22,6 +23,7 @@
from dagster.grpc.types import ExecuteRunArgs, ExecuteStepArgs, ResumeRunArgs
from dagster.serdes import deserialize_as, serialize_dagster_namedtuple
from dagster.seven import nullcontext
from dagster.utils.error import serializable_error_info_from_exc_info
from dagster.utils.hosted_user_process import recon_pipeline_from_origin
from dagster.utils.interrupts import capture_interrupts
from dagster.utils.log import configure_loggers
Expand Down Expand Up @@ -299,52 +301,82 @@ def execute_step_command(input_json):
else DagsterInstance.get()
) as instance:
pipeline_run = instance.get_run_by_id(args.pipeline_run_id)
check.inst(
pipeline_run,
PipelineRun,
"Pipeline run with id '{}' not found for step execution".format(
args.pipeline_run_id
),
)

if args.should_verify_step:
success = verify_step(
instance,
pipeline_run,
args.known_state.get_retry_state(),
args.step_keys_to_execute,
)
if not success:
return

recon_pipeline = recon_pipeline_from_origin(
args.pipeline_origin
).subset_for_execution_from_existing_pipeline(pipeline_run.solids_to_execute)

execution_plan = create_execution_plan(
recon_pipeline,
run_config=pipeline_run.run_config,
step_keys_to_execute=args.step_keys_to_execute,
mode=pipeline_run.mode,
known_state=args.known_state,
)

buff = []

for event in execute_plan_iterator(
execution_plan,
recon_pipeline,
pipeline_run,
for event in _execute_step_command_body(
args,
instance,
run_config=pipeline_run.run_config,
retry_mode=args.retry_mode,
pipeline_run,
):
buff.append(serialize_dagster_namedtuple(event))

for line in buff:
click.echo(line)


def _execute_step_command_body(
args: ExecuteStepArgs, instance: DagsterInstance, pipeline_run: PipelineRun
):
single_step_key = (
args.step_keys_to_execute[0]
if args.step_keys_to_execute and len(args.step_keys_to_execute) == 1
else None
)
try:
check.inst(
pipeline_run,
PipelineRun,
"Pipeline run with id '{}' not found for step execution".format(args.pipeline_run_id),
)

if args.should_verify_step:
success = verify_step(
instance,
pipeline_run,
args.known_state.get_retry_state(),
args.step_keys_to_execute,
)
if not success:
return

recon_pipeline = recon_pipeline_from_origin(
args.pipeline_origin
).subset_for_execution_from_existing_pipeline(pipeline_run.solids_to_execute)

execution_plan = create_execution_plan(
recon_pipeline,
run_config=pipeline_run.run_config,
step_keys_to_execute=args.step_keys_to_execute,
mode=pipeline_run.mode,
known_state=args.known_state,
)

yield from execute_plan_iterator(
execution_plan,
recon_pipeline,
pipeline_run,
instance,
run_config=pipeline_run.run_config,
retry_mode=args.retry_mode,
)
except (KeyboardInterrupt, DagsterExecutionInterruptedError):
yield instance.report_engine_event(
message="Step execution terminated by interrupt",
pipeline_run=pipeline_run,
step_key=single_step_key,
)
raise
except Exception:
yield instance.report_engine_event(
"An exception was thrown during step execution that is likely a framework error, rather than an error in user code.",
pipeline_run,
EngineEventData.engine_error(serializable_error_info_from_exc_info(sys.exc_info())),
step_key=single_step_key,
)
raise


@api_cli.command(name="grpc", help="Serve the Dagster inter-process API over GRPC")
@click.option(
"--port",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from click.testing import CliRunner
from dagster.cli import api
from dagster.cli.api import ExecuteRunArgs, ExecuteStepArgs, verify_step
from dagster.core.execution.plan.state import KnownExecutionState
from dagster.core.execution.retries import RetryState
from dagster.core.execution.stats import RunStepKeyStatsSnapshot
from dagster.core.host_representation import PipelineHandle
Expand Down Expand Up @@ -298,3 +299,58 @@ def test_execute_step_verify_step():
# # Check that verify fails for step that has already run (case 1)
retries = RetryState()
assert not verify_step(instance, run, retries, step_keys_to_execute=["do_something"])


@mock.patch("dagster.cli.api.verify_step")
def test_execute_step_verify_step_framework_error(mock_verify_step):
with get_foo_pipeline_handle() as pipeline_handle:
runner = CliRunner()

mock_verify_step.side_effect = Exception("Unexpected framework error text")

with instance_for_test(
overrides={
"compute_logs": {
"module": "dagster.core.storage.noop_compute_log_manager",
"class": "NoOpComputeLogManager",
}
}
) as instance:
run = create_run_for_test(
instance,
pipeline_name="foo",
run_id="new_run",
)

input_json = serialize_dagster_namedtuple(
ExecuteStepArgs(
pipeline_origin=pipeline_handle.get_python_origin(),
pipeline_run_id=run.run_id,
step_keys_to_execute=["fake_step"],
instance_ref=instance.get_ref(),
should_verify_step=True,
known_state=KnownExecutionState(
{},
{
"blah": {"result": ["0", "1", "2"]},
},
),
)
)
result = runner.invoke(api.execute_step_command, [input_json])

assert result.exit_code != 0

# Framework error logged to event log
logs = instance.all_logs(run.run_id)

log_entry = logs[0]
assert (
log_entry.message
== "An exception was thrown during step execution that is likely a framework error, rather than an error in user code."
)
assert log_entry.step_key == "fake_step"

assert "Unexpected framework error text" in str(
log_entry.dagster_event.event_specific_data.error
)

0 comments on commit 4d89075

Please sign in to comment.