Skip to content

Commit

Permalink
Test test_create_reexecuted_run_from_failure (#7566)
Browse files Browse the repository at this point in the history
  • Loading branch information
johannkm committed Apr 26, 2022
1 parent 36e7e6a commit 38c650e
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 27 deletions.
26 changes: 26 additions & 0 deletions python_modules/dagster/dagster/core/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import sys
import tempfile
import time
from collections import defaultdict
from contextlib import ExitStack, contextmanager

import pendulum
Expand Down Expand Up @@ -496,6 +497,31 @@ def get_logger_output_from_capfd(capfd, logger_name):
)


def _step_events(instance, run):
events_by_step = defaultdict(set)
logs = instance.all_logs(run.run_id)
for record in logs:
if not record.is_dagster_event or not record.step_key:
continue
events_by_step[record.step_key] = record.dagster_event.event_type_value
return events_by_step


def step_did_not_run(instance, run, step_name):
step_events = _step_events(instance, run)[step_name]
return len(step_events) == 0


def step_succeeded(instance, run, step_name):
step_events = _step_events(instance, run)[step_name]
return "STEP_SUCCESS" in step_events


def step_failed(instance, run, step_name):
step_events = _step_events(instance, run)[step_name]
return "STEP_FAILURE" in step_events


def test_counter():
@traced
async def foo():
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import os

import pytest

from dagster import DagsterInstance, execute_pipeline, job, op, reconstructable, repository
from dagster.core.storage.pipeline_run import PipelineRunStatus
from dagster.core.storage.tags import RESUME_RETRY_TAG
from dagster.core.test_utils import (
environ,
instance_for_test,
poll_for_finished_run,
step_did_not_run,
step_succeeded,
)
from dagster.core.workspace.context import WorkspaceProcessContext
from dagster.core.workspace.load_target import PythonFileTarget
from dagster.seven import get_system_temp_directory

CONDITIONAL_FAIL_ENV = "DAGSTER_CONDIIONAL_FAIL"


@op
def before_failure():
return "hello"


@op
def conditional_fail(_, input_value):
if os.environ.get(CONDITIONAL_FAIL_ENV):
raise Exception("env set, failing!")

return input_value


@op
def after_failure(_, input_value):
return input_value


@job
def conditional_fail_job():
after_failure(conditional_fail(before_failure()))


@repository
def repo():
return [conditional_fail_job]


@pytest.fixture
def instance():
with instance_for_test() as instance:
yield instance


@pytest.fixture
def workspace(instance):
with WorkspaceProcessContext(
instance,
PythonFileTarget(
python_file=__file__,
attribute=None,
working_directory=None,
location_name="repo_loc",
),
) as workspace_process_context:
yield workspace_process_context.create_request_context()


def test_create_reexecuted_run_from_failure(instance: DagsterInstance, workspace):
# trigger failure in the conditionally_fail op
with environ({CONDITIONAL_FAIL_ENV: "1"}):
result = execute_pipeline(reconstructable(conditional_fail_job), instance=instance)

assert not result.success

parent_run = instance.get_run_by_id(result.run_id)
repository_location = workspace.get_repository_location("repo_loc")
external_pipeline = repository_location.get_repository("repo").get_full_external_pipeline(
"conditional_fail_job"
)

run = instance.create_reexecuted_run_from_failure(
parent_run, repository_location, external_pipeline
)

assert run.tags[RESUME_RETRY_TAG] == "true"
assert set(run.step_keys_to_execute) == {"conditional_fail", "after_failure"} # type: ignore

instance.launch_run(run.run_id, workspace)
run = poll_for_finished_run(instance, run.run_id)

assert run.status == PipelineRunStatus.SUCCESS
assert step_did_not_run(instance, run, "before_failure")
assert step_succeeded(instance, run, "conditional_fail")
assert step_succeeded(instance, run, "after_failure")
34 changes: 7 additions & 27 deletions python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import string
import sys
import time
from collections import defaultdict
from contextlib import contextmanager

import pendulum
Expand All @@ -29,7 +28,13 @@
)
from dagster.core.storage.pipeline_run import PipelineRunStatus, RunsFilter
from dagster.core.storage.tags import BACKFILL_ID_TAG, PARTITION_NAME_TAG, PARTITION_SET_TAG
from dagster.core.test_utils import create_test_daemon_workspace, instance_for_test
from dagster.core.test_utils import (
create_test_daemon_workspace,
instance_for_test,
step_did_not_run,
step_failed,
step_succeeded,
)
from dagster.core.types.loadable_target_origin import LoadableTargetOrigin
from dagster.core.workspace.load_target import PythonFileTarget
from dagster.daemon import get_default_daemon_logger
Expand All @@ -45,16 +50,6 @@ def _failure_flag_file():
return os.path.join(get_system_temp_directory(), "conditionally_fail")


def _step_events(instance, run):
events_by_step = defaultdict(set)
logs = instance.all_logs(run.run_id)
for record in logs:
if not record.is_dagster_event or not record.step_key:
continue
events_by_step[record.step_key] = record.dagster_event.event_type_value
return events_by_step


@solid
def always_succeed(_):
return 1
Expand Down Expand Up @@ -237,21 +232,6 @@ def instance_for_context(external_repo_context, overrides=None):
yield (instance, workspace, external_repo)


def step_did_not_run(instance, run, step_name):
step_events = _step_events(instance, run)[step_name]
return len(step_events) == 0


def step_succeeded(instance, run, step_name):
step_events = _step_events(instance, run)[step_name]
return "STEP_SUCCESS" in step_events


def step_failed(instance, run, step_name):
step_events = _step_events(instance, run)[step_name]
return "STEP_FAILURE" in step_events


def wait_for_all_runs_to_start(instance, timeout=10):
start_time = time.time()
while True:
Expand Down

0 comments on commit 38c650e

Please sign in to comment.