Skip to content

Commit

Permalink
KnownState parent run info (#8030)
Browse files Browse the repository at this point in the history
  • Loading branch information
alangenfeld committed Jun 1, 2022
1 parent bc3fde4 commit ac59da6
Show file tree
Hide file tree
Showing 15 changed files with 196 additions and 215 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from graphql.execution.base import ResolveInfo

import dagster._check as check
from dagster.core.errors import DagsterRunNotFoundError
from dagster.core.execution.plan.resume_retry import get_retry_steps_from_parent_run
from dagster.core.execution.plan.state import KnownExecutionState
from dagster.core.storage.pipeline_run import PipelineRunStatus
from dagster.core.instance import DagsterInstance
from dagster.core.storage.pipeline_run import DagsterRun, PipelineRunStatus
from dagster.core.storage.tags import RESUME_RETRY_TAG
from dagster.core.utils import make_new_run_id
from dagster.utils import merge_dicts
Expand All @@ -13,24 +15,34 @@
from ..utils import ExecutionParams, UserFacingGraphQLError


def _get_run(instance: DagsterInstance, run_id: str) -> DagsterRun:
run = instance.get_run_by_id(run_id)
if not run:
raise DagsterRunNotFoundError(invalid_run_id=run_id)
return run


def compute_step_keys_to_execute(graphene_info, execution_params):
check.inst_param(graphene_info, "graphene_info", ResolveInfo)
check.inst_param(execution_params, "execution_params", ExecutionParams)

instance = graphene_info.context.instance
instance: DagsterInstance = graphene_info.context.instance

if not execution_params.step_keys and is_resume_retry(execution_params):
# Get step keys from parent_run_id if it's a resume/retry
parent_run = _get_run(instance, execution_params.execution_metadata.parent_run_id)
return get_retry_steps_from_parent_run(
instance, execution_params.execution_metadata.parent_run_id
instance,
parent_run,
)
else:
known_state = None
if execution_params.execution_metadata.parent_run_id and execution_params.step_keys:
known_state = KnownExecutionState.for_reexecution(
instance.all_logs(execution_params.execution_metadata.parent_run_id),
execution_params.step_keys,
)
parent_run = _get_run(instance, execution_params.execution_metadata.parent_run_id)
known_state = KnownExecutionState.build_for_reexecution(
instance,
parent_run,
).update_for_step_selection(execution_params.step_keys)

return execution_params.step_keys, known_state

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,9 +414,9 @@ def test_pipeline_reexecution_info_query(self, graphql_context, snapshot):
def test_pipeline_reexecution_invalid_step_in_subset(self, graphql_context):
run_id = make_new_run_id()
selector = infer_pipeline_selector(graphql_context, "csv_hello_world")
execute_dagster_graphql_and_finish_runs(
result_one = execute_dagster_graphql_and_finish_runs(
graphql_context,
LAUNCH_PIPELINE_REEXECUTION_MUTATION,
LAUNCH_PIPELINE_EXECUTION_MUTATION,
variables={
"executionParams": {
"selector": selector,
Expand All @@ -426,6 +426,7 @@ def test_pipeline_reexecution_invalid_step_in_subset(self, graphql_context):
}
},
)
assert result_one.data["launchPipelineExecution"]["__typename"] == "LaunchRunSuccess"

# retry
new_run_id = make_new_run_id()
Expand Down
11 changes: 1 addition & 10 deletions python_modules/dagster/dagster/core/definitions/step_launcher.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Dict, Mapping, NamedTuple, Optional, Sequence
from typing import TYPE_CHECKING, Dict, Mapping, NamedTuple, Optional

import dagster._check as check
from dagster.core.definitions.reconstruct import ReconstructablePipeline
Expand All @@ -23,8 +23,6 @@ class StepRunRef(
("recon_pipeline", ReconstructablePipeline),
("prior_attempts_count", int),
("known_state", Optional["KnownExecutionState"]),
("run_group", Sequence[PipelineRun]),
("upstream_output_events", Sequence["EventLogEntry"]),
],
)
):
Expand All @@ -43,11 +41,8 @@ def __new__(
recon_pipeline: ReconstructablePipeline,
prior_attempts_count: int,
known_state: Optional["KnownExecutionState"],
run_group: Optional[Sequence[PipelineRun]],
upstream_output_events: Optional[Sequence["EventLogEntry"]],
):
from dagster.core.execution.plan.state import KnownExecutionState
from dagster.core.storage.event_log import EventLogEntry

