Skip to content

Commit

Permalink
[with-resources changes 2/n] If resources collide when using with_res…
Browse files Browse the repository at this point in the history
…ources, error. (#8330)

* Error if resource collisions

* Fix asset job implications as a result of with_resources changes

* Use __str__ instead of to_string

* Fixed asset group resource dependency issue

* Fix repository def tests

* Fix linting, mypy errors
  • Loading branch information
dpeng817 committed Jun 13, 2022
1 parent 56fa984 commit c212e85
Show file tree
Hide file tree
Showing 9 changed files with 252 additions and 71 deletions.
9 changes: 5 additions & 4 deletions python_modules/dagster/dagster/core/asset_defs/asset_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
from dagster.core.definitions.executor_definition import in_process_executor
from dagster.core.errors import DagsterUnmetExecutorRequirementsError
from dagster.core.execution.execute_in_process_result import ExecuteInProcessResult
from dagster.core.execution.with_resources import with_resources
from dagster.core.selector.subset_selector import AssetSelectionData
from dagster.utils import merge_dicts
from dagster.utils.backcompat import ExperimentalWarning

from ..definitions.asset_layer import build_asset_selection_job
Expand All @@ -21,8 +21,9 @@
from ..definitions.partition import PartitionsDefinition
from ..definitions.resource_definition import ResourceDefinition
from ..errors import DagsterInvalidDefinitionError
from ..storage.fs_io_manager import fs_io_manager
from .assets import AssetsDefinition
from .assets_job import build_assets_job
from .assets_job import build_assets_job, check_resources_satisfy_requirements
from .load_assets_from_modules import (
assets_and_source_assets_from_modules,
assets_and_source_assets_from_package_module,
Expand Down Expand Up @@ -104,10 +105,10 @@ def __init__(
resource_defs = check.opt_mapping_param(
resource_defs, "resource_defs", key_type=str, value_type=ResourceDefinition
)
resource_defs = merge_dicts({"io_manager": fs_io_manager}, resource_defs)
executor_def = check.opt_inst_param(executor_def, "executor_def", ExecutorDefinition)

assets = with_resources(assets, resource_defs)
source_assets = with_resources(source_assets, resource_defs)
check_resources_satisfy_requirements(assets, source_assets, resource_defs)

self._assets = assets
self._source_assets = source_assets
Expand Down
22 changes: 22 additions & 0 deletions python_modules/dagster/dagster/core/asset_defs/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from dagster.core.definitions.events import AssetKey
from dagster.core.definitions.partition import PartitionsDefinition
from dagster.core.definitions.utils import DEFAULT_GROUP_NAME, validate_group_name
from dagster.core.errors import DagsterInvalidInvocationError
from dagster.core.execution.context.compute import OpExecutionContext
from dagster.utils import merge_dicts
from dagster.utils.backcompat import deprecation_warning
Expand All @@ -20,6 +21,7 @@
ResourceAddable,
ResourceRequirement,
ensure_requirements_satisfied,
get_resource_key_conflicts,
)
from .partition_mapping import PartitionMapping
from .source_asset import SourceAsset
Expand Down Expand Up @@ -416,9 +418,29 @@ def get_resource_requirements(self) -> Iterator[ResourceRequirement]:
def required_resource_keys(self) -> Set[str]:
return {requirement.key for requirement in self.get_resource_requirements()}

def __str__(self):
if len(self.asset_keys) == 1:
return f"AssetsDefinition with key {self.asset_key.to_string()}"
else:
asset_keys = ", ".join(
sorted(list([asset_key.to_string() for asset_key in self.asset_keys]))
)
return f"AssetsDefinition with keys {asset_keys}"

def with_resources(self, resource_defs: Mapping[str, ResourceDefinition]) -> "AssetsDefinition":
from dagster.core.execution.resources_init import get_transitive_required_resource_keys

overlapping_keys = get_resource_key_conflicts(self.resource_defs, resource_defs)
if overlapping_keys:
overlapping_keys_str = ", ".join(sorted(list(overlapping_keys)))
raise DagsterInvalidInvocationError(
f"{str(self)} has conflicting resource "
"definitions with provided resources for the following keys: "
f"{overlapping_keys_str}. Either remove the existing "
"resources from the asset or change the resource keys so that "
"they don't overlap."
)

merged_resource_defs = merge_dicts(resource_defs, self.resource_defs)

# Ensure top-level resource requirements are met - except for
Expand Down
70 changes: 58 additions & 12 deletions python_modules/dagster/dagster/core/asset_defs/assets_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
from dagster.core.definitions.output import OutputDefinition
from dagster.core.definitions.partition import PartitionedConfig, PartitionsDefinition
from dagster.core.definitions.resource_definition import ResourceDefinition
from dagster.core.definitions.resource_requirement import ensure_requirements_satisfied
from dagster.core.errors import DagsterInvalidDefinitionError
from dagster.core.execution.with_resources import with_resources
from dagster.core.selector.subset_selector import AssetSelectionData
from dagster.utils import merge_dicts
from dagster.utils.backcompat import experimental
Expand Down Expand Up @@ -102,9 +102,6 @@ def asset2(asset1):
resource_defs = check.opt_mapping_param(resource_defs, "resource_defs")
resource_defs = merge_dicts({"io_manager": default_job_io_manager}, resource_defs)

assets = with_resources(assets, resource_defs)
source_assets = with_resources(source_assets, resource_defs)

source_assets_by_key = build_source_assets_by_key(source_assets)
deps, assets_defs_by_node_handle = build_deps(assets, source_assets_by_key.keys())

Expand Down Expand Up @@ -135,7 +132,7 @@ def asset2(asset1):
graph, assets_defs_by_node_handle, resolved_source_assets
)

all_resource_defs = get_all_resource_defs(assets, resolved_source_assets)
all_resource_defs = get_all_resource_defs(assets, resolved_source_assets, resource_defs)

return graph.to_job(
resource_defs=all_resource_defs,
Expand Down Expand Up @@ -348,20 +345,69 @@ def _dfs(name, cur_color):
return ret


def get_all_resource_defs(
assets: Sequence[AssetsDefinition], source_assets: Sequence[SourceAsset]
) -> Dict[str, ResourceDefinition]:
all_resource_defs = {}
def _ensure_resources_dont_conflict(
assets: Iterable[AssetsDefinition],
source_assets: Sequence[SourceAsset],
resource_defs: Mapping[str, ResourceDefinition],
) -> None:
"""Ensures that resources between assets, source assets, and provided resource dictionary do not conflict."""
resource_defs_from_assets = {}
all_assets: Sequence[Union[AssetsDefinition, SourceAsset]] = [*assets, *source_assets]
for asset in all_assets:
for resource_key, resource_def in asset.resource_defs.items():
if resource_key not in all_resource_defs:
all_resource_defs[resource_key] = resource_def
if all_resource_defs[resource_key] != resource_def:
if resource_key not in resource_defs_from_assets:
resource_defs_from_assets[resource_key] = resource_def
if resource_defs_from_assets[resource_key] != resource_def:
raise DagsterInvalidDefinitionError(
f"Conflicting versions of resource with key '{resource_key}' "
"were provided to different assets. When constructing a "
"job, all resource definitions provided to assets must "
"match by reference equality for a given key."
)
for resource_key, resource_def in resource_defs.items():
if (
resource_key != "io_manager"
and resource_key in resource_defs_from_assets
and resource_defs_from_assets[resource_key] != resource_def
):
raise DagsterInvalidDefinitionError(
f"resource with key '{resource_key}' provided to job "
"conflicts with resource provided to assets. When constructing a "
"job, all resource definitions provided must "
"match by reference equality for a given key."
)


def check_resources_satisfy_requirements(
assets: Iterable[AssetsDefinition],
source_assets: Sequence[SourceAsset],
resource_defs: Mapping[str, ResourceDefinition],
) -> None:
"""Ensures that between the provided resources on an asset and the resource_defs mapping, that all resource requirements are satisfied.
Note that resources provided on assets cannot satisfy resource requirements provided on other assets.
"""

_ensure_resources_dont_conflict(assets, source_assets, resource_defs)

all_assets: Sequence[Union[AssetsDefinition, SourceAsset]] = [*assets, *source_assets]
for asset in all_assets:
ensure_requirements_satisfied(
merge_dicts(resource_defs, asset.resource_defs), list(asset.get_resource_requirements())
)


def get_all_resource_defs(
assets: Iterable[AssetsDefinition],
source_assets: Sequence[SourceAsset],
resource_defs: Mapping[str, ResourceDefinition],
) -> Dict[str, ResourceDefinition]:

# Ensures that no resource keys conflict, and each asset has its resource requirements satisfied.
check_resources_satisfy_requirements(assets, source_assets, resource_defs)

all_resource_defs = dict(resource_defs)
all_assets: Sequence[Union[AssetsDefinition, SourceAsset]] = [*assets, *source_assets]
for asset in all_assets:
all_resource_defs = merge_dicts(all_resource_defs, asset.resource_defs)
return all_resource_defs
28 changes: 25 additions & 3 deletions python_modules/dagster/dagster/core/asset_defs/source_asset.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, Mapping, NamedTuple, Optional, Sequence, Union, cast
from typing import Dict, Iterator, Mapping, NamedTuple, Optional, Sequence, Union, cast

import dagster._check as check
from dagster.core.definitions.events import AssetKey, CoercibleToAssetKey
Expand All @@ -11,9 +11,14 @@
)
from dagster.core.definitions.partition import PartitionsDefinition
from dagster.core.definitions.resource_definition import ResourceDefinition
from dagster.core.definitions.resource_requirement import ResourceAddable
from dagster.core.definitions.resource_requirement import (
ResourceAddable,
ResourceRequirement,
SourceAssetIOManagerRequirement,
get_resource_key_conflicts,
)
from dagster.core.definitions.utils import DEFAULT_GROUP_NAME, validate_group_name
from dagster.core.errors import DagsterInvalidDefinitionError
from dagster.core.errors import DagsterInvalidDefinitionError, DagsterInvalidInvocationError
from dagster.core.storage.io_manager import IOManagerDefinition
from dagster.utils import merge_dicts

Expand Down Expand Up @@ -110,6 +115,16 @@ def io_manager_def(self) -> Optional[IOManagerDefinition]:
def with_resources(self, resource_defs) -> "SourceAsset":
from dagster.core.execution.resources_init import get_transitive_required_resource_keys

overlapping_keys = get_resource_key_conflicts(self.resource_defs, resource_defs)
if overlapping_keys:
raise DagsterInvalidInvocationError(
f"SourceAsset with key {self.key} has conflicting resource "
"definitions with provided resources for the following keys: "
f"{sorted(list(overlapping_keys))}. Either remove the existing "
"resources from the asset or change the resource keys so that "
"they don't overlap."
)

merged_resource_defs = merge_dicts(resource_defs, self.resource_defs)

io_manager_def = merged_resource_defs.get(self.get_io_manager_key())
Expand Down Expand Up @@ -156,3 +171,10 @@ def with_group_name(self, group_name: str) -> "SourceAsset":
group_name=group_name,
resource_defs=self.resource_defs,
)

