Skip to content

Commit

Permalink
Don't call it a 'framework error' when a run worker unexpectedly rest…
Browse files Browse the repository at this point in the history
…arts (#7885)

Summary:
A user pointed out that it's confusing if Dagster times out a run because it took too long to start, then it eventually starts up, then Dagster confusingly claims it was a framework error. Change that specific case to be more of an FYI instead of a framework error.

There's also a good case to be made I think that we shouldn't treat the existing case when this happens when the run was already started as a 'framework error' either (although we should certainly still fail the run if the cluster interrupts a run and then tries to start it up again from the beginning)

Test Plan: Bk
  • Loading branch information
gibsondan committed May 17, 2022
1 parent 252f285 commit 06149b6
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 16 deletions.
28 changes: 22 additions & 6 deletions python_modules/dagster/dagster/core/execution/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,30 @@ def gen_ignore_duplicate_run_worker():
pipeline_run,
)

return gen_ignore_duplicate_run_worker()
elif pipeline_run.is_finished:

def gen_ignore_duplicate_run_worker():
yield instance.report_engine_event(
"Ignoring a run worker that started after the run had already finished.",
pipeline_run,
)

return gen_ignore_duplicate_run_worker()
else:
raise Exception(
f"{pipeline_run.pipeline_name} ({pipeline_run.run_id}) started "
f"a new run while the run was already in state {pipeline_run.status}. "
"This most frequently happens when the run worker unexpectedly stops and is "
"restarted by the cluster.",
)

def gen_fail_restarted_run_worker():
yield instance.report_engine_event(
f"{pipeline_run.pipeline_name} ({pipeline_run.run_id}) started "
f"a new run worker while the run was already in state {pipeline_run.status}. "
"This most frequently happens when the run worker unexpectedly stops and is "
"restarted by the cluster. Marking the run as failed.",
pipeline_run,
)
yield instance.report_run_failed(pipeline_run)

return gen_fail_restarted_run_worker()

else:
check.invariant(
pipeline_run.status == PipelineRunStatus.STARTED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,27 @@ def test_execute_run_fail_pipeline():

assert "RUN_FAILURE" in result.stdout, "no match, result: {}".format(result)

# Framework errors (e.g. running a run that has already run) also result in a non-zero error code
result = runner.invoke(api.execute_run_command, [input_json_raise_on_failure])
assert result.exit_code != 0, str(result.stdout)
with mock.patch(
"dagster.core.execution.api.pipeline_execution_iterator"
) as _mock_pipeline_execution_iterator:
_mock_pipeline_execution_iterator.side_effect = Exception("Framework error")

run = create_run_for_test(
instance, pipeline_name="foo", run_id="new_run_framework_error"
)

input_json_raise_on_failure = serialize_dagster_namedtuple(
ExecuteRunArgs(
pipeline_origin=pipeline_handle.get_python_origin(),
pipeline_run_id=run.run_id,
instance_ref=instance.get_ref(),
set_exit_code_on_failure=True,
)
)

# Framework errors also result in a non-zero error code
result = runner.invoke(api.execute_run_command, [input_json_raise_on_failure])
assert result.exit_code != 0, str(result.stdout)


def test_execute_run_cannot_load():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,16 @@ def event_callback(record):
mode="default",
).with_status(PipelineRunStatus.SUCCESS)

with pytest.raises(
Exception,
match=r"basic_resource_pipeline \({}\) started a new "
r"run while the run was already in state DagsterRunStatus.SUCCESS.".format(
pipeline_run.run_id
),
):
events = list(
execute_run_iterator(InMemoryPipeline(pipeline_def), pipeline_run, instance=instance)
)

assert any(
[
"Ignoring a run worker that started after the run had already finished." in event
for event in events
]
)

with instance_for_test(
overrides={
Expand Down Expand Up @@ -180,6 +182,72 @@ def event_callback(record):
)


def test_restart_running_run_worker():
def event_callback(_record):
pass

with instance_for_test() as instance:
pipeline_def = PipelineDefinition(
name="basic_resource_pipeline",
solid_defs=[resource_solid],
mode_defs=[
ModeDefinition(
resource_defs={"a": resource_a, "b": resource_b},
logger_defs={"callback": construct_event_logger(event_callback)},
)
],
)
pipeline_run = instance.create_run_for_pipeline(
pipeline_def=pipeline_def,
run_config={"loggers": {"callback": {}}},
mode="default",
).with_status(PipelineRunStatus.STARTED)

events = list(
execute_run_iterator(InMemoryPipeline(pipeline_def), pipeline_run, instance=instance)
)

assert any(
[
f"{pipeline_run.pipeline_name} ({pipeline_run.run_id}) started a new run worker while the run was already in state DagsterRunStatus.STARTED. "
in event.message
for event in events
]
)

assert instance.get_run_by_id(pipeline_run.run_id).status == PipelineRunStatus.FAILURE


def test_start_run_worker_after_run_failure():
def event_callback(_record):
pass

with instance_for_test() as instance:
pipeline_def = PipelineDefinition(
name="basic_resource_pipeline",
solid_defs=[resource_solid],
mode_defs=[
ModeDefinition(
resource_defs={"a": resource_a, "b": resource_b},
logger_defs={"callback": construct_event_logger(event_callback)},
)
],
)
pipeline_run = instance.create_run_for_pipeline(
pipeline_def=pipeline_def,
run_config={"loggers": {"callback": {}}},
mode="default",
).with_status(PipelineRunStatus.FAILURE)

event = next(
execute_run_iterator(InMemoryPipeline(pipeline_def), pipeline_run, instance=instance)
)
assert (
"Ignoring a run worker that started after the run had already finished."
in event.message
)


def test_execute_canceled_state():
def event_callback(_record):
pass
Expand Down

0 comments on commit 06149b6

Please sign in to comment.