Skip to content

Commit

Permalink
Add reexecution strategy to auto run retries (#8718)
Browse files Browse the repository at this point in the history
Closes #8710

This is in cloud but missed OSS as a casualty of forking the daemon- need to consolidate them
  • Loading branch information
johannkm committed Jul 6, 2022
1 parent bc227bb commit a57caf3
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@

from dagster import DagsterEvent, DagsterEventType, EventLogEntry, PipelineRunStatus
from dagster.core.execution.api import create_execution_plan
from dagster.core.execution.plan.resume_retry import ReexecutionStrategy
from dagster.core.instance import DagsterInstance
from dagster.core.snap import snapshot_from_execution_plan
from dagster.core.storage.pipeline_run import RunsFilter
from dagster.core.storage.tags import MAX_RETRIES_TAG
from dagster.core.storage.tags import MAX_RETRIES_TAG, RETRY_STRATEGY_TAG
from dagster.core.test_utils import create_run_for_test, instance_for_test
from dagster.daemon.auto_run_reexecution.auto_run_reexecution import (
consume_new_runs_for_automatic_reexecution,
filter_runs_to_should_retry,
get_reexecution_strategy,
)
from dagster.daemon.auto_run_reexecution.event_log_consumer import EventLogConsumerDaemon

Expand Down Expand Up @@ -246,3 +249,32 @@ def test_daemon_enabled(instance):
)

assert EventLogConsumerDaemon.daemon_type() in instance.get_required_daemon_types()


def test_strategy(instance: DagsterInstance):
run = create_run(
instance,
status=PipelineRunStatus.FAILURE,
)
assert get_reexecution_strategy(run, instance) == None

run = create_run(
instance,
status=PipelineRunStatus.FAILURE,
tags={RETRY_STRATEGY_TAG: "FROM_FAILURE"},
)
assert get_reexecution_strategy(run, instance) == ReexecutionStrategy.FROM_FAILURE

run = create_run(
instance,
status=PipelineRunStatus.FAILURE,
tags={RETRY_STRATEGY_TAG: "ALL_STEPS"},
)
assert get_reexecution_strategy(run, instance) == ReexecutionStrategy.ALL_STEPS

run = create_run(
instance,
status=PipelineRunStatus.FAILURE,
tags={RETRY_STRATEGY_TAG: "not a strategy"},
)
assert get_reexecution_strategy(run, instance) == None
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
from dagster.core.events import EngineEventData
from dagster.core.execution.plan.resume_retry import ReexecutionStrategy
from dagster.core.instance import DagsterInstance
from dagster.core.storage.pipeline_run import DagsterRunStatus, RunRecord
from dagster.core.storage.tags import MAX_RETRIES_TAG, RETRY_NUMBER_TAG
from dagster.core.storage.pipeline_run import DagsterRunStatus, PipelineRun, RunRecord
from dagster.core.storage.tags import MAX_RETRIES_TAG, RETRY_NUMBER_TAG, RETRY_STRATEGY_TAG
from dagster.core.workspace.workspace import IWorkspace
from dagster.utils.error import serializable_error_info_from_exc_info

DEFAULT_REEXECUTION_POLICY = ReexecutionStrategy.FROM_FAILURE


def filter_runs_to_should_retry(
runs: List[DagsterRun], instance: DagsterInstance, default_max_retries: int
Expand Down Expand Up @@ -59,6 +61,22 @@ def get_retry_number(run: DagsterRun) -> Optional[int]:
yield (run, retry_number)


def get_reexecution_strategy(
run: PipelineRun, instance: DagsterInstance
) -> Optional[ReexecutionStrategy]:
raw_strategy_tag = run.tags.get(RETRY_STRATEGY_TAG)
if raw_strategy_tag is None:
return None

if raw_strategy_tag not in ReexecutionStrategy.__members__:
instance.report_engine_event(
f"Error parsing retry strategy from tag '{RETRY_STRATEGY_TAG}: {raw_strategy_tag}'", run
)
return None
else:
return ReexecutionStrategy[raw_strategy_tag]


def retry_run(
failed_run: DagsterRun,
retry_number: int,
Expand Down Expand Up @@ -102,11 +120,13 @@ def retry_run(

external_pipeline = external_repo.get_full_external_pipeline(failed_run.pipeline_name)

strategy = get_reexecution_strategy(failed_run, instance) or DEFAULT_REEXECUTION_POLICY

new_run = instance.create_reexecuted_run(
failed_run,
repo_location,
external_pipeline,
strategy=ReexecutionStrategy.FROM_FAILURE,
strategy=strategy,
extra_tags=tags,
use_parent_run_tags=True,
)
Expand Down

0 comments on commit a57caf3

Please sign in to comment.