def get_resource_requirements(self) -> Iterator[ResourceRequirement]:
yield SourceAssetIOManagerRequirement(
key=self.get_io_manager_key(), asset_key=self.key.to_string()
)
for source_key, resource_def in self.resource_defs.items():
yield from resource_def.get_resource_requirements(outer_context=source_key)
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,29 @@ def describe_requirement(self) -> str:
return f"input manager with key '{self.key}' required by input '{self.input_name}' of {self.node_description}"


class SourceAssetIOManagerRequirement(
NamedTuple(
"_InputManagerRequirement",
[
("key", str),
("asset_key", Optional[str]),
],
),
ResourceRequirement,
):
@property
def expected_type(self) -> Type:
from ..storage.io_manager import IOManagerDefinition

return IOManagerDefinition

def describe_requirement(self) -> str:
source_asset_descriptor = (
f"SourceAsset with key {self.asset_key}" if self.asset_key else "SourceAsset"
)
return f"io manager with key '{self.key}' required by {source_asset_descriptor}"


class OutputManagerRequirement(
NamedTuple(
"_OutputManagerRequirement", [("key", str), ("node_description", str), ("output_name", str)]
Expand Down Expand Up @@ -205,3 +228,12 @@ def ensure_requirements_satisfied(
raise DagsterInvalidDefinitionError(
f"{requirement.describe_requirement()} was not provided{mode_descriptor}. Please provide a {str(requirement.expected_type)} to key '{requirement.key}', or change the required key to one of the following keys which points to an {str(requirement.expected_type)}: {requirement.keys_of_expected_type(resource_defs)}"
)


def get_resource_key_conflicts(
resource_defs: Mapping[str, "ResourceDefinition"],
other_resource_defs: Mapping[str, "ResourceDefinition"],
) -> AbstractSet[str]:
overlapping_keys = set(resource_defs.keys()).intersection(set(other_resource_defs.keys()))
overlapping_keys = {key for key in overlapping_keys if key != "io_manager"}
return overlapping_keys
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
Out,
Output,
ResourceDefinition,
fs_io_manager,
graph,
in_process_executor,
io_manager,
Expand Down Expand Up @@ -156,7 +155,7 @@ def asset_foo(context):

with pytest.raises(
DagsterInvalidDefinitionError,
match=r"SourceAsset with asset key AssetKey\(\['foo'\]\) requires IO manager with key 'foo', but none was provided.",
match=r"io manager with key 'foo' required by SourceAsset with key \[\"foo\"\] was not provided.",
):
AssetGroup([], source_assets=[source_asset_io_req])

Expand Down Expand Up @@ -1282,18 +1281,15 @@ def my_multi_asset():
SourceAsset(
AssetKey(["my_asset"]),
io_manager_key="io_manager",
resource_defs={"io_manager": fs_io_manager},
group_name="abc",
),
SourceAsset(
AssetKey(["my_asset_name"]),
io_manager_key="io_manager",
resource_defs={"io_manager": fs_io_manager},
),
SourceAsset(
AssetKey(["my_other_asset"]),
io_manager_key="io_manager",
resource_defs={"io_manager": fs_io_manager},
),
]

Expand Down Expand Up @@ -1326,44 +1322,14 @@ def the_asset():
def other_asset():
pass

group = AssetGroup([the_asset, other_asset])

with pytest.raises(
DagsterInvalidDefinitionError,
match="Conflicting versions of resource with key 'foo' were provided to "
"different assets. When constructing a job, all resource definitions "
"provided to assets must match by reference equality for a given key.",
):

group.build_job("some_name", selection="the_asset")


def test_repo_asset_group_diff_resource_defs():
the_resource = ResourceDefinition.hardcoded_resource("blah")
other_resource = ResourceDefinition.hardcoded_resource("baz")

@asset(resource_defs={"foo": the_resource})
def the_asset():
pass

@asset(resource_defs={"foo": other_resource})
def other_asset():
pass

group = AssetGroup([the_asset, other_asset])

# Demonstrate that repository construction with conflicting versions of
# same key fails
with pytest.raises(
DagsterInvalidDefinitionError,
match="Conflicting versions of resource with key 'foo' were provided to "
"different assets. When constructing a job, all resource definitions "
"provided to assets must match by reference equality for a given key.",
):

@repository
def use_group():
return [group]
AssetGroup([the_asset, other_asset])


def test_graph_backed_asset_resources():
Expand Down Expand Up @@ -1394,9 +1360,8 @@ def basic():
resource_defs={"foo": other_resource},
)

asset_group = AssetGroup([the_asset, other_asset])
with pytest.raises(
DagsterInvalidDefinitionError,
match="Conflicting versions of resource with key 'foo' were provided to different assets. When constructing a job, all resource definitions provided to assets must match by reference equality for a given key.",
):
asset_group.materialize()
AssetGroup([the_asset, other_asset])
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ def asset1(source1):

with pytest.raises(
DagsterInvalidDefinitionError,
match=r"SourceAsset with asset key AssetKey\(\['source1'\]\) requires IO manager with key 'special_io_manager', but none was provided.",
match=r"io manager with key 'special_io_manager' required by SourceAsset with key \[\"source1\"\] was not provided.",
):
build_assets_job(
"a",
Expand Down Expand Up @@ -1595,7 +1595,7 @@ def my_derived_asset(my_source_asset): # pylint: disable=unused-argument
pass

with pytest.raises(
DagsterInvariantViolationError,
match="Resource with key 'foo' required by resource with key 'my_source_asset__io_manager', but not provided.",
DagsterInvalidDefinitionError,
match="resource with key 'foo' required by resource with key 'my_source_asset__io_manager' was not provided.",
):
build_assets_job(name="test", assets=[my_derived_asset], source_assets=[my_source_asset])

0 comments on commit c212e85

Please sign in to comment.