Skip to content

Commit

Permalink
make resource config available when loading from assets outside job (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Mar 14, 2022
1 parent c7bfb9c commit 1327a6d
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
6 changes: 6 additions & 0 deletions python_modules/dagster/dagster/core/asset_defs/assets_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,16 @@ def _root_manager(input_context: InputContext) -> Any:
def _op():
pass

resource_config = input_context.step_context.resolved_run_config.resources[
source_asset.io_manager_key
].config

output_context = build_output_context(
name=source_asset_key.path[-1],
step_key="none",
solid_def=_op,
metadata=cast(Dict[str, Any], source_asset.metadata),
resource_config=resource_config,
)
input_context_with_upstream = build_input_context(
name=input_context.name,
Expand All @@ -263,6 +268,7 @@ def _op():
upstream_output=output_context,
op_def=input_context.op_def,
step_context=input_context.step_context,
resource_config=resource_config,
)

io_manager = getattr(cast(Any, input_context.resources), source_asset.io_manager_key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,17 +205,18 @@ def handle_output(self, context, obj):
pass

def load_input(self, context):
assert context.resource_config["a"] == 7
return 5

@io_manager
@io_manager(config_schema={"a": int})
def my_io_manager(_):
return MyIOManager()

job = build_assets_job(
"a",
[asset1],
source_assets=[SourceAsset(AssetKey("source1"), io_manager_key="special_io_manager")],
resource_defs={"special_io_manager": my_io_manager},
resource_defs={"special_io_manager": my_io_manager.configured({"a": 7})},
)
assert job.graph.node_defs == [asset1.op]
assert job.execute_in_process().success
Expand Down

0 comments on commit 1327a6d

Please sign in to comment.