Skip to content

Commit

Permalink
Re-execute run instance method (#7417)
Browse files Browse the repository at this point in the history
This will help avoid duplication with adding automatic re-execute from failure. Can follow up to support other re-execute modes. This logic is also duplicated in the front end- will put it on backlog to make that use this endpoint.
  • Loading branch information
johannkm committed Apr 13, 2022
1 parent 8120421 commit e2de0b7
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 32 deletions.
43 changes: 17 additions & 26 deletions python_modules/dagster/dagster/core/execution/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from typing import Dict, List, NamedTuple, Optional

from dagster import check
from dagster.core.execution.plan.resume_retry import get_retry_steps_from_parent_run
from dagster.core.execution.plan.state import KnownExecutionState
from dagster.core.host_representation import (
ExternalPartitionSet,
Expand All @@ -20,7 +19,6 @@
PARENT_RUN_ID_TAG,
PARTITION_NAME_TAG,
PARTITION_SET_TAG,
RESUME_RETRY_TAG,
ROOT_RUN_ID_TAG,
)
from dagster.core.telemetry import BACKFILL_RUN_CREATED, hash_name, log_action
Expand Down Expand Up @@ -195,6 +193,16 @@ def create_backfill_run(
check.inst_param(backfill_job, "backfill_job", PartitionBackfill)
check.inst_param(partition_data, "partition_data", ExternalPartitionExecutionParamData)

log_action(
instance,
BACKFILL_RUN_CREATED,
metadata={
"DAEMON_SESSION_ID": get_telemetry_daemon_session_id(),
"repo_hash": hash_name(repo_location.name),
"pipeline_name_hash": hash_name(external_pipeline.name),
},
)

tags = merge_dicts(
external_pipeline.tags,
partition_data.tags,
Expand All @@ -217,21 +225,14 @@ def create_backfill_run(
last_run = _fetch_last_run(instance, external_partition_set, partition_data.name)
if not last_run or last_run.status != PipelineRunStatus.FAILURE:
return None

parent_run_id = last_run.run_id
root_run_id = last_run.root_run_id or last_run.run_id
tags = merge_dicts(
tags,
{
RESUME_RETRY_TAG: "true",
PARENT_RUN_ID_TAG: parent_run_id,
ROOT_RUN_ID_TAG: root_run_id,
},
return instance.create_reexecuted_run_from_failure(
last_run,
repo_location,
external_pipeline,
tags=tags,
run_config=partition_data.run_config,
mode=external_partition_set.mode,
)
solids_to_execute = last_run.solids_to_execute
solid_selection = last_run.solid_selection

step_keys_to_execute, known_state = get_retry_steps_from_parent_run(instance, parent_run_id)

elif backfill_job.reexecution_steps:
last_run = _fetch_last_run(instance, external_partition_set, partition_data.name)
Expand Down Expand Up @@ -263,16 +264,6 @@ def create_backfill_run(
instance=instance,
)

log_action(
instance,
BACKFILL_RUN_CREATED,
metadata={
"DAEMON_SESSION_ID": get_telemetry_daemon_session_id(),
"repo_hash": hash_name(repo_location.name),
"pipeline_name_hash": hash_name(external_pipeline.name),
},
)

return instance.create_run(
pipeline_snapshot=external_pipeline.pipeline_snapshot,
execution_plan_snapshot=external_execution_plan.execution_plan_snapshot,
Expand Down
13 changes: 10 additions & 3 deletions python_modules/dagster/dagster/core/execution/plan/resume_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from dagster.core.execution.plan.step import ResolvedFromDynamicStepHandle
from dagster.core.host_representation import ExternalExecutionPlan
from dagster.core.instance import DagsterInstance
from dagster.core.storage.pipeline_run import PipelineRun


def _update_tracking_dict(tracking, handle):
Expand All @@ -30,12 +31,18 @@ def _in_tracking_dict(handle, tracking):


def get_retry_steps_from_parent_run(
instance, parent_run_id
instance, parent_run_id: str = None, parent_run: PipelineRun = None
) -> Tuple[List[str], Optional[KnownExecutionState]]:
check.inst_param(instance, "instance", DagsterInstance)
check.str_param(parent_run_id, "parent_run_id")

parent_run = instance.get_run_by_id(parent_run_id)
check.invariant(
bool(parent_run_id) != bool(parent_run), "Must provide one of parent_run_id or parent_run"
)
check.opt_str_param(parent_run_id, "parent_run_id")
check.opt_inst_param(parent_run, "parent_run", PipelineRun)

parent_run = parent_run or instance.get_run_by_id(parent_run_id)
parent_run_id = parent_run.run_id
parent_run_logs = instance.all_logs(parent_run_id)

execution_plan_snapshot = instance.get_execution_plan_snapshot(
Expand Down
80 changes: 77 additions & 3 deletions python_modules/dagster/dagster/core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@
RunsFilter,
TagBucket,
)
from dagster.core.storage.tags import MEMOIZED_RUN_TAG
from dagster.core.storage.tags import PARENT_RUN_ID_TAG, RESUME_RETRY_TAG, ROOT_RUN_ID_TAG
from dagster.core.system_config.objects import ResolvedRunConfig
from dagster.core.utils import str_format_list
from dagster.serdes import ConfigurableClass
from dagster.seven import get_current_datetime_in_utc
from dagster.utils import traced
from dagster.utils import merge_dicts, traced
from dagster.utils.backcompat import experimental_functionality_warning
from dagster.utils.error import serializable_error_info_from_exc_info

Expand All @@ -81,7 +81,11 @@
from dagster.core.events import DagsterEvent, DagsterEventType
from dagster.core.events.log import EventLogEntry
from dagster.core.execution.stats import RunStepKeyStatsSnapshot
from dagster.core.host_representation import HistoricalPipeline
from dagster.core.host_representation import (
ExternalPipeline,
HistoricalPipeline,
RepositoryLocation,
)
from dagster.core.launcher import RunLauncher
from dagster.core.run_coordinator import RunCoordinator
from dagster.core.scheduler import Scheduler
Expand Down Expand Up @@ -1009,6 +1013,76 @@ def create_run(

return pipeline_run

def create_reexecuted_run_from_failure(
self,
parent_run: PipelineRun,
repo_location: "RepositoryLocation",
external_pipeline: "ExternalPipeline",
tags: Optional[Dict[str, Any]] = None,
run_config: Optional[Dict[str, Any]] = None,
mode: Optional[str] = None,
) -> PipelineRun:
from dagster.core.execution.plan.resume_retry import get_retry_steps_from_parent_run
from dagster.core.host_representation import ExternalPipeline, RepositoryLocation

check.inst_param(parent_run, "parent_run", PipelineRun)
check.inst_param(repo_location, "repo_location", RepositoryLocation)
check.inst_param(external_pipeline, "external_pipeline", ExternalPipeline)
check.opt_dict_param(tags, "tags", key_type=str)
check.opt_dict_param(run_config, "run_config", key_type=str)
check.opt_str_param(mode, "mode")
check.invariant(
parent_run.status == PipelineRunStatus.FAILURE,
"Cannot reexecute from failure a run that is not failed",
)

root_run_id = parent_run.root_run_id or parent_run.run_id
parent_run_id = parent_run.run_id

new_tags = merge_dicts(
tags or {},
external_pipeline.tags,
{
PARENT_RUN_ID_TAG: parent_run_id,
ROOT_RUN_ID_TAG: root_run_id,
RESUME_RETRY_TAG: "true",
},
)
mode = cast(str, mode if mode is not None else parent_run.mode)
run_config = run_config if run_config is not None else parent_run.run_config

step_keys_to_execute, known_state = get_retry_steps_from_parent_run(
self, parent_run=parent_run
)

external_execution_plan = repo_location.get_external_execution_plan(
external_pipeline,
run_config,
mode=mode,
step_keys_to_execute=step_keys_to_execute,
known_state=known_state,
instance=self,
)

return self.create_run(
pipeline_name=parent_run.pipeline_name,
run_id=None,
run_config=run_config,
mode=mode,
solids_to_execute=parent_run.solids_to_execute,
step_keys_to_execute=step_keys_to_execute,
status=PipelineRunStatus.NOT_STARTED,
tags=new_tags,
root_run_id=root_run_id,
parent_run_id=parent_run_id,
pipeline_snapshot=external_pipeline.pipeline_snapshot,
execution_plan_snapshot=external_execution_plan.execution_plan_snapshot,
parent_pipeline_snapshot=external_pipeline.parent_pipeline_snapshot,
solid_selection=parent_run.solid_selection,
external_pipeline_origin=external_pipeline.get_external_origin(),
pipeline_code_origin=external_pipeline.get_python_origin(),
)

def register_managed_run(
self,
pipeline_name,
Expand Down

0 comments on commit e2de0b7

Please sign in to comment.