Skip to content

Commit

Permalink
Reexecute run mutation using just parent run (#7510)
Browse files Browse the repository at this point in the history
Provide a simpler interface for the front end to re-execute: just pass a parent run id and selector, rather than putting together the various bits of a re-execution with tags, root run, etc.

If this looks like a reasonable approach will add tests
  • Loading branch information
johannkm committed Apr 22, 2022
1 parent f160f7d commit 19dff51
Show file tree
Hide file tree
Showing 9 changed files with 240 additions and 10 deletions.
20 changes: 18 additions & 2 deletions js_modules/dagit/packages/core/src/graphql/schema.graphql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,8 @@
LAUNCH_PIPELINE_REEXECUTION_MUTATION = (
ERROR_FRAGMENT
+ """
mutation($executionParams: ExecutionParams!) {
launchPipelineReexecution(executionParams: $executionParams) {
mutation($executionParams: ExecutionParams, $reexecutionParams: ReexecutionParams) {
launchPipelineReexecution(executionParams: $executionParams, reexecutionParams: $reexecutionParams) {
__typename
... on PythonError {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
create_and_launch_partition_backfill,
resume_partition_backfill,
)
from .launch_execution import launch_pipeline_execution, launch_pipeline_reexecution
from .launch_execution import (
launch_pipeline_execution,
launch_pipeline_reexecution,
launch_reexecution_from_parent_run,
)


def _force_mark_as_canceled(instance, run_id):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from graphql.execution.base import ResolveInfo

from dagster import check
from dagster.core.host_representation.selector import PipelineSelector
from dagster.core.instance import DagsterInstance
from dagster.core.storage.pipeline_run import RunsFilter

from ..external import get_external_pipeline_or_raise
Expand Down Expand Up @@ -53,3 +55,44 @@ def _launch_pipeline_execution(graphene_info, execution_params, is_reexecuted=Fa
records = graphene_info.context.instance.get_run_records(RunsFilter(run_ids=[run.run_id]))

return GrapheneLaunchRunSuccess(run=GrapheneRun(records[0]))


@capture_error
def launch_reexecution_from_parent_run(graphene_info, parent_run_id: str, policy):
"""
Launch a re-execution by referencing the parent run id
"""
from ...schema.inputs import GrapheneReexecutionPolicy
from ...schema.pipelines.pipeline import GrapheneRun
from ...schema.runs import GrapheneLaunchRunSuccess

check.inst_param(graphene_info, "graphene_info", ResolveInfo)
check.str_param(parent_run_id, "parent_run_id")

check.invariant(
policy == GrapheneReexecutionPolicy.FROM_FAILURE, "Only FROM_FAILURE is currently supported"
)

instance: DagsterInstance = graphene_info.context.instance
parent_run = instance.get_run_by_id(parent_run_id)
check.invariant(parent_run, "Could not find parent run with id: %s" % parent_run_id)

selector = PipelineSelector(
location_name=parent_run.external_pipeline_origin.external_repository_origin.repository_location_origin.location_name,
repository_name=parent_run.external_pipeline_origin.external_repository_origin.repository_name,
pipeline_name=parent_run.pipeline_name,
solid_selection=None,
)

repo_location = graphene_info.context.get_repository_location(selector.location_name)
external_pipeline = get_external_pipeline_or_raise(graphene_info, selector)

run = instance.create_reexecuted_run_from_failure(parent_run, repo_location, external_pipeline)
graphene_info.context.instance.submit_run(
run.run_id,
workspace=graphene_info.context,
)

# return run with updateTime
records = graphene_info.context.instance.get_run_records(RunsFilter(run_ids=[run.run_id]))
return GrapheneLaunchRunSuccess(run=GrapheneRun(records[0]))
16 changes: 16 additions & 0 deletions python_modules/dagster-graphql/dagster_graphql/schema/inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,22 @@ class Meta:
name = "ExecutionParams"


class GrapheneReexecutionPolicy(graphene.Enum):
FROM_FAILURE = "FROM_FAILURE"
ALL_OPS = "ALL_OPS"

class Meta:
name = "ReexecutionPolicy"


class GrapheneReexecutionParams(graphene.InputObjectType):
parentRunId = graphene.NonNull(graphene.String)
policy = graphene.NonNull(GrapheneReexecutionPolicy)

class Meta:
name = "ReexecutionParams"


class GrapheneMarshalledInput(graphene.InputObjectType):
input_name = graphene.NonNull(graphene.String)
key = graphene.NonNull(graphene.String)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import graphene

from dagster import check
from dagster.core.definitions.events import AssetKey
from dagster.core.workspace.permissions import Permissions

Expand All @@ -9,6 +10,7 @@
delete_pipeline_run,
launch_pipeline_execution,
launch_pipeline_reexecution,
launch_reexecution_from_parent_run,
resume_partition_backfill,
terminate_pipeline_execution,
wipe_assets,
Expand Down Expand Up @@ -40,7 +42,12 @@
GrapheneUnauthorizedError,
)
from ..external import GrapheneWorkspace, GrapheneWorkspaceLocationEntry
from ..inputs import GrapheneAssetKeyInput, GrapheneExecutionParams, GrapheneLaunchBackfillParams
from ..inputs import (
GrapheneAssetKeyInput,
GrapheneExecutionParams,
GrapheneLaunchBackfillParams,
GrapheneReexecutionParams,
)
from ..pipelines.pipeline import GrapheneRun
from ..runs import (
GrapheneLaunchRunReexecutionResult,
Expand Down Expand Up @@ -293,7 +300,8 @@ class GrapheneLaunchRunReexecutionMutation(graphene.Mutation):
Output = graphene.NonNull(GrapheneLaunchRunReexecutionResult)

class Arguments:
executionParams = graphene.NonNull(GrapheneExecutionParams)
executionParams = graphene.Argument(GrapheneExecutionParams)
reexecutionParams = graphene.Argument(GrapheneReexecutionParams)

class Meta:
description = "Re-launch a run via the run launcher configured on the instance"
Expand All @@ -302,11 +310,25 @@ class Meta:
@capture_error
@check_permission(Permissions.LAUNCH_PIPELINE_REEXECUTION)
def mutate(self, graphene_info, **kwargs):
return create_execution_params_and_launch_pipeline_reexec(
graphene_info,
execution_params_dict=kwargs["executionParams"],
execution_params = kwargs.get("executionParams")
reexecution_params = kwargs.get("reexecutionParams")
check.invariant(
bool(execution_params) != bool(reexecution_params),
"Must only provide one of either executionParams or reexecutionParams",
)

if execution_params:
return create_execution_params_and_launch_pipeline_reexec(
graphene_info,
execution_params_dict=kwargs["executionParams"],
)
else:
return launch_reexecution_from_parent_run(
graphene_info,
reexecution_params["parentRunId"],
reexecution_params["policy"],
)


class GrapheneTerminateRunPolicy(graphene.Enum):
# Default behavior: Only mark as canceled if the termination is successful, and after all
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from collections import OrderedDict


{
"data": OrderedDict(
[
(
"launchPipelineReexecution",
{
"__typename": "PythonError",
"message": 'dagster.core.errors.DagsterInvalidConfigError: Error in config for pipeline\n Error 1: Received unexpected config entry "bad" at the root. Expected: "{ execution?: { in_process?: { config?: { marker_to_close?: String retries?: { disabled?: { } enabled?: { } } } } multiprocess?: { config?: { max_concurrent?: Int retries?: { disabled?: { } enabled?: { } } start_method?: { forkserver?: { preload_modules?: [String] } spawn?: { } } } } } loggers?: { console?: { config?: { log_level?: String name?: String } } } resources?: { io_manager?: { config?: { base_dir?: (String | { env: String }) } } } solids?: { after_failure?: { config?: Any outputs?: [{ result?: { json: { path: String } pickle: { path: String } } }] } always_succeed?: { config?: Any outputs?: [{ result?: { json: { path: String } pickle: { path: String } } }] } conditionally_fail?: { config?: Any outputs?: [{ result?: { json: { path: String } pickle: { path: String } } }] } } }".\n',
"className": "DagsterInvalidConfigError",
"stack": [
' File "/Users/johann/dagster/python_modules/dagster/dagster/grpc/impl.py", line 366, in get_external_execution_plan_snapshot\n create_execution_plan(\n',
' File "/Users/johann/dagster/python_modules/dagster/dagster/core/execution/api.py", line 757, in create_execution_plan\n resolved_run_config = ResolvedRunConfig.build(pipeline_def, run_config, mode=mode)\n',
' File "/Users/johann/dagster/python_modules/dagster/dagster/core/system_config/objects.py", line 160, in build\n raise DagsterInvalidConfigError(\n',
],
"cause": None,
},
)
]
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,24 @@
}
}

snapshots['TestReexecution.test_full_pipeline_reexecution_fs_storage_with_reexecution_params[sqlite_with_default_run_launcher_managed_grpc_env] 1'] = {
'launchPipelineExecution': {
'__typename': 'LaunchRunSuccess',
'run': {
'mode': 'default',
'pipeline': {
'name': 'csv_hello_world'
},
'resolvedOpSelection': None,
'runConfigYaml': '<runConfigYaml dummy value>',
'runId': '<runId dummy value>',
'status': 'STARTING',
'tags': [
]
}
}
}

snapshots['TestReexecution.test_full_pipeline_reexecution_in_memory_storage[postgres_with_default_run_launcher_deployed_grpc_env] 1'] = {
'launchPipelineExecution': {
'__typename': 'LaunchRunSuccess',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from dagster.core.storage.tags import RESUME_RETRY_TAG
from dagster.core.test_utils import poll_for_finished_run
from dagster.core.utils import make_new_run_id
from dagster.seven.temp_dir import get_system_temp_directory

from .graphql_context_test_suite import ExecutingGraphQLContextTestMatrix
from .setup import csv_hello_world_solids_config, get_retry_multi_execution_params, retry_config
Expand Down Expand Up @@ -500,6 +501,93 @@ def test_retry_hard_failure(self, graphql_context):
assert step_did_succeed(logs, "hard_fail_or_0")
assert step_did_succeed(logs, "increment")

def test_retry_hard_failure_with_reexecution_params(self, graphql_context):
"""
Test with providng reexecutionParams rather than executionParams
"""
selector = infer_pipeline_selector(graphql_context, "chained_failure_pipeline")

# trigger failure in the conditionally_fail solid
output_file = os.path.join(
get_system_temp_directory(), "chained_failure_pipeline_conditionally_fail"
)
try:
with open(output_file, "w"):
result = execute_dagster_graphql_and_finish_runs(
graphql_context,
LAUNCH_PIPELINE_EXECUTION_MUTATION,
variables={
"executionParams": {
"mode": "default",
"selector": selector,
}
},
)
finally:
os.remove(output_file)

run_id = result.data["launchPipelineExecution"]["run"]["runId"]
assert graphql_context.instance.get_run_by_id(run_id).status == PipelineRunStatus.FAILURE

retry = execute_dagster_graphql_and_finish_runs(
graphql_context,
LAUNCH_PIPELINE_REEXECUTION_MUTATION,
variables={"reexecutionParams": {"parentRunId": run_id, "policy": "FROM_FAILURE"}},
)

run_id = retry.data["launchPipelineReexecution"]["run"]["runId"]
assert graphql_context.instance.get_run_by_id(run_id).status == PipelineRunStatus.SUCCESS
logs = get_all_logs_for_finished_run_via_subscription(graphql_context, run_id)[
"pipelineRunLogs"
]["messages"]
assert step_did_not_run(logs, "always_succeed")
assert step_did_succeed(logs, "conditionally_fail")
assert step_did_succeed(logs, "after_failure")

def test_retry_hard_failure_with_reexecution_params_run_config_changed(self, graphql_context):
"""
Test that reexecution fails if the run config changes
"""
selector = infer_pipeline_selector(graphql_context, "chained_failure_pipeline")

# trigger failure in the conditionally_fail solid
output_file = os.path.join(
get_system_temp_directory(), "chained_failure_pipeline_conditionally_fail"
)
try:
with open(output_file, "w"):
result = execute_dagster_graphql_and_finish_runs(
graphql_context,
LAUNCH_PIPELINE_EXECUTION_MUTATION,
variables={
"executionParams": {
"mode": "default",
"selector": selector,
}
},
)
finally:
os.remove(output_file)

parent_run_id = result.data["launchPipelineExecution"]["run"]["runId"]
parent_run = graphql_context.instance.get_run_by_id(parent_run_id)
assert parent_run.status == PipelineRunStatus.FAILURE

# override run config to make it fail
graphql_context.instance.delete_run(parent_run_id)
graphql_context.instance.add_run(parent_run._replace(run_config={"bad": "config"}))

retry = execute_dagster_graphql_and_finish_runs(
graphql_context,
LAUNCH_PIPELINE_REEXECUTION_MUTATION,
variables={
"reexecutionParams": {"parentRunId": parent_run_id, "policy": "FROM_FAILURE"}
},
)
assert "DagsterInvalidConfigError" in str(
retry.data["launchPipelineReexecution"]["message"]
)


def _do_retry_intermediates_test(graphql_context, run_id, reexecution_run_id):
selector = infer_pipeline_selector(graphql_context, "eventually_successful")
Expand Down

0 comments on commit 19dff51

Please sign in to comment.