Skip to content

Commit

Permalink
[asset-resources 2/n][rfc] io manager defs directly on asset defs (#7920
Browse files Browse the repository at this point in the history
)

* Provide io manager def directly on asset def

Rebase + fix test conflicts

Fix test errors

* io_manager -> io_manager_def

* Remove dictionary approach

* cd

* Address comments
  • Loading branch information
dpeng817 committed May 25, 2022
1 parent d7129a5 commit 03b24cd
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 10 deletions.
37 changes: 29 additions & 8 deletions python_modules/dagster/dagster/core/asset_defs/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from dagster.core.definitions.resource_definition import ResourceDefinition
from dagster.core.definitions.utils import NoValueSentinel
from dagster.core.errors import DagsterInvalidDefinitionError
from dagster.core.storage.io_manager import IOManagerDefinition
from dagster.core.types.dagster_type import DagsterType
from dagster.seven import funcsigs
from dagster.utils.backcompat import ExperimentalWarning, experimental_decorator
Expand All @@ -52,6 +53,7 @@ def asset(
description: Optional[str] = ...,
required_resource_keys: Optional[Set[str]] = ...,
resource_defs: Optional[Mapping[str, ResourceDefinition]] = ...,
io_manager_def: Optional[IOManagerDefinition] = ...,
io_manager_key: Optional[str] = ...,
compute_kind: Optional[str] = ...,
dagster_type: Optional[DagsterType] = ...,
Expand All @@ -72,6 +74,7 @@ def asset(
description: Optional[str] = None,
required_resource_keys: Optional[Set[str]] = None,
resource_defs: Optional[Mapping[str, ResourceDefinition]] = None,
io_manager_def: Optional[IOManagerDefinition] = None,
io_manager_key: Optional[str] = None,
compute_kind: Optional[str] = None,
dagster_type: Optional[DagsterType] = None,
Expand Down Expand Up @@ -101,9 +104,11 @@ def asset(
upstream dependencies, but do not pass an input to the asset.
metadata (Optional[Dict[str, Any]]): A dict of metadata entries for the asset.
required_resource_keys (Optional[Set[str]]): Set of resource handles required by the op.
io_manager_key (Optional[str]): The resource key of the IOManager used for storing the
output of the op as an asset, and for loading it in downstream ops
(default: "io_manager").
io_manager_key (Optional[str]): The resource key of the IOManager used
for storing the output of the op as an asset, and for loading it in downstream ops (default: "io_manager"). Only one of io_manager_key and io_manager_def can be provided.
io_manager_def (Optional[IOManagerDefinition]): The definition of the IOManager used for
storing the output of the op as an asset, and for loading it in
downstream ops. Only one of io_manager_def and io_manager_key can be provided.
compute_kind (Optional[str]): A string to represent the kind of computation that produces
the asset, e.g. "dbt" or "spark". It will be displayed in Dagit as a badge on the asset.
dagster_type (Optional[DagsterType]): Allows specifying type validation functions that
Expand Down Expand Up @@ -133,6 +138,10 @@ def my_asset(my_upstream_asset: int) -> int:
return _Asset()(name)

def inner(fn: Callable[..., Any]) -> AssetsDefinition:
check.invariant(
not (io_manager_key and io_manager_def),
"Both io_manager_key and io_manager_def were provided to `@asset` decorator. Please provide one or the other. ",
)
return _Asset(
name=cast(Optional[str], name), # (mypy bug that it can't infer name is Optional[str])
namespace=namespace,
Expand All @@ -142,7 +151,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
description=description,
required_resource_keys=required_resource_keys,
resource_defs=resource_defs,
io_manager_key=io_manager_key,
io_manager=io_manager_def or io_manager_key,
compute_kind=check.opt_str_param(compute_kind, "compute_kind"),
dagster_type=dagster_type,
partitions_def=partitions_def,
Expand All @@ -164,7 +173,7 @@ def __init__(
description: Optional[str] = None,
required_resource_keys: Optional[Set[str]] = None,
resource_defs: Optional[Mapping[str, ResourceDefinition]] = None,
io_manager_key: Optional[str] = None,
io_manager: Optional[Union[str, IOManagerDefinition]] = None,
compute_kind: Optional[str] = None,
dagster_type: Optional[DagsterType] = None,
partitions_def: Optional[PartitionsDefinition] = None,
Expand All @@ -181,13 +190,13 @@ def __init__(
self.required_resource_keys = check.opt_set_param(
required_resource_keys, "required_resource_keys"
)
self.io_manager_key = io_manager_key
self.io_manager = io_manager
self.compute_kind = compute_kind
self.dagster_type = dagster_type
self.partitions_def = partitions_def
self.partition_mappings = partition_mappings
self.op_tags = op_tags
self.resource_defs = check.opt_mapping_param(resource_defs, "resource_defs")
self.resource_defs = dict(check.opt_mapping_param(resource_defs, "resource_defs"))

def __call__(self, fn: Callable) -> AssetsDefinition:
asset_name = self.name or fn.__name__
Expand All @@ -198,9 +207,21 @@ def __call__(self, fn: Callable) -> AssetsDefinition:
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=ExperimentalWarning)

if isinstance(self.io_manager, str):
io_manager_key = cast(str, self.io_manager)
elif self.io_manager is not None:
io_manager_def = check.inst_param(
self.io_manager, "io_manager", IOManagerDefinition
)
out_asset_resource_key = "__".join(out_asset_key.path)
io_manager_key = f"{out_asset_resource_key}__io_manager"
self.resource_defs[io_manager_key] = cast(ResourceDefinition, io_manager_def)
else:
io_manager_key = "io_manager"

out = Out(
metadata=self.metadata or {},
io_manager_key=self.io_manager_key,
io_manager_key=io_manager_key,
dagster_type=self.dagster_type if self.dagster_type else NoValueSentinel,
description=self.description,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import pytest

from dagster import AssetKey, Out, Output
from dagster import AssetKey, IOManager, Out, Output, io_manager
from dagster._check import CheckError
from dagster.core.asset_defs import AssetIn, SourceAsset, asset, multi_asset
from dagster.core.asset_defs import AssetGroup, AssetIn, SourceAsset, asset, multi_asset
from dagster.core.storage.mem_io_manager import InMemoryIOManager


def test_with_replaced_asset_keys():
Expand Down Expand Up @@ -208,3 +209,88 @@ def test_coerced_asset_keys():
@asset(ins={"input1": AssetIn(asset_key=["Asset", "1"])})
def asset1(input1):
assert input1


def test_asset_with_io_manager_def():
events = []

class MyIOManager(IOManager):
def handle_output(self, context, _obj):
events.append(f"entered for {context.step_key}")

def load_input(self, _context):
pass

@io_manager
def the_io_manager():
return MyIOManager()

@asset(io_manager_def=the_io_manager)
def the_asset():
pass

result = AssetGroup([the_asset]).materialize()
assert result.success
assert events == ["entered for the_asset"]


def test_multiple_assets_io_manager_defs():
io_manager_inst = InMemoryIOManager()
num_times = [0]

@io_manager
def the_io_manager():
num_times[0] += 1
return io_manager_inst

# Under the hood, these io managers are mapped to different asset keys, so
# we expect the io manager initialization to be called multiple times.
@asset(io_manager_def=the_io_manager)
def the_asset():
return 5

@asset(io_manager_def=the_io_manager)
def other_asset():
return 6

AssetGroup([the_asset, other_asset]).materialize()

assert num_times[0] == 2

the_asset_key = [key for key in io_manager_inst.values.keys() if key[1] == "the_asset"][0]
assert io_manager_inst.values[the_asset_key] == 5

other_asset_key = [key for key in io_manager_inst.values.keys() if key[1] == "other_asset"][0]
assert io_manager_inst.values[other_asset_key] == 6


def test_asset_with_io_manager_key_only():
io_manager_inst = InMemoryIOManager()

@io_manager
def the_io_manager():
return io_manager_inst

@asset(io_manager_key="the_key")
def the_asset():
return 5

AssetGroup([the_asset], resource_defs={"the_key": the_io_manager}).materialize()

assert list(io_manager_inst.values.values())[0] == 5


def test_asset_both_io_manager_args_provided():
@io_manager
def the_io_manager():
pass

with pytest.raises(
CheckError,
match="Both io_manager_key and io_manager_def were provided to `@asset` "
"decorator. Please provide one or the other.",
):

@asset(io_manager_key="the_key", io_manager_def=the_io_manager)
def the_asset():
pass

0 comments on commit 03b24cd

Please sign in to comment.