Skip to content

Commit

Permalink
All steps option for rexecute method (#7575)
Browse files Browse the repository at this point in the history
  • Loading branch information
johannkm committed Apr 26, 2022
1 parent 69546e9 commit 7ef7384
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 28 deletions.
2 changes: 1 addition & 1 deletion js_modules/dagit/packages/core/src/graphql/schema.graphql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion js_modules/dagit/packages/core/src/types/globalTypes.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from graphql.execution.base import ResolveInfo

from dagster import check
from dagster.core.execution.plan.resume_retry import ReexecutionPolicy
from dagster.core.host_representation.selector import PipelineSelector
from dagster.core.instance import DagsterInstance
from dagster.core.storage.pipeline_run import RunsFilter
Expand Down Expand Up @@ -62,16 +63,12 @@ def launch_reexecution_from_parent_run(graphene_info, parent_run_id: str, policy
"""
Launch a re-execution by referencing the parent run id
"""
from ...schema.inputs import GrapheneReexecutionPolicy
from ...schema.pipelines.pipeline import GrapheneRun
from ...schema.runs import GrapheneLaunchRunSuccess

check.inst_param(graphene_info, "graphene_info", ResolveInfo)
check.str_param(parent_run_id, "parent_run_id")

check.invariant(
policy == GrapheneReexecutionPolicy.FROM_FAILURE, "Only FROM_FAILURE is currently supported"
)
check.str_param(policy, "policy")

instance: DagsterInstance = graphene_info.context.instance
parent_run = instance.get_run_by_id(parent_run_id)
Expand All @@ -87,10 +84,11 @@ def launch_reexecution_from_parent_run(graphene_info, parent_run_id: str, policy
repo_location = graphene_info.context.get_repository_location(selector.location_name)
external_pipeline = get_external_pipeline_or_raise(graphene_info, selector)

run = instance.create_reexecuted_run_from_failure(
run = instance.create_reexecuted_run(
parent_run,
repo_location,
external_pipeline,
ReexecutionPolicy[policy],
use_parent_run_tags=True, # inherit whatever tags were set on the parent run at launch time
)
graphene_info.context.instance.submit_run(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ class Meta:

class GrapheneReexecutionPolicy(graphene.Enum):
FROM_FAILURE = "FROM_FAILURE"
ALL_OPS = "ALL_OPS"
ALL_STEPS = "ALL_STEPS"

class Meta:
name = "ReexecutionPolicy"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
LAUNCH_PIPELINE_REEXECUTION_MUTATION,
PIPELINE_REEXECUTION_INFO_QUERY,
)
from dagster_graphql.schema.inputs import GrapheneReexecutionPolicy
from dagster_graphql.test.utils import (
execute_dagster_graphql,
execute_dagster_graphql_and_finish_runs,
infer_pipeline_selector,
)

from dagster.core.execution.plan.resume_retry import ReexecutionPolicy
from dagster.core.storage.pipeline_run import PipelineRunStatus
from dagster.core.storage.tags import RESUME_RETRY_TAG
from dagster.core.test_utils import poll_for_finished_run
Expand Down Expand Up @@ -501,7 +503,7 @@ def test_retry_hard_failure(self, graphql_context):
assert step_did_succeed(logs, "hard_fail_or_0")
assert step_did_succeed(logs, "increment")

def test_retry_hard_failure_with_reexecution_params(self, graphql_context):
def test_retry_failure_all_steps_with_reexecution_params(self, graphql_context):
"""
Test with providng reexecutionParams rather than executionParams
"""
Expand Down Expand Up @@ -532,15 +534,15 @@ def test_retry_hard_failure_with_reexecution_params(self, graphql_context):
retry = execute_dagster_graphql_and_finish_runs(
graphql_context,
LAUNCH_PIPELINE_REEXECUTION_MUTATION,
variables={"reexecutionParams": {"parentRunId": run_id, "policy": "FROM_FAILURE"}},
variables={"reexecutionParams": {"parentRunId": run_id, "policy": "ALL_STEPS"}},
)

run_id = retry.data["launchPipelineReexecution"]["run"]["runId"]
assert graphql_context.instance.get_run_by_id(run_id).status == PipelineRunStatus.SUCCESS
logs = get_all_logs_for_finished_run_via_subscription(graphql_context, run_id)[
"pipelineRunLogs"
]["messages"]
assert step_did_not_run(logs, "always_succeed")
assert step_did_succeed(logs, "always_succeed")
assert step_did_succeed(logs, "conditionally_fail")
assert step_did_succeed(logs, "after_failure")

Expand Down Expand Up @@ -588,6 +590,55 @@ def test_retry_hard_failure_with_reexecution_params_run_config_changed(self, gra
retry.data["launchPipelineReexecution"]["message"]
)

def test_retry_failure_with_reexecution_params(self, graphql_context):
"""
Test with providng reexecutionParams rather than executionParams
"""
selector = infer_pipeline_selector(graphql_context, "chained_failure_pipeline")

# trigger failure in the conditionally_fail solid
output_file = os.path.join(
get_system_temp_directory(), "chained_failure_pipeline_conditionally_fail"
)
try:
with open(output_file, "w", encoding="utf8"):
result = execute_dagster_graphql_and_finish_runs(
graphql_context,
LAUNCH_PIPELINE_EXECUTION_MUTATION,
variables={
"executionParams": {
"mode": "default",
"selector": selector,
}
},
)
finally:
os.remove(output_file)

run_id = result.data["launchPipelineExecution"]["run"]["runId"]
assert graphql_context.instance.get_run_by_id(run_id).status == PipelineRunStatus.FAILURE

retry = execute_dagster_graphql_and_finish_runs(
graphql_context,
LAUNCH_PIPELINE_REEXECUTION_MUTATION,
variables={"reexecutionParams": {"parentRunId": run_id, "policy": "FROM_FAILURE"}},
)

run_id = retry.data["launchPipelineReexecution"]["run"]["runId"]
assert graphql_context.instance.get_run_by_id(run_id).status == PipelineRunStatus.SUCCESS
logs = get_all_logs_for_finished_run_via_subscription(graphql_context, run_id)[
"pipelineRunLogs"
]["messages"]
assert step_did_not_run(logs, "always_succeed")
assert step_did_succeed(logs, "conditionally_fail")
assert step_did_succeed(logs, "after_failure")


def test_graphene_reexecution_policy():
"""Check that graphene enum has corresponding values in the ReexecutionPolicy enum"""
for policy in GrapheneReexecutionPolicy.__enum__:
assert ReexecutionPolicy[policy.value]


def _do_retry_intermediates_test(graphql_context, run_id, reexecution_run_id):
selector = infer_pipeline_selector(graphql_context, "eventually_successful")
Expand Down
4 changes: 3 additions & 1 deletion python_modules/dagster/dagster/core/execution/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Dict, List, NamedTuple, Optional

from dagster import check
from dagster.core.execution.plan.resume_retry import ReexecutionPolicy
from dagster.core.execution.plan.state import KnownExecutionState
from dagster.core.host_representation import (
ExternalPartitionSet,
Expand Down Expand Up @@ -225,10 +226,11 @@ def create_backfill_run(
last_run = _fetch_last_run(instance, external_partition_set, partition_data.name)
if not last_run or last_run.status != PipelineRunStatus.FAILURE:
return None
return instance.create_reexecuted_run_from_failure(
return instance.create_reexecuted_run(
last_run,
repo_location,
external_pipeline,
ReexecutionPolicy.FROM_FAILURE,
extra_tags=tags,
run_config=partition_data.run_config,
mode=external_partition_set.mode,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import enum
from collections import defaultdict
from typing import Dict, List, Optional, Tuple

Expand Down Expand Up @@ -30,6 +31,11 @@ def _in_tracking_dict(handle, tracking):
return handle.to_key() in tracking


class ReexecutionPolicy(enum.Enum):
ALL_STEPS = "ALL_STEPS"
FROM_FAILURE = "FROM_FAILURE"


def get_retry_steps_from_parent_run(
instance, parent_run_id: str = None, parent_run: PipelineRun = None
) -> Tuple[List[str], Optional[KnownExecutionState]]:
Expand Down
35 changes: 25 additions & 10 deletions python_modules/dagster/dagster/core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
from dagster.core.debug import DebugRunPayload
from dagster.core.events import DagsterEvent, DagsterEventType
from dagster.core.events.log import EventLogEntry
from dagster.core.execution.plan.resume_retry import ReexecutionPolicy
from dagster.core.execution.stats import RunStepKeyStatsSnapshot
from dagster.core.host_representation import (
ExternalPipeline,
Expand Down Expand Up @@ -1014,29 +1015,31 @@ def create_run(

return pipeline_run

def create_reexecuted_run_from_failure(
def create_reexecuted_run(
self,
parent_run: PipelineRun,
repo_location: "RepositoryLocation",
external_pipeline: "ExternalPipeline",
policy: "ReexecutionPolicy",
extra_tags: Optional[Dict[str, Any]] = None,
run_config: Optional[Dict[str, Any]] = None,
mode: Optional[str] = None,
use_parent_run_tags: bool = False,
) -> PipelineRun:
from dagster.core.execution.plan.resume_retry import get_retry_steps_from_parent_run
from dagster.core.execution.plan.resume_retry import (
ReexecutionPolicy,
get_retry_steps_from_parent_run,
)
from dagster.core.host_representation import ExternalPipeline, RepositoryLocation

check.inst_param(parent_run, "parent_run", PipelineRun)
check.inst_param(repo_location, "repo_location", RepositoryLocation)
check.inst_param(external_pipeline, "external_pipeline", ExternalPipeline)
check.inst_param(policy, "policy", ReexecutionPolicy)
check.opt_dict_param(extra_tags, "extra_tags", key_type=str)
check.opt_dict_param(run_config, "run_config", key_type=str)
check.opt_str_param(mode, "mode")
check.invariant(
parent_run.status == PipelineRunStatus.FAILURE,
"Cannot reexecute from failure a run that is not failed",
)

check.bool_param(use_parent_run_tags, "use_parent_run_tags")

root_run_id = parent_run.root_run_id or parent_run.run_id
Expand All @@ -1050,15 +1053,27 @@ def create_reexecuted_run_from_failure(
{
PARENT_RUN_ID_TAG: parent_run_id,
ROOT_RUN_ID_TAG: root_run_id,
RESUME_RETRY_TAG: "true",
},
)

mode = cast(str, mode if mode is not None else parent_run.mode)
run_config = run_config if run_config is not None else parent_run.run_config

step_keys_to_execute, known_state = get_retry_steps_from_parent_run(
self, parent_run=parent_run
)
if policy == ReexecutionPolicy.FROM_FAILURE:
check.invariant(
parent_run.status == PipelineRunStatus.FAILURE,
"Cannot reexecute from failure a run that is not failed",
)

step_keys_to_execute, known_state = get_retry_steps_from_parent_run(
self, parent_run=parent_run
)
tags[RESUME_RETRY_TAG] = "true"
elif policy == ReexecutionPolicy.ALL_STEPS:
step_keys_to_execute = None
known_state = None
else:
raise DagsterInvariantViolationError(f"Unknown reexecution policy: {policy}")

external_execution_plan = repo_location.get_external_execution_plan(
external_pipeline,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest

from dagster import DagsterInstance, execute_pipeline, job, op, reconstructable, repository
from dagster.core.execution.plan.resume_retry import ReexecutionPolicy
from dagster.core.storage.pipeline_run import PipelineRunStatus
from dagster.core.storage.tags import RESUME_RETRY_TAG
from dagster.core.test_utils import (
Expand Down Expand Up @@ -95,7 +96,9 @@ def failed_run_fixture(instance):
def test_create_reexecuted_run_from_failure(
instance: DagsterInstance, workspace, repo_location, external_pipeline, failed_run
):
run = instance.create_reexecuted_run_from_failure(failed_run, repo_location, external_pipeline)
run = instance.create_reexecuted_run(
failed_run, repo_location, external_pipeline, ReexecutionPolicy.FROM_FAILURE
)

assert run.tags[RESUME_RETRY_TAG] == "true"
assert set(run.step_keys_to_execute) == {"conditional_fail", "after_failure"} # type: ignore
Expand All @@ -112,25 +115,50 @@ def test_create_reexecuted_run_from_failure(
def test_create_reexecuted_run_from_failure_tags(
instance: DagsterInstance, workspace, repo_location, external_pipeline, failed_run
):
run = instance.create_reexecuted_run_from_failure(failed_run, repo_location, external_pipeline)
run = instance.create_reexecuted_run(
failed_run, repo_location, external_pipeline, ReexecutionPolicy.FROM_FAILURE
)

assert run.tags["foo"] == "bar"
assert "fizz" not in run.tags

run = instance.create_reexecuted_run_from_failure(
failed_run, repo_location, external_pipeline, use_parent_run_tags=True
run = instance.create_reexecuted_run(
failed_run,
repo_location,
external_pipeline,
ReexecutionPolicy.FROM_FAILURE,
use_parent_run_tags=True,
)

assert run.tags["foo"] == "not bar!"
assert run.tags["fizz"] == "buzz"

run = instance.create_reexecuted_run_from_failure(
run = instance.create_reexecuted_run(
failed_run,
repo_location,
external_pipeline,
ReexecutionPolicy.FROM_FAILURE,
use_parent_run_tags=True,
extra_tags={"fizz": "not buzz!!"},
)

assert run.tags["foo"] == "not bar!"
assert run.tags["fizz"] == "not buzz!!"


def test_create_reexecuted_run_all_steps(
instance: DagsterInstance, workspace, repo_location, external_pipeline, failed_run
):
run = instance.create_reexecuted_run(
failed_run, repo_location, external_pipeline, ReexecutionPolicy.ALL_STEPS
)

assert RESUME_RETRY_TAG not in run.tags

instance.launch_run(run.run_id, workspace)
run = poll_for_finished_run(instance, run.run_id)

assert run.status == PipelineRunStatus.SUCCESS
assert step_succeeded(instance, run, "before_failure")
assert step_succeeded(instance, run, "conditional_fail")
assert step_succeeded(instance, run, "after_failure")

0 comments on commit 7ef7384

Please sign in to comment.