Skip to content

Commit

Permalink
KnownState.ready_outputs (#8016)
Browse files Browse the repository at this point in the history
### Summary & Motivation

Adds the set of already produced `StepOutputHandle` to the `KnownState` so that things like "which entries of this fan-in dependency should can i load" can be calculated from that.

Will address parent run loading in next PR

### How I Tested These Changes

BK, existing tests
  • Loading branch information
alangenfeld authored and OwenKephart committed Jun 1, 2022
1 parent 9a14f45 commit a558f1b
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 86 deletions.
33 changes: 19 additions & 14 deletions python_modules/dagster/dagster/core/execution/context/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
Mapping,
NamedTuple,
Optional,
Sequence,
Set,
Union,
cast,
Expand Down Expand Up @@ -60,6 +59,7 @@
from dagster.core.definitions.resource_definition import Resources
from dagster.core.events import DagsterEvent
from dagster.core.execution.plan.plan import ExecutionPlan
from dagster.core.execution.plan.state import KnownExecutionState
from dagster.core.instance import DagsterInstance

from .hook import HookContext
Expand Down Expand Up @@ -295,15 +295,19 @@ def plan_data(self) -> PlanData:
def output_capture(self) -> Optional[Dict[StepOutputHandle, Any]]:
return self._output_capture

def for_step(self, step: ExecutionStep, previous_attempt_count: int = 0) -> IStepContext:
def for_step(
self,
step: ExecutionStep,
known_state: Optional["KnownExecutionState"] = None,
) -> IStepContext:

return StepExecutionContext(
plan_data=self.plan_data,
execution_data=self._execution_data,
log_manager=self._log_manager.with_tags(**step.logging_tags),
step=step,
output_capture=self.output_capture,
previous_attempt_count=previous_attempt_count,
known_state=known_state,
)

@property
Expand Down Expand Up @@ -372,7 +376,7 @@ def __init__(
log_manager: DagsterLogManager,
step: ExecutionStep,
output_capture: Optional[Dict[StepOutputHandle, Any]],
previous_attempt_count: int,
known_state: Optional["KnownExecutionState"],
):
from dagster.core.execution.resources_init import get_required_resource_keys_for_step

Expand All @@ -391,7 +395,7 @@ def __init__(
self._resources = execution_data.scoped_resources_builder.build(
self._required_resource_keys
)
self._previous_attempt_count = previous_attempt_count
self._known_state = known_state
self._input_lineage: List[AssetLineageInfo] = []

resources_iter = cast(Iterable, self._resources)
Expand Down Expand Up @@ -548,19 +552,20 @@ def for_hook(self, hook_def: HookDefinition) -> "HookContext":

return HookContext(self, hook_def)

def get_known_state(self) -> "KnownExecutionState":
if not self._known_state:
check.failed(
"Attempted to access KnownExecutionState but it was not provided at context creation"
)
return self._known_state

def can_load(
self,
step_output_handle: StepOutputHandle,
step_output_events: Sequence["DagsterEvent"],
) -> bool:
# Whether IO Manager can load the source
# FIXME https://github.com/dagster-io/dagster/issues/3511
# This is a stopgap which asks the instance to check the event logs to find out step skipping

# can load from upstream in the same run
for event in step_output_events:
if step_output_handle == event.step_output_data.step_output_handle:
return True
if step_output_handle in self.get_known_state().ready_outputs:
return True

if (
self._should_load_from_previous_runs(step_output_handle)
Expand Down Expand Up @@ -716,7 +721,7 @@ def step_output_capture(self) -> Optional[Dict[StepOutputHandle, Any]]:

@property
def previous_attempt_count(self) -> int:
return self._previous_attempt_count
return self.get_known_state().get_retry_state().get_attempt_count(self._step.key)

@property
def op_config(self) -> Any:
Expand Down
17 changes: 6 additions & 11 deletions python_modules/dagster/dagster/core/execution/plan/active.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
)
from dagster.core.events import DagsterEvent
from dagster.core.execution.context.system import PlanOrchestrationContext
from dagster.core.execution.plan.state import KnownExecutionState, StepOutputVersionData
from dagster.core.execution.retries import RetryMode, RetryState
from dagster.core.execution.plan.state import KnownExecutionState
from dagster.core.execution.retries import RetryMode
from dagster.core.storage.tags import PRIORITY_TAG
from dagster.utils.interrupts import pop_captured_interrupt

Expand All @@ -30,15 +30,13 @@ def __init__(
self,
execution_plan: ExecutionPlan,
retry_mode: RetryMode,
retry_state: RetryState,
sort_key_fn: Optional[Callable[[ExecutionStep], float]] = None,
step_output_versions: Optional[List[StepOutputVersionData]] = None,
):
self._plan: ExecutionPlan = check.inst_param(
execution_plan, "execution_plan", ExecutionPlan
)
self._retry_mode = check.inst_param(retry_mode, "retry_mode", RetryMode)
self._retry_state = check.inst_param(retry_state, "retry_state", RetryState)
self._retry_state = self._plan.known_state.get_retry_state()

self._sort_key_fn: Callable[[ExecutionStep], float] = (
check.opt_callable_param(
Expand All @@ -48,14 +46,10 @@ def __init__(
or _default_sort_key
)

self._step_output_versions = check.opt_list_param(
step_output_versions, "step_output_versions", of_type=StepOutputVersionData
)

self._context_guard: bool = False # Prevent accidental direct use

# We decide what steps to skip based on what outputs are yielded by upstream steps
self._step_outputs: Set[StepOutputHandle] = set()
self._step_outputs: Set[StepOutputHandle] = set(self._plan.known_state.ready_outputs)

# All steps to be executed start out here in _pending
self._pending: Dict[str, Set[str]] = self._plan.get_executable_step_deps()
Expand Down Expand Up @@ -467,7 +461,8 @@ def get_known_state(self):
return KnownExecutionState(
previous_retry_attempts=self._retry_state.snapshot_attempts(),
dynamic_mappings=dict(self._successful_dynamic_outputs),
step_output_versions=self._step_output_versions,
ready_outputs=self._step_outputs,
step_output_versions=self._plan.known_state.step_output_versions,
)

def _prep_for_dynamic_outputs(self, step: ExecutionStep):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ def inner_plan_execution_iterator(
step = active_execution.get_next_step()
step_context = cast(
StepExecutionContext,
pipeline_context.for_step(
step, active_execution.retry_state.get_attempt_count(step.key)
),
pipeline_context.for_step(step, active_execution.get_known_state()),
)
step_event_list = []

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from dagster.core.execution.context.system import StepExecutionContext
from dagster.core.execution.context_creation_pipeline import PlanExecutionContextManager
from dagster.core.execution.plan.execute_plan import dagster_event_sequence_for_step
from dagster.core.execution.plan.state import KnownExecutionState
from dagster.core.instance import DagsterInstance
from dagster.core.storage.file_manager import LocalFileHandle, LocalFileManager
from dagster.serdes import deserialize_value
Expand Down Expand Up @@ -200,7 +201,7 @@ def step_context_to_step_run_ref(
retry_mode=retry_mode,
recon_pipeline=recon_pipeline, # type: ignore
prior_attempts_count=prior_attempts_count,
known_state=step_context.execution_plan.known_state,
known_state=step_context.get_known_state(),
run_group=run_group,
upstream_output_events=upstream_output_events,
)
Expand Down Expand Up @@ -272,7 +273,8 @@ def step_run_ref_to_step_context(
execution_step = cast("ExecutionStep", execution_plan.get_step_by_key(step_run_ref.step_key))

step_execution_context = execution_context.for_step(
execution_step, step_run_ref.prior_attempts_count
execution_step,
step_run_ref.known_state or KnownExecutionState(),
)
# Since for_step is abstract for IPlanContext, its return type is IStepContext.
# Since we are launching from a PlanExecutionContext, the type will always be
Expand Down
13 changes: 2 additions & 11 deletions python_modules/dagster/dagster/core/execution/plan/inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
from dagster.core.execution.context.input import InputContext
from dagster.core.execution.context.system import StepExecutionContext
from dagster.core.storage.input_manager import InputManager
from dagster.core.types.dagster_type import DagsterType


def _get_asset_lineage_from_fns(
Expand Down Expand Up @@ -754,22 +753,14 @@ def load_input_object(
step_context: "StepExecutionContext",
input_def: InputDefinition,
):
from dagster.core.events import DagsterEvent, DagsterEventType
from dagster.core.events import DagsterEvent

values = []

# https://github.com/dagster-io/dagster/issues/3511
step_output_events = [
record.dagster_event
for record in step_context.instance.all_logs(
step_context.run_id, of_type=DagsterEventType.STEP_OUTPUT
)
]

# some upstream steps may have skipped and we allow fan-in to continue in their absence
source_handles_to_skip = list(
filter(
lambda x: not step_context.can_load(x, step_output_events),
lambda x: not step_context.can_load(x),
self.step_output_handle_dependencies,
)
)
Expand Down

0 comments on commit a558f1b

Please sign in to comment.