return super(StepRunRef, cls).__new__(
cls,
Expand All @@ -59,10 +54,6 @@ def __new__(
check.inst_param(recon_pipeline, "recon_pipeline", ReconstructablePipeline),
check.int_param(prior_attempts_count, "prior_attempts_count"),
check.opt_inst_param(known_state, "known_state", KnownExecutionState),
check.opt_list_param(run_group, "run_group", of_type=PipelineRun),
check.opt_list_param(
upstream_output_events, "upstream_output_events", of_type=EventLogEntry
),
)


Expand Down
17 changes: 12 additions & 5 deletions python_modules/dagster/dagster/core/execution/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from dagster.core.execution.retries import RetryMode
from dagster.core.instance import DagsterInstance, InstanceRef
from dagster.core.selector import parse_step_selection
from dagster.core.storage.pipeline_run import PipelineRun, PipelineRunStatus
from dagster.core.storage.pipeline_run import DagsterRun, PipelineRun, PipelineRunStatus
from dagster.core.system_config.objects import ResolvedRunConfig
from dagster.core.telemetry import log_repo_stats, telemetry_wrapper
from dagster.core.utils import str_format_set
Expand Down Expand Up @@ -780,6 +780,12 @@ def create_execution_plan(
check.opt_nullable_list_param(step_keys_to_execute, "step_keys_to_execute", of_type=str)
check.opt_inst_param(instance_ref, "instance_ref", InstanceRef)
tags = check.opt_dict_param(tags, "tags", key_type=str, value_type=str)
known_state = check.opt_inst_param(
known_state,
"known_state",
KnownExecutionState,
default=KnownExecutionState(),
)

resolved_run_config = ResolvedRunConfig.build(pipeline_def, run_config, mode=mode)

Expand Down Expand Up @@ -1033,26 +1039,27 @@ def _resolve_reexecute_step_selection(
pipeline: IPipeline,
mode: Optional[str],
run_config: Optional[dict],
parent_pipeline_run: PipelineRun,
parent_pipeline_run: DagsterRun,
step_selection: List[str],
) -> ExecutionPlan:
if parent_pipeline_run.solid_selection:
pipeline = pipeline.subset_for_execution(parent_pipeline_run.solid_selection, None)

parent_logs = instance.all_logs(parent_pipeline_run.run_id)
state = KnownExecutionState.build_for_reexecution(instance, parent_pipeline_run)

parent_plan = create_execution_plan(
pipeline,
parent_pipeline_run.run_config,
mode,
known_state=KnownExecutionState.derive_from_logs(parent_logs),
known_state=state,
)
step_keys_to_execute = parse_step_selection(parent_plan.get_all_step_deps(), step_selection)
execution_plan = create_execution_plan(
pipeline,
run_config,
mode,
step_keys_to_execute=list(step_keys_to_execute),
known_state=KnownExecutionState.for_reexecution(parent_logs, step_keys_to_execute),
known_state=state.update_for_step_selection(step_keys_to_execute),
tags=parent_pipeline_run.tags,
)
return execution_plan
8 changes: 4 additions & 4 deletions python_modules/dagster/dagster/core/execution/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,10 @@ def create_backfill_run(
)
step_keys_to_execute = backfill_job.reexecution_steps
if last_run and last_run.status == PipelineRunStatus.SUCCESS:
known_state = KnownExecutionState.for_reexecution(
instance.all_logs(parent_run_id),
step_keys_to_execute,
)
known_state = KnownExecutionState.build_for_reexecution(
instance,
last_run,
).update_for_step_selection(step_keys_to_execute)
else:
known_state = None

Expand Down
25 changes: 8 additions & 17 deletions python_modules/dagster/dagster/core/execution/context/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,28 +644,19 @@ def get_output_metadata(
return metadata

def _get_source_run_id_from_logs(self, step_output_handle: StepOutputHandle) -> Optional[str]:
from dagster.core.events import DagsterEventType

# walk through event logs to find the right run_id based on the run lineage
run_group = self.instance.get_run_group(self.run_id)
if run_group is None:
check.failed(f"Failed to load run group {self.run_id}")

_, runs = run_group
run_id_to_parent_run_id = {run.run_id: run.parent_run_id for run in runs}
source_run_id = self.pipeline_run.parent_run_id
while source_run_id:
# note: this would cost N db calls where N = number of parent runs
step_output_record = self.instance.all_logs(
source_run_id, of_type=DagsterEventType.STEP_OUTPUT
)

parent_state = self.get_known_state().parent_state
while parent_state:

# if the parent run has yielded an StepOutput event for the given step output,
# we find the source run id
for r in step_output_record:
if r.dagster_event.step_output_data.step_output_handle == step_output_handle:
return source_run_id
if step_output_handle in parent_state.produced_outputs:
return parent_state.run_id

# else, keep looking backwards
source_run_id = run_id_to_parent_run_id.get(source_run_id)
parent_state = parent_state.get_parent_state()

# When a fixed path is provided via io manager, it's able to run step subset using an execution
# plan when the ascendant outputs were not previously created by dagster-controlled
Expand Down
8 changes: 1 addition & 7 deletions python_modules/dagster/dagster/core/execution/memoization.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import dagster._check as check
from dagster.core.errors import DagsterInvariantViolationError, DagsterRunNotFoundError
from dagster.core.errors import DagsterInvariantViolationError
from dagster.core.execution.context.system import IPlanContext
from dagster.core.execution.plan.plan import ExecutionPlan

Expand All @@ -14,12 +14,6 @@ def validate_reexecution_memoization(
if parent_run_id is None:
return

if not plan_context.instance.has_run(parent_run_id):
raise DagsterRunNotFoundError(
"Run id {} set as parent run id was not found in instance".format(parent_run_id),
invalid_run_id=parent_run_id,
)

# exclude full pipeline re-execution
if len(execution_plan.step_keys_to_execute) == len(execution_plan.steps):
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ def get_known_state(self):
dynamic_mappings=dict(self._successful_dynamic_outputs),
ready_outputs=self._step_outputs,
step_output_versions=self._plan.known_state.step_output_versions,
parent_state=self._plan.known_state.parent_state,
)

def _prep_for_dynamic_outputs(self, step: ExecutionStep):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from dagster.core.definitions.resource_definition import resource
from dagster.core.definitions.step_launcher import StepLauncher, StepRunRef
from dagster.core.errors import raise_execution_interrupts
from dagster.core.events import DagsterEvent, DagsterEventType
from dagster.core.events import DagsterEvent
from dagster.core.execution.api import create_execution_plan
from dagster.core.execution.context.system import StepExecutionContext
from dagster.core.execution.context_creation_pipeline import PlanExecutionContextManager
Expand Down Expand Up @@ -106,44 +106,6 @@ def _module_in_package_dir(file_path: str, package_dir: str) -> str:
return ".".join(without_extension.split(os.sep))


def _upstream_events_and_runs(step_context: StepExecutionContext):
"Grabs the minimal set of output events and runs to inform a remote instance of how to load each output."
step_inputs = step_context.step.step_inputs
upstream_output_handles = set().union(
*(step_input.source.step_output_handle_dependencies for step_input in step_inputs)
)
current_run = step_context.pipeline_run
events = []
runs = []
while True:
runs.append(current_run)
# note: this would cost N db calls where N = number of parent runs
step_output_records = step_context.instance.all_logs(
current_run.run_id, of_type=DagsterEventType.STEP_OUTPUT
)
# if the parent run has yielded an StepOutput event for the given step output,
# we find the source run id
for r in step_output_records:
output_handle = r.dagster_event.step_output_data.step_output_handle
# if this output matches one of the required step outputs, add it to the list of
# required events
if output_handle in upstream_output_handles:
events.append(r)
upstream_output_handles.remove(output_handle)

if current_run.parent_run_id is None:
if upstream_output_handles:
step_context.log.warn(
f"Could not find outputs in the logs for output handles: {upstream_output_handles}"
)
break

# else, keep looking backwards
current_run = step_context.instance.get_run_by_id(current_run.parent_run_id)

return events, runs


def step_context_to_step_run_ref(
step_context: StepExecutionContext,
prior_attempts_count: int,
Expand Down Expand Up @@ -192,7 +154,6 @@ def step_context_to_step_run_ref(
solids_to_execute=recon_pipeline.solids_to_execute,
)

upstream_output_events, run_group = _upstream_events_and_runs(step_context)
return StepRunRef(
run_config=step_context.run_config,
pipeline_run=step_context.pipeline_run,
Expand All @@ -202,8 +163,6 @@ def step_context_to_step_run_ref(
recon_pipeline=recon_pipeline, # type: ignore
prior_attempts_count=prior_attempts_count,
known_state=step_context.get_known_state(),
run_group=run_group,
upstream_output_events=upstream_output_events,
)


Expand All @@ -223,14 +182,6 @@ def external_instance_from_step_run_ref(
DagsterInstance: A DagsterInstance that can be used to execute an external step.
"""
instance = DagsterInstance.ephemeral()
# re-execution expects the parent run(s) to be available on the instance, so add these
for run in step_run_ref.run_group:
# remove the pipeline_snapshot_id, as this instance doesn't have any snapshots
instance.add_run(run._replace(pipeline_snapshot_id=None))
# the can_load() function on the step context currently depends on reading output events
# from the instance, so we make sure the remote instance has the relevant events
for entry in step_run_ref.upstream_output_events:
instance.handle_new_event(entry)
if event_listener_fn:
instance.add_event_listener(step_run_ref.run_id, event_listener_fn)
return instance
Expand Down
27 changes: 16 additions & 11 deletions python_modules/dagster/dagster/core/execution/plan/resume_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from dagster.core.execution.plan.step import ResolvedFromDynamicStepHandle
from dagster.core.host_representation import ExternalExecutionPlan
from dagster.core.instance import DagsterInstance
from dagster.core.storage.pipeline_run import PipelineRun
from dagster.core.storage.pipeline_run import DagsterRun


def _update_tracking_dict(tracking, handle):
Expand All @@ -37,19 +37,21 @@ class ReexecutionStrategy(enum.Enum):


def get_retry_steps_from_parent_run(
instance, parent_run_id: str = None, parent_run: PipelineRun = None
instance: DagsterInstance,
parent_run: DagsterRun,
) -> Tuple[List[str], Optional[KnownExecutionState]]:
check.inst_param(instance, "instance", DagsterInstance)
check.opt_inst_param(parent_run, "parent_run", DagsterRun)

check.invariant(
bool(parent_run_id) != bool(parent_run), "Must provide one of parent_run_id or parent_run"
)
check.opt_str_param(parent_run_id, "parent_run_id")
check.opt_inst_param(parent_run, "parent_run", PipelineRun)

parent_run = parent_run or instance.get_run_by_id(parent_run_id)
parent_run_id = parent_run.run_id
parent_run_logs = instance.all_logs(parent_run_id)
parent_run_logs = instance.all_logs(
parent_run_id,
of_type={
DagsterEventType.STEP_FAILURE,
DagsterEventType.STEP_SUCCESS,
DagsterEventType.STEP_SKIPPED,
},
)

execution_plan_snapshot = instance.get_execution_plan_snapshot(
parent_run.execution_plan_snapshot_id
Expand Down Expand Up @@ -140,4 +142,7 @@ def get_retry_steps_from_parent_run(
step_handle.to_key() for step_set in to_retry.values() for step_handle in step_set
]

return steps_to_retry, KnownExecutionState.for_reexecution(parent_run_logs, steps_to_retry)
return steps_to_retry, KnownExecutionState.build_for_reexecution(
instance,
parent_run,
).update_for_step_selection(steps_to_retry)

0 comments on commit ac59da6

Please sign in to comment.