Skip to content

Commit

Permalink
resource defs on asset def (#7918)
Browse files Browse the repository at this point in the history
Add test cases

Fix tests, lint, mypy
  • Loading branch information
dpeng817 committed May 24, 2022
1 parent 08e37e3 commit b1ab74b
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 9 deletions.
9 changes: 6 additions & 3 deletions python_modules/dagster/dagster/core/asset_defs/asset_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,10 +664,13 @@ def _validate_resource_reqs_for_asset_group(
):
present_resource_keys = set(resource_defs.keys())
for asset_def in asset_list:
resource_keys: Set[str] = set()
provided_resource_keys = set(asset_def.resource_defs.keys())
present_resource_keys = present_resource_keys.union(provided_resource_keys)

required_resource_keys: Set[str] = set()
for op_def in asset_def.node_def.iterate_solid_defs():
resource_keys.update(set(op_def.required_resource_keys or {}))
missing_resource_keys = list(set(resource_keys) - present_resource_keys)
required_resource_keys.update(set(op_def.required_resource_keys or {}))
missing_resource_keys = list(set(required_resource_keys) - present_resource_keys)
if missing_resource_keys:
raise DagsterInvalidDefinitionError(
f"AssetGroup is missing required resource keys for asset '{asset_def.node_def.name}'. "
Expand Down
17 changes: 15 additions & 2 deletions python_modules/dagster/dagster/core/asset_defs/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@
from typing import AbstractSet, Dict, Iterable, Mapping, Optional, Sequence, Set, cast

import dagster._check as check
from dagster.core.definitions import GraphDefinition, NodeDefinition, NodeHandle, OpDefinition
from dagster.core.definitions import (
GraphDefinition,
NodeDefinition,
NodeHandle,
OpDefinition,
ResourceDefinition,
)
from dagster.core.definitions.events import AssetKey
from dagster.core.definitions.partition import PartitionsDefinition
from dagster.utils.backcompat import ExperimentalWarning, experimental
Expand All @@ -22,7 +28,8 @@ def __init__(
asset_deps: Optional[Mapping[AssetKey, AbstractSet[AssetKey]]] = None,
selected_asset_keys: Optional[AbstractSet[AssetKey]] = None,
can_subset: bool = False,
# if adding new fields, make sure to handle them in both with_replaced_asset_keys and subset_for
resource_defs: Optional[Mapping[str, ResourceDefinition]] = None,
# if adding new fields, make sure to handle them in the with_replaced_asset_keys method
):
self._node_def = node_def
self._asset_keys_by_input_name = check.dict_param(
Expand Down Expand Up @@ -54,6 +61,7 @@ def __init__(
f"asset_deps keys: {set(self._asset_deps.keys())} \n"
f"expected keys: {all_asset_keys}",
)
self._resource_defs = check.opt_mapping_param(resource_defs, "resource_defs")

if selected_asset_keys is not None:
self._selected_asset_keys = selected_asset_keys
Expand Down Expand Up @@ -160,6 +168,10 @@ def asset_key(self) -> AssetKey:

return next(iter(self.asset_keys))

@property
def resource_defs(self) -> Mapping[str, ResourceDefinition]:
return self._resource_defs

@property
def asset_keys(self) -> AbstractSet[AssetKey]:
return self._selected_asset_keys
Expand Down Expand Up @@ -246,6 +258,7 @@ def with_replaced_asset_keys(
selected_asset_keys={
output_asset_key_replacements.get(key, key) for key in self._selected_asset_keys
},
resource_defs=self.resource_defs,
)

def subset_for(self, selected_asset_keys: AbstractSet[AssetKey]) -> "AssetsDefinition":
Expand Down
15 changes: 14 additions & 1 deletion python_modules/dagster/dagster/core/asset_defs/assets_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def asset2(asset1):
deps, assets_defs_by_node_handle = build_deps(assets, source_assets_by_key.keys())
root_manager = build_root_manager(source_assets_by_key)
partitioned_config = build_job_partitions_from_assets(assets, source_assets or [])
resource_defs = check.opt_mapping_param(resource_defs, "resource_defs")

graph = GraphDefinition(
name=name,
Expand All @@ -115,9 +116,21 @@ def asset2(asset1):
config=None,
)

all_resource_defs = dict(resource_defs)
for asset_def in assets:
for resource_key, resource_def in asset_def.resource_defs.items():
if (
resource_key in all_resource_defs
and all_resource_defs[resource_key] != resource_def
):
raise DagsterInvalidDefinitionError(
f"When attempting to build job, asset {asset_def.asset_key} had a conflicting version of the same resource key {resource_key}. Please resolve this conflict by giving different keys to each resource definition."
)
all_resource_defs[resource_key] = resource_def

return graph.to_job(
resource_defs=merge_dicts(
{"io_manager": fs_asset_io_manager}, resource_defs or {}, {"root_manager": root_manager}
{"io_manager": fs_asset_io_manager}, all_resource_defs, {"root_manager": root_manager}
),
config=config or partitioned_config,
tags=tags,
Expand Down
19 changes: 17 additions & 2 deletions python_modules/dagster/dagster/core/asset_defs/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from dagster.core.definitions.input import In
from dagster.core.definitions.output import Out
from dagster.core.definitions.partition import PartitionsDefinition
from dagster.core.definitions.resource_definition import ResourceDefinition
from dagster.core.definitions.utils import NoValueSentinel
from dagster.core.errors import DagsterInvalidDefinitionError
from dagster.core.types.dagster_type import DagsterType
Expand Down Expand Up @@ -50,6 +51,7 @@ def asset(
metadata: Optional[Mapping[str, Any]] = ...,
description: Optional[str] = ...,
required_resource_keys: Optional[Set[str]] = ...,
resource_defs: Optional[Mapping[str, ResourceDefinition]] = ...,
io_manager_key: Optional[str] = ...,
compute_kind: Optional[str] = ...,
dagster_type: Optional[DagsterType] = ...,
Expand All @@ -69,6 +71,7 @@ def asset(
metadata: Optional[Mapping[str, Any]] = None,
description: Optional[str] = None,
required_resource_keys: Optional[Set[str]] = None,
resource_defs: Optional[Mapping[str, ResourceDefinition]] = None,
io_manager_key: Optional[str] = None,
compute_kind: Optional[str] = None,
dagster_type: Optional[DagsterType] = None,
Expand Down Expand Up @@ -138,6 +141,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
metadata=metadata,
description=description,
required_resource_keys=required_resource_keys,
resource_defs=resource_defs,
io_manager_key=io_manager_key,
compute_kind=check.opt_str_param(compute_kind, "compute_kind"),
dagster_type=dagster_type,
Expand All @@ -159,6 +163,7 @@ def __init__(
metadata: Optional[Mapping[str, Any]] = None,
description: Optional[str] = None,
required_resource_keys: Optional[Set[str]] = None,
resource_defs: Optional[Mapping[str, ResourceDefinition]] = None,
io_manager_key: Optional[str] = None,
compute_kind: Optional[str] = None,
dagster_type: Optional[DagsterType] = None,
Expand All @@ -173,13 +178,16 @@ def __init__(
self.non_argument_deps = non_argument_deps
self.metadata = metadata
self.description = description
self.required_resource_keys = required_resource_keys
self.required_resource_keys = check.opt_set_param(
required_resource_keys, "required_resource_keys"
)
self.io_manager_key = io_manager_key
self.compute_kind = compute_kind
self.dagster_type = dagster_type
self.partitions_def = partitions_def
self.partition_mappings = partition_mappings
self.op_tags = op_tags
self.resource_defs = check.opt_mapping_param(resource_defs, "resource_defs")

def __call__(self, fn: Callable) -> AssetsDefinition:
asset_name = self.name or fn.__name__
Expand All @@ -197,12 +205,18 @@ def __call__(self, fn: Callable) -> AssetsDefinition:
description=self.description,
)

required_resource_keys = set()
for key in self.required_resource_keys:
required_resource_keys.add(key)
for key in self.resource_defs.keys():
required_resource_keys.add(key)

op = _Op(
name="__".join(out_asset_key.path),
description=self.description,
ins=dict(asset_ins.values()),
out=out,
required_resource_keys=self.required_resource_keys,
required_resource_keys=required_resource_keys,
tags={
**({"kind": self.compute_kind} if self.compute_kind else {}),
**(self.op_tags or {}),
Expand All @@ -229,6 +243,7 @@ def __call__(self, fn: Callable) -> AssetsDefinition:
}
if self.partition_mappings
else None,
resource_defs=self.resource_defs,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,18 @@
io_manager,
mem_io_manager,
multiprocess_executor,
op,
repository,
resource,
)
from dagster.core.asset_defs import AssetGroup, AssetIn, SourceAsset, asset, multi_asset
from dagster.core.asset_defs import (
AssetGroup,
AssetIn,
AssetsDefinition,
SourceAsset,
asset,
multi_asset,
)
from dagster.core.errors import DagsterInvalidSubsetError, DagsterUnmetExecutorRequirementsError
from dagster.core.test_utils import instance_for_test

Expand Down Expand Up @@ -921,3 +929,98 @@ def my_multi_asset():
SourceAsset(AssetKey(["my_asset_name"])),
SourceAsset(AssetKey(["my_other_asset"])),
]


def test_build_job_resource_defs_on_asset():
the_resource = ResourceDefinition.hardcoded_resource("blah")

@asset(required_resource_keys={"bar"}, resource_defs={"foo": the_resource})
def the_asset():
pass

@asset(resource_defs={"foo": the_resource})
def other_asset():
pass

group = AssetGroup([the_asset, other_asset], resource_defs={"bar": the_resource})
the_job = group.build_job("some_name")
assert the_job.execute_in_process().success


def test_build_job_diff_resource_defs():
the_resource = ResourceDefinition.hardcoded_resource("blah")
other_resource = ResourceDefinition.hardcoded_resource("baz")

@asset(resource_defs={"foo": the_resource})
def the_asset():
pass

@asset(resource_defs={"foo": other_resource})
def other_asset():
pass

group = AssetGroup([the_asset, other_asset])

assert group.build_job("some_name", selection="the_asset").execute_in_process().success


def test_repo_asset_group_diff_resource_defs():
the_resource = ResourceDefinition.hardcoded_resource("blah")
other_resource = ResourceDefinition.hardcoded_resource("baz")

@asset(resource_defs={"foo": the_resource})
def the_asset():
pass

@asset(resource_defs={"foo": other_resource})
def other_asset():
pass

group = AssetGroup([the_asset, other_asset])

# Demonstrate that repository construction with conflicting versions of
# same key fails
with pytest.raises(
DagsterInvalidDefinitionError,
match="had a conflicting version of the same resource key foo. Please resolve this conflict by giving different keys to each resource definition.",
):

@repository
def use_group():
return [group]


def test_graph_backed_asset_resources():
@op(required_resource_keys={"foo"})
def the_op():
pass

@graph
def basic():
return the_op()

the_resource = ResourceDefinition.hardcoded_resource("blah")
other_resource = ResourceDefinition.hardcoded_resource("baz")

the_asset = AssetsDefinition(
asset_keys_by_input_name={},
asset_keys_by_output_name={"result": AssetKey("the_asset")},
node_def=basic,
resource_defs={"foo": the_resource},
)
no_conflict_group = AssetGroup([the_asset])
assert no_conflict_group.materialize().success

other_asset = AssetsDefinition(
asset_keys_by_input_name={},
asset_keys_by_output_name={"result": AssetKey("other_asset")},
node_def=basic,
resource_defs={"foo": other_resource},
)

asset_group = AssetGroup([the_asset, other_asset])
with pytest.raises(
DagsterInvalidDefinitionError,
match="had a conflicting version of the same resource key foo.",
):
asset_group.materialize()

0 comments on commit b1ab74b

Please sign in to comment.