Skip to content

Commit

Permalink
[bug] fix input loading regression (#8885)
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart authored Jul 18, 2022
1 parent f787d6d commit 1961e51
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,10 @@ def source_assets_by_key(self) -> Mapping[AssetKey, "SourceAsset"]:
def assets_defs_by_key(self) -> Mapping[AssetKey, "AssetsDefinition"]:
return self._assets_defs_by_key

@property
def has_assets_defs(self) -> bool:
return len(self.assets_defs_by_key) > 0

def assets_def_for_asset(self, asset_key: AssetKey) -> "AssetsDefinition":
return self._assets_defs_by_key[asset_key]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,12 @@ def get_inputs_field(
has_upstream = input_has_upstream(dependency_structure, inp_handle, solid, name)
if inp.input_manager_key:
input_field = get_input_manager_input_field(solid, inp, resource_defs)
elif asset_layer.asset_key_for_input(handle, name) and not has_upstream:
elif (
# if you have asset definitions, input will be loaded from the source asset
asset_layer.has_assets_defs
and asset_layer.asset_key_for_input(handle, name)
and not has_upstream
):
input_field = None
elif name in direct_inputs and not has_upstream:
input_field = None
Expand Down
5 changes: 4 additions & 1 deletion python_modules/dagster/dagster/core/execution/plan/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,10 @@ def get_step_input_source(
):
if input_def.root_manager_key or input_def.input_manager_key:
return FromRootInputManager(solid_handle=handle, input_name=input_name)
elif asset_layer.asset_key_for_input(handle, input_handle.input_name):
# can only load from source asset if assets defs are available
elif asset_layer.has_assets_defs and asset_layer.asset_key_for_input(
handle, input_handle.input_name
):
return FromSourceAsset(solid_handle=handle, input_name=input_name)

if dependency_structure.has_direct_dep(input_handle):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pytest

from dagster import (
AssetKey,
ConfigMapping,
DynamicOut,
DynamicOutput,
Expand Down Expand Up @@ -217,6 +218,39 @@ def testing_io():
assert subset_result.output_for_node("end") == 1


def test_unsatisfied_input_with_asset_key_use_config():
@op(ins={"x": In(asset_key=AssetKey("foo"))})
def start(_, x: int):
return x

@op(ins={"x": In(asset_key=AssetKey("bar"))})
def end(_, x: int):
return x

@graph
def testing_io():
end(start())

full_job = testing_io.to_job()
result = full_job.execute_in_process(
run_config={
"ops": {"start": {"inputs": {"x": 4}}},
},
)
assert result.success
assert result.output_for_node("end") == 4

# test to ensure that if start is not being executed its input config is used
subset_result = full_job.execute_in_process(
run_config={
"ops": {"end": {"inputs": {"x": 1}}},
},
op_selection=["end"],
)
assert subset_result.success
assert subset_result.output_for_node("end") == 1


def test_op_selection_on_dynamic_orchestration():
@op
def num_range():
Expand Down

0 comments on commit 1961e51

Please sign in to comment.