Skip to content

Commit

Permalink
resource dependencies for root input managers (#7459)
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Apr 15, 2022
1 parent a0cc868 commit 9ad8f54
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 4 deletions.
26 changes: 24 additions & 2 deletions python_modules/dagster/dagster/core/asset_defs/assets_job.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
import itertools
import warnings
from typing import AbstractSet, Any, Dict, Mapping, Optional, Sequence, Tuple, Union, cast
from typing import (
AbstractSet,
Any,
Dict,
Mapping,
NamedTuple,
Optional,
Sequence,
Tuple,
Union,
cast,
)

from dagster import check
from dagster.core.definitions.config import ConfigMapping
Expand Down Expand Up @@ -254,16 +265,26 @@ def _root_manager(input_context: InputContext) -> Any:
def _op():
pass

resource_config = input_context.step_context.resolved_run_config.resources[
step_context = input_context.step_context
resource_config = step_context.resolved_run_config.resources[
source_asset.io_manager_key
].config
io_manager_def = (
step_context.pipeline.get_definition()
.mode_definitions[0]
.resource_defs[source_asset.io_manager_key]
)
resources = step_context.scoped_resources_builder.build(
io_manager_def.required_resource_keys
)

output_context = build_output_context(
name=source_asset_key.path[-1],
step_key="none",
solid_def=_op,
metadata=cast(Dict[str, Any], source_asset.metadata),
resource_config=resource_config,
resources=cast(NamedTuple, resources)._asdict(),
)
input_context_with_upstream = build_input_context(
name=input_context.name,
Expand All @@ -274,6 +295,7 @@ def _op():
op_def=input_context.op_def,
step_context=input_context.step_context,
resource_config=resource_config,
resources=cast(NamedTuple, resources)._asdict(),
)

io_manager = getattr(cast(Any, input_context.resources), source_asset.io_manager_key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
DagsterInvalidDefinitionError,
DependencyDefinition,
IOManager,
ResourceDefinition,
io_manager,
)
from dagster.core.asset_defs import AssetIn, SourceAsset, asset, build_assets_job
Expand Down Expand Up @@ -206,17 +207,22 @@ def handle_output(self, context, obj):

def load_input(self, context):
assert context.resource_config["a"] == 7
assert context.resources.subresource == 9
assert context.upstream_output.resources.subresource == 9
return 5

@io_manager(config_schema={"a": int})
@io_manager(config_schema={"a": int}, required_resource_keys={"subresource"})
def my_io_manager(_):
return MyIOManager()

job = build_assets_job(
"a",
[asset1],
source_assets=[SourceAsset(AssetKey("source1"), io_manager_key="special_io_manager")],
resource_defs={"special_io_manager": my_io_manager.configured({"a": 7})},
resource_defs={
"special_io_manager": my_io_manager.configured({"a": 7}),
"subresource": ResourceDefinition.hardcoded_resource(9),
},
)
assert job.graph.node_defs == [asset1.op]
assert job.execute_in_process().success
Expand Down

0 comments on commit 9ad8f54

Please sign in to comment.