Skip to content

Commit

Permalink
fix: use run status to determine success for an in-process result (#6784
Browse files Browse the repository at this point in the history
)
  • Loading branch information
rexledesma committed Feb 25, 2022
1 parent b4a1715 commit 308429b
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ def core_execute_in_process(
mode=mode_def.name,
tags={**pipeline_def.tags, **(run_tags or {})},
)
run_id = pipeline_run.run_id

_execute_run_iterable = ExecuteRunWithPlanIterable(
execute_run_iterable = ExecuteRunWithPlanIterable(
execution_plan=execution_plan,
iterator=pipeline_execution_iterator,
execution_context_manager=PlanOrchestrationContextManager(
Expand All @@ -63,6 +64,15 @@ def core_execute_in_process(
raise_on_error=raise_on_error,
),
)
event_list = list(_execute_run_iterable)

return ExecuteInProcessResult(node, event_list, pipeline_run, output_capture)
event_list = []

for event in execute_run_iterable:
event_list.append(event)

if event.is_pipeline_event:
execute_instance.handle_run_event(run_id, event)

return ExecuteInProcessResult(
node, event_list, execute_instance.get_run_by_id(run_id), output_capture
)
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __init__(
@property
def success(self) -> bool:
"""bool: Whether execution was successful."""
return all([not event.is_failure for event in self._event_list])
return self._dagster_run.is_success

@property
def all_node_events(self) -> List[DagsterEvent]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,3 +279,29 @@ def my_job():
assert result.asset_observations_for_node("my_op") == [
AssetObservation(asset_key=AssetKey(["abc"]))
]


def test_dagster_run():
@op
def success_op():
return True

@job
def my_success_job():
success_op()

result = my_success_job.execute_in_process()
assert result.success
assert result.dagster_run.is_success

@op
def fail_op():
raise Exception

@job
def my_failure_job():
fail_op()

result = my_failure_job.execute_in_process(raise_on_error=False)
assert not result.success
assert not result.dagster_run.is_success

0 comments on commit 308429b

Please sign in to comment.