Skip to content

Commit

Permalink
[events] can_load mitigation (#7955)
Browse files Browse the repository at this point in the history
  • Loading branch information
alangenfeld committed May 18, 2022
1 parent 780f771 commit b8f3816
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 7 deletions.
14 changes: 9 additions & 5 deletions python_modules/dagster/dagster/core/execution/context/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
Mapping,
NamedTuple,
Optional,
Sequence,
Set,
Union,
cast,
Expand Down Expand Up @@ -56,6 +57,7 @@
if TYPE_CHECKING:
from dagster.core.definitions.dependency import Node, NodeHandle
from dagster.core.definitions.resource_definition import Resources
from dagster.core.events import DagsterEvent
from dagster.core.execution.plan.plan import ExecutionPlan
from dagster.core.instance import DagsterInstance

Expand Down Expand Up @@ -511,16 +513,18 @@ def for_hook(self, hook_def: HookDefinition) -> "HookContext":

return HookContext(self, hook_def)

def can_load(self, step_output_handle: StepOutputHandle) -> bool:
def can_load(
self,
step_output_handle: StepOutputHandle,
step_output_events: Sequence["DagsterEvent"],
) -> bool:
# Whether IO Manager can load the source
# FIXME https://github.com/dagster-io/dagster/issues/3511
# This is a stopgap which asks the instance to check the event logs to find out step skipping

from dagster.core.events import DagsterEventType

# can load from upstream in the same run
for record in self.instance.all_logs(self.run_id, of_type=DagsterEventType.STEP_OUTPUT):
if step_output_handle == record.dagster_event.event_specific_data.step_output_handle:
for event in step_output_events:
if step_output_handle == event.step_output_data.step_output_handle:
return True

if (
Expand Down
15 changes: 13 additions & 2 deletions python_modules/dagster/dagster/core/execution/plan/inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,13 +635,24 @@ def load_input_object(
step_context: "StepExecutionContext",
input_def: InputDefinition,
):
from dagster.core.events import DagsterEvent
from dagster.core.events import DagsterEvent, DagsterEventType

values = []

# https://github.com/dagster-io/dagster/issues/3511
step_output_events = [
record.dagster_event
for record in step_context.instance.all_logs(
step_context.run_id, of_type=DagsterEventType.STEP_OUTPUT
)
]

# some upstream steps may have skipped and we allow fan-in to continue in their absence
source_handles_to_skip = list(
filter(lambda x: not step_context.can_load(x), self.step_output_handle_dependencies)
filter(
lambda x: not step_context.can_load(x, step_output_events),
self.step_output_handle_dependencies,
)
)

for inner_source in self.sources:
Expand Down

0 comments on commit b8f3816

Please sign in to comment.